Home / File/ streaming_aiter_final_only.py — langchain Source File

streaming_aiter_final_only.py — langchain Source File

Architecture documentation for streaming_aiter_final_only.py, a python file in the langchain codebase. 4 imports, 0 dependents.

Entity Profile

Dependency Diagram

graph LR
  fcc2a7ee_c767_a096_5582_71f59dbb2f09["streaming_aiter_final_only.py"]
  8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"]
  fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3
  ac2a9b92_4484_491e_1b48_ec85e71e1d58["langchain_core.outputs"]
  fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> ac2a9b92_4484_491e_1b48_ec85e71e1d58
  91721f45_4909_e489_8c1f_084f8bd87145["typing_extensions"]
  fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> 91721f45_4909_e489_8c1f_084f8bd87145
  135152d7_3f69_b654_06f0_0712e5613a76["langchain_classic.callbacks.streaming_aiter"]
  fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> 135152d7_3f69_b654_06f0_0712e5613a76
  style fcc2a7ee_c767_a096_5582_71f59dbb2f09 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

from __future__ import annotations

from typing import Any

from langchain_core.outputs import LLMResult
from typing_extensions import override

from langchain_classic.callbacks.streaming_aiter import AsyncIteratorCallbackHandler

DEFAULT_ANSWER_PREFIX_TOKENS = ["Final", "Answer", ":"]


class AsyncFinalIteratorCallbackHandler(AsyncIteratorCallbackHandler):
    """Callback handler that returns an async iterator.

    Only the final output of the agent will be iterated.
    """

    def append_to_last_tokens(self, token: str) -> None:
        """Append token to the last tokens."""
        self.last_tokens.append(token)
        self.last_tokens_stripped.append(token.strip())
        if len(self.last_tokens) > len(self.answer_prefix_tokens):
            self.last_tokens.pop(0)
            self.last_tokens_stripped.pop(0)

    def check_if_answer_reached(self) -> bool:
        """Check if the answer has been reached."""
        if self.strip_tokens:
            return self.last_tokens_stripped == self.answer_prefix_tokens_stripped
        return self.last_tokens == self.answer_prefix_tokens

    def __init__(
        self,
        *,
        answer_prefix_tokens: list[str] | None = None,
        strip_tokens: bool = True,
        stream_prefix: bool = False,
    ) -> None:
        """Instantiate AsyncFinalIteratorCallbackHandler.

        Args:
            answer_prefix_tokens: Token sequence that prefixes the answer.
                Default is ["Final", "Answer", ":"]
            strip_tokens: Ignore white spaces and new lines when comparing
                answer_prefix_tokens to last tokens? (to determine if answer has been
                reached)
            stream_prefix: Should answer prefix itself also be streamed?
        """
        super().__init__()
        if answer_prefix_tokens is None:
            self.answer_prefix_tokens = DEFAULT_ANSWER_PREFIX_TOKENS
        else:
            self.answer_prefix_tokens = answer_prefix_tokens
        if strip_tokens:
            self.answer_prefix_tokens_stripped = [
                token.strip() for token in self.answer_prefix_tokens
            ]
        else:
            self.answer_prefix_tokens_stripped = self.answer_prefix_tokens
        self.last_tokens = [""] * len(self.answer_prefix_tokens)
        self.last_tokens_stripped = [""] * len(self.answer_prefix_tokens)
        self.strip_tokens = strip_tokens
        self.stream_prefix = stream_prefix
        self.answer_reached = False

    @override
    async def on_llm_start(
        self,
        serialized: dict[str, Any],
        prompts: list[str],
        **kwargs: Any,
    ) -> None:
        # If two calls are made in a row, this resets the state
        self.done.clear()
        self.answer_reached = False

    @override
    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        if self.answer_reached:
            self.done.set()

    @override
    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        # Remember the last n tokens, where n = len(answer_prefix_tokens)
        self.append_to_last_tokens(token)

        # Check if the last n tokens match the answer_prefix_tokens list ...
        if self.check_if_answer_reached():
            self.answer_reached = True
            if self.stream_prefix:
                for t in self.last_tokens:
                    self.queue.put_nowait(t)
            return

        # If yes, then put tokens from now on
        if self.answer_reached:
            self.queue.put_nowait(token)

Subdomains

Dependencies

  • langchain_classic.callbacks.streaming_aiter
  • langchain_core.outputs
  • typing
  • typing_extensions

Frequently Asked Questions

What does streaming_aiter_final_only.py do?
streaming_aiter_final_only.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, RunnableInterface subdomain.
What does streaming_aiter_final_only.py depend on?
streaming_aiter_final_only.py imports 4 module(s): langchain_classic.callbacks.streaming_aiter, langchain_core.outputs, typing, typing_extensions.
Where is streaming_aiter_final_only.py in the architecture?
streaming_aiter_final_only.py is located at libs/langchain/langchain_classic/callbacks/streaming_aiter_final_only.py (domain: CoreAbstractions, subdomain: RunnableInterface, directory: libs/langchain/langchain_classic/callbacks).

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free