Skip to content
Search ESC

LangGraph Interrupt Patterns Beyond the Basics: Conditional Approval, Batch Review, and Timeout Handling

2026-05-20 · 8 min read · Igor Bobriakov

The interrupt-everything pattern has a failure mode that is easy to miss until it has already happened. When every tool call that touches an external system triggers an interrupt — API writes, database updates, file outputs — reviewers see every action. Within weeks, they stop reading them. The interrupt gate becomes a confirmation dialog. Approvals fire faster than any human can evaluate the payload. The audit log records the approvals; it does not record that the approvals were never actually reviewed.

This is not a failure of the LangGraph interrupt primitive. It is a failure of interrupt placement. Every action treated identically produces a review queue where high-risk and low-risk actions compete for the same attention budget, and the volume drives reviewers toward the path of least resistance: approve without reading.

The human approval gates post documents this dynamic in detail — specifically the conditions under which interrupt gates generate the appearance of oversight rather than the substance of it. The current article addresses the engineering fix: three patterns that calibrate interrupt placement to actual action risk rather than action type.

The three patterns are: conditional approval (interrupt based on risk score, not action category), batch review (consolidate multiple low-stakes actions into a single review event), and timeout handling (degrade gracefully when humans do not respond within the SLA). All three require understanding the LangGraph checkpoint lifecycle beyond what the official documentation covers.

If you are new to LangGraph interrupts, read our HITL Engineering Patterns guide first. If you want the broader LangGraph architecture context, Mastering LangGraph covers state machines, conditional edges, and graph compilation.

The Interrupt Tax

Every interrupt in a LangGraph pipeline has two cost components.

The first is mechanical: a checkpoint write, a suspended Python coroutine, and a state deserialization on resume. The exact overhead depends on backend, deployment shape, and checkpointer configuration. In most production designs, the number of writes is real but not the bottleneck.

The second cost is human latency. A pipeline that generates dozens of interrupt events per run can consume minutes of reviewer time for a process that could otherwise run unattended. Scale that pattern across concurrent pipelines and you need dedicated reviewer capacity for a task that interrupt calibration can largely eliminate.

The interrupt tax is not inherent to the interrupt primitive. It is a function of interrupt placement. The fix is calibrating where interrupts fire, not removing them.

Principle: An interrupt gate should fire when the cost of a wrong autonomous decision exceeds the cost of human review time. For irreversible high-impact actions, that threshold is always crossed. For reversible low-impact actions at high volume, that threshold is rarely crossed. Placing all actions in the same interrupt policy is the design error.

Pattern 1: Conditional Approval

The conditional approval pattern adds a risk-scoring step before the interrupt decision. The graph evaluates the pending action against a set of risk criteria and routes to either an auto-approve path or an interrupt gate. Only actions above the risk threshold pause for human review.

The risk function should be deterministic and fast — it runs on every action. It should not call an LLM. Use rule-based scoring against action properties: amount, record count, target system classification, reversibility flag.

