Home / Class/ Stream Class — anthropic-sdk-python Architecture

Stream Class — anthropic-sdk-python Architecture

Architecture documentation for the Stream class in _streaming.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  3c15a36b_3b82_93a8_fe82_3d955b1934ca["Stream"]
  8d141d22_ab1c_b4a1_744c_99e460d07454["_streaming.py"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|defined in| 8d141d22_ab1c_b4a1_744c_99e460d07454
  48cdbfe4_6f1c_66fa_fd88_70c5081e2a3b["__init__()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| 48cdbfe4_6f1c_66fa_fd88_70c5081e2a3b
  a4220068_fb96_192d_ace6_6b9448f462fd["__next__()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| a4220068_fb96_192d_ace6_6b9448f462fd
  1b49bd56_a40d_07ca_f38f_9c2784da9052["__iter__()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| 1b49bd56_a40d_07ca_f38f_9c2784da9052
  945cdcce_6566_7ff1_9ea3_601246f05c72["_iter_events()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| 945cdcce_6566_7ff1_9ea3_601246f05c72
  e1c1dc57_89ae_cea1_a4a6_084c63e1e1fa["__stream__()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| e1c1dc57_89ae_cea1_a4a6_084c63e1e1fa
  09bc7889_45b5_7fe5_97ff_6e5a7d124eb8["__enter__()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| 09bc7889_45b5_7fe5_97ff_6e5a7d124eb8
  969e9f26_ae95_4fc0_cc9f_144e58839977["__exit__()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| 969e9f26_ae95_4fc0_cc9f_144e58839977
  ae6ec5e9_59f7_69b5_d385_025941be2db1["close()"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca -->|method| ae6ec5e9_59f7_69b5_d385_025941be2db1

Relationship Graph

Source Code

src/anthropic/_streaming.py lines 44–137

class Stream(Generic[_T], metaclass=_SyncStreamMeta):
    """Provides the core interface to iterate over a synchronous stream response."""

    response: httpx.Response

    _decoder: SSEBytesDecoder

    def __init__(
        self,
        *,
        cast_to: type[_T],
        response: httpx.Response,
        client: Anthropic,
    ) -> None:
        self.response = response
        self._cast_to = cast_to
        self._client = client
        self._decoder = client._make_sse_decoder()
        self._iterator = self.__stream__()

    def __next__(self) -> _T:
        return self._iterator.__next__()

    def __iter__(self) -> Iterator[_T]:
        for item in self._iterator:
            yield item

    def _iter_events(self) -> Iterator[ServerSentEvent]:
        yield from self._decoder.iter_bytes(self.response.iter_bytes())

    def __stream__(self) -> Iterator[_T]:
        cast_to = cast(Any, self._cast_to)
        response = self.response
        process_data = self._client._process_response_data
        iterator = self._iter_events()

        try:
            for sse in iterator:
                if sse.event == "completion":
                    yield process_data(data=sse.json(), cast_to=cast_to, response=response)

                if (
                    sse.event == "message_start"
                    or sse.event == "message_delta"
                    or sse.event == "message_stop"
                    or sse.event == "content_block_start"
                    or sse.event == "content_block_delta"
                    or sse.event == "content_block_stop"
                ):
                    data = sse.json()
                    if is_dict(data) and "type" not in data:
                        data["type"] = sse.event

                    yield process_data(data=data, cast_to=cast_to, response=response)

                if sse.event == "ping":
                    continue

                if sse.event == "error":
                    body = sse.data

                    try:
                        body = sse.json()
                        err_msg = f"{body}"
                    except Exception:
                        err_msg = sse.data or f"Error code: {response.status_code}"

                    raise self._client._make_status_error(
                        err_msg,
                        body=body,
                        response=self.response,
                    )
        finally:
            # Ensure the response is closed even if the consumer doesn't read all data
            response.close()

    def __enter__(self) -> Self:
        return self

    def __exit__(
        self,

Frequently Asked Questions

What is the Stream class?
Stream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/_streaming.py.
Where is Stream defined?
Stream is defined in src/anthropic/_streaming.py at line 44.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free