astream() — langchain Function Reference
Architecture documentation for the astream() function in branch.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD e7b52750_c385_20a6_da26_fddbf6b4f003["astream()"] 8c770be5_e4c6_a7e0_4227_c2085f2c1314["RunnableBranch"] e7b52750_c385_20a6_da26_fddbf6b4f003 -->|defined in| 8c770be5_e4c6_a7e0_4227_c2085f2c1314 8fe0517c_6208_0ec9_8513_64b9f8430708["ainvoke()"] e7b52750_c385_20a6_da26_fddbf6b4f003 -->|calls| 8fe0517c_6208_0ec9_8513_64b9f8430708 style e7b52750_c385_20a6_da26_fddbf6b4f003 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/branch.py lines 380–461
async def astream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]:
"""First evaluates the condition, then delegate to `True` or `False` branch.
Args:
input: The input to the `Runnable`.
config: The configuration for the `Runnable`.
**kwargs: Additional keyword arguments to pass to the `Runnable`.
Yields:
The output of the branch that was run.
"""
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
run_manager = await callback_manager.on_chain_start(
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
final_output: Output | None = None
final_output_supported = True
try:
for idx, branch in enumerate(self.branches):
condition, runnable = branch
expression_value = await condition.ainvoke(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(tag=f"condition:{idx + 1}"),
),
)
if expression_value:
async for chunk in runnable.astream(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(tag=f"branch:{idx + 1}"),
),
**kwargs,
):
yield chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk # type: ignore[operator]
except TypeError:
final_output = None
final_output_supported = False
break
else:
async for chunk in self.default.astream(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(tag="branch:default"),
),
**kwargs,
):
yield chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk # type: ignore[operator]
except TypeError:
final_output = None
final_output_supported = False
except BaseException as e:
await run_manager.on_chain_error(e)
raise
Domain
Subdomains
Defined In
Calls
Source
Frequently Asked Questions
What does astream() do?
astream() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/branch.py.
Where is astream() defined?
astream() is defined in libs/core/langchain_core/runnables/branch.py at line 380.
What does astream() call?
astream() calls 1 function(s): ainvoke.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free