Retour aux articles
12 MIN READ

Streaming API Claude : Patterns Temps Réel et SSE

By Learnia AI Research Team

Streaming API Claude : Patterns Temps Réel et SSE

📅 Dernière mise à jour : 10 mars 2026 — Couvre le streaming texte, tool use, extended thinking et les patterns de reconnexion.

🔗 Article pilier : API Claude : Guide Complet


Pourquoi le Streaming est Essentiel

Le streaming transforme fondamentalement l'expérience utilisateur des applications IA :

  • Time To First Byte (TTFB) : le premier token arrive en ~300 ms au lieu d'attendre la réponse complète
  • Feedback progressif : l'utilisateur voit la réponse se construire en temps réel
  • Annulation précoce : possibilité d'interrompre une génération non pertinente
  • Gestion mémoire : pas besoin de stocker la réponse complète en mémoire

Le Protocole Server-Sent Events (SSE)

L'API Claude utilise le protocole Server-Sent Events (SSE) pour le streaming. Contrairement aux WebSockets (bidirectionnels), SSE est un flux unidirectionnel du serveur vers le client via HTTP standard.

Loading diagram…

Chaque événement SSE suit ce format :

event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "Bonjour"}}

Anatomie des Événements

ÉvénementQuandContenu clé
message_startDébut du messageid, model, usage.input_tokens
content_block_startDébut d'un bloctype (text, tool_use, thinking)
content_block_deltaChaque token/fragmenttext_delta, input_json_delta ou thinking_delta
content_block_stopFin d'un blocIndex du bloc terminé
message_deltaFin du messagestop_reason, usage.output_tokens
message_stopStream terminé

Streaming Texte Basique

Avec le SDK Python (synchrone)

import anthropic

client = anthropic.Anthropic()

# Streaming basique avec context manager
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explique le protocole SSE en 3 paragraphes."}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# Récupérer le message final avec les métadonnées
final_message = stream.get_final_message()
print(f"\n\nTokens utilisés : {final_message.usage.input_tokens} in / {final_message.usage.output_tokens} out")

Avec le SDK Python (asynchrone)

import asyncio
import anthropic

async def stream_response():
    client = anthropic.AsyncAnthropic()

    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": "Quels sont les avantages du streaming ?"}
        ]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

    message = await stream.get_final_message()
    print(f"\nTokens: {message.usage.output_tokens}")

asyncio.run(stream_response())

Avec l'API HTTP brute (sans SDK)

import httpx
import json

def stream_raw():
    url = "https://api.anthropic.com/v1/messages"
    headers = {
        "x-api-key": "YOUR_API_KEY",
        "anthropic-version": "2023-06-01",
        "content-type": "application/json",
    }
    payload = {
        "model": "claude-sonnet-4-20250514",
        "max_tokens": 1024,
        "stream": True,
        "messages": [{"role": "user", "content": "Bonjour Claude !"}]
    }

    with httpx.stream("POST", url, json=payload, headers=headers, timeout=60) as response:
        for line in response.iter_lines():
            if line.startswith("data: "):
                data = json.loads(line[6:])
                if data["type"] == "content_block_delta":
                    delta = data["delta"]
                    if delta["type"] == "text_delta":
                        print(delta["text"], end="", flush=True)
                elif data["type"] == "message_stop":
                    print("\n[Stream terminé]")

stream_raw()

Streaming avec Tool Use

Lorsque Claude utilise des outils en streaming, le flux d'événements change. Au lieu de text_delta, vous recevez des blocs tool_use avec des fragments JSON partiels.

🔗 Voir aussi : Claude Tool Use : Guide Complet

Loading diagram…

Gestion du Tool Use en Streaming

import anthropic
import json

client = anthropic.Anthropic()

tools = [
    {
        "name": "get_weather",
        "description": "Obtient la météo actuelle pour une ville donnée.",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "Nom de la ville"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"], "default": "celsius"}
            },
            "required": ["city"]
        }
    }
]

# Streaming avec outils
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "Quel temps fait-il à Paris ?"}]
) as stream:
    current_tool = None
    tool_input_json = ""

    for event in stream:
        if event.type == "content_block_start":
            if hasattr(event.content_block, "type") and event.content_block.type == "tool_use":
                current_tool = event.content_block.name
                tool_input_json = ""
                print(f"\n🔧 Appel outil : {current_tool}")

        elif event.type == "content_block_delta":
            if hasattr(event.delta, "type"):
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    tool_input_json += event.delta.partial_json

        elif event.type == "content_block_stop":
            if current_tool:
                params = json.loads(tool_input_json)
                print(f"\n   Paramètres : {params}")
                current_tool = None

Accumuler le JSON partiel

Le input_json_delta envoie des fragments de JSON qu'il faut accumuler :

# Fragments reçus séquentiellement :
# {"partial_json": "{\"ci"}
# {"partial_json": "ty\":"}
# {"partial_json": " \"Par"}
# {"partial_json": "is\"}"}

