Home / Function/ __init__() — langchain Function Reference

__init__() — langchain Function Reference

Architecture documentation for the __init__() function in event_stream.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  44b96f4e_8456_0f2c_7583_c68fd4c7615f["__init__()"]
  33d093c4_1ed0_fc6a_17c6_762d4c5cfa04["_AstreamEventsCallbackHandler"]
  44b96f4e_8456_0f2c_7583_c68fd4c7615f -->|defined in| 33d093c4_1ed0_fc6a_17c6_762d4c5cfa04
  style 44b96f4e_8456_0f2c_7583_c68fd4c7615f fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/langchain_core/tracers/event_stream.py lines 104–146

    def __init__(
        self,
        *args: Any,
        include_names: Sequence[str] | None = None,
        include_types: Sequence[str] | None = None,
        include_tags: Sequence[str] | None = None,
        exclude_names: Sequence[str] | None = None,
        exclude_types: Sequence[str] | None = None,
        exclude_tags: Sequence[str] | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize the tracer."""
        super().__init__(*args, **kwargs)
        # Map of run ID to run info.
        # the entry corresponding to a given run id is cleaned
        # up when each corresponding run ends.
        self.run_map: dict[UUID, RunInfo] = {}
        # The callback event that corresponds to the end of a parent run
        # may be invoked BEFORE the callback event that corresponds to the end
        # of a child run, which results in clean up of run_map.
        # So we keep track of the mapping between children and parent run IDs
        # in a separate container. This container is GCed when the tracer is GCed.
        self.parent_map: dict[UUID, UUID | None] = {}

        self.is_tapped: dict[UUID, Any] = {}

        # Filter which events will be sent over the queue.
        self.root_event_filter = _RootEventFilter(
            include_names=include_names,
            include_types=include_types,
            include_tags=include_tags,
            exclude_names=exclude_names,
            exclude_types=exclude_types,
            exclude_tags=exclude_tags,
        )

        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
        memory_stream = _MemoryStream[StreamEvent](loop)
        self.send_stream = memory_stream.get_send_stream()
        self.receive_stream = memory_stream.get_receive_stream()

Domain

Subdomains

Frequently Asked Questions

What does __init__() do?
__init__() is a function in the langchain codebase, defined in libs/core/langchain_core/tracers/event_stream.py.
Where is __init__() defined?
__init__() is defined in libs/core/langchain_core/tracers/event_stream.py at line 104.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free