Home / Function/ test_queue_for_streaming_via_sync_call() — langchain Function Reference

test_queue_for_streaming_via_sync_call() — langchain Function Reference

Architecture documentation for the test_queue_for_streaming_via_sync_call() function in test_memory_stream.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  f80a9034_e10b_daaa_c101_d3b154354d57["test_queue_for_streaming_via_sync_call()"]
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c["test_memory_stream.py"]
  f80a9034_e10b_daaa_c101_d3b154354d57 -->|defined in| f5c9b9cd_4c4b_26c8_2419_4e2f783c706c
  style f80a9034_e10b_daaa_c101_d3b154354d57 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/tracers/test_memory_stream.py lines 63–112

async def test_queue_for_streaming_via_sync_call() -> None:
    """Test via async -> sync -> async path."""
    reader_loop = asyncio.get_event_loop()
    channel = _MemoryStream[dict](reader_loop)
    writer = channel.get_send_stream()
    reader = channel.get_receive_stream()

    async def producer() -> None:
        """Produce items with slight delay."""
        tic = time.time()
        for i in range(3):
            await asyncio.sleep(0.2)
            toc = time.time()
            await writer.send(
                {
                    "item": i,
                    "produce_time": toc - tic,
                }
            )
        await writer.aclose()

    def sync_call() -> None:
        """Blocking sync call."""
        asyncio.run(producer())

    async def consumer() -> AsyncIterator[dict]:
        tic = time.time()
        async for item in reader:
            toc = time.time()
            yield {
                "receive_time": toc - tic,
                **item,
            }

    task = asyncio.create_task(asyncio.to_thread(sync_call))
    items = [item async for item in consumer()]
    await task

    assert len(items) == 3

    for item in items:
        delta_time = item["receive_time"] - item["produce_time"]
        # The test verifies that the producer and consumer are running in parallel
        # despite the producer running from another thread via asyncio.to_thread.
        # Cross-thread communication has overhead that varies with system load,
        # so we use a tolerance of 150ms. This still proves parallelism because
        # serial execution would show deltas of 200ms+ (the sleep interval).
        assert math.isclose(delta_time, 0, abs_tol=0.15) is True, (
            f"delta_time: {delta_time}"
        )

Subdomains

Frequently Asked Questions

What does test_queue_for_streaming_via_sync_call() do?
test_queue_for_streaming_via_sync_call() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py.
Where is test_queue_for_streaming_via_sync_call() defined?
test_queue_for_streaming_via_sync_call() is defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py at line 63.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free