Home / Class/ AsyncIteratorCallbackHandler Class — langchain Architecture

AsyncIteratorCallbackHandler Class — langchain Architecture

Architecture documentation for the AsyncIteratorCallbackHandler class in streaming_aiter.py from the langchain codebase.

Entity Profile

Dependency Diagram

graph TD
  026a405e_b843_1fc9_0384_15895a6c1500["AsyncIteratorCallbackHandler"]
  64513429_d001_87b1_5f19_cc17a3f3577b["AsyncCallbackHandler"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|extends| 64513429_d001_87b1_5f19_cc17a3f3577b
  a7bda14c_8cf5_7edc_4727_0a9a8280f95d["streaming_aiter.py"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|defined in| a7bda14c_8cf5_7edc_4727_0a9a8280f95d
  733f2503_3e1c_cbac_0d9a_268718ba1da5["always_verbose()"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|method| 733f2503_3e1c_cbac_0d9a_268718ba1da5
  60788243_4aa4_55db_4523_8fd5aec70e1a["__init__()"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|method| 60788243_4aa4_55db_4523_8fd5aec70e1a
  aa36850e_7d93_31b8_2615_527851110493["on_llm_start()"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|method| aa36850e_7d93_31b8_2615_527851110493
  46c8df54_8402_8bb7_a034_763228861a6b["on_llm_new_token()"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|method| 46c8df54_8402_8bb7_a034_763228861a6b
  bf1d5b01_1909_6766_da85_933040d83f52["on_llm_end()"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|method| bf1d5b01_1909_6766_da85_933040d83f52
  de7423e5_067f_f7df_de93_f346713d00bd["on_llm_error()"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|method| de7423e5_067f_f7df_de93_f346713d00bd
  5a2284c4_e445_1a0c_e097_263832dc1143["aiter()"]
  026a405e_b843_1fc9_0384_15895a6c1500 -->|method| 5a2284c4_e445_1a0c_e097_263832dc1143

Relationship Graph

Source Code

libs/langchain/langchain_classic/callbacks/streaming_aiter.py lines 14–83

class AsyncIteratorCallbackHandler(AsyncCallbackHandler):
    """Callback handler that returns an async iterator."""

    queue: asyncio.Queue[str]

    done: asyncio.Event

    @property
    def always_verbose(self) -> bool:
        """Always verbose."""
        return True

    def __init__(self) -> None:
        """Instantiate AsyncIteratorCallbackHandler."""
        self.queue = asyncio.Queue()
        self.done = asyncio.Event()

    @override
    async def on_llm_start(
        self,
        serialized: dict[str, Any],
        prompts: list[str],
        **kwargs: Any,
    ) -> None:
        # If two calls are made in a row, this resets the state
        self.done.clear()

    @override
    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        if token is not None and token != "":
            self.queue.put_nowait(token)

    @override
    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        self.done.set()

    @override
    async def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
        self.done.set()

    # TODO: implement the other methods

    async def aiter(self) -> AsyncIterator[str]:
        """Asynchronous iterator that yields tokens."""
        while not self.queue.empty() or not self.done.is_set():
            # Wait for the next token in the queue,
            # but stop waiting if the done event is set
            done, other = await asyncio.wait(
                [
                    # NOTE: If you add other tasks here, update the code below,
                    # which assumes each set has exactly one task each
                    asyncio.ensure_future(self.queue.get()),
                    asyncio.ensure_future(self.done.wait()),
                ],
                return_when=asyncio.FIRST_COMPLETED,
            )

            # Cancel the other task
            if other:
                other.pop().cancel()

            # Extract the value of the first completed task
            token_or_done = cast("str | Literal[True]", done.pop().result())

            # If the extracted value is the boolean True, the done event was set
            if token_or_done is True:
                break

            # Otherwise, the extracted value is a token, which we yield
            yield token_or_done

Frequently Asked Questions

What is the AsyncIteratorCallbackHandler class?
AsyncIteratorCallbackHandler is a class in the langchain codebase, defined in libs/langchain/langchain_classic/callbacks/streaming_aiter.py.
Where is AsyncIteratorCallbackHandler defined?
AsyncIteratorCallbackHandler is defined in libs/langchain/langchain_classic/callbacks/streaming_aiter.py at line 14.
What does AsyncIteratorCallbackHandler extend?
AsyncIteratorCallbackHandler extends AsyncCallbackHandler.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free