Home / Function/ make_async_stream_snapshot_request() — anthropic-sdk-python Function Reference

make_async_stream_snapshot_request() — anthropic-sdk-python Function Reference

Architecture documentation for the make_async_stream_snapshot_request() function in snapshots.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  1f3106b7_95f2_a01b_0f7f_67c9f95520cb["make_async_stream_snapshot_request()"]
  e7250a77_f5e0_e4de_3e1b_f2b040399c23["snapshots.py"]
  1f3106b7_95f2_a01b_0f7f_67c9f95520cb -->|defined in| e7250a77_f5e0_e4de_3e1b_f2b040399c23
  style 1f3106b7_95f2_a01b_0f7f_67c9f95520cb fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

tests/lib/snapshots.py lines 140–196

async def make_async_stream_snapshot_request(
    func: Callable[[AsyncAnthropic], Awaitable[_T]],
    *,
    content_snapshot: Any,
    respx_mock: MockRouter,
    mock_client: AsyncAnthropic,
    path: str,
) -> _T:
    live = os.environ.get("ANTHROPIC_LIVE") == "1"
    collected: list[str] = []
    if live:

        async def _on_response(response: httpx.Response) -> None:
            await response.aread()
            collected.append(response.text)

        respx_mock.stop()

        client = AsyncAnthropic(
            http_client=httpx.AsyncClient(
                event_hooks={
                    "response": [_on_response],
                }
            )
        )
    else:
        response_contents = get_snapshot_value(content_snapshot)
        assert is_list(response_contents)

        response_contents = cast(
            List[str],
            response_contents,
        )

        curr = 0

        def get_response(_request: httpx.Request) -> httpx.Response:
            nonlocal curr
            content = response_contents[curr]
            assert isinstance(content, str)

            curr += 1
            return httpx.Response(
                200,
                content=content.encode("utf-8"),
                headers={"content-type": "text/event-stream"},
            )

        respx_mock.post(path).mock(side_effect=get_response)
        client = mock_client

    result = await func(client)
    if not live:
        return result

    assert outsource(collected) == content_snapshot
    return result

Subdomains

Frequently Asked Questions

What does make_async_stream_snapshot_request() do?
make_async_stream_snapshot_request() is a function in the anthropic-sdk-python codebase, defined in tests/lib/snapshots.py.
Where is make_async_stream_snapshot_request() defined?
make_async_stream_snapshot_request() is defined in tests/lib/snapshots.py at line 140.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free