stream() — langchain Function Reference
Architecture documentation for the stream() function in branch.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD b69050e0_7f2d_540b_c705_85c3001d53a4["stream()"] 8c770be5_e4c6_a7e0_4227_c2085f2c1314["RunnableBranch"] b69050e0_7f2d_540b_c705_85c3001d53a4 -->|defined in| 8c770be5_e4c6_a7e0_4227_c2085f2c1314 9f958f06_98f9_3ba8_4239_f2c7fb815242["invoke()"] b69050e0_7f2d_540b_c705_85c3001d53a4 -->|calls| 9f958f06_98f9_3ba8_4239_f2c7fb815242 style b69050e0_7f2d_540b_c705_85c3001d53a4 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/branch.py lines 296–377
def stream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Iterator[Output]:
"""First evaluates the condition, then delegate to `True` or `False` branch.
Args:
input: The input to the `Runnable`.
config: The configuration for the `Runnable`.
**kwargs: Additional keyword arguments to pass to the `Runnable`.
Yields:
The output of the branch that was run.
"""
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
final_output: Output | None = None
final_output_supported = True
try:
for idx, branch in enumerate(self.branches):
condition, runnable = branch
expression_value = condition.invoke(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(tag=f"condition:{idx + 1}"),
),
)
if expression_value:
for chunk in runnable.stream(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(tag=f"branch:{idx + 1}"),
),
**kwargs,
):
yield chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk # type: ignore[operator]
except TypeError:
final_output = None
final_output_supported = False
break
else:
for chunk in self.default.stream(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(tag="branch:default"),
),
**kwargs,
):
yield chunk
if final_output_supported:
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk # type: ignore[operator]
except TypeError:
final_output = None
final_output_supported = False
except BaseException as e:
run_manager.on_chain_error(e)
raise
Domain
Subdomains
Defined In
Calls
Source
Frequently Asked Questions
What does stream() do?
stream() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/branch.py.
Where is stream() defined?
stream() is defined in libs/core/langchain_core/runnables/branch.py at line 296.
What does stream() call?
stream() calls 1 function(s): invoke.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free