Home / Function/ accumulate_event() — anthropic-sdk-python Function Reference

accumulate_event() — anthropic-sdk-python Function Reference

Architecture documentation for the accumulate_event() function in _messages.py from the anthropic-sdk-python codebase.

Entity Profile

Dependency Diagram

graph TD
  7a9a7646_ea92_5eb4_1131_e001c6adde71["accumulate_event()"]
  738ef0a0_4e16_ebf0_9c79_a17daa58ddef["_messages.py"]
  7a9a7646_ea92_5eb4_1131_e001c6adde71 -->|defined in| 738ef0a0_4e16_ebf0_9c79_a17daa58ddef
  ba051e44_a190_9141_551c_ce3d77cfeefc["__stream__()"]
  ba051e44_a190_9141_551c_ce3d77cfeefc -->|calls| 7a9a7646_ea92_5eb4_1131_e001c6adde71
  cad54414_ccd6_b061_6332_68338c7a97aa["__stream__()"]
  cad54414_ccd6_b061_6332_68338c7a97aa -->|calls| 7a9a7646_ea92_5eb4_1131_e001c6adde71
  style 7a9a7646_ea92_5eb4_1131_e001c6adde71 fill:#6366f1,stroke:#818cf8,color:#fff

Relationship Graph

Source Code

src/anthropic/lib/streaming/_messages.py lines 433–518

def accumulate_event(
    *,
    event: RawMessageStreamEvent,
    current_snapshot: ParsedMessage[ResponseFormatT] | None,
    output_format: ResponseFormatT | NotGiven = NOT_GIVEN,
) -> ParsedMessage[ResponseFormatT]:
    if not isinstance(cast(Any, event), BaseModel):
        event = cast(  # pyright: ignore[reportUnnecessaryCast]
            RawMessageStreamEvent,
            construct_type_unchecked(
                type_=cast(Type[RawMessageStreamEvent], RawMessageStreamEvent),
                value=event,
            ),
        )
        if not isinstance(cast(Any, event), BaseModel):
            raise TypeError(f"Unexpected event runtime type, after deserialising twice - {event} - {type(event)}")

    if current_snapshot is None:
        if event.type == "message_start":
            return cast(ParsedMessage[ResponseFormatT], ParsedMessage.construct(**cast(Any, event.message.to_dict())))

        raise RuntimeError(f'Unexpected event order, got {event.type} before "message_start"')

    if event.type == "content_block_start":
        # TODO: check index
        current_snapshot.content.append(
            cast(
                Any,  # Pydantic does not support generic unions at runtime
                construct_type(type_=ParsedContentBlock, value=event.content_block.model_dump()),
            ),
        )
    elif event.type == "content_block_delta":
        content = current_snapshot.content[event.index]
        if event.delta.type == "text_delta":
            if content.type == "text":
                content.text += event.delta.text
        elif event.delta.type == "input_json_delta":
            if isinstance(content, TRACKS_TOOL_INPUT):
                from jiter import from_json

                # we need to keep track of the raw JSON string as well so that we can
                # re-parse it for each delta, for now we just store it as an untyped
                # property on the snapshot
                json_buf = cast(bytes, getattr(content, JSON_BUF_PROPERTY, b""))
                json_buf += bytes(event.delta.partial_json, "utf-8")

                if json_buf:
                    content.input = from_json(json_buf, partial_mode=True)

                setattr(content, JSON_BUF_PROPERTY, json_buf)
        elif event.delta.type == "citations_delta":
            if content.type == "text":
                if not content.citations:
                    content.citations = [event.delta.citation]
                else:
                    content.citations.append(event.delta.citation)
        elif event.delta.type == "thinking_delta":
            if content.type == "thinking":
                content.thinking += event.delta.thinking
        elif event.delta.type == "signature_delta":
            if content.type == "thinking":
                content.signature = event.delta.signature
        else:
            # we only want exhaustive checking for linters, not at runtime
            if TYPE_CHECKING:  # type: ignore[unreachable]
                assert_never(event.delta)
    elif event.type == "content_block_stop":
        content_block = current_snapshot.content[event.index]
        if content_block.type == "text" and is_given(output_format):
            content_block.parsed_output = parse_text(content_block.text, output_format)
    elif event.type == "message_delta":
        current_snapshot.stop_reason = event.delta.stop_reason
        current_snapshot.stop_sequence = event.delta.stop_sequence
        current_snapshot.usage.output_tokens = event.usage.output_tokens

        # Update other usage fields if they exist in the event
        if event.usage.input_tokens is not None:
            current_snapshot.usage.input_tokens = event.usage.input_tokens
        if event.usage.cache_creation_input_tokens is not None:
            current_snapshot.usage.cache_creation_input_tokens = event.usage.cache_creation_input_tokens
        if event.usage.cache_read_input_tokens is not None:

Subdomains

Frequently Asked Questions

What does accumulate_event() do?
accumulate_event() is a function in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_messages.py.
Where is accumulate_event() defined?
accumulate_event() is defined in src/anthropic/lib/streaming/_messages.py at line 433.
What calls accumulate_event()?
accumulate_event() is called by 2 function(s): __stream__, __stream__.

Analyze Your Own Codebase

Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.

Try Supermodel Free