Home / Function/ _stream() — langchain Function Reference

_stream() — langchain Function Reference

Architecture documentation for the _stream() function in chat_models.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  d798c5eb_b3ec_7dcd_afbe_81031dc680c3["_stream()"]
  36b59643_acfc_fb1d_752e_ae7ec32a79a4["ChatPerplexity"]
  d798c5eb_b3ec_7dcd_afbe_81031dc680c3 -->|defined in| 36b59643_acfc_fb1d_752e_ae7ec32a79a4
  cf70f32b_032d_e8a9_45fe_13a6419bea5c["_generate()"]
  cf70f32b_032d_e8a9_45fe_13a6419bea5c -->|calls| d798c5eb_b3ec_7dcd_afbe_81031dc680c3
  b9349daf_2369_217d_7661_9f71b6258a13["_create_message_dicts()"]
  d798c5eb_b3ec_7dcd_afbe_81031dc680c3 -->|calls| b9349daf_2369_217d_7661_9f71b6258a13
  48f104ff_5195_d7be_593e_90381b500a59["_convert_delta_to_message_chunk()"]
  d798c5eb_b3ec_7dcd_afbe_81031dc680c3 -->|calls| 48f104ff_5195_d7be_593e_90381b500a59
  d64af782_4aac_1e87_a23b_e6300fcdc624["_create_usage_metadata()"]
  d798c5eb_b3ec_7dcd_afbe_81031dc680c3 -->|calls| d64af782_4aac_1e87_a23b_e6300fcdc624
  style d798c5eb_b3ec_7dcd_afbe_81031dc680c3 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/partners/perplexity/langchain_perplexity/chat_models.py lines 415–502

    def _stream(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: CallbackManagerForLLMRun | None = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        message_dicts, params = self._create_message_dicts(messages, stop)
        params = {**params, **kwargs}
        default_chunk_class = AIMessageChunk
        params.pop("stream", None)
        if stop:
            params["stop_sequences"] = stop
        stream_resp = self.client.chat.completions.create(
            messages=message_dicts, stream=True, **params
        )
        first_chunk = True
        prev_total_usage: UsageMetadata | None = None

        added_model_name: bool = False
        added_search_queries: bool = False
        added_search_context_size: bool = False
        for chunk in stream_resp:
            if not isinstance(chunk, dict):
                chunk = chunk.model_dump()
            # Collect standard usage metadata (transform from aggregate to delta)
            if total_usage := chunk.get("usage"):
                lc_total_usage = _create_usage_metadata(total_usage)
                if prev_total_usage:
                    usage_metadata: UsageMetadata | None = subtract_usage(
                        lc_total_usage, prev_total_usage
                    )
                else:
                    usage_metadata = lc_total_usage
                prev_total_usage = lc_total_usage
            else:
                usage_metadata = None
            if len(chunk["choices"]) == 0:
                continue
            choice = chunk["choices"][0]

            additional_kwargs = {}
            if first_chunk:
                additional_kwargs["citations"] = chunk.get("citations", [])
                for attr in ["images", "related_questions", "search_results"]:
                    if attr in chunk:
                        additional_kwargs[attr] = chunk[attr]

                if chunk.get("videos"):
                    additional_kwargs["videos"] = chunk["videos"]

                if chunk.get("reasoning_steps"):
                    additional_kwargs["reasoning_steps"] = chunk["reasoning_steps"]

            generation_info = {}
            if (model_name := chunk.get("model")) and not added_model_name:
                generation_info["model_name"] = model_name
                added_model_name = True
            # Add num_search_queries to generation_info if present
            if total_usage := chunk.get("usage"):
                if num_search_queries := total_usage.get("num_search_queries"):
                    if not added_search_queries:
                        generation_info["num_search_queries"] = num_search_queries
                        added_search_queries = True
                if not added_search_context_size:
                    if search_context_size := total_usage.get("search_context_size"):
                        generation_info["search_context_size"] = search_context_size
                        added_search_context_size = True

            chunk = self._convert_delta_to_message_chunk(
                choice["delta"], default_chunk_class
            )

            if isinstance(chunk, AIMessageChunk) and usage_metadata:
                chunk.usage_metadata = usage_metadata

            if first_chunk:
                chunk.additional_kwargs |= additional_kwargs
                first_chunk = False

            if finish_reason := choice.get("finish_reason"):

Domain

Subdomains

Called By

Frequently Asked Questions

What does _stream() do?
_stream() is a function in the langchain codebase, defined in libs/partners/perplexity/langchain_perplexity/chat_models.py.
Where is _stream() defined?
_stream() is defined in libs/partners/perplexity/langchain_perplexity/chat_models.py at line 415.
What does _stream() call?
_stream() calls 3 function(s): _convert_delta_to_message_chunk, _create_message_dicts, _create_usage_metadata.
What calls _stream()?
_stream() is called by 1 function(s): _generate.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free