from __future__ import annotations
import time
from enum import Enum
from typing import Any, Literal
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import interrupt
from pydantic import BaseModel, Field
class RiskLevel(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class PendingAction(BaseModel):
action_id: str
action_type: str
target_system: str
payload: dict[str, Any]
reversible: bool
estimated_record_count: int = 0
estimated_dollar_value: float = 0.0
class ApprovalDecision(BaseModel):
action_id: str
approved: bool
reviewer_id: str | None = None
rejection_reason: str | None = None
class AgentState(BaseModel):
pending_action: PendingAction | None = None
risk_level: RiskLevel | None = None
approval_decision: ApprovalDecision | None = None
interrupt_timestamp: float | None = None
timeout_seconds: int = 300 # 5-minute default
result: dict[str, Any] | None = None
error: str | None = None
def score_risk(action: PendingAction) -> RiskLevel:
"""
Deterministic risk scoring. No LLM calls.
Returns RiskLevel based on action properties.
"""
score = 0
# Irreversible actions carry base risk
if not action.reversible:
score += 3
# Dollar value thresholds
if action.estimated_dollar_value > 10_000:
score += 4
elif action.estimated_dollar_value > 1_000:
score += 2
# Record count thresholds
if action.estimated_record_count > 1_000:
score += 3
elif action.estimated_record_count > 100:
score += 1
# External production systems
if action.target_system in {"production_db", "payment_processor", "email_sender"}:
score += 2
if score >= 6:
return RiskLevel.HIGH
elif score >= 3:
return RiskLevel.MEDIUM
return RiskLevel.LOW
def assess_risk_node(state: AgentState) -> AgentState:
"""Score the pending action and attach risk level to state."""
if state.pending_action is None:
return state
risk = score_risk(state.pending_action)
return state.model_copy(update={"risk_level": risk})
def route_by_risk(state: AgentState) -> Literal["auto_approve", "interrupt_gate"]:
"""
Conditional edge function.
LOW risk -> auto_approve
MEDIUM/HIGH risk -> interrupt_gate
"""
if state.risk_level == RiskLevel.LOW:
return "auto_approve"
return "interrupt_gate"
def auto_approve_node(state: AgentState) -> AgentState:
"""Auto-approve low-risk actions without human review."""
decision = ApprovalDecision(
action_id=state.pending_action.action_id,
approved=True,
reviewer_id="system:auto_approve",
)
return state.model_copy(update={"approval_decision": decision})
def interrupt_gate_node(state: AgentState) -> AgentState:
"""
Pause for human review. The interrupt() call serializes the current
state to the checkpointer and suspends execution. Execution resumes
when ainvoke() is called with the same thread_id and a Command payload.
"""
review_payload = {
"action": state.pending_action.model_dump(),
"risk_level": state.risk_level,
"instructions": "Approve or reject the pending action.",
}
# Store timestamp before interrupting — used by timeout handler
state = state.model_copy(update={"interrupt_timestamp": time.time()})
# interrupt() pauses execution and returns the human's response on resume
human_response = interrupt(review_payload)
decision = ApprovalDecision(
action_id=state.pending_action.action_id,
approved=human_response.get("approved", False),
reviewer_id=human_response.get("reviewer_id"),
rejection_reason=human_response.get("rejection_reason"),
)
return state.model_copy(update={"approval_decision": decision})
def execute_action_node(state: AgentState) -> AgentState:
"""Execute the action if approved, skip if rejected."""
if not state.approval_decision or not state.approval_decision.approved:
return state.model_copy(update={
"result": {"status": "rejected", "action_id": state.pending_action.action_id}
})
# Actual execution logic goes here
result = {"status": "executed", "action_id": state.pending_action.action_id}
return state.model_copy(update={"result": result})
def build_conditional_approval_graph(checkpointer) -> Any:
graph = StateGraph(AgentState)
graph.add_node("assess_risk", assess_risk_node)
graph.add_node("auto_approve", auto_approve_node)
graph.add_node("interrupt_gate", interrupt_gate_node)
graph.add_node("execute_action", execute_action_node)
graph.set_entry_point("assess_risk")
graph.add_conditional_edges("assess_risk", route_by_risk)
graph.add_edge("auto_approve", "execute_action")
graph.add_edge("interrupt_gate", "execute_action")
graph.add_edge("execute_action", END)
return graph.compile(checkpointer=checkpointer)

The key design decision here is the score_risk function. It is pure Python, has no side effects, and runs in under a millisecond. The criteria live in code and are covered by unit tests. When risk policy changes — say, raising the auto-approve threshold from $1,000 to $2,500 — it is a one-line change with a clear audit trail in version control.

Pattern 2: Batch Review

The batch review pattern is the right tool when a pipeline generates many low-to-medium risk actions in a short window — document tagging, bulk record updates, campaign sends. Instead of interrupting for each action individually, the graph collects pending actions into a batch and triggers a single interrupt when the batch is full or a time window closes.

Warning: Batch review is only appropriate for actions that are logically independent — rejecting one should not affect the validity of the others. If your actions have ordering dependencies (action B requires action A to succeed first), batch review can produce inconsistent state when partial rejections occur. Map your dependency graph before choosing this pattern.
from __future__ import annotations
import time
from typing import Any, Literal
from langgraph.graph import StateGraph, END
from langgraph.types import interrupt
from pydantic import BaseModel, Field
class BatchState(BaseModel):
pending_actions: list[PendingAction] = Field(default_factory=list)
batch_max_size: int = 10
batch_window_seconds: float = 60.0
batch_open_since: float | None = None
reviewed_actions: list[ApprovalDecision] = Field(default_factory=list)
executed_results: list[dict[str, Any]] = Field(default_factory=list)
def add_to_batch_node(state: BatchState) -> BatchState:
"""
Node called for each incoming action.
Opens the batch window on first action; records the timestamp.
"""
if state.batch_open_since is None:
return state.model_copy(update={"batch_open_since": time.time()})
return state
def route_batch(state: BatchState) -> Literal["collect", "batch_review"]:
"""
Continue collecting if batch is under capacity and window is open.
Trigger review when batch is full or time window has expired.
"""
if len(state.pending_actions) < state.batch_max_size:
if state.batch_open_since is not None:
elapsed = time.time() - state.batch_open_since
if elapsed < state.batch_window_seconds:
return "collect"
return "batch_review"
def batch_review_node(state: BatchState) -> BatchState:
"""
Single interrupt for all pending actions.
Human reviewer sees the full batch and returns a decision per action_id.
"""
review_payload = {
"batch_size": len(state.pending_actions),
"actions": [a.model_dump() for a in state.pending_actions],
"instructions": (
"Review the batch below. For each action_id, provide "
"{'approved': true/false, 'reviewer_id': '<your_id>'}."
),
}
# Returns dict keyed by action_id: {'action_id': {...decision...}}
human_response: dict[str, Any] = interrupt(review_payload)
decisions = []
for action in state.pending_actions:
action_decision = human_response.get(action.action_id, {})
decisions.append(ApprovalDecision(
action_id=action.action_id,
approved=action_decision.get("approved", False),
reviewer_id=action_decision.get("reviewer_id"),
rejection_reason=action_decision.get("rejection_reason"),
))
return state.model_copy(update={
"reviewed_actions": decisions,
"pending_actions": [],
"batch_open_since": None,
})
def execute_batch_node(state: BatchState) -> BatchState:
"""Execute approved actions, record results."""
results = []
for decision in state.reviewed_actions:
if decision.approved:
results.append({"status": "executed", "action_id": decision.action_id})
else:
results.append({
"status": "rejected",
"action_id": decision.action_id,
"reason": decision.rejection_reason,
})
return state.model_copy(update={"executed_results": results, "reviewed_actions": []})

The reviewer interface for batch review matters. Presenting 10 actions as a wall of JSON creates the same rubber-stamp failure mode as 10 individual interrupts. Build a structured review UI that shows action type, target, estimated impact, and reversibility in a scannable table. The interrupt payload should be designed for the review interface, not for Python consumption.

Decision Gate: Choosing the Right Pattern

PatternTrigger conditionHuman review events per 100 actionsBest forMain risk
Every-action interruptAlways100Low-volume, high-stakes pipelines (regulatory filings, financial transactions)Reviewer fatigue, rubber-stamping
Conditional approvalRisk score above threshold5–30 (depends on threshold)Mixed-risk pipelines where action properties determine impactRisk function missing edge cases
Batch reviewBatch full or window expired1–10 (depends on batch size)High-volume pipelines with logically independent actionsPartial rejection causing inconsistent state
Timeout with defaultHuman does not respond within TTLFallback onlyAny pipeline that cannot stall indefinitelyWrong default choice for risk level
Async callbackHuman approval arrives via webhook/SlackDecoupled from pipelinePipelines where human latency is hours, not secondsResume ordering if multiple callbacks arrive

Pattern 3: Timeout Handling

LangGraph v0.2.x has no built-in interrupt TTL. A paused workflow waits indefinitely unless something external resumes it. For pipelines with SLA requirements, this is a production failure waiting to happen.

The correct architecture separates the timeout detection from the graph. The graph stores a deadline in state. An external scheduler (APScheduler is sufficient for most cases; use Temporal or Celery beat for higher reliability requirements) polls for expired interrupts and resumes them with a timeout signal.

from __future__ import annotations
import time
from typing import Any, Literal
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from langgraph.types import interrupt
from pydantic import BaseModel
class TimeoutAwareState(BaseModel):
pending_action: PendingAction | None = None
risk_level: RiskLevel | None = None
approval_decision: ApprovalDecision | None = None
interrupt_timestamp: float | None = None
interrupt_deadline: float | None = None # Unix timestamp
timeout_occurred: bool = False
result: dict[str, Any] | None = None
def interrupt_gate_with_timeout(state: TimeoutAwareState) -> TimeoutAwareState:
"""
Interrupt gate that stores a deadline in state.
The external scheduler checks this deadline and resumes with a
timeout signal if the deadline passes without human response.
"""
now = time.time()
timeout_seconds = 300 # 5 minutes; adjust per risk level
state = state.model_copy(update={
"interrupt_timestamp": now,
"interrupt_deadline": now + timeout_seconds,
})
review_payload = {
"action": state.pending_action.model_dump() if state.pending_action else {},
"risk_level": state.risk_level,
"deadline_unix": state.interrupt_deadline,
"instructions": "Approve or reject before the deadline.",
}
human_response = interrupt(review_payload)
# Distinguish timeout signal from genuine human response
is_timeout = human_response.get("_signal") == "timeout"
if is_timeout:
# Apply default policy based on risk level
auto_approve = state.risk_level == RiskLevel.LOW
decision = ApprovalDecision(
action_id=state.pending_action.action_id if state.pending_action else "",
approved=auto_approve,
reviewer_id="system:timeout",
rejection_reason=None if auto_approve else "Timeout: no reviewer response within SLA",
)
return state.model_copy(update={
"approval_decision": decision,
"timeout_occurred": True,
})
# Genuine human response
decision = ApprovalDecision(
action_id=state.pending_action.action_id if state.pending_action else "",
approved=human_response.get("approved", False),
reviewer_id=human_response.get("reviewer_id"),
rejection_reason=human_response.get("rejection_reason"),
)
return state.model_copy(update={"approval_decision": decision})
async def check_expired_interrupts(graph, checkpointer, db_conn) -> None:
"""
Scheduler job: query for stale pending approvals and resume with timeout signal.
Run this on a 60-second interval.
"""
now = time.time()
# Query your persistence layer for interrupted threads past their deadline.
# Implementation depends on your checkpointer backend.
stale_threads = await db_conn.fetch(
"""
SELECT thread_id, checkpoint_id
FROM langgraph_checkpoints
WHERE status = 'interrupted'
AND (metadata->>'interrupt_deadline')::float < $1
""",
now,
)
for row in stale_threads:
thread_id = row["thread_id"]
try:
await graph.ainvoke(
# Command payload resumes the interrupted graph
{"_signal": "timeout"},
config={"configurable": {"thread_id": thread_id}},
)
except Exception as exc:
# Log and continue — don't let one failure block others
print(f"Timeout resume failed for {thread_id}: {exc}")
def setup_timeout_scheduler(graph, checkpointer, db_conn) -> AsyncIOScheduler:
scheduler = AsyncIOScheduler()
scheduler.add_job(
check_expired_interrupts,
"interval",
seconds=60,
args=[graph, checkpointer, db_conn],
id="interrupt_timeout_check",
replace_existing=True,
)
return scheduler

The timeout default policy deserves explicit design attention. Auto-approving on timeout for a LOW risk action is usually correct — the action was low-risk to begin with, and a stalled pipeline has its own costs. Auto-rejecting on timeout for HIGH risk actions is almost always correct — if no human reviewed a high-risk action within the SLA, the safest move is to stop and escalate. MEDIUM risk is the genuinely hard case. Document the decision and make it configurable per action type.

When a timeout fires and the graph auto-rejects, that event also surfaces as a signal that reviewer capacity is mismatched to interrupt volume. Log every timeout occurrence with its risk level and action type. Repeated timeouts on MEDIUM risk actions indicate either threshold miscalibration or a staffing constraint that no amount of graph tuning will fix. See Recovery Patterns for Production AI Agent Failures for how timeout escalation fits into a broader agent failure recovery architecture.

Async Approval Workflows

For pipelines where human response takes minutes to hours — Slack approvals, email notifications, mobile push — the synchronous request-response model does not apply. The graph pauses at the interrupt, the API layer returns immediately, and the approval arrives later via webhook.

The state checkpoint is the resume key. The thread_id and checkpoint_id pair is what makes this work: the graph’s state is fully serialized, and ainvoke() with the same thread_id picks up from exactly where it paused.

The practical requirement is a durable checkpointer. In-memory checkpointers (MemorySaver) do not survive process restarts, which means any deployment, crash, or scale-down event during a pending approval loses the workflow. Use PostgresSaver or RedisSaver for any async approval workflow in production.

See the HITL Engineering Patterns guide for the full async approval API implementation, including the FastAPI webhook handler and Redis pub/sub pattern for decoupling human latency from thread pool consumption.

Composing Patterns in a Single Graph

Real production pipelines often need multiple patterns on different action types. The composition pattern uses the risk score to route not just to interrupt-or-not, but to which interrupt pattern applies.

For example: reversible low-value writes can auto-approve; moderate record-count updates can route to batch review; high-value or irreversible actions can require individual conditional approval with a short timeout; production payment actions can require individual approval with dual-reviewer escalation before auto-reject.

This routing logic lives in the conditional edge functions, not inside the nodes. Nodes handle the single responsibility of their pattern. Edges handle the routing. This makes the policy testable: write unit tests for each edge function with representative state objects and verify the routing without running the full graph.

The permission escalation post covers the authority boundary design that this routing logic enforces — specifically how to prevent scope creep as agents accumulate more tool access over time.

The state management and checkpointing guide covers the persistence layer decisions that underpin all three patterns in this article.

Production Deployment Checklist

  • Risk scoring function is deterministic, has no LLM calls, and is covered by unit tests with at least 10 representative action types covering LOW, MEDIUM, and HIGH classifications
  • All interrupt nodes store interrupt_timestamp and interrupt_deadline in state at the moment of firing — before the interrupt() call, not after
  • Timeout scheduler runs on a separate process from the graph — process crashes do not leave both the graph and the scheduler down simultaneously
  • Durable checkpointer (PostgresSaver or RedisSaver) is configured before any interrupt-enabled graph goes to production — MemorySaver is a development tool only
  • Batch review payloads are designed for the reviewer interface, not for Python consumption — structured tables showing action type, target, estimated impact, and reversibility
  • Timeout default policy is documented per action type and per risk level — auto-approve / auto-reject / escalate must be explicit decisions, not defaults-by-omission
  • Interrupt volume is instrumented: track interrupts-per-run, auto-approve rate, batch size distribution, and timeout frequency in your observability stack
  • Reviewer response time is logged per interrupt event — sustained near-instant review times can signal rubber-stamping, not efficient review

Measuring the Right Things

The operational signal that your interrupt policy is miscalibrated is not the number of interrupts per run. It is reviewer response time distribution.

If median review time is collapsing toward zero and holding there, reviewers are not reading the payloads. Your threshold is too low or your batch size is too large. If review queues regularly exceed your SLA and timeout events are frequent, your threshold is too high or you have insufficient reviewer capacity.

Instrument at the state level: log interrupt_timestamp and the timestamp of the resume payload. The delta is your human latency. Aggregate by risk level, action type, and reviewer. This data drives threshold calibration over time.

The interrupt tax is not a fixed cost. It is a function of policy quality. Well-calibrated policies with conditional approval and batch review substantially reduce human review volume compared to interrupt-everything, while maintaining meaningful oversight on the actions where it matters.

Frequently Asked Questions

When should a LangGraph node use conditional interrupt instead of always interrupting?

Use conditional interrupt when the action's risk can be quantified at runtime — for example, when a tool call involves a dollar amount, a record count, or an external system write. Define a risk score or threshold function that runs before the interrupt decision. Actions below the threshold proceed automatically; actions above it pause for review. This pattern substantially reduces human review volume compared to interrupting every action, while preserving oversight where it matters. The risk function itself should be unit-tested separately from the graph logic.

How does batch review work in LangGraph when multiple actions queue up simultaneously?

Batch review collects pending interrupt payloads into a shared state list rather than interrupting immediately for each action. A coordinator node checks whether the batch is full (by count or by time window) before triggering the interrupt. The human reviewer sees all pending actions in one interface and can approve or reject each individually. On resume, the graph processes approved actions and discards rejected ones before continuing. This is most effective in high-volume pipelines where individual review would create a throughput bottleneck — document processing, bulk data operations, or multi-step campaign actions.

What is the correct way to implement timeout handling for LangGraph interrupts?

LangGraph has no built-in interrupt TTL as of v0.2.x, so timeout logic must live at the orchestration layer. The recommended pattern: store the interrupt timestamp and a configured timeout duration in the state object at the moment the interrupt fires. A background scheduler polls for stale pending approvals by comparing current time against the stored deadline. When a timeout is detected, the scheduler resumes the graph with a synthetic timeout signal payload rather than a human approval. The graph's routing logic then handles that signal — typically auto-approving low-risk actions and auto-rejecting high-risk ones.

How do you measure the throughput cost of adding interrupt gates to a LangGraph pipeline?

The interrupt tax has two components: backend-dependent checkpoint overhead and human latency. For synchronous pipelines, checkpoint overhead is usually not the limiting factor. The real cost is human latency, which makes interrupt-heavy designs bottleneck on reviewer capacity rather than compute. Profile your pipeline by logging time-in-interrupt vs. time-in-execution per run. When a disproportionate share of wall-clock time is waiting for approvals on low-risk actions, your interrupt placement needs recalibration — either raise thresholds, batch actions, or introduce async callback patterns.

The decision rule

Interrupt policy design is one of the more consequential decisions in a production agent architecture. Getting it wrong means either no real oversight or a pipeline that stalls on reviewer capacity. Before committing to a HITL pattern, map authority boundaries, threshold calibration, reviewer capacity, and checkpointer behavior. The Enterprise Agentic Assessment Kit gives teams a starting structure for that oversight map.

Technical Review

Bring the system under review

Send the system context, constraints, and pressure. A Principal Engineer reviews it and recommends the next step.

[ SUBMIT SPECS ]

No SDRs. A Principal Engineer reviews every submission.

About the author

Igor Bobriakov

AI Architect. Author of Production-Ready AI Agents. 15 years deploying production AI platforms and agentic systems for enterprise clients and deep-tech startups.