Streaming API Claude : Patterns Temps Réel et SSE
By Learnia AI Research Team
Streaming API Claude : Patterns Temps Réel et SSE
📅 Dernière mise à jour : 10 mars 2026 — Couvre le streaming texte, tool use, extended thinking et les patterns de reconnexion.
🔗 Article pilier : API Claude : Guide Complet
Pourquoi le Streaming est Essentiel
Le streaming transforme fondamentalement l'expérience utilisateur des applications IA :
- →Time To First Byte (TTFB) : le premier token arrive en ~300 ms au lieu d'attendre la réponse complète
- →Feedback progressif : l'utilisateur voit la réponse se construire en temps réel
- →Annulation précoce : possibilité d'interrompre une génération non pertinente
- →Gestion mémoire : pas besoin de stocker la réponse complète en mémoire
Le Protocole Server-Sent Events (SSE)
L'API Claude utilise le protocole Server-Sent Events (SSE) pour le streaming. Contrairement aux WebSockets (bidirectionnels), SSE est un flux unidirectionnel du serveur vers le client via HTTP standard.
Chaque événement SSE suit ce format :
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Bonjour"}}
Anatomie des Événements
| Événement | Quand | Contenu clé |
|---|---|---|
message_start | Début du message | id, model, usage.input_tokens |
content_block_start | Début d'un bloc | type (text, tool_use, thinking) |
content_block_delta | Chaque token/fragment | text_delta, input_json_delta ou thinking_delta |
content_block_stop | Fin d'un bloc | Index du bloc terminé |
message_delta | Fin du message | stop_reason, usage.output_tokens |
message_stop | Stream terminé | — |
Streaming Texte Basique
Avec le SDK Python (synchrone)
import anthropic
client = anthropic.Anthropic()
# Streaming basique avec context manager
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explique le protocole SSE en 3 paragraphes."}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Récupérer le message final avec les métadonnées
final_message = stream.get_final_message()
print(f"\n\nTokens utilisés : {final_message.usage.input_tokens} in / {final_message.usage.output_tokens} out")
Avec le SDK Python (asynchrone)
import asyncio
import anthropic
async def stream_response():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Quels sont les avantages du streaming ?"}
]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
message = await stream.get_final_message()
print(f"\nTokens: {message.usage.output_tokens}")
asyncio.run(stream_response())
Avec l'API HTTP brute (sans SDK)
import httpx
import json
def stream_raw():
url = "https://api.anthropic.com/v1/messages"
headers = {
"x-api-key": "YOUR_API_KEY",
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"stream": True,
"messages": [{"role": "user", "content": "Bonjour Claude !"}]
}
with httpx.stream("POST", url, json=payload, headers=headers, timeout=60) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data["type"] == "content_block_delta":
delta = data["delta"]
if delta["type"] == "text_delta":
print(delta["text"], end="", flush=True)
elif data["type"] == "message_stop":
print("\n[Stream terminé]")
stream_raw()
Streaming avec Tool Use
Lorsque Claude utilise des outils en streaming, le flux d'événements change. Au lieu de text_delta, vous recevez des blocs tool_use avec des fragments JSON partiels.
🔗 Voir aussi : Claude Tool Use : Guide Complet
Gestion du Tool Use en Streaming
import anthropic
import json
client = anthropic.Anthropic()
tools = [
{
"name": "get_weather",
"description": "Obtient la météo actuelle pour une ville donnée.",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Nom de la ville"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"], "default": "celsius"}
},
"required": ["city"]
}
}
]
# Streaming avec outils
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "Quel temps fait-il à Paris ?"}]
) as stream:
current_tool = None
tool_input_json = ""
for event in stream:
if event.type == "content_block_start":
if hasattr(event.content_block, "type") and event.content_block.type == "tool_use":
current_tool = event.content_block.name
tool_input_json = ""
print(f"\n🔧 Appel outil : {current_tool}")
elif event.type == "content_block_delta":
if hasattr(event.delta, "type"):
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
tool_input_json += event.delta.partial_json
elif event.type == "content_block_stop":
if current_tool:
params = json.loads(tool_input_json)
print(f"\n Paramètres : {params}")
current_tool = None
Accumuler le JSON partiel
Le input_json_delta envoie des fragments de JSON qu'il faut accumuler :
# Fragments reçus séquentiellement :
# {"partial_json": "{\"ci"}
# {"partial_json": "ty\":"}
# {"partial_json": " \"Par"}
# {"partial_json": "is\"}"}
# Après concaténation → {"city": "Paris"}
Streaming avec Extended Thinking
Lorsque vous activez l'extended thinking, Claude émet un nouveau type de bloc : thinking. Les tokens de réflexion arrivent via des thinking_delta avant les tokens de réponse.
🔗 Voir aussi : Extended Thinking Claude : Guide Complet
import anthropic
client = anthropic.Anthropic()
# Streaming avec extended thinking
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{"role": "user", "content": "Résous ce problème : si f(x) = x³ - 6x² + 11x - 6, trouve toutes les racines."}]
) as stream:
current_block_type = None
for event in stream:
if event.type == "content_block_start":
block_type = event.content_block.type
if block_type == "thinking":
current_block_type = "thinking"
print("💭 Réflexion en cours...")
elif block_type == "text":
current_block_type = "text"
print("\n📝 Réponse :")
elif event.type == "content_block_delta":
if hasattr(event.delta, "type"):
if event.delta.type == "thinking_delta":
# Afficher ou masquer la réflexion selon l'UX souhaitée
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
Flux d'événements avec Thinking
event: message_start
event: content_block_start → {"type": "thinking", "thinking": ""}
event: content_block_delta → {"type": "thinking_delta", "thinking": "Analysons..."}
event: content_block_delta → {"type": "thinking_delta", "thinking": "f(x) = (x-1)..."}
event: content_block_stop
event: content_block_start → {"type": "text", "text": ""}
event: content_block_delta → {"type": "text_delta", "text": "Les racines sont..."}
event: content_block_stop
event: message_delta
event: message_stop
Comptage de Tokens en Streaming
Le streaming permet de suivre la consommation de tokens en temps réel :
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explique le streaming SSE."}]
) as stream:
for event in stream:
pass # Consommer le stream
# Accéder aux statistiques finales
final = stream.get_final_message()
usage = final.usage
print(f"Tokens d'entrée : {usage.input_tokens}")
print(f"Tokens de sortie : {usage.output_tokens}")
# Avec extended thinking, les tokens de réflexion sont aussi comptés
if hasattr(usage, "cache_creation_input_tokens"):
print(f"Tokens cache créés : {usage.cache_creation_input_tokens}")
if hasattr(usage, "cache_read_input_tokens"):
print(f"Tokens cache lus : {usage.cache_read_input_tokens}")
L'événement message_delta (avant message_stop) contient le stop_reason et les output_tokens finaux, permettant un suivi de la consommation pendant le stream.
Gestion d'Erreurs et Reconnexion
Types d'erreurs en streaming
| Code | Erreur | Action recommandée |
|---|---|---|
| 429 | overloaded_error | Retry avec backoff exponentiel |
| 529 | api_error (surcharge) | Retry après 30-60 secondes |
| 408 | Timeout de connexion | Reconnecter immédiatement |
| — | Coupure réseau | Retry avec le contexte complet |
Pattern de reconnexion robuste
import anthropic
import time
def stream_with_retry(messages, max_retries=3):
client = anthropic.Anthropic()
accumulated_text = ""
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
accumulated_text += text
print(text, end="", flush=True)
# Stream terminé avec succès
return accumulated_text
except anthropic.APIStatusError as e:
if e.status_code == 429:
wait = 2 ** attempt * 5 # 5s, 10s, 20s
print(f"\n⏳ Rate limit — retry dans {wait}s...")
time.sleep(wait)
elif e.status_code >= 500:
wait = 2 ** attempt * 10
print(f"\n⚠️ Erreur serveur — retry dans {wait}s...")
time.sleep(wait)
else:
raise # Erreur client, ne pas retenter
except anthropic.APIConnectionError:
wait = 2 ** attempt * 3
print(f"\n🔌 Connexion perdue — retry dans {wait}s...")
time.sleep(wait)
raise Exception("Échec après tous les retries")
Gestion des erreurs côté événement
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=messages,
) as stream:
for event in stream:
if event.type == "error":
error = event.error
print(f"Erreur streaming : {error.type} — {error.message}")
if error.type == "overloaded_error":
# Le serveur est surchargé, le stream va se terminer
break
elif event.type == "content_block_delta":
if hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
Helpers du SDK Python
Le SDK offre des helpers de haut niveau pour simplifier le streaming :
Event Handlers personnalisés
import anthropic
client = anthropic.Anthropic()
# Utiliser les événements de haut niveau
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Raconte une courte histoire."}]
) as stream:
# text_stream : itérateur sur les fragments de texte uniquement
for text in stream.text_stream:
print(text, end="", flush=True)
# Collecter tout le texte d'un coup
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Résume le streaming SSE."}]
) as stream:
# get_final_text() attend la fin et retourne le texte complet
full_text = stream.get_final_text()
print(full_text)
# Accéder au message complet avec métadonnées
final_message = stream.get_final_message()
print(f"Stop reason: {final_message.stop_reason}")
print(f"Usage: {final_message.usage}")
Streaming asynchrone avec FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.post("/chat")
async def chat(user_message: str):
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": user_message}]
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Pattern : Chat Temps Réel
Voici un pattern complet pour construire un chat temps réel avec streaming :
import anthropic
from dataclasses import dataclass, field
@dataclass
class ChatSession:
client: anthropic.Anthropic = field(default_factory=anthropic.Anthropic)
messages: list = field(default_factory=list)
model: str = "claude-sonnet-4-20250514"
def send(self, user_input: str) -> str:
"""Envoie un message et stream la réponse."""
self.messages.append({"role": "user", "content": user_input})
full_response = ""
with self.client.messages.stream(
model=self.model,
max_tokens=4096,
messages=self.messages,
) as stream:
for text in stream.text_stream:
full_response += text
print(text, end="", flush=True)
print() # Nouvelle ligne après le stream
self.messages.append({"role": "assistant", "content": full_response})
return full_response
# Utilisation
chat = ChatSession()
chat.send("Bonjour ! Qu'est-ce que le streaming SSE ?")
chat.send("Peux-tu donner un exemple en Python ?")
Côté Frontend (JavaScript)
async function streamChat(message, onToken) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
onToken(line.slice(6));
}
}
}
}
// Utilisation avec React
streamChat("Bonjour Claude", (token) => {
setResponse(prev => prev + token);
});
🔗 Voir aussi : Architecture d'Agents IA avec Claude pour les patterns multi-étapes avec streaming.
Bonnes Pratiques
- →
Toujours utiliser un timeout : configurez un timeout côté client pour éviter les connexions suspendues
- →
Buffering intelligent : pour les interfaces web, accumulez quelques tokens avant de mettre à jour le DOM (évite les reflows excessifs)
- →
Annulation propre : fermez le stream avec
stream.close()ou sortez du context manager pour libérer les ressources - →
Monitoring des tokens : suivez
input_tokensetoutput_tokenspour contrôler les coûts en production - →
Streaming conditionnel : n'utilisez le streaming que quand l'UX le justifie — pour les appels batch en arrière-plan, une réponse complète est plus simple
🔗 Voir aussi : Structured Outputs Claude : Guide Complet pour combiner streaming et sorties structurées.
FAQ
Comment activer le streaming avec l'API Claude ?
Ajoutez stream: true à votre requête API ou utilisez client.messages.stream() avec le SDK Python. L'API retourne des événements SSE au lieu d'une réponse JSON unique.
Quels sont les types d'événements SSE retournés par Claude ?
Les principaux événements sont : message_start, content_block_start, content_block_delta (avec text_delta, input_json_delta ou thinking_delta), content_block_stop, message_delta et message_stop.
Le streaming fonctionne-t-il avec le tool use de Claude ?
Oui. Les paramètres d'outil arrivent via des content_block_delta avec input_json_delta contenant du JSON partiel à accumuler.
Comment gérer les erreurs et reconnexions en streaming ?
Le SDK Python gère automatiquement les retries avec backoff exponentiel. Pour un contrôle fin, capturez APIStatusError (429, 5xx) et APIConnectionError, puis retentez avec un délai croissant.
Weekly AI Insights
Tools, techniques & news — curated for AI practitioners. Free, no spam.
Free, no spam. Unsubscribe anytime.
→Related Articles
FAQ
Comment activer le streaming avec l'API Claude ?+
Ajoutez le paramètre stream: true à votre requête API. L'API retourne alors des événements Server-Sent Events (SSE) au lieu d'une réponse JSON unique, permettant un affichage token par token.
Quels sont les types d'événements SSE retournés par Claude ?+
Les principaux événements sont : message_start, content_block_start, content_block_delta (avec text_delta ou input_json_delta), content_block_stop, message_delta et message_stop. Avec extended thinking, on reçoit aussi thinking_delta.
Le streaming fonctionne-t-il avec le tool use de Claude ?+
Oui. Lors du tool use en streaming, Claude émet des content_block_start avec type tool_use, puis des content_block_delta avec input_json_delta contenant le JSON partiel des paramètres de l'outil.
Comment gérer les erreurs et reconnexions en streaming ?+
Implémentez un mécanisme de retry avec backoff exponentiel. Capturez les erreurs de connexion, les overloaded_error (429) et les erreurs réseau. Le SDK Python gère automatiquement les retries avec client.messages.create(stream=True).