Home / Function/ astream() — langchain Function Reference

astream() — langchain Function Reference

Architecture documentation for the astream() function in branch.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  e7b52750_c385_20a6_da26_fddbf6b4f003["astream()"]
  8c770be5_e4c6_a7e0_4227_c2085f2c1314["RunnableBranch"]
  e7b52750_c385_20a6_da26_fddbf6b4f003 -->|defined in| 8c770be5_e4c6_a7e0_4227_c2085f2c1314
  8fe0517c_6208_0ec9_8513_64b9f8430708["ainvoke()"]
  e7b52750_c385_20a6_da26_fddbf6b4f003 -->|calls| 8fe0517c_6208_0ec9_8513_64b9f8430708
  style e7b52750_c385_20a6_da26_fddbf6b4f003 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/runnables/branch.py lines 380–461

    async def astream(
        self,
        input: Input,
        config: RunnableConfig | None = None,
        **kwargs: Any | None,
    ) -> AsyncIterator[Output]:
        """First evaluates the condition, then delegate to `True` or `False` branch.

        Args:
            input: The input to the `Runnable`.
            config: The configuration for the `Runnable`.
            **kwargs: Additional keyword arguments to pass to the `Runnable`.

        Yields:
            The output of the branch that was run.
        """
        config = ensure_config(config)
        callback_manager = get_async_callback_manager_for_config(config)
        run_manager = await callback_manager.on_chain_start(
            None,
            input,
            name=config.get("run_name") or self.get_name(),
            run_id=config.pop("run_id", None),
        )
        final_output: Output | None = None
        final_output_supported = True

        try:
            for idx, branch in enumerate(self.branches):
                condition, runnable = branch

                expression_value = await condition.ainvoke(
                    input,
                    config=patch_config(
                        config,
                        callbacks=run_manager.get_child(tag=f"condition:{idx + 1}"),
                    ),
                )

                if expression_value:
                    async for chunk in runnable.astream(
                        input,
                        config=patch_config(
                            config,
                            callbacks=run_manager.get_child(tag=f"branch:{idx + 1}"),
                        ),
                        **kwargs,
                    ):
                        yield chunk
                        if final_output_supported:
                            if final_output is None:
                                final_output = chunk
                            else:
                                try:
                                    final_output = final_output + chunk  # type: ignore[operator]
                                except TypeError:
                                    final_output = None
                                    final_output_supported = False
                    break
            else:
                async for chunk in self.default.astream(
                    input,
                    config=patch_config(
                        config,
                        callbacks=run_manager.get_child(tag="branch:default"),
                    ),
                    **kwargs,
                ):
                    yield chunk
                    if final_output_supported:
                        if final_output is None:
                            final_output = chunk
                        else:
                            try:
                                final_output = final_output + chunk  # type: ignore[operator]
                            except TypeError:
                                final_output = None
                                final_output_supported = False
        except BaseException as e:
            await run_manager.on_chain_error(e)
            raise

Domain

Subdomains

Calls

Frequently Asked Questions

What does astream() do?
astream() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/branch.py.
Where is astream() defined?
astream() is defined in libs/core/langchain_core/runnables/branch.py at line 380.
What does astream() call?
astream() calls 1 function(s): ainvoke.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free