Streaming API Claude : Patterns Temps Réel et SSE
By Dorian Laurenceau
📅 Dernière révision : 24 avril 2026. Mise à jour avec les retours et observations d'avril 2026.
🔗 Article pilier : API Claude : Guide Complet
Pourquoi le Streaming est Essentiel
Le streaming transforme fondamentalement l'expérience utilisateur des applications IA :
- →Time To First Byte (TTFB) : le premier token arrive en ~300 ms au lieu d'attendre la réponse complète
- →Feedback progressif : l'utilisateur voit la réponse se construire en temps réel
- →Annulation précoce : possibilité d'interrompre une génération non pertinente
- →Gestion mémoire : pas besoin de stocker la réponse complète en mémoire
Le streaming en production : les gotchas dont personne ne vous prévient
Streamer la sortie d'un LLM paraît trivial en démo et devient étonnamment épineux à l'échelle. Les threads sur r/webdev, r/nextjs, r/devops et r/ExperiencedDevs sont où les ingénieurs échangent les pièges.
Ce qui aide vraiment les utilisateurs :
- →La latence perçue. Le premier token en 400ms compte plus que le temps total. La recherche Google sur la performance perçue s'applique directement.
- →L'annulation précoce. Les utilisateurs qui réalisent qu'ils ont posé la mauvaise question peuvent stopper le stream en cours de génération, économisant tokens et argent.
- →Le rendu progressif. Markdown, blocs de code et tables peuvent être rendus progressivement. Les utilisateurs voient la structure se former, pas juste un mur de caractères.
Ce qui casse discrètement en production :
- →Buffering de proxy. Cloudflare et NGINX bufferent tous deux les réponses par défaut, ce qui casse le streaming SSE. Positionnez
proxy_buffering offet lecache-control: no-transformde Cloudflare. - →Framing HTTP/2. Certaines infrastructures re-frament les réponses HTTP/2 de façons qui batchent les chunks. Testez contre la production, pas juste localhost.
- →Cold starts serverless. Vercel Edge Functions, Cloudflare Workers et AWS Lambda Response Streaming ont tous des quirks avec les streams longs.
- →La reconnexion est votre responsabilité. SSE ne reprend pas automatiquement. Trackez le dernier event id et rejouez depuis là si l'utilisateur a un output dépendant du contexte.
- →Comptabilité de tokens à l'annulation. Si un utilisateur annule en cours, vous devez compter les tokens déjà générés. Les stream events d'Anthropic l'exposent ; ne supposez pas un coût zéro à l'annulation.
- →Événements d'erreur en cours de stream. Un stream peut démarrer avec succès, puis erreur à mi-chemin. Ne supposez pas qu'une réponse 200 signifie que tout l'output est valide.
Ce que les équipes de production font vraiment :
- →Utiliser les helpers de streaming du SDK. Les SDKs officiels Python et TypeScript gèrent parsing, agrégation et cas d'erreur. Les parsers écrits à la main ratent les edge cases.
- →Instrumenter time-to-first-token, tokens/sec et taux de complétion. Langfuse, Helicone, LangSmith et Datadog LLM observability supportent tous ça.
- →Fallback gracieux vers non-streaming. Si le streaming échoue (proxy, réseau, CDN), fallback vers non-streaming avec un indicateur « toujours en réflexion ».
- →Budgeter les streams. Plafond max tokens par stream, plafond streams concurrents par utilisateur, timeouts durs. Les streams emballés sont un vrai vecteur DoS.
- →Pour les UIs de chat, utilisez quelque chose comme Vercel AI SDK ou les helpers de streaming LangChain plutôt que de construire de zéro.
Quand le streaming ne vaut pas la peine :
- →Workloads batch. Pas d'utilisateur qui attend ; utilisez message batches pour 50% de remise.
- →Outputs courts et déterministes. Si la réponse entière fait 50 tokens, les utilisateurs ne perçoivent pas le stream.
- →Pipelines qui parsent un output structuré. Streamer du JSON est possible mais fragile ; non-streaming avec validation d'output structuré est habituellement plus sûr.
Le cadrage honnête : le streaming est une victoire UX pour les outils de chat et de coding ; c'est un puzzle d'infrastructure dès que vous scalez. Utilisez les helpers SDK, désactivez le buffering de proxy, instrumentez tout et gardez un fallback non-streaming. Le premier incident de production sur le streaming est presque toujours le proxy.
Le Protocole Server-Sent Events (SSE)
L'API Claude utilise le protocole Server-Sent Events (SSE) pour le streaming. Contrairement aux WebSockets (bidirectionnels), SSE est un flux unidirectionnel du serveur vers le client via HTTP standard.
Chaque événement SSE suit ce format :
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Bonjour"}}
Anatomie des Événements
| Événement | Quand | Contenu clé |
|---|---|---|
message_start | Début du message | id, model, usage.input_tokens |
content_block_start | Début d'un bloc | type (text, tool_use, thinking) |
content_block_delta | Chaque token/fragment | text_delta, input_json_delta ou thinking_delta |
content_block_stop | Fin d'un bloc | Index du bloc terminé |
message_delta | Fin du message | stop_reason, usage.output_tokens |
message_stop | Stream terminé | , |
Streaming Texte Basique
Avec le SDK Python (synchrone)
import anthropic
client = anthropic.Anthropic()
# Streaming basique avec context manager
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explique le protocole SSE en 3 paragraphes."}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Récupérer le message final avec les métadonnées
final_message = stream.get_final_message()
print(f"\n\nTokens utilisés : {final_message.usage.input_tokens} in / {final_message.usage.output_tokens} out")
Avec le SDK Python (asynchrone)
import asyncio
import anthropic
async def stream_response():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Quels sont les avantages du streaming ?"}
]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
message = await stream.get_final_message()
print(f"\nTokens: {message.usage.output_tokens}")
asyncio.run(stream_response())
Avec l'API HTTP brute (sans SDK)
import httpx
import json
def stream_raw():
url = "https://api.anthropic.com/v1/messages"
headers = {
"x-api-key": "YOUR_API_KEY",
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"stream": True,
"messages": [{"role": "user", "content": "Bonjour Claude !"}]
}
with httpx.stream("POST", url, json=payload, headers=headers, timeout=60) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
if data["type"] == "content_block_delta":
delta = data["delta"]
if delta["type"] == "text_delta":
print(delta["text"], end="", flush=True)
elif data["type"] == "message_stop":
print("\n[Stream terminé]")
stream_raw()
Streaming avec Tool Use
Lorsque Claude utilise des outils en streaming, le flux d'événements change. Au lieu de text_delta, vous recevez des blocs tool_use avec des fragments JSON partiels.
🔗 Voir aussi : Claude Tool Use : Guide Complet
Gestion du Tool Use en Streaming
import anthropic
import json
client = anthropic.Anthropic()
tools = [
{
"name": "get_weather",
"description": "Obtient la météo actuelle pour une ville donnée.",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "Nom de la ville"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"], "default": "celsius"}
},
"required": ["city"]
}
}
]
# Streaming avec outils
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "Quel temps fait-il à Paris ?"}]
) as stream:
current_tool = None
tool_input_json = ""
for event in stream:
if event.type == "content_block_start":
if hasattr(event.content_block, "type") and event.content_block.type == "tool_use":
current_tool = event.content_block.name
tool_input_json = ""
print(f"\n🔧 Appel outil : {current_tool}")
elif event.type == "content_block_delta":
if hasattr(event.delta, "type"):
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
tool_input_json += event.delta.partial_json
elif event.type == "content_block_stop":
if current_tool:
params = json.loads(tool_input_json)
print(f"\n Paramètres : {params}")
current_tool = None
Accumuler le JSON partiel
Le input_json_delta envoie des fragments de JSON qu'il faut accumuler :
# Fragments reçus séquentiellement :
# {"partial_json": "{\"ci"}
# {"partial_json": "ty\":"}
# {"partial_json": " \"Par"}
# {"partial_json": "is\"}"}
# Après concaténation → {"city": "Paris"}
Streaming avec Extended Thinking
Lorsque vous activez l'extended thinking, Claude émet un nouveau type de bloc : thinking. Les tokens de réflexion arrivent via des thinking_delta avant les tokens de réponse.
🔗 Voir aussi : Extended Thinking Claude : Guide Complet
import anthropic
client = anthropic.Anthropic()
# Streaming avec extended thinking
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{"role": "user", "content": "Résous ce problème : si f(x) = x³ - 6x² + 11x - 6, trouve toutes les racines."}]
) as stream:
current_block_type = None
for event in stream:
if event.type == "content_block_start":
block_type = event.content_block.type
if block_type == "thinking":
current_block_type = "thinking"
print("💭 Réflexion en cours...")
elif block_type == "text":
current_block_type = "text"
print("\n📝 Réponse :")
elif event.type == "content_block_delta":
if hasattr(event.delta, "type"):
if event.delta.type == "thinking_delta":
# Afficher ou masquer la réflexion selon l'UX souhaitée
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
Flux d'événements avec Thinking
event: message_start
event: content_block_start → {"type": "thinking", "thinking": ""}
event: content_block_delta → {"type": "thinking_delta", "thinking": "Analysons..."}
event: content_block_delta → {"type": "thinking_delta", "thinking": "f(x) = (x-1)..."}
event: content_block_stop
event: content_block_start → {"type": "text", "text": ""}
event: content_block_delta → {"type": "text_delta", "text": "Les racines sont..."}
event: content_block_stop
event: message_delta
event: message_stop
Comptage de Tokens en Streaming
Le streaming permet de suivre la consommation de tokens en temps réel :
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explique le streaming SSE."}]
) as stream:
for event in stream:
pass # Consommer le stream
# Accéder aux statistiques finales
final = stream.get_final_message()
usage = final.usage
print(f"Tokens d'entrée : {usage.input_tokens}")
print(f"Tokens de sortie : {usage.output_tokens}")
# Avec extended thinking, les tokens de réflexion sont aussi comptés
if hasattr(usage, "cache_creation_input_tokens"):
print(f"Tokens cache créés : {usage.cache_creation_input_tokens}")
if hasattr(usage, "cache_read_input_tokens"):
print(f"Tokens cache lus : {usage.cache_read_input_tokens}")
L'événement message_delta (avant message_stop) contient le stop_reason et les output_tokens finaux, permettant un suivi de la consommation pendant le stream.
Gestion d'Erreurs et Reconnexion
Types d'erreurs en streaming
| Code | Erreur | Action recommandée |
|---|---|---|
| 429 | overloaded_error | Retry avec backoff exponentiel |
| 529 | api_error (surcharge) | Retry après 30-60 secondes |
| 408 | Timeout de connexion | Reconnecter immédiatement |
| , | Coupure réseau | Retry avec le contexte complet |
Pattern de reconnexion robuste
import anthropic
import time
def stream_with_retry(messages, max_retries=3):
client = anthropic.Anthropic()
accumulated_text = ""
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
) as stream:
for text in stream.text_stream:
accumulated_text += text
print(text, end="", flush=True)
# Stream terminé avec succès
return accumulated_text
except anthropic.APIStatusError as e:
if e.status_code == 429:
wait = 2 ** attempt * 5 # 5s, 10s, 20s
print(f"\n⏳ Rate limit — retry dans {wait}s...")
time.sleep(wait)
elif e.status_code >= 500:
wait = 2 ** attempt * 10
print(f"\n⚠️ Erreur serveur — retry dans {wait}s...")
time.sleep(wait)
else:
raise # Erreur client, ne pas retenter
except anthropic.APIConnectionError:
wait = 2 ** attempt * 3
print(f"\n🔌 Connexion perdue — retry dans {wait}s...")
time.sleep(wait)
raise Exception("Échec après tous les retries")
Gestion des erreurs côté événement
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=messages,
) as stream:
for event in stream:
if event.type == "error":
error = event.error
print(f"Erreur streaming : {error.type} — {error.message}")
if error.type == "overloaded_error":
# Le serveur est surchargé, le stream va se terminer
break
elif event.type == "content_block_delta":
if hasattr(event.delta, "text"):
print(event.delta.text, end="", flush=True)
Helpers du SDK Python
Le SDK offre des helpers de haut niveau pour simplifier le streaming :
Event Handlers personnalisés
import anthropic
client = anthropic.Anthropic()
# Utiliser les événements de haut niveau
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Raconte une courte histoire."}]
) as stream:
# text_stream : itérateur sur les fragments de texte uniquement
for text in stream.text_stream:
print(text, end="", flush=True)
# Collecter tout le texte d'un coup
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Résume le streaming SSE."}]
) as stream:
# get_final_text() attend la fin et retourne le texte complet
full_text = stream.get_final_text()
print(full_text)
# Accéder au message complet avec métadonnées
final_message = stream.get_final_message()
print(f"Stop reason: {final_message.stop_reason}")
print(f"Usage: {final_message.usage}")
Streaming asynchrone avec FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.post("/chat")
async def chat(user_message: str):
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": user_message}]
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Pattern : Chat Temps Réel
Voici un pattern complet pour construire un chat temps réel avec streaming :
import anthropic
from dataclasses import dataclass, field
@dataclass
class ChatSession:
client: anthropic.Anthropic = field(default_factory=anthropic.Anthropic)
messages: list = field(default_factory=list)
model: str = "claude-sonnet-4-20250514"
def send(self, user_input: str) -> str:
"""Envoie un message et stream la réponse."""
self.messages.append({"role": "user", "content": user_input})
full_response = ""
with self.client.messages.stream(
model=self.model,
max_tokens=4096,
messages=self.messages,
) as stream:
for text in stream.text_stream:
full_response += text
print(text, end="", flush=True)
print() # Nouvelle ligne après le stream
self.messages.append({"role": "assistant", "content": full_response})
return full_response
# Utilisation
chat = ChatSession()
chat.send("Bonjour ! Qu'est-ce que le streaming SSE ?")
chat.send("Peux-tu donner un exemple en Python ?")
Côté Frontend (JavaScript)
async function streamChat(message, onToken) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
onToken(line.slice(6));
}
}
}
}
// Utilisation avec React
streamChat("Bonjour Claude", (token) => {
setResponse(prev => prev + token);
});
🔗 Voir aussi : Architecture d'Agents IA avec Claude pour les patterns multi-étapes avec streaming.
Conseils pratiques
- →
Toujours utiliser un timeout : configurez un timeout côté client pour éviter les connexions suspendues
- →
Buffering intelligent : pour les interfaces web, accumulez quelques tokens avant de mettre à jour le DOM (évite les reflows excessifs)
- →
Annulation propre : fermez le stream avec
stream.close()ou sortez du context manager pour libérer les ressources - →
Monitoring des tokens : suivez
input_tokensetoutput_tokenspour contrôler les coûts en production - →
Streaming conditionnel : n'utilisez le streaming que quand l'UX le justifie, pour les appels batch en arrière-plan, une réponse complète est plus simple
🔗 Voir aussi : Structured Outputs Claude : Guide Complet pour combiner streaming et sorties structurées.
FAQ
Comment activer le streaming avec l'API Claude ?
Ajoutez stream: true à votre requête API ou utilisez client.messages.stream() avec le SDK Python. L'API retourne des événements SSE au lieu d'une réponse JSON unique.
Quels sont les types d'événements SSE retournés par Claude ?
Les principaux événements sont : message_start, content_block_start, content_block_delta (avec text_delta, input_json_delta ou thinking_delta), content_block_stop, message_delta et message_stop.
Le streaming fonctionne-t-il avec le tool use de Claude ?
Oui. Les paramètres d'outil arrivent via des content_block_delta avec input_json_delta contenant du JSON partiel à accumuler.
Comment gérer les erreurs et reconnexions en streaming ?
Le SDK Python gère automatiquement les retries avec backoff exponentiel. Pour un contrôle fin, capturez APIStatusError (429, 5xx) et APIConnectionError, puis retentez avec un délai croissant.
Dorian Laurenceau
Full-Stack Developer & Learning DesignerFull-stack web developer and learning designer. I spent 4 years as a freelance full-stack developer and 4 years teaching React, JavaScript, HTML/CSS and WordPress to adult learners. Today I design learning paths in web development and AI, grounded in learning science. I founded learn-prompting.fr to make AI practical and accessible, and built the Bluff app to gamify political transparency.
Weekly AI Insights
Tools, techniques & news — curated for AI practitioners. Free, no spam.
Free, no spam. Unsubscribe anytime.
→Related Articles
FAQ
Comment activer le streaming avec l'API Claude ?+
Ajoutez le paramètre stream: true à votre requête API. L'API retourne alors des événements Server-Sent Events (SSE) au lieu d'une réponse JSON unique, permettant un affichage token par token.
Quels sont les types d'événements SSE retournés par Claude ?+
Les principaux événements sont : message_start, content_block_start, content_block_delta (avec text_delta ou input_json_delta), content_block_stop, message_delta et message_stop. Avec extended thinking, on reçoit aussi thinking_delta.
Le streaming fonctionne-t-il avec le tool use de Claude ?+
Oui. Lors du tool use en streaming, Claude émet des content_block_start avec type tool_use, puis des content_block_delta avec input_json_delta contenant le JSON partiel des paramètres de l'outil.
Comment gérer les erreurs et reconnexions en streaming ?+
Implémentez un mécanisme de retry avec backoff exponentiel. Capturez les erreurs de connexion, les overloaded_error (429) et les erreurs réseau. Le SDK Python gère automatiquement les retries avec client.messages.create(stream=True).