test_streaming.py — anthropic-sdk-python Source File
Architecture documentation for test_streaming.py, a python file in the anthropic-sdk-python codebase. 5 imports, 0 dependents.
Entity Profile
Dependency Diagram
graph LR 2da79393_445f_321e_d12f_59a347c0de92["test_streaming.py"] 89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875["typing"] 2da79393_445f_321e_d12f_59a347c0de92 --> 89ddcdd7_3ae1_4c7b_41bb_9f1e25f16875 9c26e8a9_1ad2_1174_876a_1fc500ce0eaf["httpx"] 2da79393_445f_321e_d12f_59a347c0de92 --> 9c26e8a9_1ad2_1174_876a_1fc500ce0eaf cde8421b_93c7_41e4_d69d_2a3f1bade2f2["pytest"] 2da79393_445f_321e_d12f_59a347c0de92 --> cde8421b_93c7_41e4_d69d_2a3f1bade2f2 d10c5377_2939_0f0b_cc44_8759393f2853["anthropic"] 2da79393_445f_321e_d12f_59a347c0de92 --> d10c5377_2939_0f0b_cc44_8759393f2853 4576f658_f93f_2a77_9ff8_4875c797aab6["anthropic._streaming"] 2da79393_445f_321e_d12f_59a347c0de92 --> 4576f658_f93f_2a77_9ff8_4875c797aab6 style 2da79393_445f_321e_d12f_59a347c0de92 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
from __future__ import annotations
from typing import Iterator, AsyncIterator
import httpx
import pytest
from anthropic import Anthropic, AsyncAnthropic
from anthropic._streaming import Stream, AsyncStream, ServerSentEvent
@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_basic(sync: bool, client: Anthropic, async_client: AsyncAnthropic) -> None:
def body() -> Iterator[bytes]:
yield b"event: completion\n"
yield b'data: {"foo":true}\n'
yield b"\n"
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
sse = await iter_next(iterator)
assert sse.event == "completion"
assert sse.json() == {"foo": True}
await assert_empty_iter(iterator)
@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_data_missing_event(sync: bool, client: Anthropic, async_client: AsyncAnthropic) -> None:
def body() -> Iterator[bytes]:
yield b'data: {"foo":true}\n'
yield b"\n"
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
sse = await iter_next(iterator)
assert sse.event is None
assert sse.json() == {"foo": True}
await assert_empty_iter(iterator)
@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_event_missing_data(sync: bool, client: Anthropic, async_client: AsyncAnthropic) -> None:
def body() -> Iterator[bytes]:
yield b"event: ping\n"
yield b"\n"
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
sse = await iter_next(iterator)
assert sse.event == "ping"
assert sse.data == ""
await assert_empty_iter(iterator)
// ... (197 more lines)
Domain
Subdomains
Functions
- assert_empty_iter()
- iter_next()
- make_event_iterator()
- test_basic()
- test_data_json_escaped_double_new_line()
- test_data_missing_event()
- test_event_missing_data()
- test_isinstance_check()
- test_multi_byte_character_multiple_chunks()
- test_multiple_data_lines()
- test_multiple_data_lines_with_empty_line()
- test_multiple_events()
- test_multiple_events_with_data()
- test_special_new_line_character()
- to_aiter()
Dependencies
- anthropic
- anthropic._streaming
- httpx
- pytest
- typing
Source
Frequently Asked Questions
What does test_streaming.py do?
test_streaming.py is a source file in the anthropic-sdk-python codebase, written in python. It belongs to the AnthropicClient domain, Authentication subdomain.
What functions are defined in test_streaming.py?
test_streaming.py defines 15 function(s): assert_empty_iter, iter_next, make_event_iterator, test_basic, test_data_json_escaped_double_new_line, test_data_missing_event, test_event_missing_data, test_isinstance_check, test_multi_byte_character_multiple_chunks, test_multiple_data_lines, and 5 more.
What does test_streaming.py depend on?
test_streaming.py imports 5 module(s): anthropic, anthropic._streaming, httpx, pytest, typing.
Where is test_streaming.py in the architecture?
test_streaming.py is located at tests/test_streaming.py (domain: AnthropicClient, subdomain: Authentication, directory: tests).
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free