Home / Class/ BetaAsyncMessageStream Class — anthropic-sdk-python Architecture

BetaAsyncMessageStream Class — anthropic-sdk-python Architecture

Architecture documentation for the BetaAsyncMessageStream class in _beta_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  8e652b46_6f9a_daae_5db8_1aa0b9386626["BetaAsyncMessageStream"]
  1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272
  bdc25a5b_5970_5914_7e8c_0053757cd026["__init__()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| bdc25a5b_5970_5914_7e8c_0053757cd026
  58f8077a_d7f5_02d9_77c0_ec620599179d["response()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 58f8077a_d7f5_02d9_77c0_ec620599179d
  0fe10f23_05f2_5442_b822_7854ab013c92["request_id()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 0fe10f23_05f2_5442_b822_7854ab013c92
  3dc19492_a042_ea5f_fb0d_ed2f05be7ce0["__anext__()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 3dc19492_a042_ea5f_fb0d_ed2f05be7ce0
  0cfb575d_6c72_1660_f747_9d4a215eca27["__aiter__()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 0cfb575d_6c72_1660_f747_9d4a215eca27
  15910c01_e90c_5f88_e641_92e15cddef0e["__aenter__()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 15910c01_e90c_5f88_e641_92e15cddef0e
  1044cd7b_89c9_dbc5_8621_552b5d8e3bc9["__aexit__()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 1044cd7b_89c9_dbc5_8621_552b5d8e3bc9
  5bff697f_abb4_8e35_16b7_0a9a63f52fac["close()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 5bff697f_abb4_8e35_16b7_0a9a63f52fac
  55b6abea_0990_ff98_54bd_a0fa130e0892["get_final_message()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 55b6abea_0990_ff98_54bd_a0fa130e0892
  f9a35b6c_3f14_3846_ea2f_b2a1210dd93b["get_final_text()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| f9a35b6c_3f14_3846_ea2f_b2a1210dd93b
  1b280af0_fc5d_c3ea_3f9e_afe795826544["until_done()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| 1b280af0_fc5d_c3ea_3f9e_afe795826544
  e7965791_ad1d_2e7d_4140_e2bc3b728085["current_message_snapshot()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| e7965791_ad1d_2e7d_4140_e2bc3b728085
  c715fa82_5e69_b855_ef0e_47e7f518ade0["__stream__()"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626 -->|method| c715fa82_5e69_b855_ef0e_47e7f518ade0

Relationship Graph

Source Code

src/anthropic/lib/streaming/_beta_messages.py lines 185–296

class BetaAsyncMessageStream(Generic[ResponseFormatT]):
    text_stream: AsyncIterator[str]
    """Async iterator over just the text deltas in the stream.

    ```py
    async for text in stream.text_stream:
        print(text, end="", flush=True)
    print()
    ```
    """

    def __init__(
        self,
        raw_stream: AsyncStream[BetaRawMessageStreamEvent],
        output_format: ResponseFormatT | NotGiven,
    ) -> None:
        self._raw_stream = raw_stream
        self.text_stream = self.__stream_text__()
        self._iterator = self.__stream__()
        self.__final_message_snapshot: ParsedBetaMessage[ResponseFormatT] | None = None
        self.__output_format = output_format

    @property
    def response(self) -> httpx.Response:
        return self._raw_stream.response

    @property
    def request_id(self) -> str | None:
        return self.response.headers.get("request-id")  # type: ignore[no-any-return]

    async def __anext__(self) -> ParsedBetaMessageStreamEvent[ResponseFormatT]:
        return await self._iterator.__anext__()

    async def __aiter__(self) -> AsyncIterator[ParsedBetaMessageStreamEvent[ResponseFormatT]]:
        async for item in self._iterator:
            yield item

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.close()

    async def close(self) -> None:
        """
        Close the response and release the connection.

        Automatically called if the response body is read to completion.
        """
        await self._raw_stream.close()

    async def get_final_message(self) -> ParsedBetaMessage[ResponseFormatT]:
        """Waits until the stream has been read to completion and returns
        the accumulated `Message` object.
        """
        await self.until_done()
        assert self.__final_message_snapshot is not None
        return self.__final_message_snapshot

    async def get_final_text(self) -> str:
        """Returns all `text` content blocks concatenated together.

        > [!NOTE]
        > Currently the API will only respond with a single content block.

        Will raise an error if no `text` content blocks were returned.
        """
        message = await self.get_final_message()
        text_blocks: list[str] = []
        for block in message.content:
            if block.type == "text":
                text_blocks.append(block.text)

        if not text_blocks:
            raise RuntimeError(
                f".get_final_text() can only be called when the API returns a `text` content block.\nThe API returned {','.join([b.type for b in message.content])} content block type(s) that you can access by calling get_final_message().content"

Frequently Asked Questions

What is the BetaAsyncMessageStream class?
BetaAsyncMessageStream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is BetaAsyncMessageStream defined?
BetaAsyncMessageStream is defined in src/anthropic/lib/streaming/_beta_messages.py at line 185.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free