Home / Function/ test_with_config_async() — langchain Function Reference

test_with_config_async() — langchain Function Reference

Architecture documentation for the test_with_config_async() function in test_runnable.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  23b79a55_6bbb_71a7_5d94_7cea0078049a["test_with_config_async()"]
  26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"]
  23b79a55_6bbb_71a7_5d94_7cea0078049a -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5
  8652094c_ec57_c551_fc44_9566d00cf872["abatch()"]
  23b79a55_6bbb_71a7_5d94_7cea0078049a -->|calls| 8652094c_ec57_c551_fc44_9566d00cf872
  style 23b79a55_6bbb_71a7_5d94_7cea0078049a fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable.py lines 1279–1374

async def test_with_config_async(mocker: MockerFixture) -> None:
    fake = FakeRunnable()
    spy = mocker.spy(fake, "invoke")

    handler = ConsoleCallbackHandler()
    assert (
        await fake.with_config(metadata={"a": "b"}).ainvoke(
            "hello", config={"callbacks": [handler]}
        )
        == 5
    )
    assert spy.call_args_list == [
        mocker.call(
            "hello",
            {
                "callbacks": [handler],
                "metadata": {"a": "b"},
                "configurable": {},
                "tags": [],
            },
        ),
    ]
    spy.reset_mock()

    assert [
        part async for part in fake.with_config(metadata={"a": "b"}).astream("hello")
    ] == [5]
    assert spy.call_args_list == [
        mocker.call("hello", {"metadata": {"a": "b"}, "tags": [], "configurable": {}}),
    ]
    spy.reset_mock()

    assert await fake.with_config(recursion_limit=5, tags=["c"]).abatch(
        ["hello", "wooorld"], {"metadata": {"key": "value"}}
    ) == [
        5,
        7,
    ]
    assert sorted(spy.call_args_list) == [
        mocker.call(
            "hello",
            {
                "metadata": {"key": "value"},
                "tags": ["c"],
                "callbacks": None,
                "recursion_limit": 5,
                "configurable": {},
            },
        ),
        mocker.call(
            "wooorld",
            {
                "metadata": {"key": "value"},
                "tags": ["c"],
                "callbacks": None,
                "recursion_limit": 5,
                "configurable": {},
            },
        ),
    ]
    spy.reset_mock()

    assert sorted(
        [
            c
            async for c in fake.with_config(
                recursion_limit=5, tags=["c"]
            ).abatch_as_completed(["hello", "wooorld"], {"metadata": {"key": "value"}})
        ]
    ) == [
        (0, 5),
        (1, 7),
    ]
    assert len(spy.call_args_list) == 2
    first_call = next(call for call in spy.call_args_list if call.args[0] == "hello")
    assert first_call == mocker.call(
        "hello",
        {
            "metadata": {"key": "value"},
            "tags": ["c"],
            "callbacks": None,

Domain

Subdomains

Calls

Frequently Asked Questions

What does test_with_config_async() do?
test_with_config_async() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_with_config_async() defined?
test_with_config_async() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 1279.
What does test_with_config_async() call?
test_with_config_async() calls 1 function(s): abatch.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free