Stateful Graphs¶
State is what makes LangGraph graphs more than just function pipelines. The state object flows through every node, accumulates information, and determines routing decisions. Understanding how state is defined, updated, and persisted unlocks the full power of the framework.
Learning objectives¶
- Define state with
TypedDictand understand the merge semantics - Use
Annotatedwith reducers to control how fields accumulate - Implement checkpointing to persist and resume graph state
- Build a graph that tracks intermediate results across nodes
- Read state at any point in the graph execution
State design principles¶
from typing import TypedDict, Annotated, Optional
import operator
# ── Simple state: last write wins ─────────────────────────────────
class SimpleState(TypedDict):
query: str # If two nodes write "query", the last one wins
result: str
step: int
# ── Accumulating state: reducer controls merge ────────────────────
from langchain_core.messages import BaseMessage
class MessageState(TypedDict):
messages: Annotated[list[BaseMessage], operator.add] # Appends each new message
metadata: dict # Last write wins
# ── Custom reducers ───────────────────────────────────────────────
def keep_unique(existing: list, new: list) -> list:
"""Reducer that deduplicates list items."""
return list(dict.fromkeys(existing + new))
class ResearchState(TypedDict):
question: str
search_results: Annotated[list[str], keep_unique] # Deduplicates
sources: Annotated[list[str], operator.add] # Appends all
answer: str
quality_score: float
A complete stateful research graph¶
import os
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import operator
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0, api_key=os.getenv("OPENAI_API_KEY"))
class ResearchState(TypedDict):
question: str
search_queries: Annotated[list[str], operator.add]
findings: Annotated[list[str], operator.add]
answer: str
quality: str # "sufficient" or "insufficient"
iteration: int
def query_generator(state: ResearchState) -> dict:
"""Generate search queries from the question."""
resp = llm.invoke([
SystemMessage(content="Generate 3 specific search queries for this research question. One per line."),
HumanMessage(content=state["question"])
])
queries = [q.strip() for q in resp.content.strip().split("\n") if q.strip()][:3]
return {"search_queries": queries, "iteration": state.get("iteration", 0) + 1}
def researcher(state: ResearchState) -> dict:
"""Search for each query and collect findings."""
# Simulated search
knowledge = {
"python": "Python was created by Guido van Rossum in 1991. It emphasizes readability.",
"performance": "Python is slower than C/C++ for CPU-bound tasks but fast enough for I/O tasks.",
"use cases": "Python is used for data science, web dev, automation, and ML.",
"popularity": "Python is the #1 language on TIOBE as of 2024.",
}
findings = []
for query in state["search_queries"]:
for key, val in knowledge.items():
if key in query.lower():
findings.append(f"[{query}]: {val}")
break
else:
findings.append(f"[{query}]: No specific data found.")
return {"findings": findings}
def synthesizer(state: ResearchState) -> dict:
"""Synthesize findings into an answer."""
findings_text = "\n".join(state["findings"])
resp = llm.invoke([
SystemMessage(content="Synthesize research findings into a comprehensive answer."),
HumanMessage(content=f"Question: {state['question']}\n\nFindings:\n{findings_text}")
])
return {"answer": resp.content}
def quality_checker(state: ResearchState) -> dict:
"""Evaluate if the answer is sufficient."""
resp = llm.invoke([
SystemMessage(content="Evaluate if this answer adequately addresses the question. Reply 'sufficient' or 'insufficient'."),
HumanMessage(content=f"Q: {state['question']}\nA: {state['answer']}")
])
quality = "sufficient" if "sufficient" in resp.content.lower() else "insufficient"
return {"quality": quality}
def route_quality(state: ResearchState) -> str:
"""Route based on quality and iteration count."""
if state["quality"] == "sufficient" or state.get("iteration", 0) >= 3:
return "done"
return "retry"
# Build graph
graph = StateGraph(ResearchState)
graph.add_node("query_gen", query_generator)
graph.add_node("research", researcher)
graph.add_node("synthesize", synthesizer)
graph.add_node("quality_check", quality_checker)
graph.add_edge(START, "query_gen")
graph.add_edge("query_gen", "research")
graph.add_edge("research", "synthesize")
graph.add_edge("synthesize", "quality_check")
graph.add_conditional_edges(
"quality_check",
route_quality,
{"done": END, "retry": "query_gen"}, # Retry loops back
)
app = graph.compile()
result = app.invoke({
"question": "What are Python's strengths and use cases?",
"search_queries": [],
"findings": [],
"answer": "",
"quality": "",
"iteration": 0,
})
print(f"Iterations: {result['iteration']}")
print(f"Queries used: {len(result['search_queries'])}")
print(f"Quality: {result['quality']}")
print(f"\nAnswer:\n{result['answer']}")
Checkpointing: persist and resume¶
LangGraph's checkpointing saves state after each node, enabling pause/resume and human-in-the-loop:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class SimpleState(TypedDict):
value: int
def increment(state: SimpleState) -> dict:
return {"value": state["value"] + 1}
graph = StateGraph(SimpleState)
graph.add_node("increment", increment)
graph.add_edge(START, "increment")
graph.add_edge("increment", END)
# Compile with a checkpointer
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# Each thread_id is an independent conversation/session
config = {"configurable": {"thread_id": "session-001"}}
# First invocation
result1 = app.invoke({"value": 0}, config=config)
print(f"After first run: {result1['value']}") # → 1
# Resume same session — state is restored from checkpoint
result2 = app.invoke({"value": result1["value"]}, config=config)
print(f"After second run: {result2['value']}") # → 2
# Get state at any point
state_snapshot = app.get_state(config)
print(f"Current state: {state_snapshot.values}")
Streaming state updates¶
# Stream shows each node's output as it completes
for chunk in app.stream({"value": 0}, config=config):
for node_name, state_update in chunk.items():
print(f"Node '{node_name}' updated state: {state_update}")
Use checkpointing for any agent that might need to pause
Even if you don't use human-in-the-loop today, adding a MemorySaver checkpointer costs nothing and gives you the ability to inspect intermediate state, resume failed runs, and add human review later without restructuring your graph.