test_passthrough_tap_async() — langchain Function Reference
Architecture documentation for the test_passthrough_tap_async() function in test_runnable.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD e529913f_ad36_f5f7_eb7e_5fc8b9592de0["test_passthrough_tap_async()"] 26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"] e529913f_ad36_f5f7_eb7e_5fc8b9592de0 -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5 8652094c_ec57_c551_fc44_9566d00cf872["abatch()"] e529913f_ad36_f5f7_eb7e_5fc8b9592de0 -->|calls| 8652094c_ec57_c551_fc44_9566d00cf872 style e529913f_ad36_f5f7_eb7e_5fc8b9592de0 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/runnables/test_runnable.py lines 1070–1140
async def test_passthrough_tap_async(mocker: MockerFixture) -> None:
fake = FakeRunnable()
mock = mocker.Mock()
seq = RunnablePassthrough[Any](mock) | fake | RunnablePassthrough[Any](mock)
assert await seq.ainvoke("hello", my_kwarg="value") == 5
assert mock.call_args_list == [
mocker.call("hello", my_kwarg="value"),
mocker.call(5),
]
mock.reset_mock()
assert await seq.abatch(["hello", "byebye"], my_kwarg="value") == [5, 6]
assert len(mock.call_args_list) == 4
for call in [
mocker.call("hello", my_kwarg="value"),
mocker.call("byebye", my_kwarg="value"),
mocker.call(5),
mocker.call(6),
]:
assert call in mock.call_args_list
mock.reset_mock()
assert await seq.abatch(
["hello", "byebye"], my_kwarg="value", return_exceptions=True
) == [
5,
6,
]
assert len(mock.call_args_list) == 4
for call in [
mocker.call("hello", my_kwarg="value"),
mocker.call("byebye", my_kwarg="value"),
mocker.call(5),
mocker.call(6),
]:
assert call in mock.call_args_list
mock.reset_mock()
assert sorted(
[
a
async for a in seq.abatch_as_completed(
["hello", "byebye"], my_kwarg="value", return_exceptions=True
)
]
) == [
(0, 5),
(1, 6),
]
assert len(mock.call_args_list) == 4
for call in [
mocker.call("hello", my_kwarg="value"),
mocker.call("byebye", my_kwarg="value"),
mocker.call(5),
mocker.call(6),
]:
assert call in mock.call_args_list
mock.reset_mock()
assert [
part
async for part in seq.astream(
"hello", {"metadata": {"key": "value"}}, my_kwarg="value"
)
] == [5]
assert mock.call_args_list == [
mocker.call("hello", my_kwarg="value"),
mocker.call(5),
]
Domain
Subdomains
Calls
Source
Frequently Asked Questions
What does test_passthrough_tap_async() do?
test_passthrough_tap_async() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_passthrough_tap_async() defined?
test_passthrough_tap_async() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 1070.
What does test_passthrough_tap_async() call?
test_passthrough_tap_async() calls 1 function(s): abatch.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free