Home / Function/ test_streaming_call_sync_events() — anthropic-sdk-python Function Reference

test_streaming_call_sync_events() — anthropic-sdk-python Function Reference

Architecture documentation for the test_streaming_call_sync_events() function in test_runners.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  e9798232_e45a_0fe8_021e_639088536658["test_streaming_call_sync_events()"]
  6847e1ae_03a4_3e19_19ff_ea808e74d94c["TestSyncRunTools"]
  e9798232_e45a_0fe8_021e_639088536658 -->|defined in| 6847e1ae_03a4_3e19_19ff_ea808e74d94c
  be8a7f3a_5951_1f73_7331_3ad05c999ecf["_get_weather()"]
  e9798232_e45a_0fe8_021e_639088536658 -->|calls| be8a7f3a_5951_1f73_7331_3ad05c999ecf
  style e9798232_e45a_0fe8_021e_639088536658 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

tests/lib/tools/test_runners.py lines 356–402

    def test_streaming_call_sync_events(self, client: Anthropic, respx_mock: MockRouter) -> None:
        @beta_tool
        def get_weather(location: str, units: Literal["c", "f"]) -> BetaFunctionToolResultType:
            """Lookup the weather for a given city in either celsius or fahrenheit

            Args:
                location: The city and state, e.g. San Francisco, CA
                units: Unit for the output, either 'c' for celsius or 'f' for fahrenheit
            Returns:
                A dictionary containing the location, temperature, and weather condition.
            """
            return json.dumps(_get_weather(location, units))

        def accumulate_events(client: Anthropic) -> List[str]:
            events: list[str] = []
            runner = client.beta.messages.tool_runner(
                max_tokens=1024,
                model="claude-haiku-4-5",
                tools=[get_weather],
                messages=[{"role": "user", "content": "What is the weather in SF?"}],
                stream=True,
            )

            for stream in runner:
                for event in stream:
                    events.append(event.type)
            return events

        events = make_stream_snapshot_request(
            accumulate_events,
            content_snapshot=external("uuid:9cb114c8-69bd-4111-841b-edee30333afd.json"),
            path="/v1/messages",
            mock_client=client,
            respx_mock=respx_mock,
        )
        assert set(events) == snapshot(
            {
                "content_block_delta",
                "content_block_start",
                "content_block_stop",
                "input_json",
                "message_delta",
                "message_start",
                "message_stop",
                "text",
            }
        )

Subdomains

Frequently Asked Questions

What does test_streaming_call_sync_events() do?
test_streaming_call_sync_events() is a function in the anthropic-sdk-python codebase, defined in tests/lib/tools/test_runners.py.
Where is test_streaming_call_sync_events() defined?
test_streaming_call_sync_events() is defined in tests/lib/tools/test_runners.py at line 356.
What does test_streaming_call_sync_events() call?
test_streaming_call_sync_events() calls 1 function(s): _get_weather.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free