accumulate_event() — anthropic-sdk-python Function Reference
Architecture documentation for the accumulate_event() function in _beta_messages.py from the anthropic-sdk-python codebase.
Entity Profile
Dependency Diagram
graph TD a902fa3e_1643_6ad3_983d_fb2f08bc9b41["accumulate_event()"] 1e66b543_4a7a_c49e_b6a6_6343193fb272["_beta_messages.py"] a902fa3e_1643_6ad3_983d_fb2f08bc9b41 -->|defined in| 1e66b543_4a7a_c49e_b6a6_6343193fb272 0e363261_728a_0dbe_df13_19e318699314["__stream__()"] 0e363261_728a_0dbe_df13_19e318699314 -->|calls| a902fa3e_1643_6ad3_983d_fb2f08bc9b41 c715fa82_5e69_b855_ef0e_47e7f518ade0["__stream__()"] c715fa82_5e69_b855_ef0e_47e7f518ade0 -->|calls| a902fa3e_1643_6ad3_983d_fb2f08bc9b41 style a902fa3e_1643_6ad3_983d_fb2f08bc9b41 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
src/anthropic/lib/streaming/_beta_messages.py lines 448–555
def accumulate_event(
*,
event: BetaRawMessageStreamEvent,
current_snapshot: ParsedBetaMessage[ResponseFormatT] | None,
request_headers: httpx.Headers,
output_format: ResponseFormatT | NotGiven = NOT_GIVEN,
) -> ParsedBetaMessage[ResponseFormatT]:
if not isinstance(cast(Any, event), BaseModel):
event = cast( # pyright: ignore[reportUnnecessaryCast]
BetaRawMessageStreamEvent,
construct_type_unchecked(
type_=cast(Type[BetaRawMessageStreamEvent], BetaRawMessageStreamEvent),
value=event,
),
)
if not isinstance(cast(Any, event), BaseModel):
raise TypeError(
f"Unexpected event runtime type, after deserialising twice - {event} - {builtins.type(event)}"
)
if current_snapshot is None:
if event.type == "message_start":
return cast(
ParsedBetaMessage[ResponseFormatT], ParsedBetaMessage.construct(**cast(Any, event.message.to_dict()))
)
raise RuntimeError(f'Unexpected event order, got {event.type} before "message_start"')
if event.type == "content_block_start":
# TODO: check index
current_snapshot.content.append(
cast(
Any, # Pydantic does not support generic unions at runtime
construct_type(type_=ParsedBetaContentBlock, value=event.content_block.to_dict()),
),
)
elif event.type == "content_block_delta":
content = current_snapshot.content[event.index]
if event.delta.type == "text_delta":
if content.type == "text":
content.text += event.delta.text
elif event.delta.type == "input_json_delta":
if isinstance(content, TRACKS_TOOL_INPUT):
from jiter import from_json
# we need to keep track of the raw JSON string as well so that we can
# re-parse it for each delta, for now we just store it as an untyped
# property on the snapshot
json_buf = cast(bytes, getattr(content, JSON_BUF_PROPERTY, b""))
json_buf += bytes(event.delta.partial_json, "utf-8")
if json_buf:
try:
anthropic_beta = request_headers.get("anthropic-beta", "") if request_headers else ""
if "fine-grained-tool-streaming-2025-05-14" in anthropic_beta:
content.input = from_json(json_buf, partial_mode="trailing-strings")
else:
content.input = from_json(json_buf, partial_mode=True)
except ValueError as e:
raise ValueError(
f"Unable to parse tool parameter JSON from model. Please retry your request or adjust your prompt. Error: {e}. JSON: {json_buf.decode('utf-8')}"
) from e
setattr(content, JSON_BUF_PROPERTY, json_buf)
elif event.delta.type == "citations_delta":
if content.type == "text":
if not content.citations:
content.citations = [event.delta.citation]
else:
content.citations.append(event.delta.citation)
elif event.delta.type == "thinking_delta":
if content.type == "thinking":
content.thinking += event.delta.thinking
elif event.delta.type == "signature_delta":
if content.type == "thinking":
content.signature = event.delta.signature
elif event.delta.type == "compaction_delta":
if content.type == "compaction":
content.content = event.delta.content
else:
Domain
Subdomains
Called By
Source
Frequently Asked Questions
What does accumulate_event() do?
accumulate_event() is a function in the anthropic-sdk-python codebase, defined in src/anthropic/lib/streaming/_beta_messages.py.
Where is accumulate_event() defined?
accumulate_event() is defined in src/anthropic/lib/streaming/_beta_messages.py at line 448.
What calls accumulate_event()?
accumulate_event() is called by 2 function(s): __stream__, __stream__.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free