test_astream_implementation_uses_astream() — langchain Function Reference
Architecture documentation for the test_astream_implementation_uses_astream() function in test_base.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD de2021c5_a31f_b55f_eb9e_96aa9dd1a608["test_astream_implementation_uses_astream()"] 8cb88ac4_61d9_baf3_9df4_9b3f5095927e["test_base.py"] de2021c5_a31f_b55f_eb9e_96aa9dd1a608 -->|defined in| 8cb88ac4_61d9_baf3_9df4_9b3f5095927e 360a3b9c_a247_3d48_cdac_c5ef28482267["_astream()"] de2021c5_a31f_b55f_eb9e_96aa9dd1a608 -->|calls| 360a3b9c_a247_3d48_cdac_c5ef28482267 style de2021c5_a31f_b55f_eb9e_96aa9dd1a608 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/language_models/chat_models/test_base.py lines 269–309
async def test_astream_implementation_uses_astream() -> None:
"""Test astream uses appropriate implementation."""
class ModelWithAsyncStream(BaseChatModel):
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
"""Top Level call."""
raise NotImplementedError
@override
async def _astream(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None, # type: ignore[override]
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
"""Stream the output of the model."""
yield ChatGenerationChunk(message=AIMessageChunk(content="a"))
yield ChatGenerationChunk(
message=AIMessageChunk(content="b", chunk_position="last")
)
@property
def _llm_type(self) -> str:
return "fake-chat-model"
model = ModelWithAsyncStream()
chunks = [chunk async for chunk in model.astream("anything")]
assert chunks == [
_any_id_ai_message_chunk(
content="a",
),
_any_id_ai_message_chunk(content="b", chunk_position="last"),
]
assert len({chunk.id for chunk in chunks}) == 1
Domain
Subdomains
Calls
Source
Frequently Asked Questions
What does test_astream_implementation_uses_astream() do?
test_astream_implementation_uses_astream() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/language_models/chat_models/test_base.py.
Where is test_astream_implementation_uses_astream() defined?
test_astream_implementation_uses_astream() is defined in libs/core/tests/unit_tests/language_models/chat_models/test_base.py at line 269.
What does test_astream_implementation_uses_astream() call?
test_astream_implementation_uses_astream() calls 1 function(s): _astream.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free