Home / Class/ _AstreamEventsCallbackHandler Class — langchain Architecture

_AstreamEventsCallbackHandler Class — langchain Architecture

Architecture documentation for the _AstreamEventsCallbackHandler class in event_stream.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55["_AstreamEventsCallbackHandler"]
  e108f394_9734_a4fd_0bef_19ef9b674d50["AsyncCallbackHandler"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|extends| e108f394_9734_a4fd_0bef_19ef9b674d50
  9c9ba0ab_539f_aaa1_399f_ec00f912f41f["_StreamingCallbackHandler"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|extends| 9c9ba0ab_539f_aaa1_399f_ec00f912f41f
  4f8b46c6_9bea_256a_d764_90863752ddf6["event_stream.py"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|defined in| 4f8b46c6_9bea_256a_d764_90863752ddf6
  35688bfe_32a1_6ae3_4599_0a8dcdb7eccc["__init__()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 35688bfe_32a1_6ae3_4599_0a8dcdb7eccc
  2c05b944_8b57_0ec1_2c17_2b3f4c3562c7["_get_parent_ids()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 2c05b944_8b57_0ec1_2c17_2b3f4c3562c7
  401affce_c7fe_33fb_1054_b3a67e5e110c["_send()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 401affce_c7fe_33fb_1054_b3a67e5e110c
  875b123f_3bb9_808d_36ea_6f338c44659d["__aiter__()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 875b123f_3bb9_808d_36ea_6f338c44659d
  eba20684_9ff6_3b4c_db54_51c72958ee21["tap_output_aiter()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| eba20684_9ff6_3b4c_db54_51c72958ee21
  7e0e4260_c649_7926_5bad_cfb55c871227["tap_output_iter()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 7e0e4260_c649_7926_5bad_cfb55c871227
  f3dc5a8a_f29f_d854_e788_e0fa7350b46a["_write_run_start_info()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| f3dc5a8a_f29f_d854_e788_e0fa7350b46a
  e23900ac_414e_24b8_9643_d43f3281b4a2["on_chat_model_start()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| e23900ac_414e_24b8_9643_d43f3281b4a2
  095bb696_11f2_5caa_de95_fa70c67c0755["on_llm_start()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 095bb696_11f2_5caa_de95_fa70c67c0755
  5ce0066a_597b_8984_5257_86b03f333f12["on_custom_event()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 5ce0066a_597b_8984_5257_86b03f333f12
  5f307e5b_1e60_af07_da9e_835f68af6b95["on_llm_new_token()"]
  7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 5f307e5b_1e60_af07_da9e_835f68af6b95

Relationship Graph

Source Code

libs/core/langchain_core/tracers/event_stream.py lines 101–824

class _AstreamEventsCallbackHandler(AsyncCallbackHandler, _StreamingCallbackHandler):
    """An implementation of an async callback handler for astream events."""

    def __init__(
        self,
        *args: Any,
        include_names: Sequence[str] | None = None,
        include_types: Sequence[str] | None = None,
        include_tags: Sequence[str] | None = None,
        exclude_names: Sequence[str] | None = None,
        exclude_types: Sequence[str] | None = None,
        exclude_tags: Sequence[str] | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize the tracer."""
        super().__init__(*args, **kwargs)
        # Map of run ID to run info.
        # the entry corresponding to a given run id is cleaned
        # up when each corresponding run ends.
        self.run_map: dict[UUID, RunInfo] = {}
        # The callback event that corresponds to the end of a parent run
        # may be invoked BEFORE the callback event that corresponds to the end
        # of a child run, which results in clean up of run_map.
        # So we keep track of the mapping between children and parent run IDs
        # in a separate container. This container is GCed when the tracer is GCed.
        self.parent_map: dict[UUID, UUID | None] = {}

        self.is_tapped: dict[UUID, Any] = {}

        # Filter which events will be sent over the queue.
        self.root_event_filter = _RootEventFilter(
            include_names=include_names,
            include_types=include_types,
            include_tags=include_tags,
            exclude_names=exclude_names,
            exclude_types=exclude_types,
            exclude_tags=exclude_tags,
        )

        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
        memory_stream = _MemoryStream[StreamEvent](loop)
        self.send_stream = memory_stream.get_send_stream()
        self.receive_stream = memory_stream.get_receive_stream()

    def _get_parent_ids(self, run_id: UUID) -> list[str]:
        """Get the parent IDs of a run (non-recursively) cast to strings."""
        parent_ids = []

        while parent_id := self.parent_map.get(run_id):
            str_parent_id = str(parent_id)
            if str_parent_id in parent_ids:
                msg = (
                    f"Parent ID {parent_id} is already in the parent_ids list. "
                    f"This should never happen."
                )
                raise AssertionError(msg)
            parent_ids.append(str_parent_id)
            run_id = parent_id

        # Return the parent IDs in reverse order, so that the first
        # parent ID is the root and the last ID is the immediate parent.
        return parent_ids[::-1]

    def _send(self, event: StreamEvent, event_type: str) -> None:
        """Send an event to the stream."""
        if self.root_event_filter.include_event(event, event_type):
            self.send_stream.send_nowait(event)

    def __aiter__(self) -> AsyncIterator[Any]:
        """Iterate over the receive stream.

        Returns:
            An async iterator over the receive stream.
        """
        return self.receive_stream.__aiter__()

    async def tap_output_aiter(
        self, run_id: UUID, output: AsyncIterator[T]

Frequently Asked Questions

What is the _AstreamEventsCallbackHandler class?
_AstreamEventsCallbackHandler is a class in the langchain codebase, defined in libs/core/langchain_core/tracers/event_stream.py.
Where is _AstreamEventsCallbackHandler defined?
_AstreamEventsCallbackHandler is defined in libs/core/langchain_core/tracers/event_stream.py at line 101.
What does _AstreamEventsCallbackHandler extend?
_AstreamEventsCallbackHandler extends AsyncCallbackHandler, _StreamingCallbackHandler.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free