Skip to content

Streaming Responses

Waiting 4 seconds for a complete LLM response feels slow. Streaming the same response token-by-token, starting in 400ms, feels fast — even though the total time is the same. Streaming changes perceived latency from "time to last token" to "time to first token," which is typically 10x shorter.

Learning objectives

  • Implement Server-Sent Events (SSE) streaming in FastAPI
  • Handle streaming from both OpenAI and Anthropic APIs
  • Stream LangChain chain outputs
  • Test streaming endpoints with httpx
  • Handle stream errors and cleanup gracefully

SSE streaming with FastAPI

# streaming_app.py
import os
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from openai import OpenAI
import asyncio

app = FastAPI()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

class ChatRequest(BaseModel):
    message: str
    system_prompt: str = "You are a helpful assistant."

async def stream_openai_response(message: str, system: str):
    """Generator that yields SSE-formatted chunks from OpenAI."""
    try:
        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": message},
            ],
            stream=True,
            max_tokens=500,
        )

        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                # SSE format: "data: {json}\n\n"
                data = json.dumps({"token": delta.content, "done": False})
                yield f"data: {data}\n\n"

        # Signal completion
        yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"

    except Exception as e:
        error_data = json.dumps({"error": str(e), "done": True})
        yield f"data: {error_data}\n\n"

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        stream_openai_response(request.message, request.system_prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

# Run: uvicorn streaming_app:app --reload

Streaming with metadata events

import os
import json
import time
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

async def stream_with_metadata(message: str):
    """Stream tokens with metadata events at start and end."""
    start_time = time.perf_counter()

    # Send start event with request metadata
    yield f"data: {json.dumps({'event': 'start', 'model': 'gpt-4o-mini'})}\n\n"

    token_count = 0
    try:
        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": message}],
            stream=True,
            max_tokens=300,
        )

        for chunk in stream:
            if chunk.choices[0].delta.content:
                token = chunk.choices[0].delta.content
                token_count += 1
                yield f"data: {json.dumps({'event': 'token', 'token': token})}\n\n"

        # Send completion event with metrics
        latency_ms = (time.perf_counter() - start_time) * 1000
        yield f"data: {json.dumps({'event': 'done', 'tokens': token_count, 'latency_ms': round(latency_ms, 0)})}\n\n"

    except Exception as e:
        yield f"data: {json.dumps({'event': 'error', 'message': str(e)})}\n\n"

@app.post("/chat/stream-meta")
async def chat_stream_meta(request: dict):
    return StreamingResponse(
        stream_with_metadata(request.get("message", "")),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

Streaming from Anthropic

import os
import json
import anthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

app = FastAPI()
ac_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

class AnthropicRequest(BaseModel):
    message: str
    system: str = "You are a helpful assistant."

async def stream_anthropic(message: str, system: str):
    with ac_client.messages.stream(
        model="claude-haiku-4-5-20251001",
        max_tokens=500,
        system=system,
        messages=[{"role": "user", "content": message}],
    ) as stream:
        for text in stream.text_stream:
            yield f"data: {json.dumps({'token': text, 'done': False})}\n\n"
    yield f"data: {json.dumps({'token': '', 'done': True})}\n\n"

@app.post("/anthropic/stream")
async def anthropic_stream(request: AnthropicRequest):
    return StreamingResponse(
        stream_anthropic(request.message, request.system),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"},
    )

LangChain streaming

import os
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json

app = FastAPI()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True, api_key=os.getenv("OPENAI_API_KEY"))

async def stream_langchain(message: str):
    async for chunk in llm.astream([
        SystemMessage(content="You are a helpful assistant."),
        HumanMessage(content=message),
    ]):
        if chunk.content:
            yield f"data: {json.dumps({'token': chunk.content})}\n\n"
    yield f"data: {json.dumps({'done': True})}\n\n"

@app.post("/langchain/stream")
async def lc_stream(request: dict):
    return StreamingResponse(
        stream_langchain(request.get("message", "")),
        media_type="text/event-stream",
    )

Testing streaming endpoints

import httpx
import json

# Test streaming endpoint
def test_stream(url: str, payload: dict) -> None:
    """Consume a streaming SSE endpoint and print each event."""
    with httpx.Client(timeout=30.0) as client:
        with client.stream("POST", url, json=payload) as response:
            full_text = ""
            for line in response.iter_lines():
                if line.startswith("data: "):
                    data = json.loads(line[6:])
                    if "token" in data and data["token"]:
                        print(data["token"], end="", flush=True)
                        full_text += data["token"]
                    if data.get("done"):
                        print(f"\n\nStream complete. Total: {len(full_text)} chars")
                        break

# Run against a local server
# test_stream("http://localhost:8000/chat/stream", {"message": "What is machine learning?"})

Disable nginx/proxy buffering for streaming to work

By default, nginx and many reverse proxies buffer responses before sending them to the client. This defeats streaming. Set X-Accel-Buffering: no in your response headers and configure your proxy with proxy_buffering off. Without this, the client receives the complete response all at once even though you're streaming.


01-fastapi-wrappers | 03-async-patterns