BetaMessageStream Class — anthropic-sdk-python Architecture
Architecture documentation for the BetaMessageStream class in _beta_messages.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD fa27ef0f_31e3_cd5e_ffa3_12becd4b426b["BetaMessageStream"] 1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272 8354232f_496e_adc7_3a32_200b3f90d24d["__init__()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 8354232f_496e_adc7_3a32_200b3f90d24d 5598d7fb_f7d6_d22c_b0c1_662e5ec0cd08["response()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 5598d7fb_f7d6_d22c_b0c1_662e5ec0cd08 8dcac9c5_ba6a_bbaa_7e41_580c92805b3c["request_id()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 8dcac9c5_ba6a_bbaa_7e41_580c92805b3c c41f9c92_f773_b6f1_1c2a_90182fc24dbf["__next__()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| c41f9c92_f773_b6f1_1c2a_90182fc24dbf 0657e922_67a2_4a7a_e1f5_5fccd2bf0552["__iter__()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 0657e922_67a2_4a7a_e1f5_5fccd2bf0552 b12f636a_5876_4c42_1b3c_3a131b500093["__enter__()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| b12f636a_5876_4c42_1b3c_3a131b500093 cec6357f_9d99_49da_edeb_dead71ddbfac["__exit__()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| cec6357f_9d99_49da_edeb_dead71ddbfac ebad253b_be4f_1b54_b2e3_1949e019e1a6["close()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| ebad253b_be4f_1b54_b2e3_1949e019e1a6 a493aa49_e2c3_381a_28e6_6809172e402a["get_final_message()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| a493aa49_e2c3_381a_28e6_6809172e402a 024c5edb_9d15_f7bc_2a82_77f22e1d8838["get_final_text()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 024c5edb_9d15_f7bc_2a82_77f22e1d8838 9137206e_3937_7fd5_2948_649e0b33112b["until_done()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 9137206e_3937_7fd5_2948_649e0b33112b ee335547_faa5_16d6_adb9_34e52cda09ce["current_message_snapshot()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| ee335547_faa5_16d6_adb9_34e52cda09ce 0e363261_728a_0dbe_df13_19e318699314["__stream__()"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b -->|method| 0e363261_728a_0dbe_df13_19e318699314
Relationship Graph
Source Code
src/anthropic/lib/streaming/_beta_messages.py lines 36–147
class BetaMessageStream(Generic[ResponseFormatT]):
text_stream: Iterator[str]
"""Iterator over just the text deltas in the stream.
```py
for text in stream.text_stream:
print(text, end="", flush=True)
print()
```
"""
def __init__(
self,
raw_stream: Stream[BetaRawMessageStreamEvent],
output_format: ResponseFormatT | NotGiven,
) -> None:
self._raw_stream = raw_stream
self.text_stream = self.__stream_text__()
self._iterator = self.__stream__()
self.__final_message_snapshot: ParsedBetaMessage[ResponseFormatT] | None = None
self.__output_format = output_format
@property
def response(self) -> httpx.Response:
return self._raw_stream.response
@property
def request_id(self) -> str | None:
return self.response.headers.get("request-id") # type: ignore[no-any-return]
def __next__(self) -> ParsedBetaMessageStreamEvent[ResponseFormatT]:
return self._iterator.__next__()
def __iter__(self) -> Iterator[ParsedBetaMessageStreamEvent[ResponseFormatT]]:
for item in self._iterator:
yield item
def __enter__(self) -> Self:
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
def close(self) -> None:
"""
Close the response and release the connection.
Automatically called if the response body is read to completion.
"""
self._raw_stream.close()
def get_final_message(self) -> ParsedBetaMessage[ResponseFormatT]:
"""Waits until the stream has been read to completion and returns
the accumulated `Message` object.
"""
self.until_done()
assert self.__final_message_snapshot is not None
return self.__final_message_snapshot
def get_final_text(self) -> str:
"""Returns all `text` content blocks concatenated together.
> [!NOTE]
> Currently the API will only respond with a single content block.
Will raise an error if no `text` content blocks were returned.
"""
message = self.get_final_message()
text_blocks: list[str] = []
for block in message.content:
if block.type == "text":
text_blocks.append(block.text)
if not text_blocks:
raise RuntimeError(
f".get_final_text() can only be called when the API returns a `text` content block.\nThe API returned {','.join([b.type for b in message.content])} content block type(s) that you can access by calling get_final_message().content"
Domain
Source
Frequently Asked Questions
What is the BetaMessageStream class?
BetaMessageStream is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is BetaMessageStream defined?
BetaMessageStream is defined in src/anthropic/lib/streaming/_beta_messages.py at line 36.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free