test_cancel_astream_events() — langchain Function Reference
Architecture documentation for the test_cancel_astream_events() function in test_runnable_events_v2.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 871ce5fd_04b5_0b98_83c8_1c98e6ed7672["test_cancel_astream_events()"] 33c02978_2077_5819_7048_bc2a81e80625["test_runnable_events_v2.py"] 871ce5fd_04b5_0b98_83c8_1c98e6ed7672 -->|defined in| 33c02978_2077_5819_7048_bc2a81e80625 ff9dc64e_d522_d439_5302_1fca52a68214["reset()"] 871ce5fd_04b5_0b98_83c8_1c98e6ed7672 -->|calls| ff9dc64e_d522_d439_5302_1fca52a68214 style 871ce5fd_04b5_0b98_83c8_1c98e6ed7672 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py lines 2527–2596
async def test_cancel_astream_events() -> None:
class AwhileMaker:
def __init__(self) -> None:
self.reset()
async def __call__(self, value: Any) -> Any:
self.started = True
try:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
self.cancelled = True
raise
return value
def reset(self) -> None:
self.started = False
self.cancelled = False
alittlewhile = AwhileMaker()
awhile = AwhileMaker()
anotherwhile = AwhileMaker()
outer_cancelled = False
@chain
async def sequence(value: Any) -> Any:
try:
yield await alittlewhile(value)
yield await awhile(value)
yield await anotherwhile(value)
except asyncio.CancelledError:
nonlocal outer_cancelled
outer_cancelled = True
raise
got_event = False
async def aconsume(stream: AsyncIterator[Any]) -> None:
nonlocal got_event
# here we don't need aclosing as cancelling the task is propagated
# to the async generator being consumed
async for chunk in stream:
if chunk["event"] == "on_chain_stream":
got_event = True
assert chunk["data"]["chunk"] == {"value": 1}
task.cancel()
thread2: RunnableConfig = {"configurable": {"thread_id": 2}}
task = asyncio.create_task(
aconsume(sequence.astream_events({"value": 1}, thread2, version="v2"))
)
with pytest.raises(asyncio.CancelledError):
await task
# did break
assert got_event
# did cancel outer chain
assert outer_cancelled
# node "alittlewhile" starts, not cancelled
assert alittlewhile.started is True
assert alittlewhile.cancelled is False
# node "awhile" starts but is cancelled
assert awhile.started is True
assert awhile.cancelled is True
# node "anotherwhile" should never start
assert anotherwhile.started is False
Domain
Subdomains
Calls
Source
Frequently Asked Questions
What does test_cancel_astream_events() do?
test_cancel_astream_events() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py.
Where is test_cancel_astream_events() defined?
test_cancel_astream_events() is defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py at line 2527.
What does test_cancel_astream_events() call?
test_cancel_astream_events() calls 1 function(s): reset.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free