Home / Class/ BetaAsyncMessageStreamManager Class — anthropic-sdk-python Architecture

BetaAsyncMessageStreamManager Class — anthropic-sdk-python Architecture

Architecture documentation for the BetaAsyncMessageStreamManager class in _beta_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  a1ba5622_543f_3582_7f07_129c43239db4["BetaAsyncMessageStreamManager"]
  1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"]
  a1ba5622_543f_3582_7f07_129c43239db4 -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272
  34d10ab5_4c1a_0812_97ea_29a0aace302b["__init__()"]
  a1ba5622_543f_3582_7f07_129c43239db4 -->|method| 34d10ab5_4c1a_0812_97ea_29a0aace302b
  8f47e3de_6f22_6c79_e346_730c29f90708["__aenter__()"]
  a1ba5622_543f_3582_7f07_129c43239db4 -->|method| 8f47e3de_6f22_6c79_e346_730c29f90708
  6b148dd1_c821_c1d9_619b_8727da38e1f7["__aexit__()"]
  a1ba5622_543f_3582_7f07_129c43239db4 -->|method| 6b148dd1_c821_c1d9_619b_8727da38e1f7

Relationship Graph

Source Code

src/anthropic/lib/streaming/_beta_messages.py lines 299–333

class BetaAsyncMessageStreamManager(Generic[ResponseFormatT]):
    """Wrapper over BetaAsyncMessageStream that is returned by `.stream()`
    so that an async context manager can be used without `await`ing the
    original client call.

    ```py
    async with client.beta.messages.stream(...) as stream:
        async for chunk in stream:
            ...
    ```
    """

    def __init__(
        self,
        api_request: Awaitable[AsyncStream[BetaRawMessageStreamEvent]],
        *,
        output_format: ResponseFormatT | NotGiven = NOT_GIVEN,
    ) -> None:
        self.__stream: BetaAsyncMessageStream[ResponseFormatT] | None = None
        self.__api_request = api_request
        self.__output_format = output_format

    async def __aenter__(self) -> BetaAsyncMessageStream[ResponseFormatT]:
        raw_stream = await self.__api_request
        self.__stream = BetaAsyncMessageStream(raw_stream, output_format=self.__output_format)
        return self.__stream

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        if self.__stream is not None:
            await self.__stream.close()

Frequently Asked Questions

What is the BetaAsyncMessageStreamManager class?
BetaAsyncMessageStreamManager is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is BetaAsyncMessageStreamManager defined?
BetaAsyncMessageStreamManager is defined in src/anthropic/lib/streaming/_beta_messages.py at line 299.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free