Claude API Streaming: Real-Time Patterns and SSE
By Learnia AI Research Team
Claude API Streaming: Real-Time Patterns and SSE
๐ Last updated: March 10, 2026 โ Covers text streaming, tool use, extended thinking and reconnection patterns.
๐ Pillar article: Claude API: Complete Guide
Why Streaming Matters
Streaming fundamentally transforms the user experience of AI applications:
- โTime To First Byte (TTFB): the first token arrives in ~300 ms instead of waiting for the complete response
- โProgressive feedback: users see the response being built in real time
- โEarly cancellation: ability to interrupt an irrelevant generation
- โMemory management: no need to store the complete response in memory
The Server-Sent Events (SSE) Protocol
The Claude API uses the Server-Sent Events (SSE) protocol for streaming. Unlike WebSockets (bidirectional), SSE is a unidirectional flow from server to client over standard HTTP.
Each SSE event follows this format:
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}
Event Anatomy
| Event | When | Key Content |
|---|---|---|
message_start | Message begins | id, model, usage.input_tokens |
content_block_start | Block begins | type (text, tool_use, thinking) |
content_block_delta | Each token/fragment | text_delta, input_json_delta or thinking_delta |
content_block_stop | Block ends | Index of completed block |
message_delta | Message ends | stop_reason, usage.output_tokens |
message_stop | Stream finished | โ |
Basic Text Streaming
With the Python SDK (synchronous)
import anthropic
client = anthropic.Anthropic()
# Basic streaming with context manager
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain the SSE protocol in 3 paragraphs."}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Retrieve the final message with metadata
final_message = stream.get_final_message()
print(f"\n\nTokens used: {final_message.usage.input_tokens} in / {final_message.usage.output_tokens} out")
With the Python SDK (asynchronous)
import asyncio
import anthropic
async def stream_response():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "What are the advantages of streaming?"}
]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
message = await stream.get_final_message()
print(f"\nTokens: {message.usage.output_tokens}")
asyncio.run(stream_response())
With the Raw HTTP API (no SDK)
import httpx
import json
def stream_raw():
url = "https://api.anthropic.com/v1/messages"
headers = {
"x-api-key": "YOUR_API_KEY",
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"stream": True,
"messages": [{"role": "user", "content": "Hello Claude!"}]
}
with httpx.stream("POST", url, json=payload, headers=headers, timeout=60) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data["type"] == "content_block_delta":
delta = data["delta"]
if delta["type"] == "text_delta":
print(delta["text"], end="", flush=True)
elif data["type"] == "message_stop":
print("\n[Stream finished]")
stream_raw()
Streaming with Tool Use
When Claude uses tools during streaming, the event flow changes. Instead of text_delta, you receive tool_use blocks with partial JSON fragments.
๐ See also: Claude Tool Use: Complete Guide
Handling Tool Use in Streaming
import anthropic
import json
client = anthropic.Anthropic()
tools = [
{
"name": "get_weather",
"description": "Gets the current weather for a given city.",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"], "default": "celsius"}
},
"required": ["city"]
}
}
]
# Streaming with tools
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in Paris?"}]
) as stream:
current_tool = None
tool_input_json = ""
for event in stream:
if event.type == "content_block_start":
if hasattr(event.content_block, "type") and event.content_block.type == "tool_use":
current_tool = event.content_block.name
tool_input_json = ""
print(f"\n๐ง Tool call: {current_tool}")
elif event.type == "content_block_delta":
if hasattr(event.delta, "type"):
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
tool_input_json += event.delta.partial_json
elif event.type == "content_block_stop":
if current_tool:
params = json.loads(tool_input_json)
print(f"\n Parameters: {params}")
current_tool = None
Accumulating Partial JSON
The input_json_delta sends JSON fragments that must be accumulated:
# Fragments received sequentially:
# {"partial_json": "{\"ci"}
# {"partial_json": "ty\":"}
# {"partial_json": " \"Par"}
# {"partial_json": "is\"}"}
# After concatenation โ {"city": "Paris"}
Streaming with Extended Thinking
When you enable extended thinking, Claude emits a new block type: thinking. Thinking tokens arrive via thinking_delta before the response tokens.
๐ See also: Claude Extended Thinking: Complete Guide
import anthropic
client = anthropic.Anthropic()
# Streaming with extended thinking
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{"role": "user", "content": "Solve this: if f(x) = xยณ - 6xยฒ + 11x - 6, find all roots."}]
) as stream:
current_block_type = None
for event in stream:
if event.type == "content_block_start":
block_type = event.content_block.type
if block_type == "thinking":
current_block_type = "thinking"
print("๐ญ Thinking in progress...")
elif block_type == "text":
current_block_type = "text"
print("\n๐ Response:")
elif event.type == "content_block_delta":
if hasattr(event.delta, "type"):
if event.delta.type == "thinking_delta":
# Show or hide thinking based on desired UX
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
Event Flow with Thinking
event: message_start
event: content_block_start โ {"type": "thinking", "thinking": ""}
event: content_block_delta โ {"type": "thinking_delta", "thinking": "Let's analyze..."}
event: content_block_delta โ {"type": "thinking_delta", "thinking": "f(x) = (x-1)..."}
event: content_block_stop
event: content_block_start โ {"type": "text", "text": ""}
event: content_block_delta โ {"type": "text_delta", "text": "The roots are..."}
event: content_block_stop
event: message_delta
event: message_stop
Token Counting During Streaming
Streaming lets you track token consumption in real time:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain SSE streaming."}]
) as stream:
for event in stream:
pass # Consume the stream
# Access final statistics
final = stream.get_final_message()
usage = final.usage
print(f"Input tokens : {usage.input_tokens}")
print(f"Output tokens : {usage.output_tokens}")
# With extended thinking, thinking tokens are also counted
if hasattr(usage, "cache_creation_input_tokens"):
print(f"Cache creation tokens: {usage.cache_creation_input_tokens}")
if hasattr(usage, "cache_read_input_tokens"):
print(f"Cache read tokens : {usage.cache_read_input_tokens}")
The message_delta event (before message_stop) contains the stop_reason and final output_tokens, enabling consumption tracking during the stream.
Error Handling and Reconnection
Streaming Error Types
| Code | Error | Recommended Action |
|---|---|---|
| 429 | overloaded_error | Retry with exponential backoff |
| 529 | api_error (overloaded) | Retry after 30-60 seconds |
| 408 | Connection timeout | Reconnect immediately |
| โ | Network interruption | Retry with full context |
Robust Reconnection Pattern
import anthropic
import time
def stream_with_retry(messages, max_retries=3):
client = anthropic.Anthropic()
accumulated_text = ""
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
accumulated_text += text
print(text, end="", flush=True)
# Stream completed successfully
return accumulated_text
except anthropic.APIStatusError as e:
if e.status_code == 429:
wait = 2 ** attempt * 5 # 5s, 10s, 20s
print(f"\nโณ Rate limit โ retrying in {wait}s...")
time.sleep(wait)
elif e.status_code >= 500:
wait = 2 ** attempt * 10
print(f"\nโ ๏ธ Server error โ retrying in {wait}s...")
time.sleep(wait)
else:
raise # Client error, don't retry
except anthropic.APIConnectionError:
wait = 2 ** attempt * 3
print(f"\n๐ Connection lost โ retrying in {wait}s...")
time.sleep(wait)
raise Exception("Failed after all retries")
Handling Event-Level Errors
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=messages,
) as stream:
for event in stream:
if event.type == "error":
error = event.error
print(f"Stream error: {error.type} โ {error.message}")
if error.type == "overloaded_error":
# Server is overloaded, stream will terminate
break
elif event.type == "content_block_delta":
if hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
Python SDK Helpers
The SDK offers high-level helpers to simplify streaming:
Custom Event Handlers
import anthropic
client = anthropic.Anthropic()
# Use high-level events
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Tell a short story."}]
) as stream:
# text_stream: iterator over text fragments only
for text in stream.text_stream:
print(text, end="", flush=True)
# Collect all text at once
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Summarize SSE streaming."}]
) as stream:
# get_final_text() waits for completion and returns full text
full_text = stream.get_final_text()
print(full_text)
# Access the complete message with metadata
final_message = stream.get_final_message()
print(f"Stop reason: {final_message.stop_reason}")
print(f"Usage: {final_message.usage}")
Async Streaming with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.post("/chat")
async def chat(user_message: str):
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": user_message}]
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Pattern: Real-Time Chat
Here's a complete pattern for building a real-time chat with streaming:
import anthropic
from dataclasses import dataclass, field
@dataclass
class ChatSession:
client: anthropic.Anthropic = field(default_factory=anthropic.Anthropic)
messages: list = field(default_factory=list)
model: str = "claude-sonnet-4-20250514"
def send(self, user_input: str) -> str:
"""Send a message and stream the response."""
self.messages.append({"role": "user", "content": user_input})
full_response = ""
with self.client.messages.stream(
model=self.model,
max_tokens=4096,
messages=self.messages,
) as stream:
for text in stream.text_stream:
full_response += text
print(text, end="", flush=True)
print() # New line after stream
self.messages.append({"role": "assistant", "content": full_response})
return full_response
# Usage
chat = ChatSession()
chat.send("Hello! What is SSE streaming?")
chat.send("Can you give a Python example?")
Frontend Side (JavaScript)
async function streamChat(message, onToken) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
onToken(line.slice(6));
}
}
}
}
// Usage with React
streamChat("Hello Claude", (token) => {
setResponse(prev => prev + token);
});
๐ See also: AI Agent Architecture Patterns with Claude for multi-step patterns with streaming.
Best Practices
- โ
Always set a timeout: configure a client-side timeout to avoid hanging connections
- โ
Smart buffering: for web interfaces, accumulate a few tokens before updating the DOM (prevents excessive reflows)
- โ
Clean cancellation: close the stream with
stream.close()or exit the context manager to release resources - โ
Token monitoring: track
input_tokensandoutput_tokensto control costs in production - โ
Conditional streaming: only use streaming when the UX justifies it โ for background batch calls, a complete response is simpler
๐ See also: Claude Structured Outputs: Complete Guide for combining streaming with structured outputs.
FAQ
How do I enable streaming with the Claude API?
Add stream: true to your API request or use client.messages.stream() with the Python SDK. The API returns SSE events instead of a single JSON response.
What SSE event types does Claude return?
The main events are: message_start, content_block_start, content_block_delta (with text_delta, input_json_delta or thinking_delta), content_block_stop, message_delta and message_stop.
Does streaming work with Claude's tool use?
Yes. Tool parameters arrive via content_block_delta with input_json_delta containing partial JSON to accumulate.
How do I handle errors and reconnections during streaming?
The Python SDK handles retries automatically with exponential backoff. For fine-grained control, catch APIStatusError (429, 5xx) and APIConnectionError, then retry with increasing delay.
Weekly AI Insights
Tools, techniques & news โ curated for AI practitioners. Free, no spam.
Free, no spam. Unsubscribe anytime.
โRelated Articles
FAQ
How do I enable streaming with the Claude API?+
Add the stream: true parameter to your API request. The API then returns Server-Sent Events (SSE) instead of a single JSON response, enabling token-by-token display.
What SSE event types does Claude return?+
The main events are: message_start, content_block_start, content_block_delta (with text_delta or input_json_delta), content_block_stop, message_delta and message_stop. With extended thinking, you also receive thinking_delta.
Does streaming work with Claude's tool use?+
Yes. During streaming tool use, Claude emits content_block_start with type tool_use, then content_block_delta with input_json_delta containing partial JSON of the tool parameters.
How do I handle errors and reconnections during streaming?+
Implement a retry mechanism with exponential backoff. Catch connection errors, overloaded_error (429) and network errors. The Python SDK handles retries automatically with client.messages.create(stream=True).