_AstreamEventsCallbackHandler Class — langchain Architecture
Architecture documentation for the _AstreamEventsCallbackHandler class in event_stream.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55["_AstreamEventsCallbackHandler"] e108f394_9734_a4fd_0bef_19ef9b674d50["AsyncCallbackHandler"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|extends| e108f394_9734_a4fd_0bef_19ef9b674d50 9c9ba0ab_539f_aaa1_399f_ec00f912f41f["_StreamingCallbackHandler"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|extends| 9c9ba0ab_539f_aaa1_399f_ec00f912f41f 4f8b46c6_9bea_256a_d764_90863752ddf6["event_stream.py"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|defined in| 4f8b46c6_9bea_256a_d764_90863752ddf6 35688bfe_32a1_6ae3_4599_0a8dcdb7eccc["__init__()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 35688bfe_32a1_6ae3_4599_0a8dcdb7eccc 2c05b944_8b57_0ec1_2c17_2b3f4c3562c7["_get_parent_ids()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 2c05b944_8b57_0ec1_2c17_2b3f4c3562c7 401affce_c7fe_33fb_1054_b3a67e5e110c["_send()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 401affce_c7fe_33fb_1054_b3a67e5e110c 875b123f_3bb9_808d_36ea_6f338c44659d["__aiter__()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 875b123f_3bb9_808d_36ea_6f338c44659d eba20684_9ff6_3b4c_db54_51c72958ee21["tap_output_aiter()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| eba20684_9ff6_3b4c_db54_51c72958ee21 7e0e4260_c649_7926_5bad_cfb55c871227["tap_output_iter()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 7e0e4260_c649_7926_5bad_cfb55c871227 f3dc5a8a_f29f_d854_e788_e0fa7350b46a["_write_run_start_info()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| f3dc5a8a_f29f_d854_e788_e0fa7350b46a e23900ac_414e_24b8_9643_d43f3281b4a2["on_chat_model_start()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| e23900ac_414e_24b8_9643_d43f3281b4a2 095bb696_11f2_5caa_de95_fa70c67c0755["on_llm_start()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 095bb696_11f2_5caa_de95_fa70c67c0755 5ce0066a_597b_8984_5257_86b03f333f12["on_custom_event()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 5ce0066a_597b_8984_5257_86b03f333f12 5f307e5b_1e60_af07_da9e_835f68af6b95["on_llm_new_token()"] 7d90a3d0_d943_c43b_2fdd_f76b9bdccd55 -->|method| 5f307e5b_1e60_af07_da9e_835f68af6b95
Relationship Graph
Source Code
libs/core/langchain_core/tracers/event_stream.py lines 101–824
class _AstreamEventsCallbackHandler(AsyncCallbackHandler, _StreamingCallbackHandler):
"""An implementation of an async callback handler for astream events."""
def __init__(
self,
*args: Any,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> None:
"""Initialize the tracer."""
super().__init__(*args, **kwargs)
# Map of run ID to run info.
# the entry corresponding to a given run id is cleaned
# up when each corresponding run ends.
self.run_map: dict[UUID, RunInfo] = {}
# The callback event that corresponds to the end of a parent run
# may be invoked BEFORE the callback event that corresponds to the end
# of a child run, which results in clean up of run_map.
# So we keep track of the mapping between children and parent run IDs
# in a separate container. This container is GCed when the tracer is GCed.
self.parent_map: dict[UUID, UUID | None] = {}
self.is_tapped: dict[UUID, Any] = {}
# Filter which events will be sent over the queue.
self.root_event_filter = _RootEventFilter(
include_names=include_names,
include_types=include_types,
include_tags=include_tags,
exclude_names=exclude_names,
exclude_types=exclude_types,
exclude_tags=exclude_tags,
)
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
memory_stream = _MemoryStream[StreamEvent](loop)
self.send_stream = memory_stream.get_send_stream()
self.receive_stream = memory_stream.get_receive_stream()
def _get_parent_ids(self, run_id: UUID) -> list[str]:
"""Get the parent IDs of a run (non-recursively) cast to strings."""
parent_ids = []
while parent_id := self.parent_map.get(run_id):
str_parent_id = str(parent_id)
if str_parent_id in parent_ids:
msg = (
f"Parent ID {parent_id} is already in the parent_ids list. "
f"This should never happen."
)
raise AssertionError(msg)
parent_ids.append(str_parent_id)
run_id = parent_id
# Return the parent IDs in reverse order, so that the first
# parent ID is the root and the last ID is the immediate parent.
return parent_ids[::-1]
def _send(self, event: StreamEvent, event_type: str) -> None:
"""Send an event to the stream."""
if self.root_event_filter.include_event(event, event_type):
self.send_stream.send_nowait(event)
def __aiter__(self) -> AsyncIterator[Any]:
"""Iterate over the receive stream.
Returns:
An async iterator over the receive stream.
"""
return self.receive_stream.__aiter__()
async def tap_output_aiter(
self, run_id: UUID, output: AsyncIterator[T]
Source
Frequently Asked Questions
What is the _AstreamEventsCallbackHandler class?
_AstreamEventsCallbackHandler is a class in the langchain codebase, defined in libs/core/langchain_core/tracers/event_stream.py.
Where is _AstreamEventsCallbackHandler defined?
_AstreamEventsCallbackHandler is defined in libs/core/langchain_core/tracers/event_stream.py at line 101.
What does _AstreamEventsCallbackHandler extend?
_AstreamEventsCallbackHandler extends AsyncCallbackHandler, _StreamingCallbackHandler.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free