test_map_stream_iterator_input() — langchain Function Reference
Architecture documentation for the test_map_stream_iterator_input() function in test_runnable.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 35b1d00e_7405_561b_7bea_38bbe18cbbcc["test_map_stream_iterator_input()"] 26df6ad8_0189_51d0_c3c1_6c3248893ff5["test_runnable.py"] 35b1d00e_7405_561b_7bea_38bbe18cbbcc -->|defined in| 26df6ad8_0189_51d0_c3c1_6c3248893ff5 style 35b1d00e_7405_561b_7bea_38bbe18cbbcc fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/runnables/test_runnable.py lines 3237–3282
def test_map_stream_iterator_input() -> None:
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
chat_res = "i'm a chatbot"
# sleep to better simulate a real stream
chat = FakeListChatModel(responses=[chat_res], sleep=0.01)
llm_res = "i'm a textbot"
# sleep to better simulate a real stream
llm = FakeStreamingListLLM(responses=[llm_res], sleep=0.01)
chain: Runnable = (
prompt
| llm
| {
"chat": chat.bind(stop=["Thought:"]),
"llm": llm,
"passthrough": RunnablePassthrough(),
}
)
stream = chain.stream({"question": "What is your name?"})
final_value = None
streamed_chunks = []
for chunk in stream:
streamed_chunks.append(chunk)
if final_value is None:
final_value = chunk
else:
final_value += chunk
assert streamed_chunks[0] in [
{"passthrough": "i"},
{"llm": "i"},
{"chat": _any_id_ai_message_chunk(content="i")},
]
assert len(streamed_chunks) == len(chat_res) + len(llm_res) + len(llm_res)
assert all(len(c.keys()) == 1 for c in streamed_chunks)
assert final_value is not None
assert final_value.get("chat").content == "i'm a chatbot"
assert final_value.get("llm") == "i'm a textbot"
assert final_value.get("passthrough") == "i'm a textbot"
Domain
Subdomains
Source
Frequently Asked Questions
What does test_map_stream_iterator_input() do?
test_map_stream_iterator_input() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable.py.
Where is test_map_stream_iterator_input() defined?
test_map_stream_iterator_input() is defined in libs/core/tests/unit_tests/runnables/test_runnable.py at line 3237.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free