# Après concaténation → {"city": "Paris"}

Streaming avec Extended Thinking

Lorsque vous activez l'extended thinking, Claude émet un nouveau type de bloc : thinking. Les tokens de réflexion arrivent via des thinking_delta avant les tokens de réponse.

🔗 Voir aussi : Extended Thinking Claude : Guide Complet

import anthropic

client = anthropic.Anthropic()

# Streaming avec extended thinking
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{"role": "user", "content": "Résous ce problème : si f(x) = x³ - 6x² + 11x - 6, trouve toutes les racines."}]
) as stream:
    current_block_type = None

    for event in stream:
        if event.type == "content_block_start":
            block_type = event.content_block.type
            if block_type == "thinking":
                current_block_type = "thinking"
                print("💭 Réflexion en cours...")
            elif block_type == "text":
                current_block_type = "text"
                print("\n📝 Réponse :")

        elif event.type == "content_block_delta":
            if hasattr(event.delta, "type"):
                if event.delta.type == "thinking_delta":
                    # Afficher ou masquer la réflexion selon l'UX souhaitée
                    print(event.delta.thinking, end="", flush=True)
                elif event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)

Flux d'événements avec Thinking

event: message_start
event: content_block_start   → {"type": "thinking", "thinking": ""}
event: content_block_delta   → {"type": "thinking_delta", "thinking": "Analysons..."}
event: content_block_delta   → {"type": "thinking_delta", "thinking": "f(x) = (x-1)..."}
event: content_block_stop
event: content_block_start   → {"type": "text", "text": ""}
event: content_block_delta   → {"type": "text_delta", "text": "Les racines sont..."}
event: content_block_stop
event: message_delta
event: message_stop

Comptage de Tokens en Streaming

Le streaming permet de suivre la consommation de tokens en temps réel :

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explique le streaming SSE."}]
) as stream:
    for event in stream:
        pass  # Consommer le stream

    # Accéder aux statistiques finales
    final = stream.get_final_message()
    usage = final.usage

    print(f"Tokens d'entrée  : {usage.input_tokens}")
    print(f"Tokens de sortie : {usage.output_tokens}")

    # Avec extended thinking, les tokens de réflexion sont aussi comptés
    if hasattr(usage, "cache_creation_input_tokens"):
        print(f"Tokens cache créés : {usage.cache_creation_input_tokens}")
    if hasattr(usage, "cache_read_input_tokens"):
        print(f"Tokens cache lus   : {usage.cache_read_input_tokens}")

L'événement message_delta (avant message_stop) contient le stop_reason et les output_tokens finaux, permettant un suivi de la consommation pendant le stream.


Gestion d'Erreurs et Reconnexion

Types d'erreurs en streaming

CodeErreurAction recommandée
429overloaded_errorRetry avec backoff exponentiel
529api_error (surcharge)Retry après 30-60 secondes
408Timeout de connexionReconnecter immédiatement
Coupure réseauRetry avec le contexte complet

Pattern de reconnexion robuste

import anthropic
import time

def stream_with_retry(messages, max_retries=3):
    client = anthropic.Anthropic()
    accumulated_text = ""

    for attempt in range(max_retries):
        try:
            with client.messages.stream(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    accumulated_text += text
                    print(text, end="", flush=True)

            # Stream terminé avec succès
            return accumulated_text

        except anthropic.APIStatusError as e:
            if e.status_code == 429:
                wait = 2 ** attempt * 5  # 5s, 10s, 20s
                print(f"\n⏳ Rate limit — retry dans {wait}s...")
                time.sleep(wait)
            elif e.status_code >= 500:
                wait = 2 ** attempt * 10
                print(f"\n⚠️ Erreur serveur — retry dans {wait}s...")
                time.sleep(wait)
            else:
                raise  # Erreur client, ne pas retenter

        except anthropic.APIConnectionError:
            wait = 2 ** attempt * 3
            print(f"\n🔌 Connexion perdue — retry dans {wait}s...")
            time.sleep(wait)

    raise Exception("Échec après tous les retries")

Gestion des erreurs côté événement

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=messages,
) as stream:
    for event in stream:
        if event.type == "error":
            error = event.error
            print(f"Erreur streaming : {error.type} — {error.message}")
            if error.type == "overloaded_error":
                # Le serveur est surchargé, le stream va se terminer
                break
        elif event.type == "content_block_delta":
            if hasattr(event.delta, "text"):
                print(event.delta.text, end="", flush=True)

Helpers du SDK Python

Le SDK offre des helpers de haut niveau pour simplifier le streaming :

Event Handlers personnalisés

import anthropic

client = anthropic.Anthropic()

# Utiliser les événements de haut niveau
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Raconte une courte histoire."}]
) as stream:
    # text_stream : itérateur sur les fragments de texte uniquement
    for text in stream.text_stream:
        print(text, end="", flush=True)

