Home / Function/ build_events() — anthropic-sdk-python Function Reference

build_events() — anthropic-sdk-python Function Reference

Architecture documentation for the build_events() function in _beta_messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  3df97661_6baf_7419_93d1_ca34ae93ba17["build_events()"]
  1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"]
  3df97661_6baf_7419_93d1_ca34ae93ba17 -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272
  0e363261_728a_0dbe_df13_19e318699314["__stream__()"]
  0e363261_728a_0dbe_df13_19e318699314 -->|calls| 3df97661_6baf_7419_93d1_ca34ae93ba17
  c715fa82_5e69_b855_ef0e_47e7f518ade0["__stream__()"]
  c715fa82_5e69_b855_ef0e_47e7f518ade0 -->|calls| 3df97661_6baf_7419_93d1_ca34ae93ba17
  style 3df97661_6baf_7419_93d1_ca34ae93ba17 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

src/anthropic/lib/streaming/_beta_messages.py lines 336–436

def build_events(
    *,
    event: BetaRawMessageStreamEvent,
    message_snapshot: ParsedBetaMessage[ResponseFormatT],
) -> list[ParsedBetaMessageStreamEvent[ResponseFormatT]]:
    events_to_fire: list[ParsedBetaMessageStreamEvent[ResponseFormatT]] = []

    if event.type == "message_start":
        events_to_fire.append(event)
    elif event.type == "message_delta":
        events_to_fire.append(event)
    elif event.type == "message_stop":
        events_to_fire.append(
            build(ParsedBetaMessageStopEvent[ResponseFormatT], type="message_stop", message=message_snapshot)
        )
    elif event.type == "content_block_start":
        events_to_fire.append(event)
    elif event.type == "content_block_delta":
        events_to_fire.append(event)

        content_block = message_snapshot.content[event.index]
        if event.delta.type == "text_delta":
            if content_block.type == "text":
                events_to_fire.append(
                    build(
                        ParsedBetaTextEvent,
                        type="text",
                        text=event.delta.text,
                        snapshot=content_block.text,
                    )
                )
        elif event.delta.type == "input_json_delta":
            if content_block.type == "tool_use" or content_block.type == "mcp_tool_use":
                events_to_fire.append(
                    build(
                        BetaInputJsonEvent,
                        type="input_json",
                        partial_json=event.delta.partial_json,
                        snapshot=content_block.input,
                    )
                )
        elif event.delta.type == "citations_delta":
            if content_block.type == "text":
                events_to_fire.append(
                    build(
                        BetaCitationEvent,
                        type="citation",
                        citation=event.delta.citation,
                        snapshot=content_block.citations or [],
                    )
                )
        elif event.delta.type == "thinking_delta":
            if content_block.type == "thinking":
                events_to_fire.append(
                    build(
                        BetaThinkingEvent,
                        type="thinking",
                        thinking=event.delta.thinking,
                        snapshot=content_block.thinking,
                    )
                )
        elif event.delta.type == "signature_delta":
            if content_block.type == "thinking":
                events_to_fire.append(
                    build(
                        BetaSignatureEvent,
                        type="signature",
                        signature=content_block.signature,
                    )
                )
            pass
        elif event.delta.type == "compaction_delta":
            if content_block.type == "compaction":
                events_to_fire.append(
                    build(
                        BetaCompactionEvent,
                        type="compaction",
                        content=content_block.content,
                    )
                )
        else:

Subdomains

Frequently Asked Questions

What does build_events() do?
build_events() is a function in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is build_events() defined?
build_events() is defined in src/anthropic/lib/streaming/_beta_messages.py at line 336.
What calls build_events()?
build_events() is called by 2 function(s): __stream__, __stream__.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free