Home / Function/ test_async_in_async_stream_lambdas() — langchain Function Reference

test_async_in_async_stream_lambdas() — langchain Function Reference

Architecture documentation for the test_async_in_async_stream_lambdas() function in test_runnable_events_v2.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  ee416508_9e2a_a50d_f6d0_f03000a99329["test_async_in_async_stream_lambdas()"]
  33c02978_2077_5819_7048_bc2a81e80625["test_runnable_events_v2.py"]
  ee416508_9e2a_a50d_f6d0_f03000a99329 -->|defined in| 33c02978_2077_5819_7048_bc2a81e80625
  858b85ba_6d9e_d209_9eb5_2469bad0adca["astream()"]
  ee416508_9e2a_a50d_f6d0_f03000a99329 -->|calls| 858b85ba_6d9e_d209_9eb5_2469bad0adca
  716d2a5e_dc8e_3cae_e044_b56b06bee655["_collect_events()"]
  ee416508_9e2a_a50d_f6d0_f03000a99329 -->|calls| 716d2a5e_dc8e_3cae_e044_b56b06bee655
  style ee416508_9e2a_a50d_f6d0_f03000a99329 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py lines 2081–2098

async def test_async_in_async_stream_lambdas() -> None:
    """Test invoking nested runnable lambda."""

    async def add_one(x: int) -> int:
        return x + 1

    add_one_ = RunnableLambda(add_one)

    async def add_one_proxy(x: int, config: RunnableConfig) -> int:
        # Use sync streaming
        streaming = add_one_.astream(x, config)
        results = [result async for result in streaming]
        return results[0]

    add_one_proxy_ = RunnableLambda[int, int](add_one_proxy)

    events = await _collect_events(add_one_proxy_.astream_events(1, version="v2"))
    _assert_events_equal_allow_superset_metadata(events, EXPECTED_EVENTS)

Domain

Subdomains

Frequently Asked Questions

What does test_async_in_async_stream_lambdas() do?
test_async_in_async_stream_lambdas() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py.
Where is test_async_in_async_stream_lambdas() defined?
test_async_in_async_stream_lambdas() is defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py at line 2081.
What does test_async_in_async_stream_lambdas() call?
test_async_in_async_stream_lambdas() calls 2 function(s): _collect_events, astream.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free