Home / Class/ MessageStream Class — anthropic-sdk-python Architecture

MessageStream Class — anthropic-sdk-python Architecture

Architecture documentation for the MessageStream class in _messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  e621e870_ca7f_408e_c8bb_3d408ede66a4["MessageStream"]
  738ef0a0_4e16_ebf0_9c79_a17daa58ddef["_messages.py"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|defined in| 738ef0a0_4e16_ebf0_9c79_a17daa58ddef
  a0f472c2_f937_ec2c_1348_4b9056c82e3d["__init__()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| a0f472c2_f937_ec2c_1348_4b9056c82e3d
  056a18bd_ab71_8ac8_b5b1_250b10eac310["response()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 056a18bd_ab71_8ac8_b5b1_250b10eac310
  4e401905_dfa7_4197_afed_a62322eef9e6["request_id()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 4e401905_dfa7_4197_afed_a62322eef9e6
  5c3a2b33_d013_657e_5a96_174a03fe5e4e["__next__()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 5c3a2b33_d013_657e_5a96_174a03fe5e4e
  7d0cdda6_d9e6_0ebb_6ec5_d324350b98f7["__iter__()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 7d0cdda6_d9e6_0ebb_6ec5_d324350b98f7
  333dbaf8_f28f_4840_2555_ff42abd8b24d["__enter__()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 333dbaf8_f28f_4840_2555_ff42abd8b24d
  89eeec62_b34b_6669_ea96_d4cf2cb99af8["__exit__()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 89eeec62_b34b_6669_ea96_d4cf2cb99af8
  804494a0_9cfb_3f57_2cc6_c01a157782f6["close()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 804494a0_9cfb_3f57_2cc6_c01a157782f6
  f7d7eda3_f48d_9a94_ee86_5e96ba366fee["get_final_message()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| f7d7eda3_f48d_9a94_ee86_5e96ba366fee
  1bc68915_5733_1542_a79b_791f2a5ac05e["get_final_text()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| 1bc68915_5733_1542_a79b_791f2a5ac05e
  a83262c8_25cd_2a5b_a4bd_500bdcf5b6c5["until_done()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| a83262c8_25cd_2a5b_a4bd_500bdcf5b6c5
  d0f3c7f3_9697_e16c_d229_25799a6b7b46["current_message_snapshot()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| d0f3c7f3_9697_e16c_d229_25799a6b7b46
  ba051e44_a190_9141_551c_ce3d77cfeefc["__stream__()"]
  e621e870_ca7f_408e_c8bb_3d408ede66a4 -->|method| ba051e44_a190_9141_551c_ce3d77cfeefc

Relationship Graph

Source Code

src/anthropic/lib/streaming/_messages.py lines 33–143

class MessageStream(Generic[ResponseFormatT]):
    text_stream: Iterator[str]
    """Iterator over just the text deltas in the stream.

    ```py
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()
    ```
    """

    def __init__(
        self,
        raw_stream: Stream[RawMessageStreamEvent],
        output_format: ResponseFormatT | NotGiven,
    ) -> None:
        self._raw_stream = raw_stream
        self.text_stream = self.__stream_text__()
        self._iterator = self.__stream__()
        self.__final_message_snapshot: ParsedMessage[ResponseFormatT] | None = None
        self.__output_format = output_format

    @property
    def response(self) -> httpx.Response:
        return self._raw_stream.response

    @property
    def request_id(self) -> str | None:
        return self.response.headers.get("request-id")  # type: ignore[no-any-return]

    def __next__(self) -> ParsedMessageStreamEvent[ResponseFormatT]:
        return self._iterator.__next__()

    def __iter__(self) -> Iterator[ParsedMessageStreamEvent[ResponseFormatT]]:
        for item in self._iterator:
            yield item

    def __enter__(self) -> Self:
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        self.close()

    def close(self) -> None:
        """
        Close the response and release the connection.

        Automatically called if the response body is read to completion.
        """
        self._raw_stream.close()

    def get_final_message(self) -> ParsedMessage[ResponseFormatT]:
        """Waits until the stream has been read to completion and returns
        the accumulated `Message` object.
        """
        self.until_done()
        assert self.__final_message_snapshot is not None
        return self.__final_message_snapshot

    def get_final_text(self) -> str:
        """Returns all `text` content blocks concatenated together.

        > [!NOTE]
        > Currently the API will only respond with a single content block.

        Will raise an error if no `text` content blocks were returned.
        """
        message = self.get_final_message()
        text_blocks: list[str] = []
        for block in message.content:
            if block.type == "text":
                text_blocks.append(block.text)

        if not text_blocks:
            raise RuntimeError(
                f".get_final_text() can only be called when the API returns a `text` content block.\nThe API returned {','.join([b.type for b in message.content])} content block type(s) that you can access by calling get_final_message().content"

Frequently Asked Questions

What is the MessageStream class?
MessageStream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_messages.py.
Where is MessageStream defined?
MessageStream is defined in src/anthropic/lib/streaming/_messages.py at line 33.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free