Home / Function/ _astream() — langchain Function Reference

_astream() — langchain Function Reference

Architecture documentation for the _astream() function in base.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  47573e6b_689c_1e1b_b035_1aee9c3d3bba["_astream()"]
  2a683305_667b_3567_cab9_9f77e29d4afa["BaseChatOpenAI"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|defined in| 2a683305_667b_3567_cab9_9f77e29d4afa
  fe7d2227_7e9d_1aa6_e506_35d77859da4c["_astream()"]
  fe7d2227_7e9d_1aa6_e506_35d77859da4c -->|calls| 47573e6b_689c_1e1b_b035_1aee9c3d3bba
  df9175a2_1cf3_cb55_7d03_6f5b7e1bc76b["_should_stream_usage()"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|calls| df9175a2_1cf3_cb55_7d03_6f5b7e1bc76b
  36b15b48_0822_029c_4a53_8243405e5a5e["_get_request_payload()"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|calls| 36b15b48_0822_029c_4a53_8243405e5a5e
  9dd73ff5_bb27_7bf2_5124_b82e93cd60f6["_convert_chunk_to_generation_chunk()"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|calls| 9dd73ff5_bb27_7bf2_5124_b82e93cd60f6
  2d1ed7ab_3dc5_34eb_9bfb_4f79c345fc6b["_get_generation_chunk_from_completion()"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|calls| 2d1ed7ab_3dc5_34eb_9bfb_4f79c345fc6b
  fe7d2227_7e9d_1aa6_e506_35d77859da4c["_astream()"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|calls| fe7d2227_7e9d_1aa6_e506_35d77859da4c
  6a6e1bc7_82ad_0ec6_6f76_46c87a121099["_handle_openai_bad_request()"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|calls| 6a6e1bc7_82ad_0ec6_6f76_46c87a121099
  9b7290da_4511_6588_b149_1f3f5856fece["_handle_openai_api_error()"]
  47573e6b_689c_1e1b_b035_1aee9c3d3bba -->|calls| 9b7290da_4511_6588_b149_1f3f5856fece
  style 47573e6b_689c_1e1b_b035_1aee9c3d3bba fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/partners/openai/langchain_openai/chat_models/base.py lines 1549–1623

    async def _astream(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        run_manager: AsyncCallbackManagerForLLMRun | None = None,
        *,
        stream_usage: bool | None = None,
        **kwargs: Any,
    ) -> AsyncIterator[ChatGenerationChunk]:
        kwargs["stream"] = True
        stream_usage = self._should_stream_usage(stream_usage, **kwargs)
        if stream_usage:
            kwargs["stream_options"] = {"include_usage": stream_usage}
        payload = self._get_request_payload(messages, stop=stop, **kwargs)
        default_chunk_class: type[BaseMessageChunk] = AIMessageChunk
        base_generation_info = {}

        try:
            if "response_format" in payload:
                if self.include_response_headers:
                    warnings.warn(
                        "Cannot currently include response headers when "
                        "response_format is specified."
                    )
                payload.pop("stream")
                response_stream = self.root_async_client.beta.chat.completions.stream(
                    **payload
                )
                context_manager = response_stream
            else:
                if self.include_response_headers:
                    raw_response = await self.async_client.with_raw_response.create(
                        **payload
                    )
                    response = raw_response.parse()
                    base_generation_info = {"headers": dict(raw_response.headers)}
                else:
                    response = await self.async_client.create(**payload)
                context_manager = response
            async with context_manager as response:
                is_first_chunk = True
                async for chunk in response:
                    if not isinstance(chunk, dict):
                        chunk = chunk.model_dump()
                    generation_chunk = self._convert_chunk_to_generation_chunk(
                        chunk,
                        default_chunk_class,
                        base_generation_info if is_first_chunk else {},
                    )
                    if generation_chunk is None:
                        continue
                    default_chunk_class = generation_chunk.message.__class__
                    logprobs = (generation_chunk.generation_info or {}).get("logprobs")
                    if run_manager:
                        await run_manager.on_llm_new_token(
                            generation_chunk.text,
                            chunk=generation_chunk,
                            logprobs=logprobs,
                        )
                    is_first_chunk = False
                    yield generation_chunk
        except openai.BadRequestError as e:
            _handle_openai_bad_request(e)
        except openai.APIError as e:
            _handle_openai_api_error(e)
        if hasattr(response, "get_final_completion") and "response_format" in payload:
            final_completion = await response.get_final_completion()
            generation_chunk = self._get_generation_chunk_from_completion(
                final_completion
            )
            if run_manager:
                await run_manager.on_llm_new_token(
                    generation_chunk.text, chunk=generation_chunk
                )
            yield generation_chunk

Domain

Subdomains

Called By

Frequently Asked Questions

What does _astream() do?
_astream() is a function in the langchain codebase, defined in libs/partners/openai/langchain_openai/chat_models/base.py.
Where is _astream() defined?
_astream() is defined in libs/partners/openai/langchain_openai/chat_models/base.py at line 1549.
What does _astream() call?
_astream() calls 7 function(s): _astream, _convert_chunk_to_generation_chunk, _get_generation_chunk_from_completion, _get_request_payload, _handle_openai_api_error, _handle_openai_bad_request, _should_stream_usage.
What calls _astream()?
_astream() is called by 1 function(s): _astream.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free