build_events() — anthropic-sdk-python Function Reference
Architecture documentation for the build_events() function in _messages.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD 65667782_04ce_8584_0e7c_871a0bb8dd1c["build_events()"] 738ef0a0_4e16_ebf0_9c79_a17daa58ddef["_messages.py"] 65667782_04ce_8584_0e7c_871a0bb8dd1c -->|defined in| 738ef0a0_4e16_ebf0_9c79_a17daa58ddef ba051e44_a190_9141_551c_ce3d77cfeefc["__stream__()"] ba051e44_a190_9141_551c_ce3d77cfeefc -->|calls| 65667782_04ce_8584_0e7c_871a0bb8dd1c cad54414_ccd6_b061_6332_68338c7a97aa["__stream__()"] cad54414_ccd6_b061_6332_68338c7a97aa -->|calls| 65667782_04ce_8584_0e7c_871a0bb8dd1c style 65667782_04ce_8584_0e7c_871a0bb8dd1c fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
src/anthropic/lib/streaming/_messages.py lines 331–422
def build_events(
*,
event: RawMessageStreamEvent,
message_snapshot: ParsedMessage[ResponseFormatT],
) -> list[ParsedMessageStreamEvent[ResponseFormatT]]:
events_to_fire: list[ParsedMessageStreamEvent[ResponseFormatT]] = []
if event.type == "message_start":
events_to_fire.append(event)
elif event.type == "message_delta":
events_to_fire.append(event)
elif event.type == "message_stop":
events_to_fire.append(
build(ParsedMessageStopEvent[ResponseFormatT], type="message_stop", message=message_snapshot)
)
elif event.type == "content_block_start":
events_to_fire.append(event)
elif event.type == "content_block_delta":
events_to_fire.append(event)
content_block = message_snapshot.content[event.index]
if event.delta.type == "text_delta":
if content_block.type == "text":
events_to_fire.append(
build(
TextEvent,
type="text",
text=event.delta.text,
snapshot=content_block.text,
)
)
elif event.delta.type == "input_json_delta":
if content_block.type == "tool_use":
events_to_fire.append(
build(
InputJsonEvent,
type="input_json",
partial_json=event.delta.partial_json,
snapshot=content_block.input,
)
)
elif event.delta.type == "citations_delta":
if content_block.type == "text":
events_to_fire.append(
build(
CitationEvent,
type="citation",
citation=event.delta.citation,
snapshot=content_block.citations or [],
)
)
elif event.delta.type == "thinking_delta":
if content_block.type == "thinking":
events_to_fire.append(
build(
ThinkingEvent,
type="thinking",
thinking=event.delta.thinking,
snapshot=content_block.thinking,
)
)
elif event.delta.type == "signature_delta":
if content_block.type == "thinking":
events_to_fire.append(
build(
SignatureEvent,
type="signature",
signature=content_block.signature,
)
)
pass
else:
# we only want exhaustive checking for linters, not at runtime
if TYPE_CHECKING: # type: ignore[unreachable]
assert_never(event.delta)
elif event.type == "content_block_stop":
content_block = message_snapshot.content[event.index]
event_to_fire = build(
ParsedContentBlockStopEvent,
type="content_block_stop",
Domain
Subdomains
Defined In
Called By
Source
Frequently Asked Questions
What does build_events() do?
build_events() is a function in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_messages.py.
Where is build_events() defined?
build_events() is defined in src/anthropic/lib/streaming/_messages.py at line 331.
What calls build_events()?
build_events() is called by 2 function(s): __stream__, __stream__.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free