Skip to content

Latency Optimization

A 4-second LLM response feels slow. A 400ms response feels fast. The techniques in this note target the four main sources of LLM latency: cold starts, time to first token, total generation time, and sequential API calls that could run in parallel.

Learning objectives

  • Measure latency at the component level (TTFT vs total)
  • Implement prompt caching to reduce repeated prefix costs
  • Use async/batch APIs to eliminate sequential bottlenecks
  • Apply streaming to improve perceived latency
  • Select models based on latency-quality tradeoffs

Measuring latency: TTFT vs total

import os
import time
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def measure_latency(model: str, messages: list[dict], n_runs: int = 3) -> dict:
    """Measure time-to-first-token and total latency via streaming."""
    ttfts = []
    totals = []

    for _ in range(n_runs):
        start = time.perf_counter()
        first_token_time = None
        tokens = 0

        stream = client.chat.completions.create(
            model=model, messages=messages, stream=True, max_tokens=200,
        )
        for chunk in stream:
            if chunk.choices[0].delta.content:
                if first_token_time is None:
                    first_token_time = time.perf_counter()
                tokens += 1

        total_time = time.perf_counter() - start
        ttft = (first_token_time - start) * 1000 if first_token_time else 0
        ttfts.append(ttft)
        totals.append(total_time * 1000)

    return {
        "model": model,
        "avg_ttft_ms": round(sum(ttfts) / n_runs, 0),
        "avg_total_ms": round(sum(totals) / n_runs, 0),
        "tokens_generated": tokens,
    }

messages = [{"role": "user", "content": "Explain how transformers work in 3 sentences."}]

for model in ["gpt-4o-mini", "gpt-4o"]:
    result = measure_latency(model, messages, n_runs=2)
    print(f"{result['model']:<20} TTFT: {result['avg_ttft_ms']:>5.0f}ms | Total: {result['avg_total_ms']:>6.0f}ms")

Prompt caching

OpenAI's prompt caching automatically reduces cost for repeated prompt prefixes (>1024 tokens). You get it automatically — but you need to structure prompts to maximize cache hits.

import os
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Long, stable system prompt — will be cached after the first call
LONG_SYSTEM = """You are an expert Python tutor with 15 years of teaching experience.
You follow the Socratic method: you ask guiding questions before providing answers.
You write clear, well-commented code examples.
You explain concepts with real-world analogies.
You provide common mistakes and how to avoid them.
""" * 50  # Make it long enough to trigger caching (>1024 tokens)

def ask_question(question: str) -> dict:
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": LONG_SYSTEM},  # Long static prefix
            {"role": "user", "content": question},        # Short dynamic part
        ],
    )
    usage = response.usage
    cached = getattr(usage, "prompt_tokens_details", None)
    cached_tokens = getattr(cached, "cached_tokens", 0) if cached else 0

    return {
        "answer": response.choices[0].message.content[:100],
        "prompt_tokens": usage.prompt_tokens,
        "cached_tokens": cached_tokens,
        "cache_hit_pct": f"{cached_tokens / usage.prompt_tokens * 100:.0f}%" if usage.prompt_tokens else "0%",
    }

# First call — no cache
r1 = ask_question("What is a list comprehension?")
print(f"Call 1 — Cached: {r1['cached_tokens']}/{r1['prompt_tokens']} ({r1['cache_hit_pct']})")

# Second call — cache hit on the system prompt
r2 = ask_question("What is a generator?")
print(f"Call 2 — Cached: {r2['cached_tokens']}/{r2['prompt_tokens']} ({r2['cache_hit_pct']})")

Cache design rules: 1. Put the longest, most stable content at the beginning (system prompt) 2. Put the dynamic parts (user input, current date, session-specific data) at the end 3. Cache discounts apply at 50% on OpenAI (cached tokens cost 0.5× input price)


Async for parallel LLM calls

import os
import asyncio
import time
from openai import AsyncOpenAI

aclient = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

async def summarize_async(text: str, topic: str) -> str:
    response = await aclient.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": f"Summarize this text about {topic} in one sentence."},
            {"role": "user", "content": text},
        ],
        max_tokens=100,
    )
    return response.choices[0].message.content

async def sequential_summarize(documents: list[dict]) -> list[str]:
    results = []
    for doc in documents:
        result = await summarize_async(doc["text"], doc["topic"])
        results.append(result)
    return results

async def parallel_summarize(documents: list[dict]) -> list[str]:
    tasks = [summarize_async(doc["text"], doc["topic"]) for doc in documents]
    return await asyncio.gather(*tasks)

DOCS = [
    {"text": "Machine learning is a subset of AI...", "topic": "ML"},
    {"text": "Neural networks are inspired by the brain...", "topic": "neural networks"},
    {"text": "Transformers use attention mechanisms...", "topic": "transformers"},
    {"text": "RAG combines retrieval with generation...", "topic": "RAG"},
]

async def benchmark():
    start = time.perf_counter()
    await sequential_summarize(DOCS)
    sequential_ms = (time.perf_counter() - start) * 1000

    start = time.perf_counter()
    await parallel_summarize(DOCS)
    parallel_ms = (time.perf_counter() - start) * 1000

    print(f"Sequential: {sequential_ms:.0f}ms")
    print(f"Parallel:   {parallel_ms:.0f}ms")
    print(f"Speedup:    {sequential_ms / parallel_ms:.1f}x")

asyncio.run(benchmark())

Batch API for offline workloads

For workloads that don't need real-time responses (nightly eval runs, bulk extraction), the OpenAI Batch API offers 50% cost reduction:

import os
import json
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Prepare batch requests
requests = [
    {
        "custom_id": f"request-{i}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-4o-mini",
            "messages": [{"role": "user", "content": f"Classify: 'Example text {i}'. Reply: positive/negative/neutral"}],
            "max_tokens": 10,
        }
    }
    for i in range(5)
]

# Write to JSONL
with open("batch_requests.jsonl", "w") as f:
    for req in requests:
        f.write(json.dumps(req) + "\n")

# Upload and create batch
with open("batch_requests.jsonl", "rb") as f:
    batch_file = client.files.create(file=f, purpose="batch")

batch = client.batches.create(
    input_file_id=batch_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
)
print(f"Batch ID: {batch.id} | Status: {batch.status}")
# Check status with: client.batches.retrieve(batch.id)
# Results available when status = "completed"

Parallel async calls are the single highest-impact latency optimization

If your pipeline makes 5 sequential LLM calls that don't depend on each other, switching to asyncio.gather() cuts latency by 70–80% with zero change to output quality. Do this before anything else.


03-cost-tracking | 05-observability