Home / File/ streaming_aiter.py — langchain Source File

streaming_aiter.py — langchain Source File

Architecture documentation for streaming_aiter.py, a python file in the langchain codebase. 6 imports, 0 dependents.

Entity Profile

Dependency Diagram

graph LR
  338feaf1_ef34_c2bd_c965_d6a9d87eac81["streaming_aiter.py"]
  a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"]
  338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> a327e534_84f6_5308_58ca_5727d5eda0cf
  cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7["collections.abc"]
  338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7
  8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"]
  338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3
  f3bc7443_c889_119d_0744_aacc3620d8d2["langchain_core.callbacks"]
  338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> f3bc7443_c889_119d_0744_aacc3620d8d2
  ac2a9b92_4484_491e_1b48_ec85e71e1d58["langchain_core.outputs"]
  338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> ac2a9b92_4484_491e_1b48_ec85e71e1d58
  91721f45_4909_e489_8c1f_084f8bd87145["typing_extensions"]
  338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> 91721f45_4909_e489_8c1f_084f8bd87145
  style 338feaf1_ef34_c2bd_c965_d6a9d87eac81 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

from __future__ import annotations

import asyncio
from collections.abc import AsyncIterator
from typing import Any, Literal, cast

from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult
from typing_extensions import override

# TODO: If used by two LLM runs in parallel this won't work as expected


class AsyncIteratorCallbackHandler(AsyncCallbackHandler):
    """Callback handler that returns an async iterator."""

    queue: asyncio.Queue[str]

    done: asyncio.Event

    @property
    def always_verbose(self) -> bool:
        """Always verbose."""
        return True

    def __init__(self) -> None:
        """Instantiate AsyncIteratorCallbackHandler."""
        self.queue = asyncio.Queue()
        self.done = asyncio.Event()

    @override
    async def on_llm_start(
        self,
        serialized: dict[str, Any],
        prompts: list[str],
        **kwargs: Any,
    ) -> None:
        # If two calls are made in a row, this resets the state
        self.done.clear()

    @override
    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        if token is not None and token != "":
            self.queue.put_nowait(token)

    @override
    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        self.done.set()

    @override
    async def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
        self.done.set()

    # TODO: implement the other methods

    async def aiter(self) -> AsyncIterator[str]:
        """Asynchronous iterator that yields tokens."""
        while not self.queue.empty() or not self.done.is_set():
            # Wait for the next token in the queue,
            # but stop waiting if the done event is set
            done, other = await asyncio.wait(
                [
                    # NOTE: If you add other tasks here, update the code below,
                    # which assumes each set has exactly one task each
                    asyncio.ensure_future(self.queue.get()),
                    asyncio.ensure_future(self.done.wait()),
                ],
                return_when=asyncio.FIRST_COMPLETED,
            )

            # Cancel the other task
            if other:
                other.pop().cancel()

            # Extract the value of the first completed task
            token_or_done = cast("str | Literal[True]", done.pop().result())

            # If the extracted value is the boolean True, the done event was set
            if token_or_done is True:
                break

            # Otherwise, the extracted value is a token, which we yield
            yield token_or_done

Subdomains

Dependencies

  • asyncio
  • collections.abc
  • langchain_core.callbacks
  • langchain_core.outputs
  • typing
  • typing_extensions

Frequently Asked Questions

What does streaming_aiter.py do?
streaming_aiter.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, Serialization subdomain.
What does streaming_aiter.py depend on?
streaming_aiter.py imports 6 module(s): asyncio, collections.abc, langchain_core.callbacks, langchain_core.outputs, typing, typing_extensions.
Where is streaming_aiter.py in the architecture?
streaming_aiter.py is located at libs/langchain/langchain_classic/callbacks/streaming_aiter.py (domain: CoreAbstractions, subdomain: Serialization, directory: libs/langchain/langchain_classic/callbacks).

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free