Home / Function/ test_runnable_gen_async() — langchain Function Reference

test_runnable_gen_async() — langchain Function Reference

Architecture documentation for the test_runnable_gen_async() function in test_runnable.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  d5e5acb0_898f_3f00_2dcb_202ac33c9fc9["test_runnable_gen_async()"]
  26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"]
  d5e5acb0_898f_3f00_2dcb_202ac33c9fc9 -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5
  8652094c_ec57_c551_fc44_9566d00cf872["abatch()"]
  d5e5acb0_898f_3f00_2dcb_202ac33c9fc9 -->|calls| 8652094c_ec57_c551_fc44_9566d00cf872
  style d5e5acb0_898f_3f00_2dcb_202ac33c9fc9 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable.py lines 4883–4912

async def test_runnable_gen_async() -> None:
    """Test that a generator can be used as a runnable."""

    async def agen(_: AsyncIterator[Any]) -> AsyncIterator[int]:
        yield 1
        yield 2
        yield 3

    arunnable = RunnableGenerator(agen)

    assert await arunnable.ainvoke(None) == 6
    assert [p async for p in arunnable.astream(None)] == [1, 2, 3]
    assert await arunnable.abatch([None, None]) == [6, 6]

    class AsyncGen:
        async def __call__(self, _: AsyncIterator[Any]) -> AsyncIterator[int]:
            yield 1
            yield 2
            yield 3

    arunnablecallable = RunnableGenerator(AsyncGen())
    assert await arunnablecallable.ainvoke(None) == 6
    assert [p async for p in arunnablecallable.astream(None)] == [1, 2, 3]
    assert await arunnablecallable.abatch([None, None]) == [6, 6]
    with pytest.raises(NotImplementedError):
        await asyncio.to_thread(arunnablecallable.invoke, None)
    with pytest.raises(NotImplementedError):
        await asyncio.to_thread(arunnablecallable.stream, None)
    with pytest.raises(NotImplementedError):
        await asyncio.to_thread(arunnablecallable.batch, [None, None])

Domain

Subdomains

Calls

Frequently Asked Questions

What does test_runnable_gen_async() do?
test_runnable_gen_async() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_runnable_gen_async() defined?
test_runnable_gen_async() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 4883.
What does test_runnable_gen_async() call?
test_runnable_gen_async() calls 1 function(s): abatch.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free