test_fine_grained_tool_streaming() — langchain Function Reference
Architecture documentation for the test_fine_grained_tool_streaming() function in test_chat_models.py from the langchain codebase.
Entity Profile
Dependency Diagram
graph TD 53446e15_6f27_2173_786c_8ea7f8c0b466["test_fine_grained_tool_streaming()"] f27640dd_3870_5548_d153_f9504ae1021f["test_chat_models.py"] 53446e15_6f27_2173_786c_8ea7f8c0b466 -->|defined in| f27640dd_3870_5548_d153_f9504ae1021f style 53446e15_6f27_2173_786c_8ea7f8c0b466 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
libs/partners/anthropic/tests/integration_tests/test_chat_models.py lines 2308–2409
def test_fine_grained_tool_streaming() -> None:
"""Test fine-grained tool streaming reduces latency for tool parameter streaming.
Fine-grained tool streaming enables Claude to stream tool parameter values.
https://platform.claude.com/docs/en/agents-and-tools/tool-use/fine-grained-tool-streaming
"""
llm = ChatAnthropic(
model=MODEL_NAME, # type: ignore[call-arg]
temperature=0,
betas=["fine-grained-tool-streaming-2025-05-14"],
)
# Define a tool that requires a longer text parameter
tool_definition = {
"name": "write_document",
"description": "Write a document with the given content",
"input_schema": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Document title"},
"content": {
"type": "string",
"description": "The full document content",
},
},
"required": ["title", "content"],
},
}
llm_with_tools = llm.bind_tools([tool_definition])
query = (
"Write a document about the benefits of streaming APIs. "
"Include at least 3 paragraphs."
)
# Test streaming with fine-grained tool streaming
first = True
chunks: list[BaseMessage | BaseMessageChunk] = []
tool_call_chunks = []
for chunk in llm_with_tools.stream(query):
chunks.append(chunk)
if first:
gathered = chunk
first = False
else:
gathered = gathered + chunk # type: ignore[assignment]
# Collect tool call chunks
tool_call_chunks.extend(
[
block
for block in chunk.content_blocks
if block["type"] == "tool_call_chunk"
]
)
# Verify we got chunks
assert len(chunks) > 1
# Verify final message has tool call
assert isinstance(gathered, AIMessageChunk)
assert isinstance(gathered.tool_calls, list)
assert len(gathered.tool_calls) >= 1
# Find the write_document tool call
write_doc_call = None
for tool_call in gathered.tool_calls:
if tool_call["name"] == "write_document":
write_doc_call = tool_call
break
assert write_doc_call is not None, "write_document tool call not found"
assert isinstance(write_doc_call["args"], dict)
assert "title" in write_doc_call["args"]
assert "content" in write_doc_call["args"]
assert (
len(write_doc_call["args"]["content"]) > 100
) # Should have substantial content
Domain
Subdomains
Source
Frequently Asked Questions
What does test_fine_grained_tool_streaming() do?
test_fine_grained_tool_streaming() is a function in the langchain codebase, defined in libs/partners/anthropic/tests/integration_tests/test_chat_models.py.
Where is test_fine_grained_tool_streaming() defined?
test_fine_grained_tool_streaming() is defined in libs/partners/anthropic/tests/integration_tests/test_chat_models.py at line 2308.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free