Home / Function/ test_stream_log_retriever() — langchain Function Reference

test_stream_log_retriever() — langchain Function Reference

Architecture documentation for the test_stream_log_retriever() function in test_runnable.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  52abe324_7bea_8d67_c37a_ca03d1570883["test_stream_log_retriever()"]
  26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"]
  52abe324_7bea_8d67_c37a_ca03d1570883 -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5
  style 52abe324_7bea_8d67_c37a_ca03d1570883 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable.py lines 2468–2504

async def test_stream_log_retriever() -> None:
    prompt = (
        SystemMessagePromptTemplate.from_template("You are a nice assistant.")
        + "{documents}"
        + "{question}"
    )
    llm = FakeListLLM(responses=["foo", "bar"])

    chain: Runnable = (
        {"documents": FakeRetriever(), "question": itemgetter("question")}
        | prompt
        | {"one": llm, "two": llm}
    )

    stream_log = [
        part async for part in chain.astream_log({"question": "What is your name?"})
    ]

    # Remove IDs from logs
    for part in stream_log:
        for op in part.ops:
            if (
                isinstance(op["value"], dict)
                and "id" in op["value"]
                and not isinstance(op["value"]["id"], list)  # serialized lc id
            ):
                del op["value"]["id"]

    assert sorted(cast("RunLog", add(stream_log)).state["logs"]) == [
        "ChatPromptTemplate",
        "FakeListLLM",
        "FakeListLLM:2",
        "FakeRetriever",
        "RunnableLambda",
        "RunnableParallel<documents,question>",
        "RunnableParallel<one,two>",
    ]

Domain

Subdomains

Frequently Asked Questions

What does test_stream_log_retriever() do?
test_stream_log_retriever() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_stream_log_retriever() defined?
test_stream_log_retriever() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 2468.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free