Home / Class/ AsyncStream Class — anthropic-sdk-python Architecture

AsyncStream Class — anthropic-sdk-python Architecture

Architecture documentation for the AsyncStream class in _streaming.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6["AsyncStream"]
  8d141d22_ab1c_b4a1_744c_99e460d07454["_streaming.py"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|defined in| 8d141d22_ab1c_b4a1_744c_99e460d07454
  7d073374_6230_1c97_123d_c3db76015d4f["__init__()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 7d073374_6230_1c97_123d_c3db76015d4f
  afd0e912_e8f7_e23d_622f_378a591abfdc["__anext__()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| afd0e912_e8f7_e23d_622f_378a591abfdc
  273006cf_b321_986e_9a1a_fe0bfd4f5445["__aiter__()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 273006cf_b321_986e_9a1a_fe0bfd4f5445
  d5539735_c422_7406_d09e_758baa2aeb5a["_iter_events()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| d5539735_c422_7406_d09e_758baa2aeb5a
  25ebcabc_636f_8228_a433_2ed301cf8302["__stream__()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 25ebcabc_636f_8228_a433_2ed301cf8302
  6f81eaa0_dd95_edba_114d_c1649dc1c77b["__aenter__()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 6f81eaa0_dd95_edba_114d_c1649dc1c77b
  5d382d51_42db_9755_1edc_01ad6f39c93f["__aexit__()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 5d382d51_42db_9755_1edc_01ad6f39c93f
  777716cd_b9ad_01f3_69e3_207b522f7bb6["close()"]
  03fc0a8b_1c63_1aee_ef30_754aeebc2ff6 -->|method| 777716cd_b9ad_01f3_69e3_207b522f7bb6

Relationship Graph

Source Code

src/anthropic/_streaming.py lines 161–255

class AsyncStream(Generic[_T], metaclass=_AsyncStreamMeta):
    """Provides the core interface to iterate over an asynchronous stream response."""

    response: httpx.Response

    _decoder: SSEDecoder | SSEBytesDecoder

    def __init__(
        self,
        *,
        cast_to: type[_T],
        response: httpx.Response,
        client: AsyncAnthropic,
    ) -> None:
        self.response = response
        self._cast_to = cast_to
        self._client = client
        self._decoder = client._make_sse_decoder()
        self._iterator = self.__stream__()

    async def __anext__(self) -> _T:
        return await self._iterator.__anext__()

    async def __aiter__(self) -> AsyncIterator[_T]:
        async for item in self._iterator:
            yield item

    async def _iter_events(self) -> AsyncIterator[ServerSentEvent]:
        async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
            yield sse

    async def __stream__(self) -> AsyncIterator[_T]:
        cast_to = cast(Any, self._cast_to)
        response = self.response
        process_data = self._client._process_response_data
        iterator = self._iter_events()

        try:
            async for sse in iterator:
                if sse.event == "completion":
                    yield process_data(data=sse.json(), cast_to=cast_to, response=response)

                if (
                    sse.event == "message_start"
                    or sse.event == "message_delta"
                    or sse.event == "message_stop"
                    or sse.event == "content_block_start"
                    or sse.event == "content_block_delta"
                    or sse.event == "content_block_stop"
                ):
                    data = sse.json()
                    if is_dict(data) and "type" not in data:
                        data["type"] = sse.event

                    yield process_data(data=data, cast_to=cast_to, response=response)

                if sse.event == "ping":
                    continue

                if sse.event == "error":
                    body = sse.data

                    try:
                        body = sse.json()
                        err_msg = f"{body}"
                    except Exception:
                        err_msg = sse.data or f"Error code: {response.status_code}"

                    raise self._client._make_status_error(
                        err_msg,
                        body=body,
                        response=self.response,
                    )
        finally:
            # Ensure the response is closed even if the consumer doesn't read all data
            await response.aclose()

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(

Frequently Asked Questions

What is the AsyncStream class?
AsyncStream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/_streaming.py.
Where is AsyncStream defined?
AsyncStream is defined in src/anthropic/_streaming.py at line 161.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free