build_events() — anthropic-sdk-python Function Reference
Architecture documentation for the build_events() function in _beta_messages.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD 3df97661_6baf_7419_93d1_ca34ae93ba17["build_events()"] 1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"] 3df97661_6baf_7419_93d1_ca34ae93ba17 -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272 0e363261_728a_0dbe_df13_19e318699314["__stream__()"] 0e363261_728a_0dbe_df13_19e318699314 -->|calls| 3df97661_6baf_7419_93d1_ca34ae93ba17 c715fa82_5e69_b855_ef0e_47e7f518ade0["__stream__()"] c715fa82_5e69_b855_ef0e_47e7f518ade0 -->|calls| 3df97661_6baf_7419_93d1_ca34ae93ba17 style 3df97661_6baf_7419_93d1_ca34ae93ba17 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
src/anthropic/lib/streaming/_beta_messages.py lines 336–436
def build_events(
*,
event: BetaRawMessageStreamEvent,
message_snapshot: ParsedBetaMessage[ResponseFormatT],
) -> list[ParsedBetaMessageStreamEvent[ResponseFormatT]]:
events_to_fire: list[ParsedBetaMessageStreamEvent[ResponseFormatT]] = []
if event.type == "message_start":
events_to_fire.append(event)
elif event.type == "message_delta":
events_to_fire.append(event)
elif event.type == "message_stop":
events_to_fire.append(
build(ParsedBetaMessageStopEvent[ResponseFormatT], type="message_stop", message=message_snapshot)
)
elif event.type == "content_block_start":
events_to_fire.append(event)
elif event.type == "content_block_delta":
events_to_fire.append(event)
content_block = message_snapshot.content[event.index]
if event.delta.type == "text_delta":
if content_block.type == "text":
events_to_fire.append(
build(
ParsedBetaTextEvent,
type="text",
text=event.delta.text,
snapshot=content_block.text,
)
)
elif event.delta.type == "input_json_delta":
if content_block.type == "tool_use" or content_block.type == "mcp_tool_use":
events_to_fire.append(
build(
BetaInputJsonEvent,
type="input_json",
partial_json=event.delta.partial_json,
snapshot=content_block.input,
)
)
elif event.delta.type == "citations_delta":
if content_block.type == "text":
events_to_fire.append(
build(
BetaCitationEvent,
type="citation",
citation=event.delta.citation,
snapshot=content_block.citations or [],
)
)
elif event.delta.type == "thinking_delta":
if content_block.type == "thinking":
events_to_fire.append(
build(
BetaThinkingEvent,
type="thinking",
thinking=event.delta.thinking,
snapshot=content_block.thinking,
)
)
elif event.delta.type == "signature_delta":
if content_block.type == "thinking":
events_to_fire.append(
build(
BetaSignatureEvent,
type="signature",
signature=content_block.signature,
)
)
pass
elif event.delta.type == "compaction_delta":
if content_block.type == "compaction":
events_to_fire.append(
build(
BetaCompactionEvent,
type="compaction",
content=content_block.content,
)
)
else:
Domain
Subdomains
Called By
Source
Frequently Asked Questions
What does build_events() do?
build_events() is a function in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is build_events() defined?
build_events() is defined in src/anthropic/lib/streaming/_beta_messages.py at line 336.
What calls build_events()?
build_events() is called by 2 function(s): __stream__, __stream__.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free