Home / Class/ AsyncAnthropicVertex Class — anthropic-sdk-python Architecture

AsyncAnthropicVertex Class — anthropic-sdk-python Architecture

Architecture documentation for the AsyncAnthropicVertex class in _client.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  a885dabd_0206_9ba5_949f_5990703d0a04["AsyncAnthropicVertex"]
  3224f719_8046_78c8_59e1_47301a46ddd4["AsyncAPIClient"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|extends| 3224f719_8046_78c8_59e1_47301a46ddd4
  f30eca5a_2fa0_68c1_64aa_b52331a2dcc8["NotGiven"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|extends| f30eca5a_2fa0_68c1_64aa_b52331a2dcc8
  7bcf2a66_49f2_0382_7180_1ee6501f1674["_client.py"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|defined in| 7bcf2a66_49f2_0382_7180_1ee6501f1674
  23fb5ff0_baf2_ed98_da86_830a4603c471["__init__()"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|method| 23fb5ff0_baf2_ed98_da86_830a4603c471
  4910cb96_8c10_3d2e_086c_79ddcc421a05["_prepare_options()"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|method| 4910cb96_8c10_3d2e_086c_79ddcc421a05
  562c226e_d12f_aba5_59c1_72cf16b01d1f["_prepare_request()"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|method| 562c226e_d12f_aba5_59c1_72cf16b01d1f
  d2aca394_7ad5_92b5_38ec_6c25f071ab97["_ensure_access_token()"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|method| d2aca394_7ad5_92b5_38ec_6c25f071ab97
  36d0becb_82f8_5e48_ffe9_ad9bc847f367["copy()"]
  a885dabd_0206_9ba5_949f_5990703d0a04 -->|method| 36d0becb_82f8_5e48_ffe9_ad9bc847f367

Relationship Graph

Source Code

src/anthropic/lib/vertex/_client.py lines 235–377

class AsyncAnthropicVertex(BaseVertexClient[httpx.AsyncClient, AsyncStream[Any]], AsyncAPIClient):
    messages: AsyncMessages
    beta: AsyncBeta

    def __init__(
        self,
        *,
        region: str | NotGiven = NOT_GIVEN,
        project_id: str | NotGiven = NOT_GIVEN,
        access_token: str | None = None,
        credentials: GoogleCredentials | None = None,
        base_url: str | httpx.URL | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
        max_retries: int = DEFAULT_MAX_RETRIES,
        default_headers: Mapping[str, str] | None = None,
        default_query: Mapping[str, object] | None = None,
        # Configure a custom httpx client. See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
        http_client: httpx.AsyncClient | None = None,
        _strict_response_validation: bool = False,
    ) -> None:
        if not is_given(region):
            region = os.environ.get("CLOUD_ML_REGION", NOT_GIVEN)
        if not is_given(region):
            raise ValueError(
                "No region was given. The client should be instantiated with the `region` argument or the `CLOUD_ML_REGION` environment variable should be set."
            )

        if base_url is None:
            base_url = os.environ.get("ANTHROPIC_VERTEX_BASE_URL")
            if base_url is None:
                if region == "global":
                    base_url = "https://aiplatform.googleapis.com/v1"
                else:
                    base_url = f"https://{region}-aiplatform.googleapis.com/v1"

        super().__init__(
            version=__version__,
            base_url=base_url,
            timeout=timeout,
            max_retries=max_retries,
            custom_headers=default_headers,
            custom_query=default_query,
            http_client=http_client,
            _strict_response_validation=_strict_response_validation,
        )

        if is_given(project_id):
            self.project_id = project_id

        self.region = region
        self.access_token = access_token
        self.credentials = credentials

        self.messages = AsyncMessages(self)
        self.beta = AsyncBeta(self)

    @override
    async def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions:
        return _prepare_options(options, project_id=self.project_id, region=self.region)

    @override
    async def _prepare_request(self, request: httpx.Request) -> None:
        if request.headers.get("Authorization"):
            # already authenticated, nothing for us to do
            return

        request.headers["Authorization"] = f"Bearer {await self._ensure_access_token()}"

    async def _ensure_access_token(self) -> str:
        if self.access_token is not None:
            return self.access_token

        if not self.credentials:
            self.credentials, project_id = await asyncify(load_auth)(project_id=self.project_id)
            if not self.project_id:
                self.project_id = project_id

        if self.credentials.expired or not self.credentials.token:
            await asyncify(refresh_auth)(self.credentials)

        if not self.credentials.token:

Frequently Asked Questions

What is the AsyncAnthropicVertex class?
AsyncAnthropicVertex is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/vertex/_client.py.
Where is AsyncAnthropicVertex defined?
AsyncAnthropicVertex is defined in src/anthropic/lib/vertex/_client.py at line 235.
What does AsyncAnthropicVertex extend?
AsyncAnthropicVertex extends AsyncAPIClient, NotGiven.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free