Home / Class/ AWSEventStreamDecoder Class — anthropic-sdk-python Architecture

AWSEventStreamDecoder Class — anthropic-sdk-python Architecture

Architecture documentation for the AWSEventStreamDecoder class in _stream_decoder.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  582aef90_700e_ea04_755a_159b23a192d2["AWSEventStreamDecoder"]
  91a12d3e_6b2a_90bd_28df_074cc1d1ad53["_stream_decoder.py"]
  582aef90_700e_ea04_755a_159b23a192d2 -->|defined in| 91a12d3e_6b2a_90bd_28df_074cc1d1ad53
  e02089ea_7771_8e73_0416_0324f6289312["__init__()"]
  582aef90_700e_ea04_755a_159b23a192d2 -->|method| e02089ea_7771_8e73_0416_0324f6289312
  b59cc883_d079_ab3c_26db_78fb47b9bde5["iter_bytes()"]
  582aef90_700e_ea04_755a_159b23a192d2 -->|method| b59cc883_d079_ab3c_26db_78fb47b9bde5
  6c6b9845_9f62_9ee7_a980_1bf61dfd29ec["aiter_bytes()"]
  582aef90_700e_ea04_755a_159b23a192d2 -->|method| 6c6b9845_9f62_9ee7_a980_1bf61dfd29ec
  21cf148e_e403_27d0_043f_c464fcb60144["_parse_message_from_event()"]
  582aef90_700e_ea04_755a_159b23a192d2 -->|method| 21cf148e_e403_27d0_043f_c464fcb60144

Relationship Graph

Source Code

src/anthropic/lib/bedrock/_stream_decoder.py lines 24–64

class AWSEventStreamDecoder:
    def __init__(self) -> None:
        from botocore.parsers import EventStreamJSONParser

        self.parser = EventStreamJSONParser()

    def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
        """Given an iterator that yields lines, iterate over it & yield every event encountered"""
        from botocore.eventstream import EventStreamBuffer

        event_stream_buffer = EventStreamBuffer()
        for chunk in iterator:
            event_stream_buffer.add_data(chunk)
            for event in event_stream_buffer:
                message = self._parse_message_from_event(event)
                if message:
                    yield ServerSentEvent(data=message, event="completion")

    async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
        """Given an async iterator that yields lines, iterate over it & yield every event encountered"""
        from botocore.eventstream import EventStreamBuffer

        event_stream_buffer = EventStreamBuffer()
        async for chunk in iterator:
            event_stream_buffer.add_data(chunk)
            for event in event_stream_buffer:
                message = self._parse_message_from_event(event)
                if message:
                    yield ServerSentEvent(data=message, event="completion")

    def _parse_message_from_event(self, event: EventStreamMessage) -> str | None:
        response_dict = event.to_response_dict()
        parsed_response = self.parser.parse(response_dict, get_response_stream_shape())
        if response_dict["status_code"] != 200:
            raise ValueError(f"Bad response code, expected 200: {response_dict}")

        chunk = parsed_response.get("chunk")
        if not chunk:
            return None

        return chunk.get("bytes").decode()  # type: ignore[no-any-return]

Frequently Asked Questions

What is the AWSEventStreamDecoder class?
AWSEventStreamDecoder is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/bedrock/_stream_decoder.py.
Where is AWSEventStreamDecoder defined?
AWSEventStreamDecoder is defined in src/anthropic/lib/bedrock/_stream_decoder.py at line 24.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free