Home / File/ test_streaming.py — anthropic-sdk-python Source File

test_streaming.py — anthropic-sdk-python Source File

Architecture documentation for test_streaming.py, a python file in the anthropic-sdk-python codebase. 5 imports, 0 dependents.

File python AnthropicClient Authentication 5 imports 15 functions

Entity Profile

Dependency Diagram

graph LR
  2da79393_445f_321e_d12f_59a347c0de92["test_streaming.py"]
  89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875["typing"]
  2da79393_445f_321e_d12f_59a347c0de92 --> 89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875
  9c26e8a9_1ad2_1174_876a_1fc500ce0eaf["httpx"]
  2da79393_445f_321e_d12f_59a347c0de92 --> 9c26e8a9_1ad2_1174_876a_1fc500ce0eaf
  cde8421b_93c7_41e4_d69d_2a3f1bade2f2["pytest"]
  2da79393_445f_321e_d12f_59a347c0de92 --> cde8421b_93c7_41e4_d69d_2a3f1bade2f2
  d10c5377_2939_0f0b_cc44_8759393f2853["anthropic"]
  2da79393_445f_321e_d12f_59a347c0de92 --> d10c5377_2939_0f0b_cc44_8759393f2853
  4576f658_f93f_2a77_9ff8_4875c797aab6["anthropic._streaming"]
  2da79393_445f_321e_d12f_59a347c0de92 --> 4576f658_f93f_2a77_9ff8_4875c797aab6
  style 2da79393_445f_321e_d12f_59a347c0de92 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

from __future__ import annotations

from typing import Iterator, AsyncIterator

import httpx
import pytest

from anthropic import Anthropic, AsyncAnthropic
from anthropic._streaming import Stream, AsyncStream, ServerSentEvent


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_basic(sync: bool, client: Anthropic, async_client: AsyncAnthropic) -> None:
    def body() -> Iterator[bytes]:
        yield b"event: completion\n"
        yield b'data: {"foo":true}\n'
        yield b"\n"

    iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)

    sse = await iter_next(iterator)
    assert sse.event == "completion"
    assert sse.json() == {"foo": True}

    await assert_empty_iter(iterator)


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_data_missing_event(sync: bool, client: Anthropic, async_client: AsyncAnthropic) -> None:
    def body() -> Iterator[bytes]:
        yield b'data: {"foo":true}\n'
        yield b"\n"

    iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)

    sse = await iter_next(iterator)
    assert sse.event is None
    assert sse.json() == {"foo": True}

    await assert_empty_iter(iterator)


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_event_missing_data(sync: bool, client: Anthropic, async_client: AsyncAnthropic) -> None:
    def body() -> Iterator[bytes]:
        yield b"event: ping\n"
        yield b"\n"

    iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)

    sse = await iter_next(iterator)
    assert sse.event == "ping"
    assert sse.data == ""

    await assert_empty_iter(iterator)


// ... (197 more lines)

Subdomains

Dependencies

  • anthropic
  • anthropic._streaming
  • httpx
  • pytest
  • typing

Frequently Asked Questions

What does test_streaming.py do?
test_streaming.py is a source file in the anthropic-sdk-python codebase, written in python. It belongs to the AnthropicClient domain, Authentication subdomain.
What functions are defined in test_streaming.py?
test_streaming.py defines 15 function(s): assert_empty_iter, iter_next, make_event_iterator, test_basic, test_data_json_escaped_double_new_line, test_data_missing_event, test_event_missing_data, test_isinstance_check, test_multi_byte_character_multiple_chunks, test_multiple_data_lines, and 5 more.
What does test_streaming.py depend on?
test_streaming.py imports 5 module(s): anthropic, anthropic._streaming, httpx, pytest, typing.
Where is test_streaming.py in the architecture?
test_streaming.py is located at tests/test_streaming.py (domain: AnthropicClient, subdomain: Authentication, directory: tests).

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free