_stream_decoder.py — anthropic-sdk-python Source File
Architecture documentation for _stream_decoder.py, a python file in the anthropic-sdk-python codebase. 7 imports, 2 dependents.
Entity Profile
Dependency Diagram
graph LR 91a12d3e_6b2a_90bd_28df_074cc1d1ad53["_stream_decoder.py"] da7ce3bd_4934_ff39_8877_5ba81a168358["_utils"] 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> da7ce3bd_4934_ff39_8877_5ba81a168358 597ca033_023a_d6d2_0dc5_e7dacc60a056["_streaming"] 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 597ca033_023a_d6d2_0dc5_e7dacc60a056 89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875["typing"] 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875 8d3edde3_6108_07a1_1819_62277f20ee9e["botocore.model"] 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 8d3edde3_6108_07a1_1819_62277f20ee9e 9afd82dd_2375_6a0a_1b78_afc46006d48c["botocore.eventstream"] 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 9afd82dd_2375_6a0a_1b78_afc46006d48c da224897_45aa_a14e_81ca_39e8e3a2eec6["botocore.loaders"] 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> da224897_45aa_a14e_81ca_39e8e3a2eec6 96387cf0_aea3_6014_63fa_94d98085ba2e["botocore.parsers"] 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 --> 96387cf0_aea3_6014_63fa_94d98085ba2e cbdd42fd_75f9_7cc6_3f3b_b8d939e4c157["_client.py"] cbdd42fd_75f9_7cc6_3f3b_b8d939e4c157 --> 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 43a93153_bd96_06f2_9418_72daf6b4ca3a["_stream.py"] 43a93153_bd96_06f2_9418_72daf6b4ca3a --> 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 style 91a12d3e_6b2a_90bd_28df_074cc1d1ad53 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
from __future__ import annotations
from typing import TYPE_CHECKING, Iterator, AsyncIterator
from ..._utils import lru_cache
from ..._streaming import ServerSentEvent
if TYPE_CHECKING:
from botocore.model import Shape
from botocore.eventstream import EventStreamMessage
@lru_cache(maxsize=None)
def get_response_stream_shape() -> Shape:
from botocore.model import ServiceModel
from botocore.loaders import Loader
loader = Loader()
bedrock_service_dict = loader.load_service_model("bedrock-runtime", "service-2")
bedrock_service_model = ServiceModel(bedrock_service_dict)
return bedrock_service_model.shape_for("ResponseStream")
class AWSEventStreamDecoder:
def __init__(self) -> None:
from botocore.parsers import EventStreamJSONParser
self.parser = EventStreamJSONParser()
def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
"""Given an iterator that yields lines, iterate over it & yield every event encountered"""
from botocore.eventstream import EventStreamBuffer
event_stream_buffer = EventStreamBuffer()
for chunk in iterator:
event_stream_buffer.add_data(chunk)
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
"""Given an async iterator that yields lines, iterate over it & yield every event encountered"""
from botocore.eventstream import EventStreamBuffer
event_stream_buffer = EventStreamBuffer()
async for chunk in iterator:
event_stream_buffer.add_data(chunk)
for event in event_stream_buffer:
message = self._parse_message_from_event(event)
if message:
yield ServerSentEvent(data=message, event="completion")
def _parse_message_from_event(self, event: EventStreamMessage) -> str | None:
response_dict = event.to_response_dict()
parsed_response = self.parser.parse(response_dict, get_response_stream_shape())
if response_dict["status_code"] != 200:
raise ValueError(f"Bad response code, expected 200: {response_dict}")
chunk = parsed_response.get("chunk")
if not chunk:
return None
return chunk.get("bytes").decode() # type: ignore[no-any-return]
Domain
Subdomains
Functions
Classes
Dependencies
- _streaming
- _utils
- botocore.eventstream
- botocore.loaders
- botocore.model
- botocore.parsers
- typing
Source
Frequently Asked Questions
What does _stream_decoder.py do?
_stream_decoder.py is a source file in the anthropic-sdk-python codebase, written in python. It belongs to the AnthropicClient domain, AsyncAPI subdomain.
What functions are defined in _stream_decoder.py?
_stream_decoder.py defines 2 function(s): botocore, get_response_stream_shape.
What does _stream_decoder.py depend on?
_stream_decoder.py imports 7 module(s): _streaming, _utils, botocore.eventstream, botocore.loaders, botocore.model, botocore.parsers, typing.
What files import _stream_decoder.py?
_stream_decoder.py is imported by 2 file(s): _client.py, _stream.py.
Where is _stream_decoder.py in the architecture?
_stream_decoder.py is located at src/anthropic/lib/bedrock/_stream_decoder.py (domain: AnthropicClient, subdomain: AsyncAPI, directory: src/anthropic/lib/bedrock).
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free