Home / Function/ _astream_events_implementation_v1() — langchain Function Reference

_astream_events_implementation_v1() — langchain Function Reference

Architecture documentation for the _astream_events_implementation_v1() function in event_stream.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  bdbacc7d_9fa0_723b_a647_864f55986dc6["_astream_events_implementation_v1()"]
  1bf0b874_2a05_f5ee_944a_a70022bbedb8["event_stream.py"]
  bdbacc7d_9fa0_723b_a647_864f55986dc6 -->|defined in| 1bf0b874_2a05_f5ee_944a_a70022bbedb8
  style bdbacc7d_9fa0_723b_a647_864f55986dc6 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/tracers/event_stream.py lines 827–1000

async def _astream_events_implementation_v1(
    runnable: Runnable[Input, Output],
    value: Any,
    config: RunnableConfig | None = None,
    *,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StandardStreamEvent]:
    stream = LogStreamCallbackHandler(
        auto_close=False,
        include_names=include_names,
        include_types=include_types,
        include_tags=include_tags,
        exclude_names=exclude_names,
        exclude_types=exclude_types,
        exclude_tags=exclude_tags,
        _schema_format="streaming_events",
    )

    run_log = RunLog(state=None)  # type: ignore[arg-type]
    encountered_start_event = False

    root_event_filter = _RootEventFilter(
        include_names=include_names,
        include_types=include_types,
        include_tags=include_tags,
        exclude_names=exclude_names,
        exclude_types=exclude_types,
        exclude_tags=exclude_tags,
    )

    config = ensure_config(config)
    root_tags = config.get("tags", [])
    root_metadata = config.get("metadata", {})
    root_name = config.get("run_name", runnable.get_name())

    async for log in _astream_log_implementation(
        runnable,
        value,
        config=config,
        stream=stream,
        diff=True,
        with_streamed_output_list=True,
        **kwargs,
    ):
        run_log += log

        if not encountered_start_event:
            # Yield the start event for the root runnable.
            encountered_start_event = True
            state = run_log.state.copy()

            event = StandardStreamEvent(
                event=f"on_{state['type']}_start",
                run_id=state["id"],
                name=root_name,
                tags=root_tags,
                metadata=root_metadata,
                data={
                    "input": value,
                },
                parent_ids=[],  # Not supported in v1
            )

            if root_event_filter.include_event(event, state["type"]):
                yield event

        paths = {
            op["path"].split("/")[2]
            for op in log.ops
            if op["path"].startswith("/logs/")
        }
        # Elements in a set should be iterated in the same order
        # as they were inserted in modern python versions.
        for path in paths:
            data: EventData = {}

Domain

Subdomains

Frequently Asked Questions

What does _astream_events_implementation_v1() do?
_astream_events_implementation_v1() is a function in the langchain codebase, defined in libs/core/langchain_core/tracers/event_stream.py.
Where is _astream_events_implementation_v1() defined?
_astream_events_implementation_v1() is defined in libs/core/langchain_core/tracers/event_stream.py at line 827.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free