TestAsyncAnthropicVertex Class — anthropic-sdk-python Architecture
Architecture documentation for the TestAsyncAnthropicVertex class in test_vertex.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD d3290b43_b727_3d0c_38d7_e3383a13bfe4["TestAsyncAnthropicVertex"] db230024_6f86_b90f_ba9d_ea89f56ffac7["test_vertex.py"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|defined in| db230024_6f86_b90f_ba9d_ea89f56ffac7 c3f91a84_efa8_0579_b9be_ecfdfac60301["test_messages_retries()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| c3f91a84_efa8_0579_b9be_ecfdfac60301 023907af_6254_ff3e_a515_0bffc237a256["test_copy()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| 023907af_6254_ff3e_a515_0bffc237a256 ed20a314_0099_f36e_47d5_fa1f05027a2b["test_with_options()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| ed20a314_0099_f36e_47d5_fa1f05027a2b fd95a881_7ffd_acab_32bc_0fdf84d51c8b["test_copy_default_options()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| fd95a881_7ffd_acab_32bc_0fdf84d51c8b 5b63ae3a_4e38_ac0e_37dc_713b4b9db841["test_copy_default_headers()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| 5b63ae3a_4e38_ac0e_37dc_713b4b9db841 c53be97f_83fd_3bb1_d4f0_30a91ddc1919["test_global_region_base_url()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| c53be97f_83fd_3bb1_d4f0_30a91ddc1919 2e5d30bf_5561_8fc6_5d93_02783d668329["test_regional_base_url()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| 2e5d30bf_5561_8fc6_5d93_02783d668329 85e9c113_3b40_67c3_aedb_4effef7d0a15["test_env_var_base_url_override()"] d3290b43_b727_3d0c_38d7_e3383a13bfe4 -->|method| 85e9c113_3b40_67c3_aedb_4effef7d0a15
Relationship Graph
Source Code
tests/lib/test_vertex.py lines 149–278
class TestAsyncAnthropicVertex:
client = AsyncAnthropicVertex(region="region", project_id="project", access_token="my-access-token")
@pytest.mark.respx()
@pytest.mark.asyncio()
async def test_messages_retries(self, respx_mock: MockRouter) -> None:
request_url = "https://region-aiplatform.googleapis.com/v1/projects/project/locations/region/publishers/anthropic/models/claude-3-sonnet@20240229:rawPredict"
respx_mock.post(request_url).mock(
side_effect=[
httpx.Response(500, json={"error": "server error"}, headers={"retry-after-ms": "10"}),
httpx.Response(200, json={"foo": "bar"}),
]
)
await self.client.with_options(timeout=0.2).messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Say hello there!",
}
],
model="claude-3-sonnet@20240229",
)
calls = cast("list[MockRequestCall]", respx_mock.calls)
assert len(calls) == 2
assert calls[0].request.url == request_url
assert calls[1].request.url == request_url
def test_copy(self) -> None:
copied = self.client.copy()
assert id(copied) != id(self.client)
copied = self.client.copy(region="another-region", project_id="another-project")
assert copied.region == "another-region"
assert self.client.region == "region"
assert copied.project_id == "another-project"
assert self.client.project_id == "project"
def test_with_options(self) -> None:
copied = self.client.with_options(region="another-region", project_id="another-project")
assert copied.region == "another-region"
assert self.client.region == "region"
assert copied.project_id == "another-project"
assert self.client.project_id == "project"
def test_copy_default_options(self) -> None:
# options that have a default are overridden correctly
copied = self.client.copy(max_retries=7)
assert copied.max_retries == 7
assert self.client.max_retries == 2
copied2 = copied.copy(max_retries=6)
assert copied2.max_retries == 6
assert copied.max_retries == 7
# timeout
assert isinstance(self.client.timeout, httpx.Timeout)
copied = self.client.copy(timeout=None)
assert copied.timeout is None
assert isinstance(self.client.timeout, httpx.Timeout)
def test_copy_default_headers(self) -> None:
client = AsyncAnthropicVertex(
base_url=base_url,
region="region",
project_id="project",
_strict_response_validation=True,
default_headers={"X-Foo": "bar"},
)
assert client.default_headers["X-Foo"] == "bar"
# does not override the already given value when not specified
copied = client.copy()
assert copied.default_headers["X-Foo"] == "bar"
# merges already given headers
copied = client.copy(default_headers={"X-Bar": "stainless"})
Defined In
Source
Frequently Asked Questions
What is the TestAsyncAnthropicVertex class?
TestAsyncAnthropicVertex is a class in the anthropic-sdk-python codebase, defined in tests/lib/test_vertex.py.
Where is TestAsyncAnthropicVertex defined?
TestAsyncAnthropicVertex is defined in tests/lib/test_vertex.py at line 149.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free