Home / Class/ TestAsyncStream Class — anthropic-sdk-python Architecture

TestAsyncStream Class — anthropic-sdk-python Architecture

Architecture documentation for the TestAsyncStream class in test_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  af4d287a_1a7a_7175_0a72_07c984020b6c["TestAsyncStream"]
  2e600497_5282_f76f_a53c_ea19bd3e90f3["test_messages.py"]
  af4d287a_1a7a_7175_0a72_07c984020b6c -->|defined in| 2e600497_5282_f76f_a53c_ea19bd3e90f3
  a53a8817_6e66_31ac_b782_0e5a157e3580["test_stream_with_raw_schema()"]
  af4d287a_1a7a_7175_0a72_07c984020b6c -->|method| a53a8817_6e66_31ac_b782_0e5a157e3580

Relationship Graph

Source Code

tests/lib/_parse/test_messages.py lines 163–199

class TestAsyncStream:
    async def test_stream_with_raw_schema(self, async_client: AsyncAnthropic, respx_mock: MockRouter) -> None:
        """Test async messages.stream() with raw JSON schema via output_config."""

        async def async_stream_parse(client: AsyncAnthropic) -> Message:
            async with client.messages.stream(
                model="claude-sonnet-4-5",
                messages=[
                    {
                        "role": "user",
                        "content": "Extract order IDs from the following text:\n\nOrder 12345\nOrder 67890",
                    }
                ],
                output_config={
                    "format": {
                        "type": "json_schema",
                        "schema": {
                            "type": "array",
                            "items": {"type": "integer"},
                        },
                    },
                },
                max_tokens=1024,
            ) as stream:
                return await stream.get_final_message()

        response = await make_async_stream_snapshot_request(
            async_stream_parse,
            content_snapshot=external("uuid:b2c4d6e8-f012-4a56-8b90-1234567890ab.json"),
            respx_mock=respx_mock,
            mock_client=async_client,
            path="/v1/messages",
        )

        content_block = response.content[0]
        assert content_block.type == "text"
        assert content_block.text == snapshot("[12345,67890]")

Frequently Asked Questions

What is the TestAsyncStream class?
TestAsyncStream is a class in the anthropic-sdk-python codebase, defined in tests/lib/_parse/test_messages.py.
Where is TestAsyncStream defined?
TestAsyncStream is defined in tests/lib/_parse/test_messages.py at line 163.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free