test_map_astream() — langchain Function Reference
Architecture documentation for the test_map_astream() function in test_runnable.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 4ae5c9d4_35a8_52a8_d1f8_f5ecffd86584["test_map_astream()"] 26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"] 4ae5c9d4_35a8_52a8_d1f8_f5ecffd86584 -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5 f59d5b6a_111b_6895_b338_7e3d29e63896["invoke()"] 4ae5c9d4_35a8_52a8_d1f8_f5ecffd86584 -->|calls| f59d5b6a_111b_6895_b338_7e3d29e63896 style 4ae5c9d4_35a8_52a8_d1f8_f5ecffd86584 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/runnables/test_runnable.py lines 3285–3411
async def test_map_astream() -> None:
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
chat_res = "i'm a chatbot"
# sleep to better simulate a real stream
chat = FakeListChatModel(responses=[chat_res], sleep=0.01)
llm_res = "i'm a textbot"
# sleep to better simulate a real stream
llm = FakeStreamingListLLM(responses=[llm_res], sleep=0.01)
chain: Runnable = prompt | {
"chat": chat.bind(stop=["Thought:"]),
"llm": llm,
"passthrough": RunnablePassthrough(),
}
stream = chain.astream({"question": "What is your name?"})
final_value = None
streamed_chunks = []
async for chunk in stream:
streamed_chunks.append(chunk)
if final_value is None:
final_value = chunk
else:
final_value += chunk
assert streamed_chunks[0] in [
{"passthrough": prompt.invoke({"question": "What is your name?"})},
{"llm": "i"},
{"chat": _any_id_ai_message_chunk(content="i")},
]
assert len(streamed_chunks) == len(chat_res) + len(llm_res) + 1
assert all(len(c.keys()) == 1 for c in streamed_chunks)
assert final_value is not None
assert final_value.get("chat").content == "i'm a chatbot"
final_value["chat"].id = AnyStr()
assert final_value.get("llm") == "i'm a textbot"
assert final_value.get("passthrough") == prompt.invoke(
{"question": "What is your name?"}
)
# Test astream_log state accumulation
final_state = None
streamed_ops = []
async for chunk in chain.astream_log({"question": "What is your name?"}):
streamed_ops.extend(chunk.ops)
if final_state is None:
final_state = chunk
else:
final_state += chunk
final_state = cast("RunLog", final_state)
assert final_state.state["final_output"] == final_value
assert len(final_state.state["streamed_output"]) == len(streamed_chunks)
assert isinstance(final_state.state["id"], str)
assert len(final_state.ops) == len(streamed_ops)
assert len(final_state.state["logs"]) == 5
assert (
final_state.state["logs"]["ChatPromptTemplate"]["name"] == "ChatPromptTemplate"
)
assert final_state.state["logs"]["ChatPromptTemplate"][
"final_output"
] == prompt.invoke({"question": "What is your name?"})
assert (
final_state.state["logs"]["RunnableParallel<chat,llm,passthrough>"]["name"]
== "RunnableParallel<chat,llm,passthrough>"
)
assert sorted(final_state.state["logs"]) == [
"ChatPromptTemplate",
"FakeListChatModel",
"FakeStreamingListLLM",
"RunnableParallel<chat,llm,passthrough>",
"RunnablePassthrough",
]
Domain
Subdomains
Calls
Source
Frequently Asked Questions
What does test_map_astream() do?
test_map_astream() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_map_astream() defined?
test_map_astream() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 3285.
What does test_map_astream() call?
test_map_astream() calls 1 function(s): invoke.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free