Home / Function/ astream() — langchain Function Reference

astream() — langchain Function Reference

Architecture documentation for the astream() function in chat_models.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa["astream()"]
  d009a608_c505_bd50_7200_0de8a69ba4b7["BaseChatModel"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|defined in| d009a608_c505_bd50_7200_0de8a69ba4b7
  85ef976a_84b2_6421_f697_db0f396b2444["_should_stream()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 85ef976a_84b2_6421_f697_db0f396b2444
  39de84e5_4b2e_9b0f_208f_399a7b79ebc1["ainvoke()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 39de84e5_4b2e_9b0f_208f_399a7b79ebc1
  b0e1e167_44cd_1c63_6e71_0caf683fc904["_convert_input()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| b0e1e167_44cd_1c63_6e71_0caf683fc904
  5c320356_b8cd_92a3_38a3_2878a3c460d0["_get_invocation_params()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 5c320356_b8cd_92a3_38a3_2878a3c460d0
  47283f0d_d8e7_addf_ea32_c0ecefe3d97c["_get_ls_params()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 47283f0d_d8e7_addf_ea32_c0ecefe3d97c
  872a786c_fe60_9c05_0023_8702aaba0a90["_astream()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 872a786c_fe60_9c05_0023_8702aaba0a90
  5f652461_f9fa_fdc2_d659_cde32ef53f66["_format_ls_structured_output()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 5f652461_f9fa_fdc2_d659_cde32ef53f66
  f1b77769_1c98_a324_e709_fd921b433e56["_format_for_tracing()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| f1b77769_1c98_a324_e709_fd921b433e56
  3aa65704_c798_ec7b_b231_6600cb1a6a44["_gen_info_and_msg_metadata()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 3aa65704_c798_ec7b_b231_6600cb1a6a44
  0a0401bd_a59c_7ac5_1c91_a5f406b3cdc6["_generate_response_from_error()"]
  39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa -->|calls| 0a0401bd_a59c_7ac5_1c91_a5f406b3cdc6
  style 39e8cf9b_1f2e_9a93_e3f1_aaaffdfd70fa fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/language_models/chat_models.py lines 606–733

    async def astream(
        self,
        input: LanguageModelInput,
        config: RunnableConfig | None = None,
        *,
        stop: list[str] | None = None,
        **kwargs: Any,
    ) -> AsyncIterator[AIMessageChunk]:
        if not self._should_stream(async_api=True, **{**kwargs, "stream": True}):
            # No async or sync stream is implemented, so fall back to ainvoke
            yield cast(
                "AIMessageChunk",
                await self.ainvoke(input, config=config, stop=stop, **kwargs),
            )
            return

        config = ensure_config(config)
        messages = self._convert_input(input).to_messages()

        ls_structured_output_format = kwargs.pop(
            "ls_structured_output_format", None
        ) or kwargs.pop("structured_output_format", None)
        ls_structured_output_format_dict = _format_ls_structured_output(
            ls_structured_output_format
        )

        params = self._get_invocation_params(stop=stop, **kwargs)
        options = {"stop": stop, **kwargs, **ls_structured_output_format_dict}
        inheritable_metadata = {
            **(config.get("metadata") or {}),
            **self._get_ls_params(stop=stop, **kwargs),
        }
        callback_manager = AsyncCallbackManager.configure(
            config.get("callbacks"),
            self.callbacks,
            self.verbose,
            config.get("tags"),
            self.tags,
            inheritable_metadata,
            self.metadata,
        )
        (run_manager,) = await callback_manager.on_chat_model_start(
            self._serialized,
            [_format_for_tracing(messages)],
            invocation_params=params,
            options=options,
            name=config.get("run_name"),
            run_id=config.pop("run_id", None),
            batch_size=1,
        )

        if self.rate_limiter:
            await self.rate_limiter.aacquire(blocking=True)

        chunks: list[ChatGenerationChunk] = []

        try:
            input_messages = _normalize_messages(messages)
            run_id = "-".join((LC_ID_PREFIX, str(run_manager.run_id)))
            yielded = False
            index = -1
            index_type = ""
            async for chunk in self._astream(
                input_messages,
                stop=stop,
                **kwargs,
            ):
                if chunk.message.id is None:
                    chunk.message.id = run_id
                chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk)
                if self.output_version == "v1":
                    # Overwrite .content with .content_blocks
                    chunk.message = _update_message_content_to_blocks(
                        chunk.message, "v1"
                    )
                    for block in cast(
                        "list[types.ContentBlock]", chunk.message.content
                    ):
                        if block["type"] != index_type:
                            index_type = block["type"]
                            index += 1

Subdomains

Frequently Asked Questions

What does astream() do?
astream() is a function in the langchain codebase, defined in libs/core/langchain_core/language_models/chat_models.py.
Where is astream() defined?
astream() is defined in libs/core/langchain_core/language_models/chat_models.py at line 606.
What does astream() call?
astream() calls 10 function(s): _astream, _convert_input, _format_for_tracing, _format_ls_structured_output, _gen_info_and_msg_metadata, _generate_response_from_error, _get_invocation_params, _get_ls_params, and 2 more.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free