test_queue_for_streaming_via_sync_call() — langchain Function Reference
Architecture documentation for the test_queue_for_streaming_via_sync_call() function in test_memory_stream.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD f80a9034_e10b_daaa_c101_d3b154354d57["test_queue_for_streaming_via_sync_call()"] f5c9b9cd_4c4b_26c8_2419_4e2f783c706c["test_memory_stream.py"] f80a9034_e10b_daaa_c101_d3b154354d57 -->|defined in| f5c9b9cd_4c4b_26c8_2419_4e2f783c706c style f80a9034_e10b_daaa_c101_d3b154354d57 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/tracers/test_memory_stream.py lines 63–112
async def test_queue_for_streaming_via_sync_call() -> None:
"""Test via async -> sync -> async path."""
reader_loop = asyncio.get_event_loop()
channel = _MemoryStream[dict](reader_loop)
writer = channel.get_send_stream()
reader = channel.get_receive_stream()
async def producer() -> None:
"""Produce items with slight delay."""
tic = time.time()
for i in range(3):
await asyncio.sleep(0.2)
toc = time.time()
await writer.send(
{
"item": i,
"produce_time": toc - tic,
}
)
await writer.aclose()
def sync_call() -> None:
"""Blocking sync call."""
asyncio.run(producer())
async def consumer() -> AsyncIterator[dict]:
tic = time.time()
async for item in reader:
toc = time.time()
yield {
"receive_time": toc - tic,
**item,
}
task = asyncio.create_task(asyncio.to_thread(sync_call))
items = [item async for item in consumer()]
await task
assert len(items) == 3
for item in items:
delta_time = item["receive_time"] - item["produce_time"]
# The test verifies that the producer and consumer are running in parallel
# despite the producer running from another thread via asyncio.to_thread.
# Cross-thread communication has overhead that varies with system load,
# so we use a tolerance of 150ms. This still proves parallelism because
# serial execution would show deltas of 200ms+ (the sleep interval).
assert math.isclose(delta_time, 0, abs_tol=0.15) is True, (
f"delta_time: {delta_time}"
)
Domain
Subdomains
Source
Frequently Asked Questions
What does test_queue_for_streaming_via_sync_call() do?
test_queue_for_streaming_via_sync_call() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py.
Where is test_queue_for_streaming_via_sync_call() defined?
test_queue_for_streaming_via_sync_call() is defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py at line 63.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free