Back to all articles
12 MIN READ

Claude API Streaming: Real-Time Patterns and SSE

By Learnia AI Research Team

Claude API Streaming: Real-Time Patterns and SSE

๐Ÿ“… Last updated: March 10, 2026 โ€” Covers text streaming, tool use, extended thinking and reconnection patterns.

๐Ÿ”— Pillar article: Claude API: Complete Guide


Why Streaming Matters

Streaming fundamentally transforms the user experience of AI applications:

  • โ†’Time To First Byte (TTFB): the first token arrives in ~300 ms instead of waiting for the complete response
  • โ†’Progressive feedback: users see the response being built in real time
  • โ†’Early cancellation: ability to interrupt an irrelevant generation
  • โ†’Memory management: no need to store the complete response in memory

The Server-Sent Events (SSE) Protocol

The Claude API uses the Server-Sent Events (SSE) protocol for streaming. Unlike WebSockets (bidirectional), SSE is a unidirectional flow from server to client over standard HTTP.

Loading diagramโ€ฆ

Each SSE event follows this format:

event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Hello"}}

Event Anatomy

EventWhenKey Content
message_startMessage beginsid, model, usage.input_tokens
content_block_startBlock beginstype (text, tool_use, thinking)
content_block_deltaEach token/fragmenttext_delta, input_json_delta or thinking_delta
content_block_stopBlock endsIndex of completed block
message_deltaMessage endsstop_reason, usage.output_tokens
message_stopStream finishedโ€”

Basic Text Streaming

With the Python SDK (synchronous)

import anthropic

client = anthropic.Anthropic()

# Basic streaming with context manager
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain the SSE protocol in 3 paragraphs."}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# Retrieve the final message with metadata
final_message = stream.get_final_message()
print(f"\n\nTokens used: {final_message.usage.input_tokens} in / {final_message.usage.output_tokens} out")

With the Python SDK (asynchronous)

import asyncio
import anthropic

async def stream_response():
    client = anthropic.AsyncAnthropic()

    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": "What are the advantages of streaming?"}
        ]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

    message = await stream.get_final_message()
    print(f"\nTokens: {message.usage.output_tokens}")

asyncio.run(stream_response())

With the Raw HTTP API (no SDK)

import httpx
import json

def stream_raw():
    url = "https://api.anthropic.com/v1/messages"
    headers = {
        "x-api-key": "YOUR_API_KEY",
        "anthropic-version": "2023-06-01",
        "content-type": "application/json",
    }
    payload = {
        "model": "claude-sonnet-4-20250514",
        "max_tokens": 1024,
        "stream": True,
        "messages": [{"role": "user", "content": "Hello Claude!"}]
    }

    with httpx.stream("POST", url, json=payload, headers=headers, timeout=60) as response:
        for line in response.iter_lines():
            if line.startswith("data: "):
                data = json.loads(line[6:])
                if data["type"] == "content_block_delta":
                    delta = data["delta"]
                    if delta["type"] == "text_delta":
                        print(delta["text"], end="", flush=True)
                elif data["type"] == "message_stop":
                    print("\n[Stream finished]")

stream_raw()

Streaming with Tool Use

When Claude uses tools during streaming, the event flow changes. Instead of text_delta, you receive tool_use blocks with partial JSON fragments.

๐Ÿ”— See also: Claude Tool Use: Complete Guide

Loading diagramโ€ฆ

Handling Tool Use in Streaming

import anthropic
import json

client = anthropic.Anthropic()

tools = [
    {
        "name": "get_weather",
        "description": "Gets the current weather for a given city.",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "City name"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"], "default": "celsius"}
            },
            "required": ["city"]
        }
    }
]

# Streaming with tools
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in Paris?"}]
) as stream:
    current_tool = None
    tool_input_json = ""

    for event in stream:
        if event.type == "content_block_start":
            if hasattr(event.content_block, "type") and event.content_block.type == "tool_use":
                current_tool = event.content_block.name
                tool_input_json = ""
                print(f"\n๐Ÿ”ง Tool call: {current_tool}")

        elif event.type == "content_block_delta":
            if hasattr(event.delta, "type"):
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    tool_input_json += event.delta.partial_json

        elif event.type == "content_block_stop":
            if current_tool:
                params = json.loads(tool_input_json)
                print(f"\n   Parameters: {params}")
                current_tool = None

Accumulating Partial JSON

The input_json_delta sends JSON fragments that must be accumulated:

