AsyncStream Class — anthropic-sdk-python Architecture
Architecture documentation for the AsyncStream class in _streaming.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6["AsyncStream"] 8d141d22_ab1c_b4a1_744c_99e460d07454["_streaming.py"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|defined in| 8d141d22_ab1c_b4a1_744c_99e460d07454 7d073374_6230_1c97_123d_c3db76015d4f["__init__()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 7d073374_6230_1c97_123d_c3db76015d4f afd0e912_e8f7_e23d_622f_378a591abfdc["__anext__()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| afd0e912_e8f7_e23d_622f_378a591abfdc 273006cf_b321_986e_9a1a_fe0bfd4f5445["__aiter__()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 273006cf_b321_986e_9a1a_fe0bfd4f5445 d5539735_c422_7406_d09e_758baa2aeb5a["_iter_events()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| d5539735_c422_7406_d09e_758baa2aeb5a 25ebcabc_636f_8228_a433_2ed301cf8302["__stream__()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 25ebcabc_636f_8228_a433_2ed301cf8302 6f81eaa0_dd95_edba_114d_c1649dc1c77b["__aenter__()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 6f81eaa0_dd95_edba_114d_c1649dc1c77b 5d382d51_42db_9755_1edc_01ad6f39c93f["__aexit__()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 5d382d51_42db_9755_1edc_01ad6f39c93f 777716cd_b9ad_01f3_69e3_207b522f7bb6["close()"] 03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 777716cd_b9ad_01f3_69e3_207b522f7bb6
Relationship Graph
Source Code
src/anthropic/_streaming.py lines 161–255
class AsyncStream(Generic[_T], metaclass=_AsyncStreamMeta):
"""Provides the core interface to iterate over an asynchronous stream response."""
response: httpx.Response
_decoder: SSEDecoder | SSEBytesDecoder
def __init__(
self,
*,
cast_to: type[_T],
response: httpx.Response,
client: AsyncAnthropic,
) -> None:
self.response = response
self._cast_to = cast_to
self._client = client
self._decoder = client._make_sse_decoder()
self._iterator = self.__stream__()
async def __anext__(self) -> _T:
return await self._iterator.__anext__()
async def __aiter__(self) -> AsyncIterator[_T]:
async for item in self._iterator:
yield item
async def _iter_events(self) -> AsyncIterator[ServerSentEvent]:
async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
yield sse
async def __stream__(self) -> AsyncIterator[_T]:
cast_to = cast(Any, self._cast_to)
response = self.response
process_data = self._client._process_response_data
iterator = self._iter_events()
try:
async for sse in iterator:
if sse.event == "completion":
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
if (
sse.event == "message_start"
or sse.event == "message_delta"
or sse.event == "message_stop"
or sse.event == "content_block_start"
or sse.event == "content_block_delta"
or sse.event == "content_block_stop"
):
data = sse.json()
if is_dict(data) and "type" not in data:
data["type"] = sse.event
yield process_data(data=data, cast_to=cast_to, response=response)
if sse.event == "ping":
continue
if sse.event == "error":
body = sse.data
try:
body = sse.json()
err_msg = f"{body}"
except Exception:
err_msg = sse.data or f"Error code: {response.status_code}"
raise self._client._make_status_error(
err_msg,
body=body,
response=self.response,
)
finally:
# Ensure the response is closed even if the consumer doesn't read all data
await response.aclose()
async def __aenter__(self) -> Self:
return self
async def __aexit__(
Domain
Defined In
Source
Frequently Asked Questions
What is the AsyncStream class?
AsyncStream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/_streaming.py.
Where is AsyncStream defined?
AsyncStream is defined in src/anthropic/_streaming.py at line 161.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free