streaming_aiter_final_only.py — langchain Source File
Architecture documentation for streaming_aiter_final_only.py, a python file in the langchain codebase. 4 imports, 0 dependents.
Entity Profile
Dependency Diagram
graph LR fcc2a7ee_c767_a096_5582_71f59dbb2f09["streaming_aiter_final_only.py"] 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"] fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3 ac2a9b92_4484_491e_1b48_ec85e71e1d58["langchain_core.outputs"] fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> ac2a9b92_4484_491e_1b48_ec85e71e1d58 91721f45_4909_e489_8c1f_084f8bd87145["typing_extensions"] fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> 91721f45_4909_e489_8c1f_084f8bd87145 135152d7_3f69_b654_06f0_0712e5613a76["langchain_classic.callbacks.streaming_aiter"] fcc2a7ee_c767_a096_5582_71f59dbb2f09 --> 135152d7_3f69_b654_06f0_0712e5613a76 style fcc2a7ee_c767_a096_5582_71f59dbb2f09 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
from __future__ import annotations
from typing import Any
from langchain_core.outputs import LLMResult
from typing_extensions import override
from langchain_classic.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
DEFAULT_ANSWER_PREFIX_TOKENS = ["Final", "Answer", ":"]
class AsyncFinalIteratorCallbackHandler(AsyncIteratorCallbackHandler):
"""Callback handler that returns an async iterator.
Only the final output of the agent will be iterated.
"""
def append_to_last_tokens(self, token: str) -> None:
"""Append token to the last tokens."""
self.last_tokens.append(token)
self.last_tokens_stripped.append(token.strip())
if len(self.last_tokens) > len(self.answer_prefix_tokens):
self.last_tokens.pop(0)
self.last_tokens_stripped.pop(0)
def check_if_answer_reached(self) -> bool:
"""Check if the answer has been reached."""
if self.strip_tokens:
return self.last_tokens_stripped == self.answer_prefix_tokens_stripped
return self.last_tokens == self.answer_prefix_tokens
def __init__(
self,
*,
answer_prefix_tokens: list[str] | None = None,
strip_tokens: bool = True,
stream_prefix: bool = False,
) -> None:
"""Instantiate AsyncFinalIteratorCallbackHandler.
Args:
answer_prefix_tokens: Token sequence that prefixes the answer.
Default is ["Final", "Answer", ":"]
strip_tokens: Ignore white spaces and new lines when comparing
answer_prefix_tokens to last tokens? (to determine if answer has been
reached)
stream_prefix: Should answer prefix itself also be streamed?
"""
super().__init__()
if answer_prefix_tokens is None:
self.answer_prefix_tokens = DEFAULT_ANSWER_PREFIX_TOKENS
else:
self.answer_prefix_tokens = answer_prefix_tokens
if strip_tokens:
self.answer_prefix_tokens_stripped = [
token.strip() for token in self.answer_prefix_tokens
]
else:
self.answer_prefix_tokens_stripped = self.answer_prefix_tokens
self.last_tokens = [""] * len(self.answer_prefix_tokens)
self.last_tokens_stripped = [""] * len(self.answer_prefix_tokens)
self.strip_tokens = strip_tokens
self.stream_prefix = stream_prefix
self.answer_reached = False
@override
async def on_llm_start(
self,
serialized: dict[str, Any],
prompts: list[str],
**kwargs: Any,
) -> None:
# If two calls are made in a row, this resets the state
self.done.clear()
self.answer_reached = False
@override
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
if self.answer_reached:
self.done.set()
@override
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
# Remember the last n tokens, where n = len(answer_prefix_tokens)
self.append_to_last_tokens(token)
# Check if the last n tokens match the answer_prefix_tokens list ...
if self.check_if_answer_reached():
self.answer_reached = True
if self.stream_prefix:
for t in self.last_tokens:
self.queue.put_nowait(t)
return
# If yes, then put tokens from now on
if self.answer_reached:
self.queue.put_nowait(token)
Domain
Subdomains
Dependencies
- langchain_classic.callbacks.streaming_aiter
- langchain_core.outputs
- typing
- typing_extensions
Source
Frequently Asked Questions
What does streaming_aiter_final_only.py do?
streaming_aiter_final_only.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, RunnableInterface subdomain.
What does streaming_aiter_final_only.py depend on?
streaming_aiter_final_only.py imports 4 module(s): langchain_classic.callbacks.streaming_aiter, langchain_core.outputs, typing, typing_extensions.
Where is streaming_aiter_final_only.py in the architecture?
streaming_aiter_final_only.py is located at libs/langchain/langchain_classic/callbacks/streaming_aiter_final_only.py (domain: CoreAbstractions, subdomain: RunnableInterface, directory: libs/langchain/langchain_classic/callbacks).
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free