test_closed_stream() — langchain Function Reference
Architecture documentation for the test_closed_stream() function in test_memory_stream.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD cf23b634_dc79_0d8d_bbaf_ca0a76aafe24["test_closed_stream()"] f5c9b9cd_4c4b_26c8_2419_4e2f783c706c["test_memory_stream.py"] cf23b634_dc79_0d8d_bbaf_ca0a76aafe24 -->|defined in| f5c9b9cd_4c4b_26c8_2419_4e2f783c706c style cf23b634_dc79_0d8d_bbaf_ca0a76aafe24 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/tracers/test_memory_stream.py lines 133–140
async def test_closed_stream() -> None:
reader_loop = asyncio.get_event_loop()
channel = _MemoryStream[str](reader_loop)
writer = channel.get_send_stream()
reader = channel.get_receive_stream()
await writer.aclose()
assert [chunk async for chunk in reader] == []
Domain
Subdomains
Source
Frequently Asked Questions
What does test_closed_stream() do?
test_closed_stream() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py.
Where is test_closed_stream() defined?
test_closed_stream() is defined in libs/core/tests/unit_tests/tracers/test_memory_stream.py at line 133.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free