Home / Class/ SSEDecoder Class — anthropic-sdk-python Architecture

SSEDecoder Class — anthropic-sdk-python Architecture

Architecture documentation for the SSEDecoder class in _streaming.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  f51bd194_1989_e99a_c3a5_5c461e19050e["SSEDecoder"]
  8d141d22_ab1c_b4a1_744c_99e460d07454["_streaming.py"]
  f51bd194_1989_e99a_c3a5_5c461e19050e -->|defined in| 8d141d22_ab1c_b4a1_744c_99e460d07454
  301683bd_1e47_6f1e_9a66_283c09fb8cb1["__init__()"]
  f51bd194_1989_e99a_c3a5_5c461e19050e -->|method| 301683bd_1e47_6f1e_9a66_283c09fb8cb1
  9f55bd15_7a95_ba28_c590_203ea0be62b2["iter_bytes()"]
  f51bd194_1989_e99a_c3a5_5c461e19050e -->|method| 9f55bd15_7a95_ba28_c590_203ea0be62b2
  1ee4fd43_58e1_ba10_7366_ee4c34d3cf45["_iter_chunks()"]
  f51bd194_1989_e99a_c3a5_5c461e19050e -->|method| 1ee4fd43_58e1_ba10_7366_ee4c34d3cf45
  484b3879_bf87_4601_d355_eb6e18237685["aiter_bytes()"]
  f51bd194_1989_e99a_c3a5_5c461e19050e -->|method| 484b3879_bf87_4601_d355_eb6e18237685
  b961b18a_baa9_de62_39dc_b7271a4ee103["_aiter_chunks()"]
  f51bd194_1989_e99a_c3a5_5c461e19050e -->|method| b961b18a_baa9_de62_39dc_b7271a4ee103
  3f6e6755_7dcb_24fa_f03c_7576f9854d91["decode()"]
  f51bd194_1989_e99a_c3a5_5c461e19050e -->|method| 3f6e6755_7dcb_24fa_f03c_7576f9854d91

Relationship Graph

Source Code

src/anthropic/_streaming.py lines 299–401

class SSEDecoder:
    _data: list[str]
    _event: str | None
    _retry: int | None
    _last_event_id: str | None

    def __init__(self) -> None:
        self._event = None
        self._data = []
        self._last_event_id = None
        self._retry = None

    def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
        """Given an iterator that yields raw binary data, iterate over it & yield every event encountered"""
        for chunk in self._iter_chunks(iterator):
            # Split before decoding so splitlines() only uses \r and \n
            for raw_line in chunk.splitlines():
                line = raw_line.decode("utf-8")
                sse = self.decode(line)
                if sse:
                    yield sse

    def _iter_chunks(self, iterator: Iterator[bytes]) -> Iterator[bytes]:
        """Given an iterator that yields raw binary data, iterate over it and yield individual SSE chunks"""
        data = b""
        for chunk in iterator:
            for line in chunk.splitlines(keepends=True):
                data += line
                if data.endswith((b"\r\r", b"\n\n", b"\r\n\r\n")):
                    yield data
                    data = b""
        if data:
            yield data

    async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
        """Given an iterator that yields raw binary data, iterate over it & yield every event encountered"""
        async for chunk in self._aiter_chunks(iterator):
            # Split before decoding so splitlines() only uses \r and \n
            for raw_line in chunk.splitlines():
                line = raw_line.decode("utf-8")
                sse = self.decode(line)
                if sse:
                    yield sse

    async def _aiter_chunks(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[bytes]:
        """Given an iterator that yields raw binary data, iterate over it and yield individual SSE chunks"""
        data = b""
        async for chunk in iterator:
            for line in chunk.splitlines(keepends=True):
                data += line
                if data.endswith((b"\r\r", b"\n\n", b"\r\n\r\n")):
                    yield data
                    data = b""
        if data:
            yield data

    def decode(self, line: str) -> ServerSentEvent | None:
        # See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation  # noqa: E501

        if not line:
            if not self._event and not self._data and not self._last_event_id and self._retry is None:
                return None

            sse = ServerSentEvent(
                event=self._event,
                data="\n".join(self._data),
                id=self._last_event_id,
                retry=self._retry,
            )

            # NOTE: as per the SSE spec, do not reset last_event_id.
            self._event = None
            self._data = []
            self._retry = None

            return sse

        if line.startswith(":"):
            return None

        fieldname, _, value = line.partition(":")

Frequently Asked Questions

What is the SSEDecoder class?
SSEDecoder is a class in the anthropic-sdk-python codebase, defined in src/anthropic/_streaming.py.
Where is SSEDecoder defined?
SSEDecoder is defined in src/anthropic/_streaming.py at line 299.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free