_astream_events_implementation_v1() — langchain Function Reference
Architecture documentation for the _astream_events_implementation_v1() function in event_stream.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD bdbacc7d_9fa0_723b_a647_864f55986dc6["_astream_events_implementation_v1()"] 1bf0b874_2a05_f5ee_944a_a70022bbedb8["event_stream.py"] bdbacc7d_9fa0_723b_a647_864f55986dc6 -->|defined in| 1bf0b874_2a05_f5ee_944a_a70022bbedb8 style bdbacc7d_9fa0_723b_a647_864f55986dc6 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/langchain_core/tracers/event_stream.py lines 827–1000
async def _astream_events_implementation_v1(
runnable: Runnable[Input, Output],
value: Any,
config: RunnableConfig | None = None,
*,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StandardStreamEvent]:
stream = LogStreamCallbackHandler(
auto_close=False,
include_names=include_names,
include_types=include_types,
include_tags=include_tags,
exclude_names=exclude_names,
exclude_types=exclude_types,
exclude_tags=exclude_tags,
_schema_format="streaming_events",
)
run_log = RunLog(state=None) # type: ignore[arg-type]
encountered_start_event = False
root_event_filter = _RootEventFilter(
include_names=include_names,
include_types=include_types,
include_tags=include_tags,
exclude_names=exclude_names,
exclude_types=exclude_types,
exclude_tags=exclude_tags,
)
config = ensure_config(config)
root_tags = config.get("tags", [])
root_metadata = config.get("metadata", {})
root_name = config.get("run_name", runnable.get_name())
async for log in _astream_log_implementation(
runnable,
value,
config=config,
stream=stream,
diff=True,
with_streamed_output_list=True,
**kwargs,
):
run_log += log
if not encountered_start_event:
# Yield the start event for the root runnable.
encountered_start_event = True
state = run_log.state.copy()
event = StandardStreamEvent(
event=f"on_{state['type']}_start",
run_id=state["id"],
name=root_name,
tags=root_tags,
metadata=root_metadata,
data={
"input": value,
},
parent_ids=[], # Not supported in v1
)
if root_event_filter.include_event(event, state["type"]):
yield event
paths = {
op["path"].split("/")[2]
for op in log.ops
if op["path"].startswith("/logs/")
}
# Elements in a set should be iterated in the same order
# as they were inserted in modern python versions.
for path in paths:
data: EventData = {}
Domain
Subdomains
Source
Frequently Asked Questions
What does _astream_events_implementation_v1() do?
_astream_events_implementation_v1() is a function in the langchain codebase, defined in libs/core/langchain_core/tracers/event_stream.py.
Where is _astream_events_implementation_v1() defined?
_astream_events_implementation_v1() is defined in libs/core/langchain_core/tracers/event_stream.py at line 827.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free