Home / Function/ test_async_in_async_stream_lambdas() — langchain Function Reference

test_async_in_async_stream_lambdas() — langchain Function Reference

Architecture documentation for the test_async_in_async_stream_lambdas() function in test_runnable_events_v1.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  42c7bb2c_e81f_aadf_bd51_21cc88d5eb32["test_async_in_async_stream_lambdas()"]
  8ff41f3c_f250_f8de_8094_4f24860a10e0["test_runnable_events_v1.py"]
  42c7bb2c_e81f_aadf_bd51_21cc88d5eb32 -->|defined in| 8ff41f3c_f250_f8de_8094_4f24860a10e0
  aac0453e_34cd_4dce_c7fa_f176ab20140b["_collect_events()"]
  42c7bb2c_e81f_aadf_bd51_21cc88d5eb32 -->|calls| aac0453e_34cd_4dce_c7fa_f176ab20140b
  6ebe7fde_6e8f_dae5_d42f_9cea181617f5["_assert_events_equal_allow_superset_metadata()"]
  42c7bb2c_e81f_aadf_bd51_21cc88d5eb32 -->|calls| 6ebe7fde_6e8f_dae5_d42f_9cea181617f5
  style 42c7bb2c_e81f_aadf_bd51_21cc88d5eb32 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable_events_v1.py lines 2117–2134

async def test_async_in_async_stream_lambdas() -> None:
    """Test invoking nested runnable lambda."""

    async def add_one(x: int) -> int:
        return x + 1

    add_one_ = RunnableLambda(add_one)

    async def add_one_proxy(x: int, config: RunnableConfig) -> int:
        # Use sync streaming
        streaming = add_one_.astream(x, config)
        results = [result async for result in streaming]
        return results[0]

    add_one_proxy_ = RunnableLambda[int, int](add_one_proxy)

    events = await _collect_events(add_one_proxy_.astream_events(1, version="v1"))
    _assert_events_equal_allow_superset_metadata(events, EXPECTED_EVENTS)

Domain

Subdomains

Frequently Asked Questions

What does test_async_in_async_stream_lambdas() do?
test_async_in_async_stream_lambdas() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v1.py.
Where is test_async_in_async_stream_lambdas() defined?
test_async_in_async_stream_lambdas() is defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v1.py at line 2117.
What does test_async_in_async_stream_lambdas() call?
test_async_in_async_stream_lambdas() calls 2 function(s): _assert_events_equal_allow_superset_metadata, _collect_events.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free