Home / Class/ TestAsyncMessages Class — anthropic-sdk-python Architecture

TestAsyncMessages Class — anthropic-sdk-python Architecture

Architecture documentation for the TestAsyncMessages class in test_beta_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188["TestAsyncMessages"]
  8e652b46_6f9a_daae_5db8_1aa0b9386626["BetaAsyncMessageStream"]
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188 -->|extends| 8e652b46_6f9a_daae_5db8_1aa0b9386626
  239f7d64_6d26_d418_04e3_d586fe367986["test_beta_messages.py"]
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188 -->|defined in| 239f7d64_6d26_d418_04e3_d586fe367986
  71579650_ff21_691c_21c9_e39db60bb3bc["test_basic_response()"]
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188 -->|method| 71579650_ff21_691c_21c9_e39db60bb3bc
  d7d85fbf_ea7f_b02a_9f96_2044dd6ec764["test_context_manager()"]
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188 -->|method| d7d85fbf_ea7f_b02a_9f96_2044dd6ec764
  f7e860e3_4079_9840_ae67_41cb6c3ab5c9["test_deprecated_model_warning_stream()"]
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188 -->|method| f7e860e3_4079_9840_ae67_41cb6c3ab5c9
  2d02c760_5abf_bf03_495b_f7d4b3fe39ab["test_tool_use()"]
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188 -->|method| 2d02c760_5abf_bf03_495b_f7d4b3fe39ab
  f7761867_036e_f45b_09c1_0edd0d7dfaaa["test_incomplete_response()"]
  3b785fa6_7fe8_27f9_a4b3_d1b931c9e188 -->|method| f7761867_036e_f45b_09c1_0edd0d7dfaaa

Relationship Graph

Source Code

tests/lib/streaming/test_beta_messages.py lines 267–372

class TestAsyncMessages:
    @pytest.mark.asyncio
    @pytest.mark.respx(base_url=base_url)
    async def test_basic_response(self, respx_mock: MockRouter) -> None:
        respx_mock.post("/v1/messages").mock(
            return_value=httpx.Response(200, content=to_async_iter(get_response("basic_response.txt")))
        )

        async with async_client.beta.messages.stream(
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": "Say hello there!",
                }
            ],
            model="claude-opus-4-0",
        ) as stream:
            assert isinstance(cast(Any, stream), BetaAsyncMessageStream)

            assert_basic_response([event async for event in stream], await stream.get_final_message())

    @pytest.mark.asyncio
    @pytest.mark.respx(base_url=base_url)
    async def test_context_manager(self, respx_mock: MockRouter) -> None:
        respx_mock.post("/v1/messages").mock(
            return_value=httpx.Response(200, content=to_async_iter(get_response("basic_response.txt")))
        )

        async with async_client.beta.messages.stream(
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": "Say hello there!",
                }
            ],
            model="claude-3-opus-latest",
        ) as stream:
            assert not stream.response.is_closed

        # response should be closed even if the body isn't read
        assert stream.response.is_closed

    @pytest.mark.asyncio
    @pytest.mark.respx(base_url=base_url)
    async def test_deprecated_model_warning_stream(self, respx_mock: MockRouter) -> None:
        for deprecated_model in DEPRECATED_MODELS:
            respx_mock.post("/v1/messages").mock(
                return_value=httpx.Response(200, content=to_async_iter(get_response("basic_response.txt")))
            )

            with pytest.warns(DeprecationWarning, match=f"The model '{deprecated_model}' is deprecated"):
                async with async_client.beta.messages.stream(
                    max_tokens=1024,
                    messages=[{"role": "user", "content": "Hello"}],
                    model=deprecated_model,
                ) as stream:
                    # Consume the stream to ensure the warning is triggered
                    await stream.get_final_message()

    @pytest.mark.asyncio
    @pytest.mark.respx(base_url=base_url)
    async def test_tool_use(self, respx_mock: MockRouter) -> None:
        respx_mock.post("/v1/messages").mock(
            return_value=httpx.Response(200, content=to_async_iter(get_response("tool_use_response.txt")))
        )

        async with async_client.beta.messages.stream(
            max_tokens=1024,
            messages=[
                {
                    "role": "user",
                    "content": "Say hello there!",
                }
            ],
            model="claude-sonnet-4-20250514",
        ) as stream:
            assert isinstance(cast(Any, stream), BetaAsyncMessageStream)

            assert_tool_use_response([event async for event in stream], await stream.get_final_message())

Frequently Asked Questions

What is the TestAsyncMessages class?
TestAsyncMessages is a class in the anthropic-sdk-python codebase, defined in tests/lib/streaming/test_beta_messages.py.
Where is TestAsyncMessages defined?
TestAsyncMessages is defined in tests/lib/streaming/test_beta_messages.py at line 267.
What does TestAsyncMessages extend?
TestAsyncMessages extends BetaAsyncMessageStream.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free