# Fragments received sequentially:
# {"partial_json": "{\"ci"}
# {"partial_json": "ty\":"}
# {"partial_json": " \"Par"}
# {"partial_json": "is\"}"}

# After concatenation โ†’ {"city": "Paris"}

Streaming with Extended Thinking

When you enable extended thinking, Claude emits a new block type: thinking. Thinking tokens arrive via thinking_delta before the response tokens.

๐Ÿ”— See also: Claude Extended Thinking: Complete Guide

import anthropic

client = anthropic.Anthropic()

# Streaming with extended thinking
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{"role": "user", "content": "Solve this: if f(x) = xยณ - 6xยฒ + 11x - 6, find all roots."}]
) as stream:
    current_block_type = None

    for event in stream:
        if event.type == "content_block_start":
            block_type = event.content_block.type
            if block_type == "thinking":
                current_block_type = "thinking"
                print("๐Ÿ’ญ Thinking in progress...")
            elif block_type == "text":
                current_block_type = "text"
                print("\n๐Ÿ“ Response:")

        elif event.type == "content_block_delta":
            if hasattr(event.delta, "type"):
                if event.delta.type == "thinking_delta":
                    # Show or hide thinking based on desired UX
                    print(event.delta.thinking, end="", flush=True)
                elif event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)

Event Flow with Thinking

event: message_start
event: content_block_start   โ†’ {"type": "thinking", "thinking": ""}
event: content_block_delta   โ†’ {"type": "thinking_delta", "thinking": "Let's analyze..."}
event: content_block_delta   โ†’ {"type": "thinking_delta", "thinking": "f(x) = (x-1)..."}
event: content_block_stop
event: content_block_start   โ†’ {"type": "text", "text": ""}
event: content_block_delta   โ†’ {"type": "text_delta", "text": "The roots are..."}
event: content_block_stop
event: message_delta
event: message_stop

Token Counting During Streaming

Streaming lets you track token consumption in real time:

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain SSE streaming."}]
) as stream:
    for event in stream:
        pass  # Consume the stream

    # Access final statistics
    final = stream.get_final_message()
    usage = final.usage

    print(f"Input tokens  : {usage.input_tokens}")
    print(f"Output tokens : {usage.output_tokens}")

    # With extended thinking, thinking tokens are also counted
    if hasattr(usage, "cache_creation_input_tokens"):
        print(f"Cache creation tokens: {usage.cache_creation_input_tokens}")
    if hasattr(usage, "cache_read_input_tokens"):
        print(f"Cache read tokens    : {usage.cache_read_input_tokens}")

The message_delta event (before message_stop) contains the stop_reason and final output_tokens, enabling consumption tracking during the stream.


Error Handling and Reconnection

Streaming Error Types

CodeErrorRecommended Action
429overloaded_errorRetry with exponential backoff
529api_error (overloaded)Retry after 30-60 seconds
408Connection timeoutReconnect immediately
โ€”Network interruptionRetry with full context

Robust Reconnection Pattern

import anthropic
import time

def stream_with_retry(messages, max_retries=3):
    client = anthropic.Anthropic()
    accumulated_text = ""

    for attempt in range(max_retries):
        try:
            with client.messages.stream(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    accumulated_text += text
                    print(text, end="", flush=True)

            # Stream completed successfully
            return accumulated_text

        except anthropic.APIStatusError as e:
            if e.status_code == 429:
                wait = 2 ** attempt * 5  # 5s, 10s, 20s
                print(f"\nโณ Rate limit โ€” retrying in {wait}s...")
                time.sleep(wait)
            elif e.status_code >= 500:
                wait = 2 ** attempt * 10
                print(f"\nโš ๏ธ Server error โ€” retrying in {wait}s...")
                time.sleep(wait)
            else:
                raise  # Client error, don't retry

        except anthropic.APIConnectionError:
            wait = 2 ** attempt * 3
            print(f"\n๐Ÿ”Œ Connection lost โ€” retrying in {wait}s...")
            time.sleep(wait)

    raise Exception("Failed after all retries")

Handling Event-Level Errors

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=messages,
) as stream:
    for event in stream:
        if event.type == "error":
            error = event.error
            print(f"Stream error: {error.type} โ€” {error.message}")
            if error.type == "overloaded_error":
                # Server is overloaded, stream will terminate
                break
        elif event.type == "content_block_delta":
            if hasattr(event.delta, "text"):
                print(event.delta.text, end="", flush=True)

Python SDK Helpers

The SDK offers high-level helpers to simplify streaming:

Custom Event Handlers

import anthropic

client = anthropic.Anthropic()

