Home / File/ _stream_decoder.py — anthropic-sdk-python Source File

_stream_decoder.py — anthropic-sdk-python Source File

Architecture documentation for _stream_decoder.py, a python file in the anthropic-sdk-python codebase. 7 imports, 2 dependents.

File python AnthropicClient AsyncAPI 7 imports 2 dependents 2 functions 1 classes

Entity Profile

Dependency Diagram

graph LR
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53["_stream_decoder.py"]
  da7ce3bd_4934_ff39_8877_5ba81a168358["_utils"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> da7ce3bd_4934_ff39_8877_5ba81a168358
  597ca033_023a_d6d2_0dc5_e7dacc60a056["_streaming"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 597ca033_023a_d6d2_0dc5_e7dacc60a056
  89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875["typing"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875
  8d3edde3_6108_07a1_1819_62277f20ee9e["botocore.model"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 8d3edde3_6108_07a1_1819_62277f20ee9e
  9afd82dd_2375_6a0a_1b78_afc46006d48c["botocore.eventstream"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 9afd82dd_2375_6a0a_1b78_afc46006d48c
  da224897_45aa_a14e_81ca_39e8e3a2eec6["botocore.loaders"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> da224897_45aa_a14e_81ca_39e8e3a2eec6
  96387cf0_aea3_6014_63fa_94d98085ba2e["botocore.parsers"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 96387cf0_aea3_6014_63fa_94d98085ba2e
  cbdd42fd_75f9_7cc6_3f3b_b8d939e4c157["_client.py"]
  cbdd42fd_75f9_7cc6_3f3b_b8d939e4c157 --> 91a12d3e_6b2a_90bd_28df_074cc1d1ad53
  43a93153_bd96_06f2_9418_72daf6b4ca3a["_stream.py"]
  43a93153_bd96_06f2_9418_72daf6b4ca3a --> 91a12d3e_6b2a_90bd_28df_074cc1d1ad53
  style 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

from __future__ import annotations

from typing import TYPE_CHECKING, Iterator, AsyncIterator

from ..._utils import lru_cache
from ..._streaming import ServerSentEvent

if TYPE_CHECKING:
    from botocore.model import Shape
    from botocore.eventstream import EventStreamMessage


@lru_cache(maxsize=None)
def get_response_stream_shape() -> Shape:
    from botocore.model import ServiceModel
    from botocore.loaders import Loader

    loader = Loader()
    bedrock_service_dict = loader.load_service_model("bedrock-runtime", "service-2")
    bedrock_service_model = ServiceModel(bedrock_service_dict)
    return bedrock_service_model.shape_for("ResponseStream")


class AWSEventStreamDecoder:
    def __init__(self) -> None:
        from botocore.parsers import EventStreamJSONParser

        self.parser = EventStreamJSONParser()

    def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
        """Given an iterator that yields lines, iterate over it & yield every event encountered"""
        from botocore.eventstream import EventStreamBuffer

        event_stream_buffer = EventStreamBuffer()
        for chunk in iterator:
            event_stream_buffer.add_data(chunk)
            for event in event_stream_buffer:
                message = self._parse_message_from_event(event)
                if message:
                    yield ServerSentEvent(data=message, event="completion")

    async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
        """Given an async iterator that yields lines, iterate over it & yield every event encountered"""
        from botocore.eventstream import EventStreamBuffer

        event_stream_buffer = EventStreamBuffer()
        async for chunk in iterator:
            event_stream_buffer.add_data(chunk)
            for event in event_stream_buffer:
                message = self._parse_message_from_event(event)
                if message:
                    yield ServerSentEvent(data=message, event="completion")

    def _parse_message_from_event(self, event: EventStreamMessage) -> str | None:
        response_dict = event.to_response_dict()
        parsed_response = self.parser.parse(response_dict, get_response_stream_shape())
        if response_dict["status_code"] != 200:
            raise ValueError(f"Bad response code, expected 200: {response_dict}")

        chunk = parsed_response.get("chunk")
        if not chunk:
            return None

        return chunk.get("bytes").decode()  # type: ignore[no-any-return]

Subdomains

Dependencies

  • _streaming
  • _utils
  • botocore.eventstream
  • botocore.loaders
  • botocore.model
  • botocore.parsers
  • typing

Frequently Asked Questions

What does _stream_decoder.py do?
_stream_decoder.py is a source file in the anthropic-sdk-python codebase, written in python. It belongs to the AnthropicClient domain, AsyncAPI subdomain.
What functions are defined in _stream_decoder.py?
_stream_decoder.py defines 2 function(s): botocore, get_response_stream_shape.
What does _stream_decoder.py depend on?
_stream_decoder.py imports 7 module(s): _streaming, _utils, botocore.eventstream, botocore.loaders, botocore.model, botocore.parsers, typing.
What files import _stream_decoder.py?
_stream_decoder.py is imported by 2 file(s): _client.py, _stream.py.
Where is _stream_decoder.py in the architecture?
_stream_decoder.py is located at src/anthropic/lib/bedrock/_stream_decoder.py (domain: AnthropicClient, subdomain: AsyncAPI, directory: src/anthropic/lib/bedrock).

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free