TestSyncMessages Class — anthropic-sdk-python Architecture
Architecture documentation for the TestSyncMessages class in test_beta_messages.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD 9bc201ea_a885_b5a7_1802_da159bef751a["TestSyncMessages"] fa27ef0f_31e3_cd5e_ffa3_12becd4b426b["BetaMessageStream"] 9bc201ea_a885_b5a7_1802_da159bef751a -->|extends| fa27ef0f_31e3_cd5e_ffa3_12becd4b426b 239f7d64_6d26_d418_04e3_d586fe367986["test_beta_messages.py"] 9bc201ea_a885_b5a7_1802_da159bef751a -->|defined in| 239f7d64_6d26_d418_04e3_d586fe367986 da1cbf37_7a91_5789_d4cd_1ad1fab89ef7["test_basic_response()"] 9bc201ea_a885_b5a7_1802_da159bef751a -->|method| da1cbf37_7a91_5789_d4cd_1ad1fab89ef7 8f450ef4_3bea_5a50_6741_63552939eb9f["test_tool_use()"] 9bc201ea_a885_b5a7_1802_da159bef751a -->|method| 8f450ef4_3bea_5a50_6741_63552939eb9f 78655d86_e01f_f848_1839_a877c36f94a5["test_context_manager()"] 9bc201ea_a885_b5a7_1802_da159bef751a -->|method| 78655d86_e01f_f848_1839_a877c36f94a5 c576415a_9a32_814a_a7c5_0eae9ec6bc91["test_deprecated_model_warning_stream()"] 9bc201ea_a885_b5a7_1802_da159bef751a -->|method| c576415a_9a32_814a_a7c5_0eae9ec6bc91
Relationship Graph
Source Code
tests/lib/streaming/test_beta_messages.py lines 188–264
class TestSyncMessages:
@pytest.mark.respx(base_url=base_url)
def test_basic_response(self, respx_mock: MockRouter) -> None:
respx_mock.post("/v1/messages").mock(
return_value=httpx.Response(200, content=get_response("basic_response.txt"))
)
with sync_client.beta.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-3-opus-latest",
) as stream:
assert isinstance(cast(Any, stream), BetaMessageStream)
assert_basic_response([event for event in stream], stream.get_final_message())
@pytest.mark.respx(base_url=base_url)
def test_tool_use(self, respx_mock: MockRouter) -> None:
respx_mock.post("/v1/messages").mock(
return_value=httpx.Response(200, content=get_response("tool_use_response.txt"))
)
with sync_client.beta.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-sonnet-4-20250514",
) as stream:
assert isinstance(cast(Any, stream), BetaMessageStream)
assert_tool_use_response([event for event in stream], stream.get_final_message())
@pytest.mark.respx(base_url=base_url)
def test_context_manager(self, respx_mock: MockRouter) -> None:
respx_mock.post("/v1/messages").mock(
return_value=httpx.Response(200, content=get_response("basic_response.txt"))
)
with sync_client.beta.messages.stream(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-3-opus-latest",
) as stream:
assert not stream.response.is_closed
# response should be closed even if the body isn't read
assert stream.response.is_closed
@pytest.mark.respx(base_url=base_url)
def test_deprecated_model_warning_stream(self, respx_mock: MockRouter) -> None:
for deprecated_model in DEPRECATED_MODELS:
respx_mock.post("/v1/messages").mock(
return_value=httpx.Response(200, content=get_response("basic_response.txt"))
)
with pytest.warns(DeprecationWarning, match=f"The model '{deprecated_model}' is deprecated"):
with sync_client.beta.messages.stream(
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
model=deprecated_model,
) as stream:
# Consume the stream to ensure the warning is triggered
stream.until_done()
Domain
Defined In
Extends
Source
Frequently Asked Questions
What is the TestSyncMessages class?
TestSyncMessages is a class in the anthropic-sdk-python codebase, defined in tests/lib/streaming/test_beta_messages.py.
Where is TestSyncMessages defined?
TestSyncMessages is defined in tests/lib/streaming/test_beta_messages.py at line 188.
What does TestSyncMessages extend?
TestSyncMessages extends BetaMessageStream.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free