test_responses_stream() — langchain Function Reference
Architecture documentation for the test_responses_stream() function in test_responses_stream.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 6cc7bda0_8ed6_abb9_3210_5e81f80ffce6["test_responses_stream()"] b7a76314_9619_4110_6766_cbbf07c950ca["test_responses_stream.py"] 6cc7bda0_8ed6_abb9_3210_5e81f80ffce6 -->|defined in| b7a76314_9619_4110_6766_cbbf07c950ca d49a1af2_033f_28bc_0b8e_448a6f87b588["_strip_none()"] 6cc7bda0_8ed6_abb9_3210_5e81f80ffce6 -->|calls| d49a1af2_033f_28bc_0b8e_448a6f87b588 style 6cc7bda0_8ed6_abb9_3210_5e81f80ffce6 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/partners/openai/tests/unit_tests/chat_models/test_responses_stream.py lines 720–758
def test_responses_stream(output_version: str, expected_content: list[dict]) -> None:
llm = ChatOpenAI(
model="o4-mini", use_responses_api=True, output_version=output_version
)
mock_client = MagicMock()
def mock_create(*args: Any, **kwargs: Any) -> MockSyncContextManager:
return MockSyncContextManager(responses_stream)
mock_client.responses.create = mock_create
full: BaseMessageChunk | None = None
chunks = []
with patch.object(llm, "root_client", mock_client):
for chunk in llm.stream("test"):
assert isinstance(chunk, AIMessageChunk)
full = chunk if full is None else full + chunk
chunks.append(chunk)
assert isinstance(full, AIMessageChunk)
assert full.content == expected_content
assert full.additional_kwargs == {}
assert full.id == "resp_123"
# Test reconstruction
payload = llm._get_request_payload([full])
completed = [
item
for item in responses_stream
if item.type == "response.completed" # type: ignore[attr-defined]
]
assert len(completed) == 1
response = completed[0].response # type: ignore[attr-defined]
assert len(response.output) == len(payload["input"])
for idx, item in enumerate(response.output):
dumped = _strip_none(item.model_dump())
_ = dumped.pop("status", None)
assert dumped == payload["input"][idx]
Domain
Subdomains
Calls
Source
Frequently Asked Questions
What does test_responses_stream() do?
test_responses_stream() is a function in the langchain codebase, defined in libs/partners/openai/tests/unit_tests/chat_models/test_responses_stream.py.
Where is test_responses_stream() defined?
test_responses_stream() is defined in libs/partners/openai/tests/unit_tests/chat_models/test_responses_stream.py at line 720.
What does test_responses_stream() call?
test_responses_stream() calls 1 function(s): _strip_none.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free