streaming_aiter.py — langchain Source File
Architecture documentation for streaming_aiter.py, a python file in the langchain codebase. 6 imports, 0 dependents.
Entity Profile
Dependency Diagram
graph LR 338feaf1_ef34_c2bd_c965_d6a9d87eac81["streaming_aiter.py"] a327e534_84f6_5308_58ca_5727d5eda0cf["asyncio"] 338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> a327e534_84f6_5308_58ca_5727d5eda0cf cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7["collections.abc"] 338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> cfe2bde5_180e_e3b0_df2b_55b3ebaca8e7 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3["typing"] 338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> 8e2034b7_ceb8_963f_29fc_2ea6b50ef9b3 f3bc7443_c889_119d_0744_aacc3620d8d2["langchain_core.callbacks"] 338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> f3bc7443_c889_119d_0744_aacc3620d8d2 ac2a9b92_4484_491e_1b48_ec85e71e1d58["langchain_core.outputs"] 338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> ac2a9b92_4484_491e_1b48_ec85e71e1d58 91721f45_4909_e489_8c1f_084f8bd87145["typing_extensions"] 338feaf1_ef34_c2bd_c965_d6a9d87eac81 --> 91721f45_4909_e489_8c1f_084f8bd87145 style 338feaf1_ef34_c2bd_c965_d6a9d87eac81 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from typing import Any, Literal, cast
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult
from typing_extensions import override
# TODO: If used by two LLM runs in parallel this won't work as expected
class AsyncIteratorCallbackHandler(AsyncCallbackHandler):
"""Callback handler that returns an async iterator."""
queue: asyncio.Queue[str]
done: asyncio.Event
@property
def always_verbose(self) -> bool:
"""Always verbose."""
return True
def __init__(self) -> None:
"""Instantiate AsyncIteratorCallbackHandler."""
self.queue = asyncio.Queue()
self.done = asyncio.Event()
@override
async def on_llm_start(
self,
serialized: dict[str, Any],
prompts: list[str],
**kwargs: Any,
) -> None:
# If two calls are made in a row, this resets the state
self.done.clear()
@override
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
if token is not None and token != "":
self.queue.put_nowait(token)
@override
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
self.done.set()
@override
async def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
self.done.set()
# TODO: implement the other methods
async def aiter(self) -> AsyncIterator[str]:
"""Asynchronous iterator that yields tokens."""
while not self.queue.empty() or not self.done.is_set():
# Wait for the next token in the queue,
# but stop waiting if the done event is set
done, other = await asyncio.wait(
[
# NOTE: If you add other tasks here, update the code below,
# which assumes each set has exactly one task each
asyncio.ensure_future(self.queue.get()),
asyncio.ensure_future(self.done.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
# Cancel the other task
if other:
other.pop().cancel()
# Extract the value of the first completed task
token_or_done = cast("str | Literal[True]", done.pop().result())
# If the extracted value is the boolean True, the done event was set
if token_or_done is True:
break
# Otherwise, the extracted value is a token, which we yield
yield token_or_done
Domain
Subdomains
Classes
Dependencies
- asyncio
- collections.abc
- langchain_core.callbacks
- langchain_core.outputs
- typing
- typing_extensions
Source
Frequently Asked Questions
What does streaming_aiter.py do?
streaming_aiter.py is a source file in the langchain codebase, written in python. It belongs to the CoreAbstractions domain, Serialization subdomain.
What does streaming_aiter.py depend on?
streaming_aiter.py imports 6 module(s): asyncio, collections.abc, langchain_core.callbacks, langchain_core.outputs, typing, typing_extensions.
Where is streaming_aiter.py in the architecture?
streaming_aiter.py is located at libs/langchain/langchain_classic/callbacks/streaming_aiter.py (domain: CoreAbstractions, subdomain: Serialization, directory: libs/langchain/langchain_classic/callbacks).
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free