Home / Class/ BetaMessageStream Class — anthropic-sdk-python Architecture

BetaMessageStream Class — anthropic-sdk-python Architecture

Architecture documentation for the BetaMessageStream class in _beta_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b["BetaMessageStream"]
  1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272
  8354232f_496e_adc7_3a32_200b3f90d24d["__init__()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 8354232f_496e_adc7_3a32_200b3f90d24d
  5598d7fb_f7d6_d22c_b0c1_662e5ec0cd08["response()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 5598d7fb_f7d6_d22c_b0c1_662e5ec0cd08
  8dcac9c5_ba6a_bbaa_7e41_580c92805b3c["request_id()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 8dcac9c5_ba6a_bbaa_7e41_580c92805b3c
  c41f9c92_f773_b6f1_1c2a_90182fc24dbf["__next__()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| c41f9c92_f773_b6f1_1c2a_90182fc24dbf
  0657e922_67a2_4a7a_e1f5_5fccd2bf0552["__iter__()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 0657e922_67a2_4a7a_e1f5_5fccd2bf0552
  b12f636a_5876_4c42_1b3c_3a131b500093["__enter__()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| b12f636a_5876_4c42_1b3c_3a131b500093
  cec6357f_9d99_49da_edeb_dead71ddbfac["__exit__()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| cec6357f_9d99_49da_edeb_dead71ddbfac
  ebad253b_be4f_1b54_b2e3_1949e019e1a6["close()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| ebad253b_be4f_1b54_b2e3_1949e019e1a6
  a493aa49_e2c3_381a_28e6_6809172e402a["get_final_message()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| a493aa49_e2c3_381a_28e6_6809172e402a
  024c5edb_9d15_f7bc_2a82_77f22e1d8838["get_final_text()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 024c5edb_9d15_f7bc_2a82_77f22e1d8838
  9137206e_3937_7fd5_2948_649e0b33112b["until_done()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 9137206e_3937_7fd5_2948_649e0b33112b
  ee335547_faa5_16d6_adb9_34e52cda09ce["current_message_snapshot()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| ee335547_faa5_16d6_adb9_34e52cda09ce
  0e363261_728a_0dbe_df13_19e318699314["__stream__()"]
  fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 0e363261_728a_0dbe_df13_19e318699314

Relationship Graph

Source Code

src/anthropic/lib/streaming/_beta_messages.py lines 36–147

class BetaMessageStream(Generic[ResponseFormatT]):
    text_stream: Iterator[str]
    """Iterator over just the text deltas in the stream.

    ```py
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()
    ```
    """

    def __init__(
        self,
        raw_stream: Stream[BetaRawMessageStreamEvent],
        output_format: ResponseFormatT | NotGiven,
    ) -> None:
        self._raw_stream = raw_stream
        self.text_stream = self.__stream_text__()
        self._iterator = self.__stream__()
        self.__final_message_snapshot: ParsedBetaMessage[ResponseFormatT] | None = None
        self.__output_format = output_format

    @property
    def response(self) -> httpx.Response:
        return self._raw_stream.response

    @property
    def request_id(self) -> str | None:
        return self.response.headers.get("request-id")  # type: ignore[no-any-return]

    def __next__(self) -> ParsedBetaMessageStreamEvent[ResponseFormatT]:
        return self._iterator.__next__()

    def __iter__(self) -> Iterator[ParsedBetaMessageStreamEvent[ResponseFormatT]]:
        for item in self._iterator:
            yield item

    def __enter__(self) -> Self:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        self.close()

    def close(self) -> None:
        """
        Close the response and release the connection.

        Automatically called if the response body is read to completion.
        """
        self._raw_stream.close()

    def get_final_message(self) -> ParsedBetaMessage[ResponseFormatT]:
        """Waits until the stream has been read to completion and returns
        the accumulated `Message` object.
        """
        self.until_done()
        assert self.__final_message_snapshot is not None
        return self.__final_message_snapshot

    def get_final_text(self) -> str:
        """Returns all `text` content blocks concatenated together.

        > [!NOTE]
        > Currently the API will only respond with a single content block.

        Will raise an error if no `text` content blocks were returned.
        """
        message = self.get_final_message()
        text_blocks: list[str] = []
        for block in message.content:
            if block.type == "text":
                text_blocks.append(block.text)

        if not text_blocks:
            raise RuntimeError(
                f".get_final_text() can only be called when the API returns a `text` content block.\nThe API returned {','.join([b.type for b in message.content])} content block type(s) that you can access by calling get_final_message().content"

Frequently Asked Questions

What is the BetaMessageStream class?
BetaMessageStream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is BetaMessageStream defined?
BetaMessageStream is defined in src/anthropic/lib/streaming/_beta_messages.py at line 36.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free