test_memory_stream.py — langchain Source File
Architecture documentation for test_memory_stream.py, a python file in the langchain codebase. 5 imports, 0 dependents.
Entity Profile
Dependency Diagram
graph LR f5c9b9cd_4c4b_26c8_2419_4e2f783c706c["test_memory_stream.py"] a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"] f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> a327e534_84f6_5308_58ca_5727d5eda0cf 6d7cdba5_8e52_34b5_6742_57caf6500c80["math"] f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> 6d7cdba5_8e52_34b5_6742_57caf6500c80 0c1d9a1b_c553_0388_dbc1_58af49567aa2["time"] f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> 0c1d9a1b_c553_0388_dbc1_58af49567aa2 cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7["collections.abc"] f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7 dd98e19e_1fed_98d3_ef00_32b32b2e4e5c["langchain_core.tracers.memory_stream"] f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> dd98e19e_1fed_98d3_ef00_32b32b2e4e5c style f5c9b9cd_4c4b_26c8_2419_4e2f783c706c fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
import asyncio
import math
import time
from collections.abc import AsyncIterator
from langchain_core.tracers.memory_stream import _MemoryStream
async def test_same_event_loop() -> None:
"""Test that the memory stream works when the same event loop is used.
This is the easy case.
"""
reader_loop = asyncio.get_event_loop()
channel = _MemoryStream[dict](reader_loop)
writer = channel.get_send_stream()
reader = channel.get_receive_stream()
async def producer() -> None:
"""Produce items with slight delay."""
tic = time.time()
for i in range(3):
await asyncio.sleep(0.10)
toc = time.time()
await writer.send(
{
"item": i,
"produce_time": toc - tic,
}
)
await writer.aclose()
async def consumer() -> AsyncIterator[dict]:
tic = time.time()
async for item in reader:
toc = time.time()
yield {
"receive_time": toc - tic,
**item,
}
producer_task = asyncio.create_task(producer())
items = [item async for item in consumer()]
for item in items:
delta_time = item["receive_time"] - item["produce_time"]
# Allow a generous 10ms of delay
# The test is meant to verify that the producer and consumer are running in
# parallel despite the fact that the producer is running from another thread.
# abs_tol is used to allow for some delay in the producer and consumer
# due to overhead.
# To verify that the producer and consumer are running in parallel, we
# expect the delta_time to be smaller than the sleep delay in the producer
# * # of items = 30 ms
assert math.isclose(delta_time, 0, abs_tol=0.010) is True, (
f"delta_time: {delta_time}"
)
await producer_task
// ... (81 more lines)
Domain
Subdomains
Functions
Dependencies
- asyncio
- collections.abc
- langchain_core.tracers.memory_stream
- math
- time
Source
Frequently Asked Questions
What does test_memory_stream.py do?
test_memory_stream.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, RunnableInterface subdomain.
What functions are defined in test_memory_stream.py?
test_memory_stream.py defines 4 function(s): test_closed_stream, test_queue_for_streaming_via_sync_call, test_same_event_loop, test_send_to_closed_stream.
What does test_memory_stream.py depend on?
test_memory_stream.py imports 5 module(s): asyncio, collections.abc, langchain_core.tracers.memory_stream, math, time.
Where is test_memory_stream.py in the architecture?
test_memory_stream.py is located at libs/core/tests/unit_tests/tracers/test_memory_stream.py (domain: CoreAbstractions, subdomain: RunnableInterface, directory: libs/core/tests/unit_tests/tracers).
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free