Home / Function/ test_passthrough_tap_async() — langchain Function Reference

test_passthrough_tap_async() — langchain Function Reference

Architecture documentation for the test_passthrough_tap_async() function in test_runnable.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  e529913f_ad36_f5f7_eb7e_5fc8b9592de0["test_passthrough_tap_async()"]
  26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"]
  e529913f_ad36_f5f7_eb7e_5fc8b9592de0 -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5
  8652094c_ec57_c551_fc44_9566d00cf872["abatch()"]
  e529913f_ad36_f5f7_eb7e_5fc8b9592de0 -->|calls| 8652094c_ec57_c551_fc44_9566d00cf872
  style e529913f_ad36_f5f7_eb7e_5fc8b9592de0 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable.py lines 1070–1140

async def test_passthrough_tap_async(mocker: MockerFixture) -> None:
    fake = FakeRunnable()
    mock = mocker.Mock()

    seq = RunnablePassthrough[Any](mock) | fake | RunnablePassthrough[Any](mock)

    assert await seq.ainvoke("hello", my_kwarg="value") == 5
    assert mock.call_args_list == [
        mocker.call("hello", my_kwarg="value"),
        mocker.call(5),
    ]
    mock.reset_mock()

    assert await seq.abatch(["hello", "byebye"], my_kwarg="value") == [5, 6]
    assert len(mock.call_args_list) == 4
    for call in [
        mocker.call("hello", my_kwarg="value"),
        mocker.call("byebye", my_kwarg="value"),
        mocker.call(5),
        mocker.call(6),
    ]:
        assert call in mock.call_args_list
    mock.reset_mock()

    assert await seq.abatch(
        ["hello", "byebye"], my_kwarg="value", return_exceptions=True
    ) == [
        5,
        6,
    ]
    assert len(mock.call_args_list) == 4
    for call in [
        mocker.call("hello", my_kwarg="value"),
        mocker.call("byebye", my_kwarg="value"),
        mocker.call(5),
        mocker.call(6),
    ]:
        assert call in mock.call_args_list
    mock.reset_mock()

    assert sorted(
        [
            a
            async for a in seq.abatch_as_completed(
                ["hello", "byebye"], my_kwarg="value", return_exceptions=True
            )
        ]
    ) == [
        (0, 5),
        (1, 6),
    ]
    assert len(mock.call_args_list) == 4
    for call in [
        mocker.call("hello", my_kwarg="value"),
        mocker.call("byebye", my_kwarg="value"),
        mocker.call(5),
        mocker.call(6),
    ]:
        assert call in mock.call_args_list
    mock.reset_mock()

    assert [
        part
        async for part in seq.astream(
            "hello", {"metadata": {"key": "value"}}, my_kwarg="value"
        )
    ] == [5]
    assert mock.call_args_list == [
        mocker.call("hello", my_kwarg="value"),
        mocker.call(5),
    ]

Domain

Subdomains

Calls

Frequently Asked Questions

What does test_passthrough_tap_async() do?
test_passthrough_tap_async() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_passthrough_tap_async() defined?
test_passthrough_tap_async() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 1070.
What does test_passthrough_tap_async() call?
test_passthrough_tap_async() calls 1 function(s): abatch.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free