Home / Function/ test_cancel_astream_events() — langchain Function Reference

test_cancel_astream_events() — langchain Function Reference

Architecture documentation for the test_cancel_astream_events() function in test_runnable_events_v2.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  871ce5fd_04b5_0b98_83c8_1c98e6ed7672["test_cancel_astream_events()"]
  33c02978_2077_5819_7048_bc2a81e80625["test_runnable_events_v2.py"]
  871ce5fd_04b5_0b98_83c8_1c98e6ed7672 -->|defined in| 33c02978_2077_5819_7048_bc2a81e80625
  ff9dc64e_d522_d439_5302_1fca52a68214["reset()"]
  871ce5fd_04b5_0b98_83c8_1c98e6ed7672 -->|calls| ff9dc64e_d522_d439_5302_1fca52a68214
  style 871ce5fd_04b5_0b98_83c8_1c98e6ed7672 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py lines 2527–2596

async def test_cancel_astream_events() -> None:
    class AwhileMaker:
        def __init__(self) -> None:
            self.reset()

        async def __call__(self, value: Any) -> Any:
            self.started = True
            try:
                await asyncio.sleep(0.5)
            except asyncio.CancelledError:
                self.cancelled = True
                raise
            return value

        def reset(self) -> None:
            self.started = False
            self.cancelled = False

    alittlewhile = AwhileMaker()
    awhile = AwhileMaker()
    anotherwhile = AwhileMaker()

    outer_cancelled = False

    @chain
    async def sequence(value: Any) -> Any:
        try:
            yield await alittlewhile(value)
            yield await awhile(value)
            yield await anotherwhile(value)
        except asyncio.CancelledError:
            nonlocal outer_cancelled
            outer_cancelled = True
            raise

    got_event = False

    async def aconsume(stream: AsyncIterator[Any]) -> None:
        nonlocal got_event
        # here we don't need aclosing as cancelling the task is propagated
        # to the async generator being consumed
        async for chunk in stream:
            if chunk["event"] == "on_chain_stream":
                got_event = True
                assert chunk["data"]["chunk"] == {"value": 1}
                task.cancel()

    thread2: RunnableConfig = {"configurable": {"thread_id": 2}}
    task = asyncio.create_task(
        aconsume(sequence.astream_events({"value": 1}, thread2, version="v2"))
    )

    with pytest.raises(asyncio.CancelledError):
        await task

    # did break
    assert got_event
    # did cancel outer chain
    assert outer_cancelled

    # node "alittlewhile" starts, not cancelled
    assert alittlewhile.started is True
    assert alittlewhile.cancelled is False

    # node "awhile" starts but is cancelled
    assert awhile.started is True
    assert awhile.cancelled is True

    # node "anotherwhile" should never start
    assert anotherwhile.started is False

Domain

Subdomains

Calls

Frequently Asked Questions

What does test_cancel_astream_events() do?
test_cancel_astream_events() is a function in the langchain codebase, defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py.
Where is test_cancel_astream_events() defined?
test_cancel_astream_events() is defined in libs/core/tests/unit_tests/runnables/test_runnable_events_v2.py at line 2527.
What does test_cancel_astream_events() call?
test_cancel_astream_events() calls 1 function(s): reset.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free