Home / Class/ TestSyncMessages Class — anthropic-sdk-python Architecture

TestSyncMessages Class — anthropic-sdk-python Architecture

Architecture documentation for the TestSyncMessages class in test_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  dee3798b_de54_17a3_eda7_2fcb6484288c["TestSyncMessages"]
  3c15a36b_3b82_93a8_fe82_3d955b1934ca["Stream"]
  dee3798b_de54_17a3_eda7_2fcb6484288c -->|extends| 3c15a36b_3b82_93a8_fe82_3d955b1934ca
  8cc8cd20_a242_314f_b59d_2d554c624e7c["test_messages.py"]
  dee3798b_de54_17a3_eda7_2fcb6484288c -->|defined in| 8cc8cd20_a242_314f_b59d_2d554c624e7c
  ce440b6f_3c48_2cbe_7590_84a80c8bb509["test_basic_response()"]
  dee3798b_de54_17a3_eda7_2fcb6484288c -->|method| ce440b6f_3c48_2cbe_7590_84a80c8bb509
  bb206031_6ed6_75bb_2973_4e4a072b1031["test_context_manager()"]
  dee3798b_de54_17a3_eda7_2fcb6484288c -->|method| bb206031_6ed6_75bb_2973_4e4a072b1031
  2eb8e093_0b72_7084_6802_ae8ae503a89a["test_deprecated_model_warning_stream()"]
  dee3798b_de54_17a3_eda7_2fcb6484288c -->|method| 2eb8e093_0b72_7084_6802_ae8ae503a89a
  ee7a56e8_2e83_4351_5d7c_8d9b27fba3d2["test_tool_use()"]
  dee3798b_de54_17a3_eda7_2fcb6484288c -->|method| ee7a56e8_2e83_4351_5d7c_8d9b27fba3d2

Relationship Graph

Source Code

tests/lib/streaming/test_messages.py lines 108–186

class TestSyncMessages:
    @pytest.mark.respx(base_url=base_url)
    def test_basic_response(self, respx_mock: MockRouter) -> None:
        respx_mock.post("/v1/messages").mock(
            return_value=httpx.Response(200, content=get_response("basic_response.txt"))
        )

        with sync_client.messages.stream(
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": "Say hello there!",
                }
            ],
            model="claude-3-opus-latest",
        ) as stream:
            with pytest.warns(DeprecationWarning):
                assert isinstance(cast(Any, stream), Stream)

            assert_basic_response([event for event in stream], stream.get_final_message())

    @pytest.mark.respx(base_url=base_url)
    def test_context_manager(self, respx_mock: MockRouter) -> None:
        respx_mock.post("/v1/messages").mock(
            return_value=httpx.Response(200, content=get_response("basic_response.txt"))
        )

        with sync_client.messages.stream(
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": "Say hello there!",
                }
            ],
            model="claude-3-opus-latest",
        ) as stream:
            assert not stream.response.is_closed

        # response should be closed even if the body isn't read
        assert stream.response.is_closed

    @pytest.mark.respx(base_url=base_url)
    def test_deprecated_model_warning_stream(self, respx_mock: MockRouter) -> None:
        for deprecated_model in DEPRECATED_MODELS:
            respx_mock.post("/v1/messages").mock(
                return_value=httpx.Response(200, content=get_response("basic_response.txt"))
            )

            with pytest.warns(DeprecationWarning, match=f"The model '{deprecated_model}' is deprecated"):
                with sync_client.messages.stream(
                    max_tokens=1024,
                    messages=[{"role": "user", "content": "Hello"}],
                    model=deprecated_model,
                ) as stream:
                    # Consume the stream to ensure the warning is triggered
                    stream.until_done()

    @pytest.mark.respx(base_url=base_url)
    def test_tool_use(self, respx_mock: MockRouter) -> None:
        respx_mock.post("/v1/messages").mock(
            return_value=httpx.Response(200, content=get_response("tool_use_response.txt"))
        )

        with sync_client.messages.stream(
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": "Say hello there!",
                }
            ],
            model="claude-sonnet-4-20250514",
        ) as stream:
            with pytest.warns(DeprecationWarning):
                assert isinstance(cast(Any, stream), Stream)

            assert_tool_use_response([event for event in stream], stream.get_final_message())

Extends

Frequently Asked Questions

What is the TestSyncMessages class?
TestSyncMessages is a class in the anthropic-sdk-python codebase, defined in tests/lib/streaming/test_messages.py.
Where is TestSyncMessages defined?
TestSyncMessages is defined in tests/lib/streaming/test_messages.py at line 108.
What does TestSyncMessages extend?
TestSyncMessages extends Stream.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free