make_async_stream_snapshot_request() — anthropic-sdk-python Function Reference
Architecture documentation for the make_async_stream_snapshot_request() function in snapshots.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD 1f3106b7_95f2_a01b_0f7f_67c9f95520cb["make_async_stream_snapshot_request()"] e7250a77_f5e0_e4de_3e1b_f2b040399c23["snapshots.py"] 1f3106b7_95f2_a01b_0f7f_67c9f95520cb -->|defined in| e7250a77_f5e0_e4de_3e1b_f2b040399c23 style 1f3106b7_95f2_a01b_0f7f_67c9f95520cb fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
tests/lib/snapshots.py lines 140–196
async def make_async_stream_snapshot_request(
func: Callable[[AsyncAnthropic], Awaitable[_T]],
*,
content_snapshot: Any,
respx_mock: MockRouter,
mock_client: AsyncAnthropic,
path: str,
) -> _T:
live = os.environ.get("ANTHROPIC_LIVE") == "1"
collected: list[str] = []
if live:
async def _on_response(response: httpx.Response) -> None:
await response.aread()
collected.append(response.text)
respx_mock.stop()
client = AsyncAnthropic(
http_client=httpx.AsyncClient(
event_hooks={
"response": [_on_response],
}
)
)
else:
response_contents = get_snapshot_value(content_snapshot)
assert is_list(response_contents)
response_contents = cast(
List[str],
response_contents,
)
curr = 0
def get_response(_request: httpx.Request) -> httpx.Response:
nonlocal curr
content = response_contents[curr]
assert isinstance(content, str)
curr += 1
return httpx.Response(
200,
content=content.encode("utf-8"),
headers={"content-type": "text/event-stream"},
)
respx_mock.post(path).mock(side_effect=get_response)
client = mock_client
result = await func(client)
if not live:
return result
assert outsource(collected) == content_snapshot
return result
Domain
Subdomains
Defined In
Source
Frequently Asked Questions
What does make_async_stream_snapshot_request() do?
make_async_stream_snapshot_request() is a function in the anthropic-sdk-python codebase, defined in tests/lib/snapshots.py.
Where is make_async_stream_snapshot_request() defined?
make_async_stream_snapshot_request() is defined in tests/lib/snapshots.py at line 140.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free