Home / Class/ _MemoryStream Class — langchain Architecture

_MemoryStream Class — langchain Architecture

Architecture documentation for the _MemoryStream class in memory_stream.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  37a7aa96_375d_950b_9f76_b361f654e1f7["_MemoryStream"]
  3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8["memory_stream.py"]
  37a7aa96_375d_950b_9f76_b361f654e1f7 -->|defined in| 3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8
  c6142728_bc4b_30fe_d689_8bc25b858e2c["__init__()"]
  37a7aa96_375d_950b_9f76_b361f654e1f7 -->|method| c6142728_bc4b_30fe_d689_8bc25b858e2c
  f6a82c50_70d2_90e3_fe34_e4522879206d["get_send_stream()"]
  37a7aa96_375d_950b_9f76_b361f654e1f7 -->|method| f6a82c50_70d2_90e3_fe34_e4522879206d
  0c2bd967_3a62_c66c_4ea5_bce1a00bf909["get_receive_stream()"]
  37a7aa96_375d_950b_9f76_b361f654e1f7 -->|method| 0c2bd967_3a62_c66c_4ea5_bce1a00bf909

Relationship Graph

Source Code

libs/core/langchain_core/tracers/memory_stream.py lines 106–148

class _MemoryStream(Generic[T]):
    """Stream data from a writer to a reader even if they are in different threads.

    Uses asyncio queues to communicate between two co-routines. This implementation
    should work even if the writer and reader co-routines belong to two different event
    loops (e.g. one running from an event loop in the main thread and the other running
    in an event loop in a background thread).

    This implementation is meant to be used with a single writer and a single reader.

    This is an internal implementation to LangChain. Do not use it directly.
    """

    def __init__(self, loop: AbstractEventLoop) -> None:
        """Create a channel for the given loop.

        Args:
            loop: The event loop to use for the channel.

                The reader is assumed to be running in the same loop as the one passed
                to this constructor. This will NOT be validated at run time.
        """
        self._loop = loop
        self._queue: asyncio.Queue = asyncio.Queue(maxsize=0)
        self._done = object()

    def get_send_stream(self) -> _SendStream[T]:
        """Get a writer for the channel.

        Returns:
            The writer for the channel.
        """
        return _SendStream[T](
            reader_loop=self._loop, queue=self._queue, done=self._done
        )

    def get_receive_stream(self) -> _ReceiveStream[T]:
        """Get a reader for the channel.

        Returns:
            The reader for the channel.
        """
        return _ReceiveStream[T](queue=self._queue, done=self._done)

Frequently Asked Questions

What is the _MemoryStream class?
_MemoryStream is a class in the langchain codebase, defined in libs/core/langchain_core/tracers/memory_stream.py.
Where is _MemoryStream defined?
_MemoryStream is defined in libs/core/langchain_core/tracers/memory_stream.py at line 106.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free