Home / Class/ _SendStream Class — langchain Architecture

_SendStream Class — langchain Architecture

Architecture documentation for the _SendStream class in memory_stream.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  c43da4ed_d8f0_59cc_df49_f7213014b828["_SendStream"]
  3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8["memory_stream.py"]
  c43da4ed_d8f0_59cc_df49_f7213014b828 -->|defined in| 3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8
  f2556559_1880_00e4_1e44_dd692d512f5c["__init__()"]
  c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| f2556559_1880_00e4_1e44_dd692d512f5c
  4d496fb1_3366_9e01_6adc_a162f9885c9a["send()"]
  c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| 4d496fb1_3366_9e01_6adc_a162f9885c9a
  798574fd_5f80_cfef_2608_5304808575ca["send_nowait()"]
  c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| 798574fd_5f80_cfef_2608_5304808575ca
  3328c663_3336_3009_899f_e83481594b28["aclose()"]
  c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| 3328c663_3336_3009_899f_e83481594b28
  bb5a6b4f_2632_0263_f468_8f47c97e899b["close()"]
  c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| bb5a6b4f_2632_0263_f468_8f47c97e899b

Relationship Graph

Source Code

libs/core/langchain_core/tracers/memory_stream.py lines 19–83

class _SendStream(Generic[T]):
    def __init__(
        self, reader_loop: AbstractEventLoop, queue: Queue, done: object
    ) -> None:
        """Create a writer for the queue and done object.

        Args:
            reader_loop: The event loop to use for the writer.

                This loop will be used to schedule the writes to the queue.
            queue: The queue to write to.

                This is an asyncio queue.
            done: Special sentinel object to indicate that the writer is done.
        """
        self._reader_loop = reader_loop
        self._queue = queue
        self._done = done

    async def send(self, item: T) -> None:
        """Schedule the item to be written to the queue using the original loop.

        This is a coroutine that can be awaited.

        Args:
            item: The item to write to the queue.
        """
        return self.send_nowait(item)

    def send_nowait(self, item: T) -> None:
        """Schedule the item to be written to the queue using the original loop.

        This is a non-blocking call.

        Args:
            item: The item to write to the queue.

        Raises:
            RuntimeError: If the event loop is already closed when trying to write to
                the queue.
        """
        try:
            self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item)
        except RuntimeError:
            if not self._reader_loop.is_closed():
                raise  # Raise the exception if the loop is not closed

    async def aclose(self) -> None:
        """Async schedule the done object write the queue using the original loop."""
        return self.close()

    def close(self) -> None:
        """Schedule the done object write the queue using the original loop.

        This is a non-blocking call.

        Raises:
            RuntimeError: If the event loop is already closed when trying to write to
                the queue.
        """
        try:
            self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done)
        except RuntimeError:
            if not self._reader_loop.is_closed():
                raise  # Raise the exception if the loop is not closed

Frequently Asked Questions

What is the _SendStream class?
_SendStream is a class in the langchain codebase, defined in libs/core/langchain_core/tracers/memory_stream.py.
Where is _SendStream defined?
_SendStream is defined in libs/core/langchain_core/tracers/memory_stream.py at line 19.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free