Home / Function/ test_same_event_loop() — langchain Function Reference

test_same_event_loop() — langchain Function Reference

Architecture documentation for the test_same_event_loop() function in test_memory_stream.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  835610f6_be96_8a56_3c18_4d383ea3ebe8["test_same_event_loop()"]
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c["test_memory_stream.py"]
  835610f6_be96_8a56_3c18_4d383ea3ebe8 -->|defined in| f5c9b9cd_4c4b_26c8_2419_4e2f783c706c
  style 835610f6_be96_8a56_3c18_4d383ea3ebe8 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/tracers/test_memory_stream.py lines 9–60

async def test_same_event_loop() -> None:
    """Test that the memory stream works when the same event loop is used.

    This is the easy case.
    """
    reader_loop = asyncio.get_event_loop()
    channel = _MemoryStream[dict](reader_loop)
    writer = channel.get_send_stream()
    reader = channel.get_receive_stream()

    async def producer() -> None:
        """Produce items with slight delay."""
        tic = time.time()
        for i in range(3):
            await asyncio.sleep(0.10)
            toc = time.time()
            await writer.send(
                {
                    "item": i,
                    "produce_time": toc - tic,
                }
            )
        await writer.aclose()

    async def consumer() -> AsyncIterator[dict]:
        tic = time.time()
        async for item in reader:
            toc = time.time()
            yield {
                "receive_time": toc - tic,
                **item,
            }

    producer_task = asyncio.create_task(producer())

    items = [item async for item in consumer()]

    for item in items:
        delta_time = item["receive_time"] - item["produce_time"]
        # Allow a generous 10ms of delay
        # The test is meant to verify that the producer and consumer are running in
        # parallel despite the fact that the producer is running from another thread.
        # abs_tol is used to allow for some delay in the producer and consumer
        # due to overhead.
        # To verify that the producer and consumer are running in parallel, we
        # expect the delta_time to be smaller than the sleep delay in the producer
        # * # of items = 30 ms
        assert math.isclose(delta_time, 0, abs_tol=0.010) is True, (
            f"delta_time: {delta_time}"
        )

    await producer_task

Subdomains

Frequently Asked Questions

What does test_same_event_loop() do?
test_same_event_loop() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py.
Where is test_same_event_loop() defined?
test_same_event_loop() is defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py at line 9.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free