# Collecter tout le texte d'un coup
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Résume le streaming SSE."}]
) as stream:
    # get_final_text() attend la fin et retourne le texte complet
    full_text = stream.get_final_text()
    print(full_text)

# Accéder au message complet avec métadonnées
final_message = stream.get_final_message()
print(f"Stop reason: {final_message.stop_reason}")
print(f"Usage: {final_message.usage}")

Streaming asynchrone avec FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.AsyncAnthropic()

@app.post("/chat")
async def chat(user_message: str):
    async def generate():
        async with client.messages.stream(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            messages=[{"role": "user", "content": user_message}]
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {text}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Pattern : Chat Temps Réel

Voici un pattern complet pour construire un chat temps réel avec streaming :

import anthropic
from dataclasses import dataclass, field

@dataclass
class ChatSession:
    client: anthropic.Anthropic = field(default_factory=anthropic.Anthropic)
    messages: list = field(default_factory=list)
    model: str = "claude-sonnet-4-20250514"

    def send(self, user_input: str) -> str:
        """Envoie un message et stream la réponse."""
        self.messages.append({"role": "user", "content": user_input})

        full_response = ""
        with self.client.messages.stream(
            model=self.model,
            max_tokens=4096,
            messages=self.messages,
        ) as stream:
            for text in stream.text_stream:
                full_response += text
                print(text, end="", flush=True)

        print()  # Nouvelle ligne après le stream
        self.messages.append({"role": "assistant", "content": full_response})
        return full_response

# Utilisation
chat = ChatSession()
chat.send("Bonjour ! Qu'est-ce que le streaming SSE ?")
chat.send("Peux-tu donner un exemple en Python ?")

Côté Frontend (JavaScript)

async function streamChat(message, onToken) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    const lines = chunk.split('\n');

    for (const line of lines) {
      if (line.startsWith('data: ') && line !== 'data: [DONE]') {
        onToken(line.slice(6));
      }
    }
  }
}

// Utilisation avec React
streamChat("Bonjour Claude", (token) => {
  setResponse(prev => prev + token);
});

🔗 Voir aussi : Architecture d'Agents IA avec Claude pour les patterns multi-étapes avec streaming.


Bonnes Pratiques

  1. Toujours utiliser un timeout : configurez un timeout côté client pour éviter les connexions suspendues

  2. Buffering intelligent : pour les interfaces web, accumulez quelques tokens avant de mettre à jour le DOM (évite les reflows excessifs)

  3. Annulation propre : fermez le stream avec stream.close() ou sortez du context manager pour libérer les ressources

  4. Monitoring des tokens : suivez input_tokens et output_tokens pour contrôler les coûts en production

  5. Streaming conditionnel : n'utilisez le streaming que quand l'UX le justifie — pour les appels batch en arrière-plan, une réponse complète est plus simple

🔗 Voir aussi : Structured Outputs Claude : Guide Complet pour combiner streaming et sorties structurées.


FAQ

Comment activer le streaming avec l'API Claude ? Ajoutez stream: true à votre requête API ou utilisez client.messages.stream() avec le SDK Python. L'API retourne des événements SSE au lieu d'une réponse JSON unique.

Quels sont les types d'événements SSE retournés par Claude ? Les principaux événements sont : message_start, content_block_start, content_block_delta (avec text_delta, input_json_delta ou thinking_delta), content_block_stop, message_delta et message_stop.

Le streaming fonctionne-t-il avec le tool use de Claude ? Oui. Les paramètres d'outil arrivent via des content_block_delta avec input_json_delta contenant du JSON partiel à accumuler.

Comment gérer les erreurs et reconnexions en streaming ? Le SDK Python gère automatiquement les retries avec backoff exponentiel. Pour un contrôle fin, capturez APIStatusError (429, 5xx) et APIConnectionError, puis retentez avec un délai croissant.


Newsletter

Weekly AI Insights

Tools, techniques & news — curated for AI practitioners. Free, no spam.

Free, no spam. Unsubscribe anytime.

FAQ

Comment activer le streaming avec l'API Claude ?+

Ajoutez le paramètre stream: true à votre requête API. L'API retourne alors des événements Server-Sent Events (SSE) au lieu d'une réponse JSON unique, permettant un affichage token par token.

Quels sont les types d'événements SSE retournés par Claude ?+

Les principaux événements sont : message_start, content_block_start, content_block_delta (avec text_delta ou input_json_delta), content_block_stop, message_delta et message_stop. Avec extended thinking, on reçoit aussi thinking_delta.

Le streaming fonctionne-t-il avec le tool use de Claude ?+

Oui. Lors du tool use en streaming, Claude émet des content_block_start avec type tool_use, puis des content_block_delta avec input_json_delta contenant le JSON partiel des paramètres de l'outil.

Comment gérer les erreurs et reconnexions en streaming ?+

Implémentez un mécanisme de retry avec backoff exponentiel. Capturez les erreurs de connexion, les overloaded_error (429) et les erreurs réseau. Le SDK Python gère automatiquement les retries avec client.messages.create(stream=True).