BetaMessageStreamManager Class — anthropic-sdk-python Architecture
Architecture documentation for the BetaMessageStreamManager class in _beta_messages.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD a62f3265_03b5_e749_cad9_1e1f84a9df52["BetaMessageStreamManager"] 1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"] a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272 b9a413c4_a8a9_3531_cd9d_fa3e1ed14a47["__init__()"] a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|method| b9a413c4_a8a9_3531_cd9d_fa3e1ed14a47 84b5fc60_7df6_aea4_1cc8_7ec14ec061ca["__enter__()"] a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|method| 84b5fc60_7df6_aea4_1cc8_7ec14ec061ca 022770a7_09be_44a2_c8a9_056d4a3a749e["__exit__()"] a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|method| 022770a7_09be_44a2_c8a9_056d4a3a749e
Relationship Graph
Source Code
src/anthropic/lib/streaming/_beta_messages.py lines 150–182
class BetaMessageStreamManager(Generic[ResponseFormatT]):
"""Wrapper over MessageStream that is returned by `.stream()`.
```py
with client.beta.messages.stream(...) as stream:
for chunk in stream:
...
```
"""
def __init__(
self,
api_request: Callable[[], Stream[BetaRawMessageStreamEvent]],
*,
output_format: ResponseFormatT | NotGiven,
) -> None:
self.__stream: BetaMessageStream[ResponseFormatT] | None = None
self.__api_request = api_request
self.__output_format = output_format
def __enter__(self) -> BetaMessageStream[ResponseFormatT]:
raw_stream = self.__api_request()
self.__stream = BetaMessageStream(raw_stream, output_format=self.__output_format)
return self.__stream
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
if self.__stream is not None:
self.__stream.close()
Domain
Source
Frequently Asked Questions
What is the BetaMessageStreamManager class?
BetaMessageStreamManager is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is BetaMessageStreamManager defined?
BetaMessageStreamManager is defined in src/anthropic/lib/streaming/_beta_messages.py at line 150.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free