Home / File/ test_memory_stream.py — langchain Source File

test_memory_stream.py — langchain Source File

Architecture documentation for test_memory_stream.py, a python file in the langchain codebase. 5 imports, 0 dependents.

Entity Profile

Dependency Diagram

graph LR
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c["test_memory_stream.py"]
  a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"]
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> a327e534_84f6_5308_58ca_5727d5eda0cf
  6d7cdba5_8e52_34b5_6742_57caf6500c80["math"]
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> 6d7cdba5_8e52_34b5_6742_57caf6500c80
  0c1d9a1b_c553_0388_dbc1_58af49567aa2["time"]
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> 0c1d9a1b_c553_0388_dbc1_58af49567aa2
  cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7["collections.abc"]
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7
  dd98e19e_1fed_98d3_ef00_32b32b2e4e5c["langchain_core.tracers.memory_stream"]
  f5c9b9cd_4c4b_26c8_2419_4e2f783c706c --> dd98e19e_1fed_98d3_ef00_32b32b2e4e5c
  style f5c9b9cd_4c4b_26c8_2419_4e2f783c706c fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

import asyncio
import math
import time
from collections.abc import AsyncIterator

from langchain_core.tracers.memory_stream import _MemoryStream


async def test_same_event_loop() -> None:
    """Test that the memory stream works when the same event loop is used.

    This is the easy case.
    """
    reader_loop = asyncio.get_event_loop()
    channel = _MemoryStream[dict](reader_loop)
    writer = channel.get_send_stream()
    reader = channel.get_receive_stream()

    async def producer() -> None:
        """Produce items with slight delay."""
        tic = time.time()
        for i in range(3):
            await asyncio.sleep(0.10)
            toc = time.time()
            await writer.send(
                {
                    "item": i,
                    "produce_time": toc - tic,
                }
            )
        await writer.aclose()

    async def consumer() -> AsyncIterator[dict]:
        tic = time.time()
        async for item in reader:
            toc = time.time()
            yield {
                "receive_time": toc - tic,
                **item,
            }

    producer_task = asyncio.create_task(producer())

    items = [item async for item in consumer()]

    for item in items:
        delta_time = item["receive_time"] - item["produce_time"]
        # Allow a generous 10ms of delay
        # The test is meant to verify that the producer and consumer are running in
        # parallel despite the fact that the producer is running from another thread.
        # abs_tol is used to allow for some delay in the producer and consumer
        # due to overhead.
        # To verify that the producer and consumer are running in parallel, we
        # expect the delta_time to be smaller than the sleep delay in the producer
        # * # of items = 30 ms
        assert math.isclose(delta_time, 0, abs_tol=0.010) is True, (
            f"delta_time: {delta_time}"
        )

    await producer_task
// ... (81 more lines)

Subdomains

Dependencies

  • asyncio
  • collections.abc
  • langchain_core.tracers.memory_stream
  • math
  • time

Frequently Asked Questions

What does test_memory_stream.py do?
test_memory_stream.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, RunnableInterface subdomain.
What functions are defined in test_memory_stream.py?
test_memory_stream.py defines 4 function(s): test_closed_stream, test_queue_for_streaming_via_sync_call, test_same_event_loop, test_send_to_closed_stream.
What does test_memory_stream.py depend on?
test_memory_stream.py imports 5 module(s): asyncio, collections.abc, langchain_core.tracers.memory_stream, math, time.
Where is test_memory_stream.py in the architecture?
test_memory_stream.py is located at libs/core/tests/unit_tests/tracers/test_memory_stream.py (domain: CoreAbstractions, subdomain: RunnableInterface, directory: libs/core/tests/unit_tests/tracers).

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free