Home / Function/ decode() — anthropic-sdk-python Function Reference

decode() — anthropic-sdk-python Function Reference

Architecture documentation for the decode() function in _streaming.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  3f6e6755_7dcb_24fa_f03c_7576f9854d91["decode()"]
  f51bd194_1989_e99a_c3a5_5c461e19050e["SSEDecoder"]
  3f6e6755_7dcb_24fa_f03c_7576f9854d91 -->|defined in| f51bd194_1989_e99a_c3a5_5c461e19050e
  9f55bd15_7a95_ba28_c590_203ea0be62b2["iter_bytes()"]
  9f55bd15_7a95_ba28_c590_203ea0be62b2 -->|calls| 3f6e6755_7dcb_24fa_f03c_7576f9854d91
  484b3879_bf87_4601_d355_eb6e18237685["aiter_bytes()"]
  484b3879_bf87_4601_d355_eb6e18237685 -->|calls| 3f6e6755_7dcb_24fa_f03c_7576f9854d91
  style 3f6e6755_7dcb_24fa_f03c_7576f9854d91 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

src/anthropic/_streaming.py lines 355–401

    def decode(self, line: str) -> ServerSentEvent | None:
        # See: https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation  # noqa: E501

        if not line:
            if not self._event and not self._data and not self._last_event_id and self._retry is None:
                return None

            sse = ServerSentEvent(
                event=self._event,
                data="\n".join(self._data),
                id=self._last_event_id,
                retry=self._retry,
            )

            # NOTE: as per the SSE spec, do not reset last_event_id.
            self._event = None
            self._data = []
            self._retry = None

            return sse

        if line.startswith(":"):
            return None

        fieldname, _, value = line.partition(":")

        if value.startswith(" "):
            value = value[1:]

        if fieldname == "event":
            self._event = value
        elif fieldname == "data":
            self._data.append(value)
        elif fieldname == "id":
            if "\0" in value:
                pass
            else:
                self._last_event_id = value
        elif fieldname == "retry":
            try:
                self._retry = int(value)
            except (TypeError, ValueError):
                pass
        else:
            pass  # Field is ignored.

        return None

Subdomains

Frequently Asked Questions

What does decode() do?
decode() is a function in the anthropic-sdk-python codebase, defined in src/anthropic/_streaming.py.
Where is decode() defined?
decode() is defined in src/anthropic/_streaming.py at line 355.
What calls decode()?
decode() is called by 2 function(s): aiter_bytes, iter_bytes.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free