Home / Class/ AsyncAnthropicBedrock Class — anthropic-sdk-python Architecture

AsyncAnthropicBedrock Class — anthropic-sdk-python Architecture

Architecture documentation for the AsyncAnthropicBedrock class in _client.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe["AsyncAnthropicBedrock"]
  3224f719_8046_78c8_59e1_47301a46ddd4["AsyncAPIClient"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|extends| 3224f719_8046_78c8_59e1_47301a46ddd4
  f30eca5a_2fa0_68c1_64aa_b52331a2dcc8["NotGiven"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|extends| f30eca5a_2fa0_68c1_64aa_b52331a2dcc8
  cbdd42fd_75f9_7cc6_3f3b_b8d939e4c157["_client.py"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|defined in| cbdd42fd_75f9_7cc6_3f3b_b8d939e4c157
  86791f8b_40ce_c991_4689_7abd6c6ba63d["__init__()"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|method| 86791f8b_40ce_c991_4689_7abd6c6ba63d
  0d36693f_5731_b92c_fa08_72a98cba305b["_make_sse_decoder()"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|method| 0d36693f_5731_b92c_fa08_72a98cba305b
  87dc8d50_1d41_3e7b_956e_840605cf6288["_prepare_options()"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|method| 87dc8d50_1d41_3e7b_956e_840605cf6288
  69e337f6_5da7_c1c3_8d13_9a20707778ba["_prepare_request()"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|method| 69e337f6_5da7_c1c3_8d13_9a20707778ba
  371ed11a_c027_e563_426e_e3d8865425fd["copy()"]
  a96b76ba_8027_bd6b_8747_23d68b1a6cbe -->|method| 371ed11a_c027_e563_426e_e3d8865425fd

Relationship Graph

Source Code

src/anthropic/lib/bedrock/_client.py lines 273–412

class AsyncAnthropicBedrock(BaseBedrockClient[httpx.AsyncClient, AsyncStream[Any]], AsyncAPIClient):
    messages: AsyncMessages
    completions: AsyncCompletions
    beta: AsyncBeta

    def __init__(
        self,
        aws_secret_key: str | None = None,
        aws_access_key: str | None = None,
        aws_region: str | None = None,
        aws_profile: str | None = None,
        aws_session_token: str | None = None,
        base_url: str | httpx.URL | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
        max_retries: int = DEFAULT_MAX_RETRIES,
        default_headers: Mapping[str, str] | None = None,
        default_query: Mapping[str, object] | None = None,
        # Configure a custom httpx client. See the [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
        http_client: httpx.AsyncClient | None = None,
        # Enable or disable schema validation for data returned by the API.
        # When enabled an error APIResponseValidationError is raised
        # if the API responds with invalid data for the expected schema.
        #
        # This parameter may be removed or changed in the future.
        # If you rely on this feature, please open a GitHub issue
        # outlining your use-case to help us decide if it should be
        # part of our public interface in the future.
        _strict_response_validation: bool = False,
    ) -> None:
        self.aws_secret_key = aws_secret_key

        self.aws_access_key = aws_access_key

        self.aws_region = _infer_region() if aws_region is None else aws_region
        self.aws_profile = aws_profile

        self.aws_session_token = aws_session_token

        if base_url is None:
            base_url = os.environ.get("ANTHROPIC_BEDROCK_BASE_URL")
        if base_url is None:
            base_url = f"https://bedrock-runtime.{self.aws_region}.amazonaws.com"

        super().__init__(
            version=__version__,
            base_url=base_url,
            timeout=timeout,
            max_retries=max_retries,
            custom_headers=default_headers,
            custom_query=default_query,
            http_client=http_client,
            _strict_response_validation=_strict_response_validation,
        )

        self.messages = AsyncMessages(self)
        self.completions = AsyncCompletions(self)
        self.beta = AsyncBeta(self)

    @override
    def _make_sse_decoder(self) -> AWSEventStreamDecoder:
        return AWSEventStreamDecoder()

    @override
    async def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions:
        return _prepare_options(options)

    @override
    async def _prepare_request(self, request: httpx.Request) -> None:
        from ._auth import get_auth_headers

        data = request.read().decode()

        headers = get_auth_headers(
            method=request.method,
            url=str(request.url),
            headers=request.headers,
            aws_access_key=self.aws_access_key,
            aws_secret_key=self.aws_secret_key,
            aws_session_token=self.aws_session_token,
            region=self.aws_region or "us-east-1",
            profile=self.aws_profile,

Frequently Asked Questions

What is the AsyncAnthropicBedrock class?
AsyncAnthropicBedrock is a class in the anthropic-sdk-python codebase, defined in src/anthropic/lib/bedrock/_client.py.
Where is AsyncAnthropicBedrock defined?
AsyncAnthropicBedrock is defined in src/anthropic/lib/bedrock/_client.py at line 273.
What does AsyncAnthropicBedrock extend?
AsyncAnthropicBedrock extends AsyncAPIClient, NotGiven.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free