_SendStream Class — langchain Architecture
Architecture documentation for the _SendStream class in memory_stream.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD c43da4ed_d8f0_59cc_df49_f7213014b828["_SendStream"] 3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8["memory_stream.py"] c43da4ed_d8f0_59cc_df49_f7213014b828 -->|defined in| 3f0634aa_cb85_70ef_d7c0_8f2e7cb923b8 f2556559_1880_00e4_1e44_dd692d512f5c["__init__()"] c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| f2556559_1880_00e4_1e44_dd692d512f5c 4d496fb1_3366_9e01_6adc_a162f9885c9a["send()"] c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| 4d496fb1_3366_9e01_6adc_a162f9885c9a 798574fd_5f80_cfef_2608_5304808575ca["send_nowait()"] c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| 798574fd_5f80_cfef_2608_5304808575ca 3328c663_3336_3009_899f_e83481594b28["aclose()"] c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| 3328c663_3336_3009_899f_e83481594b28 bb5a6b4f_2632_0263_f468_8f47c97e899b["close()"] c43da4ed_d8f0_59cc_df49_f7213014b828 -->|method| bb5a6b4f_2632_0263_f468_8f47c97e899b
Relationship Graph
Source Code
libs/core/langchain_core/tracers/memory_stream.py lines 19–83
class _SendStream(Generic[T]):
def __init__(
self, reader_loop: AbstractEventLoop, queue: Queue, done: object
) -> None:
"""Create a writer for the queue and done object.
Args:
reader_loop: The event loop to use for the writer.
This loop will be used to schedule the writes to the queue.
queue: The queue to write to.
This is an asyncio queue.
done: Special sentinel object to indicate that the writer is done.
"""
self._reader_loop = reader_loop
self._queue = queue
self._done = done
async def send(self, item: T) -> None:
"""Schedule the item to be written to the queue using the original loop.
This is a coroutine that can be awaited.
Args:
item: The item to write to the queue.
"""
return self.send_nowait(item)
def send_nowait(self, item: T) -> None:
"""Schedule the item to be written to the queue using the original loop.
This is a non-blocking call.
Args:
item: The item to write to the queue.
Raises:
RuntimeError: If the event loop is already closed when trying to write to
the queue.
"""
try:
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, item)
except RuntimeError:
if not self._reader_loop.is_closed():
raise # Raise the exception if the loop is not closed
async def aclose(self) -> None:
"""Async schedule the done object write the queue using the original loop."""
return self.close()
def close(self) -> None:
"""Schedule the done object write the queue using the original loop.
This is a non-blocking call.
Raises:
RuntimeError: If the event loop is already closed when trying to write to
the queue.
"""
try:
self._reader_loop.call_soon_threadsafe(self._queue.put_nowait, self._done)
except RuntimeError:
if not self._reader_loop.is_closed():
raise # Raise the exception if the loop is not closed
Source
Frequently Asked Questions
What is the _SendStream class?
_SendStream is a class in the langchain codebase, defined in libs/core/langchain_core/tracers/memory_stream.py.
Where is _SendStream defined?
_SendStream is defined in libs/core/langchain_core/tracers/memory_stream.py at line 19.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free