astream() — langchain Function Reference
Architecture documentation for the astream() function in fallbacks.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD ffee2861_1e72_5f72_b74a_f7a216fdeee5["astream()"] da856a55_079e_fd91_47d6_928251e6cee3["RunnableWithFallbacks"] ffee2861_1e72_5f72_b74a_f7a216fdeee5 -->|defined in| da856a55_079e_fd91_47d6_928251e6cee3 style ffee2861_1e72_5f72_b74a_f7a216fdeee5 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/runnables/fallbacks.py lines 530–591
async def astream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]:
if self.exception_key is not None and not isinstance(input, dict):
msg = (
"If 'exception_key' is specified then input must be a dictionary."
f"However found a type of {type(input)} for input"
)
raise ValueError(msg)
# setup callbacks
config = ensure_config(config)
callback_manager = get_async_callback_manager_for_config(config)
# start the root run
run_manager = await callback_manager.on_chain_start(
None,
input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
first_error = None
last_error = None
for runnable in self.runnables:
try:
if self.exception_key and last_error is not None:
input[self.exception_key] = last_error # type: ignore[index]
child_config = patch_config(config, callbacks=run_manager.get_child())
with set_config_context(child_config) as context:
stream = runnable.astream(
input,
child_config,
**kwargs,
)
chunk = await coro_with_context(anext(stream), context)
except self.exceptions_to_handle as e:
first_error = e if first_error is None else first_error
last_error = e
except BaseException as e:
await run_manager.on_chain_error(e)
raise
else:
first_error = None
break
if first_error:
await run_manager.on_chain_error(first_error)
raise first_error
yield chunk
output: Output | None = chunk
try:
async for chunk in stream:
yield chunk
try:
output = output + chunk # type: ignore[operator]
except TypeError:
output = None
except BaseException as e:
await run_manager.on_chain_error(e)
raise
await run_manager.on_chain_end(output)
Domain
Subdomains
Source
Frequently Asked Questions
What does astream() do?
astream() is a function in the langchain codebase, defined in libs/core/langchain_core/runnables/fallbacks.py.
Where is astream() defined?
astream() is defined in libs/core/langchain_core/runnables/fallbacks.py at line 530.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free