_MemoryStream Class — langchain Architecture
Architecture documentation for the _MemoryStream class in memory_stream.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 37a7aa96_375d_950b_9f76_b361f654e1f7["_MemoryStream"] 3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8["memory_stream.py"] 37a7aa96_375d_950b_9f76_b361f654e1f7 -->|defined in| 3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8 c6142728_bc4b_30fe_d689_8bc25b858e2c["__init__()"] 37a7aa96_375d_950b_9f76_b361f654e1f7 -->|method| c6142728_bc4b_30fe_d689_8bc25b858e2c f6a82c50_70d2_90e3_fe34_e4522879206d["get_send_stream()"] 37a7aa96_375d_950b_9f76_b361f654e1f7 -->|method| f6a82c50_70d2_90e3_fe34_e4522879206d 0c2bd967_3a62_c66c_4ea5_bce1a00bf909["get_receive_stream()"] 37a7aa96_375d_950b_9f76_b361f654e1f7 -->|method| 0c2bd967_3a62_c66c_4ea5_bce1a00bf909
Relationship Graph
Source Code
libs/core/langchain_core/tracers/memory_stream.py lines 106–148
class _MemoryStream(Generic[T]):
"""Stream data from a writer to a reader even if they are in different threads.
Uses asyncio queues to communicate between two co-routines. This implementation
should work even if the writer and reader co-routines belong to two different event
loops (e.g. one running from an event loop in the main thread and the other running
in an event loop in a background thread).
This implementation is meant to be used with a single writer and a single reader.
This is an internal implementation to LangChain. Do not use it directly.
"""
def __init__(self, loop: AbstractEventLoop) -> None:
"""Create a channel for the given loop.
Args:
loop: The event loop to use for the channel.
The reader is assumed to be running in the same loop as the one passed
to this constructor. This will NOT be validated at run time.
"""
self._loop = loop
self._queue: asyncio.Queue = asyncio.Queue(maxsize=0)
self._done = object()
def get_send_stream(self) -> _SendStream[T]:
"""Get a writer for the channel.
Returns:
The writer for the channel.
"""
return _SendStream[T](
reader_loop=self._loop, queue=self._queue, done=self._done
)
def get_receive_stream(self) -> _ReceiveStream[T]:
"""Get a reader for the channel.
Returns:
The reader for the channel.
"""
return _ReceiveStream[T](queue=self._queue, done=self._done)
Source
Frequently Asked Questions
What is the _MemoryStream class?
_MemoryStream is a class in the langchain codebase, defined in libs/core/langchain_core/tracers/memory_stream.py.
Where is _MemoryStream defined?
_MemoryStream is defined in libs/core/langchain_core/tracers/memory_stream.py at line 106.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free