memory_stream.py — langchain Source File
Architecture documentation for memory_stream.py, a python file in the langchain codebase. 3 imports, 0 dependents.
Entity Profile
Dependency Diagram
graph LR f69566c6_3a90_2e39_13c8_07ea98ac4497["memory_stream.py"] a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"] f69566c6_3a90_2e39_13c8_07ea98ac4497 --> a327e534_84f6_5308_58ca_5727d5eda0cf cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7["collections.abc"] f69566c6_3a90_2e39_13c8_07ea98ac4497 --> cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"] f69566c6_3a90_2e39_13c8_07ea98ac4497 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3 style f69566c6_3a90_2e39_13c8_07ea98ac4497 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
"""Module implements a memory stream for communication between two co-routines.
This module provides a way to communicate between two co-routines using a memory
channel. The writer and reader can be in the same event loop or in different event
loops. When they're in different event loops, they will also be in different threads.
Useful in situations when there's a mix of synchronous and asynchronous used in the
code.
"""
import asyncio
from asyncio import AbstractEventLoop, Queue
from collections.abc import AsyncIterator
from typing import Generic, TypeVar
T = TypeVar("T")
class _SendStream(Generic[T]):
def __init__(
self, reader_loop: AbstractEventLoop, queue: Queue, done: object
) -> None:
"""Create a writer for the queue and done object.
Args:
reader_loop: The event loop to use for the writer.
This loop will be used to schedule the writes to the queue.
queue: The queue to write to.
This is an asyncio queue.
done: Special sentinel object to indicate that the writer is done.
"""
self._reader_loop = reader_loop
self._queue = queue
self._done = done
async def send(self, item: T) -> None:
"""Schedule the item to be written to the queue using the original loop.
This is a coroutine that can be awaited.
Args:
item: The item to write to the queue.
"""
return self.send_nowait(item)
def send_nowait(self, item: T) -> None:
"""Schedule the item to be written to the queue using the original loop.
This is a non-blocking call.
Args:
item: The item to write to the queue.
Raises:
RuntimeError: If the event loop is already closed when trying to write to
the queue.
"""
try:
// ... (89 more lines)
Domain
Subdomains
Dependencies
- asyncio
- collections.abc
- typing
Source
Frequently Asked Questions
What does memory_stream.py do?
memory_stream.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, Serialization subdomain.
What does memory_stream.py depend on?
memory_stream.py imports 3 module(s): asyncio, collections.abc, typing.
Where is memory_stream.py in the architecture?
memory_stream.py is located at libs/core/langchain_core/tracers/memory_stream.py (domain: CoreAbstractions, subdomain: Serialization, directory: libs/core/langchain_core/tracers).
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free