Home / Function/ test_prompt_async() — langchain Function Reference

test_prompt_async() — langchain Function Reference

Architecture documentation for the test_prompt_async() function in test_runnable.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  266fc5e9_dd16_6f4a_e9b8_5a2332b308b5["test_prompt_async()"]
  26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"]
  266fc5e9_dd16_6f4a_e9b8_5a2332b308b5 -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5
  8652094c_ec57_c551_fc44_9566d00cf872["abatch()"]
  266fc5e9_dd16_6f4a_e9b8_5a2332b308b5 -->|calls| 8652094c_ec57_c551_fc44_9566d00cf872
  style 266fc5e9_dd16_6f4a_e9b8_5a2332b308b5 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable.py lines 1485–1618

async def test_prompt_async() -> None:
    prompt = ChatPromptTemplate.from_messages(
        messages=[
            SystemMessage(content="You are a nice assistant."),
            HumanMessagePromptTemplate.from_template("{question}"),
        ]
    )
    expected = ChatPromptValue(
        messages=[
            SystemMessage(content="You are a nice assistant."),
            HumanMessage(content="What is your name?"),
        ]
    )

    assert await prompt.ainvoke({"question": "What is your name?"}) == expected

    assert await prompt.abatch(
        [
            {"question": "What is your name?"},
            {"question": "What is your favorite color?"},
        ]
    ) == [
        expected,
        ChatPromptValue(
            messages=[
                SystemMessage(content="You are a nice assistant."),
                HumanMessage(content="What is your favorite color?"),
            ]
        ),
    ]

    assert [
        part async for part in prompt.astream({"question": "What is your name?"})
    ] == [expected]

    stream_log = [
        part async for part in prompt.astream_log({"question": "What is your name?"})
    ]

    assert len(stream_log[0].ops) == 1
    assert stream_log[0].ops[0]["op"] == "replace"
    assert stream_log[0].ops[0]["path"] == ""
    assert stream_log[0].ops[0]["value"]["logs"] == {}
    assert stream_log[0].ops[0]["value"]["final_output"] is None
    assert stream_log[0].ops[0]["value"]["streamed_output"] == []
    assert isinstance(stream_log[0].ops[0]["value"]["id"], str)

    assert stream_log[1:] == [
        RunLogPatch(
            {"op": "add", "path": "/streamed_output/-", "value": expected},
            {
                "op": "replace",
                "path": "/final_output",
                "value": ChatPromptValue(
                    messages=[
                        SystemMessage(content="You are a nice assistant."),
                        HumanMessage(content="What is your name?"),
                    ]
                ),
            },
        ),
    ]

    stream_log_state = [
        part
        async for part in prompt.astream_log(
            {"question": "What is your name?"}, diff=False
        )
    ]

    # remove random id
    stream_log[0].ops[0]["value"]["id"] = "00000000-0000-0000-0000-000000000000"
    stream_log_state[-1].ops[0]["value"]["id"] = "00000000-0000-0000-0000-000000000000"
    stream_log_state[-1].state["id"] = "00000000-0000-0000-0000-000000000000"

    # assert output with diff=False matches output with diff=True
    assert stream_log_state[-1].ops == [op for chunk in stream_log for op in chunk.ops]
    assert stream_log_state[-1] == RunLog(
        *[op for chunk in stream_log for op in chunk.ops],
        state={
            "final_output": ChatPromptValue(

Domain

Subdomains

Calls

Frequently Asked Questions

What does test_prompt_async() do?
test_prompt_async() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_prompt_async() defined?
test_prompt_async() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 1485.
What does test_prompt_async() call?
test_prompt_async() calls 1 function(s): abatch.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free