AI agent multi-step workflows: building complex pipelines
How to design and build multi-step agent workflows — sequential chains, parallel execution, conditional branching, and human-in-the-loop checkpoints.
Research from the ReAct paper (Yao et al., 2022) shows that interleaving reasoning steps with tool use — the core of multi-step workflows — significantly outperforms single-step LLM calls on complex tasks.
The LangGraph documentation provides the graph-based architecture for multi-step agent workflows — sequential chains, parallel execution, and conditional branching all map to graph nodes and edges.
TL;DR: Four workflow patterns power every multi-step agent: sequential chains, parallel fan-out, conditional branching, and loop with human-in-the-loop checkpoints. State management is the hardest part — persist state between steps so workflows survive failures. Multi-step agents cost 3-10x more than single-step agents, so budget accordingly.
A single LLM call is not an agent workflow. It’s a completion. The real power — and the real complexity — starts when you chain multiple steps together: fetch data, analyse it, make decisions, take actions, verify results.
I’ve built multi-step workflows for document processing, content generation, customer support triage, and code review agents. The patterns repeat across all of them. Here’s what I’ve learned about orchestrating complex agent pipelines.
Key takeaways:
- Four core patterns: sequential, parallel fan-out, conditional branching, and loop with HITL checkpoints
- State management is the hardest part — persist state between steps so workflows survive failures
- Error handling at each step: retry → fallback → flag for human review
- LangGraph works well for complex state machines; custom works better for cost-sensitive or tightly integrated workflows
- Multi-step workflows cost 3-10x more than single-step agents — budget accordingly
The four workflow patterns
Every multi-step agent workflow is a combination of these four patterns. Master these, and you can orchestrate anything.
1. Sequential chain
The simplest pattern: step A feeds into step B, which feeds into step C. Each step depends on the previous one.
class SequentialWorkflow:
def __init__(self, steps: list):
self.steps = steps # List of (name, handler) tuples
async def run(self, initial_input: dict) -> dict:
context = initial_input
for step_name, handler in self.steps:
print(f" → Running step: {step_name}")
try:
result = await handler(context)
context[step_name] = result
context["last_step"] = step_name
except Exception as e:
return {
"success": False,
"error": f"Step '{step_name}' failed: {str(e)}",
"context": context
}
return {"success": True, "context": context}
When to use: Any workflow where each step builds on the previous one. Document processing (extract → classify → redact → store), content generation (research → outline → draft → review), data pipelines.
Watch out for: Long chains where an error in step 2 wastes the work of step 1. Always check whether earlier steps can be rolled back or compensated.
2. Parallel fan-out
One agent analyses the input, determines sub-tasks, and spawns multiple worker agents that run in parallel. The results are collected and merged.
import asyncio
class ParallelFanOutWorkflow:
def __init__(self, planner, workers: list, merger):
self.planner = planner # Determines sub-tasks
self.workers = workers # List of worker agents
self.merger = merger # Combines results
async def run(self, task: str) -> dict:
# Step 1: Plan — break task into sub-tasks
sub_tasks = await self.planner.plan(task)
print(f" → Generated {len(sub_tasks)} sub-tasks")
# Step 2: Execute all sub-tasks in parallel
async def execute_worker(sub_task):
worker = self.workers[sub_task.type]
return await worker.run(sub_task)
results = await asyncio.gather(
*[execute_worker(st) for st in sub_tasks],
return_exceptions=True
)
# Step 3: Merge results
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
final = await self.merger.merge(successful)
return {
"success": len(failed) == 0,
"result": final,
"stats": {"total": len(sub_tasks), "succeeded": len(successful), "failed": len(failed)}
}
When to use: Research agents that search multiple sources, code review agents that analyse multiple files, content agents that generate multiple variations. Any task that can be decomposed into independent sub-tasks.
Watch out for: Cost explosion. If each worker makes multiple LLM calls, a 10-worker fan-out can generate 30-50 LLM calls per workflow run. Set budget limits per worker.
Always set a timeout for parallel workers. One stuck worker should not block the entire workflow. I use asyncio.wait_for(worker.run(task), timeout=30) per worker.
3. Conditional branching
The agent evaluates a condition and routes to different paths based on the result. If-else logic for agents.
class ConditionalBranchingWorkflow:
def __init__(self, router, branches: dict):
self.router = router # Evaluates conditions
self.branches = branches # {"condition_name": handler}
async def run(self, context: dict) -> dict:
# Evaluate routing condition
decision = await self.router.evaluate(context)
print(f" → Routing decision: {decision}")
# Execute the matching branch
handler = self.branches.get(decision)
if not handler:
return {"success": False, "error": f"No handler for decision: {decision}"}
result = await handler.run(context)
return {"success": True, "decision": decision, "result": result}
When to use: Support ticket triage (route to billing, technical, or account team), content moderation (allow, flag, or reject), dynamic workflow routing where the next step depends on data quality or content type.
Real example from a document processing pipeline I built:
async def route_document(context):
"""Router: decides which branch to take based on document type and confidence."""
doc_type = context.get("classification", {}).get("type")
confidence = context.get("classification", {}).get("confidence", 0)
if confidence < 0.6:
return "manual_review" # Low confidence — human needs to look
elif doc_type == "invoice":
return "invoice_processing" # Standard invoice path
elif doc_type == "contract":
return "contract_review" # Contract needs legal review
else:
return "general_processing" # Everything else
4. Loop with human-in-the-loop
The agent runs autonomously until it reaches a checkpoint that requires human approval. It pauses, waits for input, then continues based on the human’s decision.
class HumanInTheLoopWorkflow:
def __init__(self, agent, checkpoints: list):
self.agent = agent
self.checkpoints = checkpoints # Steps that need human approval
async def run(self, task: str, notify_human, wait_for_approval):
context = {"task": task, "step": 0}
while True:
context["step"] += 1
# Run the agent for one step
result = await self.agent.step(context)
# Check if this step needs human approval
if result.get("checkpoint"):
# Notify human and wait
await notify_human({
"step": context["step"],
"summary": result.get("summary"),
"decision_needed": result.get("decision_point")
})
# This blocks until the human responds
approval = await wait_for_approval()
if approval.get("action") == "approve":
context["human_feedback"] = approval.get("notes", "")
continue
elif approval.get("action") == "reject":
return {"success": False, "reason": "Rejected by human", "context": context}
elif approval.get("action") == "modify":
context["modifications"] = approval.get("changes", {})
continue
else:
context["result"] = result
return {"success": True, "context": context}
When to use: Any workflow where mistakes have significant cost. Content publishing (review before publish), financial operations (approve before executing payments), code deployment (approve before merging), email campaigns (review before sending to 10K subscribers).
How I implement notifications: For production systems, I use Telegram bot notifications with inline buttons (Approve / Reject / Modify). For internal tools, a simple Slack message with threaded replies works. The key is making the human response asynchronous — the agent shouldn’t block waiting for a response; it should save state and resume when the human responds.
Building with LangGraph vs building custom
I’ve used both approaches extensively. Here’s my framework for deciding:
Use LangGraph when:
- Your workflow has complex state transitions (many possible states, conditional edges)
- You need built-in persistence (checkpointing, save/restore)
- Your team has existing LangChain experience
- The workflow has 5+ distinct stages
Build custom when:
- You need per-step cost tracking (LangGraph doesn’t have built-in budget management)
- Your workflow integrates with existing systems (queues, databases, monitoring)
- Error recovery requirements are specific (not just “retry 3 times”)
- You want to control which model each step uses
Here’s a custom state machine that I use for most production workflows:
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable
class WorkflowStatus(Enum):
PENDING = "pending"
RUNNING = "running"
AWAITING_HUMAN = "awaiting_human"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class WorkflowState:
workflow_id: str
status: WorkflowStatus
current_step: str = ""
step_history: list = field(default_factory=list)
data: dict = field(default_factory=dict)
errors: list = field(default_factory=list)
total_cost: float = 0.0
total_steps: int = 0
class WorkflowEngine:
def __init__(self, persistence=None):
self.persistence = persistence # Optional DB/s3 persistence
async def run(self, workflow_id: str, steps: dict, initial_data: dict):
"""steps: {"step_name": {"handler": callable, "next": str or callable}}"""
state = WorkflowState(
workflow_id=workflow_id,
status=WorkflowStatus.RUNNING,
data=initial_data
)
current = "start"
while current and state.status == WorkflowStatus.RUNNING:
step_def = steps.get(current)
if not step_def:
state.status = WorkflowStatus.FAILED
state.errors.append(f"Unknown step: {current}")
break
state.current_step = current
await self._persist(state)
try:
result = await step_def["handler"](state.data)
state.step_history.append({
"step": current,
"result": result.get("summary", "completed"),
"cost": result.get("cost", 0),
"timestamp": "2026-06-01T00:00:00Z"
})
state.total_cost += result.get("cost", 0)
state.total_steps += 1
# Check for human-in-the-loop checkpoint
if result.get("awaiting_human"):
state.status = WorkflowStatus.AWAITING_HUMAN
await self._persist(state)
# Workflow paused — will resume when human responds
return {"status": "awaiting_human", "state": state}
# Determine next step
next_step = step_def.get("next")
if callable(next_step):
current = next_step(result)
else:
current = next_step
except Exception as e:
state.errors.append({"step": current, "error": str(e)})
# Check for retry logic
retry = step_def.get("retry", 0)
if len([e for e in state.errors if e.get("step") == current]) <= retry:
continue # Retry the same step
if step_def.get("fallback"):
current = step_def["fallback"]
else:
state.status = WorkflowStatus.FAILED
break
if state.status == WorkflowStatus.RUNNING:
state.status = WorkflowStatus.COMPLETED
await self._persist(state)
return {"status": state.status.value, "state": state}
async def _persist(self, state: WorkflowState):
if self.persistence:
await self.persistence.save(state)
Error handling across steps
The hardest problem in multi-step workflows: what happens when step 2 fails after step 1 succeeded?
You have three options:
1. Rollback. Undo the effects of earlier steps. This works when steps have clear compensation actions (e.g., if email step fails, delete the draft). It’s hard when steps have side effects that can’t be undone.
2. Compensate. Execute a compensating action instead of rolling back. If an API call to create a resource failed, archive the created resource instead of deleting it.
3. Flag for manual review. The safest option. Save the state, mark the workflow as needing human attention, and let a human decide what to do.
I use a combination: automatic retry for transient errors (3 retries with exponential backoff), compensation for known failure modes, and manual review for everything else.
async def execute_with_recovery(step_name: str, handler, context: dict, retries=3):
last_error = None
for attempt in range(retries):
try:
return await handler(context)
except TemporaryError as e:
wait = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f" → Step {step_name} temporary failure (attempt {attempt+1}/{retries}), retrying in {wait}s")
await asyncio.sleep(wait)
last_error = e
except PermanentError as e:
print(f" → Step {step_name} permanent failure: {e}")
raise
# All retries exhausted — try compensation
print(f" → Step {step_name} failed after {retries} attempts, executing fallback")
return {
"success": False,
"error": str(last_error),
"fallback": "Flagged for manual review",
"context": context
}
State management
State is the backbone of any multi-step workflow. Every step reads from it and writes to it. Getting state management right is the difference between a workflow you can debug and one you can’t.
What to include in state:
@dataclass
class WorkflowState:
# Identity
workflow_id: str
workflow_type: str
# Progress
status: str # pending, running, awaiting_human, completed, failed
current_step: str
completed_steps: list
# Data
input: dict # Original input
intermediate: dict # Step outputs, keyed by step name
final_output: dict # Final result
# Costs
total_cost: float
step_costs: dict # Per-step cost breakdown
# Errors
errors: list # Structured error log
retry_count: int
# Control
max_steps: int = 50
max_cost: float = 10.0
Persistence: I save state to a database (SQLite for simple workflows, Postgres for production) after every step. This means if the server crashes mid-workflow, we can resume from the last checkpoint.
# Save checkpoint after each step
await db.execute(
"INSERT INTO workflow_checkpoints (workflow_id, step, state) VALUES (?, ?, ?)
ON CONFLICT(workflow_id) DO UPDATE SET step = excluded.step, state = excluded.state",
[state.workflow_id, state.current_step, json.dumps(asdict(state))]
)
Cost implications
Multi-step workflows are expensive. Here’s a real example from a content creation agent I built:
| Step | Model | Calls | Cost per call | Total |
|---|---|---|---|---|
| Research brief | gpt-4o | 1 | $0.03 | $0.03 |
| Search execution | gpt-4o | 3 (parallel) | $0.02 | $0.06 |
| Outline generation | gpt-4o | 1 | $0.04 | $0.04 |
| Draft section 1 | gpt-4o | 1 | $0.06 | $0.06 |
| Draft section 2 | gpt-4o | 1 | $0.05 | $0.05 |
| Draft section 3 | gpt-4o | 1 | $0.07 | $0.07 |
| Review and polish | gpt-4o | 1 | $0.03 | $0.03 |
| Total | 9 | $0.34 |
A single article costs $0.34 in API calls. Generate 100 articles: $34. That’s manageable.
But add retries (each retry reruns the step), add branching (some paths are longer than others), add human review loops (resume generates more calls), and the effective cost can be 3-5x the base estimate.
I set cost limits per workflow:
class BudgetAwareWorkflow:
def __init__(self, max_cost_per_run=2.0):
self.max_cost = max_cost_per_run
self.running_cost = 0.0
async def step(self, handler, context):
if self.running_cost >= self.max_cost:
return {"error": "Budget exceeded", "cost": self.running_cost}
result = await handler(context)
self.running_cost += result.get("cost", 0)
return result
Related: AI agent logging and monitoring: seeing inside your agent’s head — how to log, trace, and monitor multi-step workflows in production.
Related: CrewAI vs LangGraph: which AI agent framework should you use? — how the choice between CrewAI and LangGraph affects multi-step workflow design.
Putting it all together
Here’s a real multi-step agent I built for content generation. It uses sequential, parallel, and conditional patterns together:
# Content Generation Agent Workflow
# Pattern: Sequential + Parallel + Conditional + HITL
async def content_workflow(topic: str, publish: bool = False):
workflow = WorkflowEngine(persistence=Database())
steps = {
"research": {
"handler": research_topic,
"next": "outline"
},
"outline": {
"handler": generate_outline,
"next": "write_sections"
},
"write_sections": {
# Parallel fan-out: write each section independently
"handler": parallel_write_sections,
"next": "review"
},
"review": {
# Conditional: if quality check fails, loop back
"handler": quality_check,
"next": lambda r: "rewrite" if r.get("quality") < 0.8 else "human_review"
},
"rewrite": {
"handler": rewrite_section,
"next": "review", # Loop back for re-check
"retry": 2 # Max 3 attempts (initial + 2 retries)
},
"human_review": {
# HITL checkpoint
"handler": request_human_approval,
"next": lambda r: "publish" if r.get("approved") else "rejected"
},
"publish": {
"handler": publish_article if publish else save_draft,
"next": None # End
},
"rejected": {
"handler": notify_rejection,
"next": None
}
}
return await workflow.run(f"content-{slugify(topic)}", steps, {"topic": topic})
The key insight: all four patterns compose naturally. A sequential workflow can have parallel steps within it. A parallel fan-out can have conditional branches per worker. A HITL checkpoint can appear at any point.
Master the patterns individually, then compose them freely. That’s how you build production agent workflows that handle real complexity.