Home / Class/ AsyncMessageStream Class — anthropic-sdk-python Architecture

AsyncMessageStream Class — anthropic-sdk-python Architecture

Architecture documentation for the AsyncMessageStream class in _messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  4f4cec09_94d5_eeb6_5c17_29d2af98f435["AsyncMessageStream"]
  738ef0a0_4e16_ebf0_9c79_a17daa58ddef["_messages.py"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|defined in| 738ef0a0_4e16_ebf0_9c79_a17daa58ddef
  754c230b_6e9f_717f_1978_6847ac85c286["__init__()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| 754c230b_6e9f_717f_1978_6847ac85c286
  80677f35_aaf0_f21d_6974_d95a4ce88428["response()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| 80677f35_aaf0_f21d_6974_d95a4ce88428
  ee6e87d7_84ee_9d1a_3584_a6da382909cc["request_id()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| ee6e87d7_84ee_9d1a_3584_a6da382909cc
  8fde9560_3f00_1b05_985a_51078c06f3f0["__anext__()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| 8fde9560_3f00_1b05_985a_51078c06f3f0
  b48e8b24_4aa7_3750_67a3_6980a67acce4["__aiter__()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| b48e8b24_4aa7_3750_67a3_6980a67acce4
  745f3f36_f1e5_5142_f03f_e54b2b813c71["__aenter__()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| 745f3f36_f1e5_5142_f03f_e54b2b813c71
  eecd94c8_5a69_9c0c_e45f_89578a735590["__aexit__()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| eecd94c8_5a69_9c0c_e45f_89578a735590
  ff2b34fc_bdb4_76f9_4c6c_9b33a7133ead["close()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| ff2b34fc_bdb4_76f9_4c6c_9b33a7133ead
  f4a46199_2749_e343_7f26_2f26657ae526["get_final_message()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| f4a46199_2749_e343_7f26_2f26657ae526
  85eb3149_498e_f553_00e0_c28342315541["get_final_text()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| 85eb3149_498e_f553_00e0_c28342315541
  adb888ca_b9bd_2363_ade2_ad24ef72c39d["until_done()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| adb888ca_b9bd_2363_ade2_ad24ef72c39d
  87ca2bf0_9479_f587_d383_e82c3965efad["current_message_snapshot()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| 87ca2bf0_9479_f587_d383_e82c3965efad
  cad54414_ccd6_b061_6332_68338c7a97aa["__stream__()"]
  4f4cec09_94d5_eeb6_5c17_29d2af98f435 -->|method| cad54414_ccd6_b061_6332_68338c7a97aa

Relationship Graph

Source Code

src/anthropic/lib/streaming/_messages.py lines 181–291

class AsyncMessageStream(Generic[ResponseFormatT]):
    text_stream: AsyncIterator[str]
    """Async iterator over just the text deltas in the stream.

    ```py
    async for text in stream.text_stream:
        print(text, end="", flush=True)
    print()
    ```
    """

    def __init__(
        self,
        raw_stream: AsyncStream[RawMessageStreamEvent],
        output_format: ResponseFormatT | NotGiven,
    ) -> None:
        self._raw_stream = raw_stream
        self.text_stream = self.__stream_text__()
        self._iterator = self.__stream__()
        self.__final_message_snapshot: ParsedMessage[ResponseFormatT] | None = None
        self.__output_format = output_format

    @property
    def response(self) -> httpx.Response:
        return self._raw_stream.response

    @property
    def request_id(self) -> str | None:
        return self.response.headers.get("request-id")  # type: ignore[no-any-return]

    async def __anext__(self) -> ParsedMessageStreamEvent[ResponseFormatT]:
        return await self._iterator.__anext__()

    async def __aiter__(self) -> AsyncIterator[ParsedMessageStreamEvent[ResponseFormatT]]:
        async for item in self._iterator:
            yield item

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.close()

    async def close(self) -> None:
        """
        Close the response and release the connection.

        Automatically called if the response body is read to completion.
        """
        await self._raw_stream.close()

    async def get_final_message(self) -> ParsedMessage[ResponseFormatT]:
        """Waits until the stream has been read to completion and returns
        the accumulated `Message` object.
        """
        await self.until_done()
        assert self.__final_message_snapshot is not None
        return self.__final_message_snapshot

    async def get_final_text(self) -> str:
        """Returns all `text` content blocks concatenated together.

        > [!NOTE]
        > Currently the API will only respond with a single content block.

        Will raise an error if no `text` content blocks were returned.
        """
        message = await self.get_final_message()
        text_blocks: list[str] = []
        for block in message.content:
            if block.type == "text":
                text_blocks.append(block.text)

        if not text_blocks:
            raise RuntimeError(
                f".get_final_text() can only be called when the API returns a `text` content block.\nThe API returned {','.join([b.type for b in message.content])} content block type(s) that you can access by calling get_final_message().content"

Frequently Asked Questions

What is the AsyncMessageStream class?
AsyncMessageStream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_messages.py.
Where is AsyncMessageStream defined?
AsyncMessageStream is defined in src/anthropic/lib/streaming/_messages.py at line 181.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free