Home / Class/ BetaMessageStreamManager Class — anthropic-sdk-python Architecture

BetaMessageStreamManager Class — anthropic-sdk-python Architecture

Architecture documentation for the BetaMessageStreamManager class in _beta_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  a62f3265_03b5_e749_cad9_1e1f84a9df52["BetaMessageStreamManager"]
  1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"]
  a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272
  b9a413c4_a8a9_3531_cd9d_fa3e1ed14a47["__init__()"]
  a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|method| b9a413c4_a8a9_3531_cd9d_fa3e1ed14a47
  84b5fc60_7df6_aea4_1cc8_7ec14ec061ca["__enter__()"]
  a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|method| 84b5fc60_7df6_aea4_1cc8_7ec14ec061ca
  022770a7_09be_44a2_c8a9_056d4a3a749e["__exit__()"]
  a62f3265_03b5_e749_cad9_1e1f84a9df52 -->|method| 022770a7_09be_44a2_c8a9_056d4a3a749e

Relationship Graph

Source Code

src/anthropic/lib/streaming/_beta_messages.py lines 150–182

class BetaMessageStreamManager(Generic[ResponseFormatT]):
    """Wrapper over MessageStream that is returned by `.stream()`.

    ```py
    with client.beta.messages.stream(...) as stream:
        for chunk in stream:
            ...
    ```
    """

    def __init__(
        self,
        api_request: Callable[[], Stream[BetaRawMessageStreamEvent]],
        *,
        output_format: ResponseFormatT | NotGiven,
    ) -> None:
        self.__stream: BetaMessageStream[ResponseFormatT] | None = None
        self.__api_request = api_request
        self.__output_format = output_format

    def __enter__(self) -> BetaMessageStream[ResponseFormatT]:
        raw_stream = self.__api_request()
        self.__stream = BetaMessageStream(raw_stream, output_format=self.__output_format)
        return self.__stream

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        if self.__stream is not None:
            self.__stream.close()

Frequently Asked Questions

What is the BetaMessageStreamManager class?
BetaMessageStreamManager is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is BetaMessageStreamManager defined?
BetaMessageStreamManager is defined in src/anthropic/lib/streaming/_beta_messages.py at line 150.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free