# Use high-level events
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Tell a short story."}]
) as stream:
    # text_stream: iterator over text fragments only
    for text in stream.text_stream:
        print(text, end="", flush=True)

# Collect all text at once
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Summarize SSE streaming."}]
) as stream:
    # get_final_text() waits for completion and returns full text
    full_text = stream.get_final_text()
    print(full_text)

# Access the complete message with metadata
final_message = stream.get_final_message()
print(f"Stop reason: {final_message.stop_reason}")
print(f"Usage: {final_message.usage}")

Async Streaming with FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.AsyncAnthropic()

@app.post("/chat")
async def chat(user_message: str):
    async def generate():
        async with client.messages.stream(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            messages=[{"role": "user", "content": user_message}]
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {text}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Pattern: Real-Time Chat

Here's a complete pattern for building a real-time chat with streaming:

import anthropic
from dataclasses import dataclass, field

@dataclass
class ChatSession:
    client: anthropic.Anthropic = field(default_factory=anthropic.Anthropic)
    messages: list = field(default_factory=list)
    model: str = "claude-sonnet-4-20250514"

    def send(self, user_input: str) -> str:
        """Send a message and stream the response."""
        self.messages.append({"role": "user", "content": user_input})

        full_response = ""
        with self.client.messages.stream(
            model=self.model,
            max_tokens=4096,
            messages=self.messages,
        ) as stream:
            for text in stream.text_stream:
                full_response += text
                print(text, end="", flush=True)

        print()  # New line after stream
        self.messages.append({"role": "assistant", "content": full_response})
        return full_response

# Usage
chat = ChatSession()
chat.send("Hello! What is SSE streaming?")
chat.send("Can you give a Python example?")

Frontend Side (JavaScript)

async function streamChat(message, onToken) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    const lines = chunk.split('\n');

    for (const line of lines) {
      if (line.startsWith('data: ') && line !== 'data: [DONE]') {
        onToken(line.slice(6));
      }
    }
  }
}

// Usage with React
streamChat("Hello Claude", (token) => {
  setResponse(prev => prev + token);
});

๐Ÿ”— See also: AI Agent Architecture Patterns with Claude for multi-step patterns with streaming.


Best Practices

  1. โ†’

    Always set a timeout: configure a client-side timeout to avoid hanging connections

  2. โ†’

    Smart buffering: for web interfaces, accumulate a few tokens before updating the DOM (prevents excessive reflows)

  3. โ†’

    Clean cancellation: close the stream with stream.close() or exit the context manager to release resources

  4. โ†’

    Token monitoring: track input_tokens and output_tokens to control costs in production

  5. โ†’

    Conditional streaming: only use streaming when the UX justifies it โ€” for background batch calls, a complete response is simpler

๐Ÿ”— See also: Claude Structured Outputs: Complete Guide for combining streaming with structured outputs.


FAQ

How do I enable streaming with the Claude API? Add stream: true to your API request or use client.messages.stream() with the Python SDK. The API returns SSE events instead of a single JSON response.

What SSE event types does Claude return? The main events are: message_start, content_block_start, content_block_delta (with text_delta, input_json_delta or thinking_delta), content_block_stop, message_delta and message_stop.

Does streaming work with Claude's tool use? Yes. Tool parameters arrive via content_block_delta with input_json_delta containing partial JSON to accumulate.

How do I handle errors and reconnections during streaming? The Python SDK handles retries automatically with exponential backoff. For fine-grained control, catch APIStatusError (429, 5xx) and APIConnectionError, then retry with increasing delay.


Newsletter

Weekly AI Insights

Tools, techniques & news โ€” curated for AI practitioners. Free, no spam.

Free, no spam. Unsubscribe anytime.

FAQ

How do I enable streaming with the Claude API?+

Add the stream: true parameter to your API request. The API then returns Server-Sent Events (SSE) instead of a single JSON response, enabling token-by-token display.

What SSE event types does Claude return?+

The main events are: message_start, content_block_start, content_block_delta (with text_delta or input_json_delta), content_block_stop, message_delta and message_stop. With extended thinking, you also receive thinking_delta.

Does streaming work with Claude's tool use?+

Yes. During streaming tool use, Claude emits content_block_start with type tool_use, then content_block_delta with input_json_delta containing partial JSON of the tool parameters.

How do I handle errors and reconnections during streaming?+

Implement a retry mechanism with exponential backoff. Catch connection errors, overloaded_error (429) and network errors. The Python SDK handles retries automatically with client.messages.create(stream=True).