AI agent logging and monitoring: seeing inside your agent
How to log, trace, and monitor AI agents in production — what to track, which tools to use, and how to debug agents that behave unexpectedly.
The LangSmith documentation shows how tracing, logging, and monitoring integrate with LangGraph agents — covering the observability patterns recommended in this post.
TL;DR: Agent observability isn’t optional — it’s the difference between shipping with confidence and shipping with hope. This guide covers what to log (every LLM call, tool execution, decision, and error), how to structure it as JSON Lines, and how to debug agents by replaying logged runs. You can implement a working setup in one hour.
The first time I deployed an agent to production, it worked perfectly for three days. Then a user asked it a question that triggered a six-minute loop of the same tool call, racking up $12 in API costs before I noticed.
I had no logs. No idea what happened. No way to replay the run.
That experience taught me something fundamental: agent observability is not optional. It’s the difference between shipping with confidence and shipping with hope.
Here’s a complete guide to agent logging and monitoring — what to log, why, and how to build it without spending weeks on tooling.
Key takeaways:
- Log every LLM call (input, output, cost, latency), every tool execution (name, params, result, duration), every decision, and every error
- Structured JSON logging to rotating files covers 90% of debugging needs — you don’t need a fancy platform to start
- Track: cost per session, error rate, latency percentiles, tool success rate, agent loop count
- The most powerful debugging technique: replay a logged agent run step by step
- A working logging setup takes 1 hour to implement — do it before your first production deployment
What to log
Everything. But in a structured way. Here’s the minimum set of events you should log for every agent run:
import json
import logging
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class LLMCallEvent:
event_type: str = "llm_call"
agent_id: str = ""
session_id: str = ""
step: int = 0
model: str = ""
input_messages: list = None # The messages sent to the LLM
output_content: str = "" # The model's text response
tool_calls: list = None # [{name, args}] if any
cost: float = 0.0 # Cost of this call
prompt_tokens: int = 0
completion_tokens: int = 0
latency_ms: int = 0
timestamp: str = ""
@dataclass
class ToolExecutionEvent:
event_type: str = "tool_execution"
agent_id: str = ""
session_id: str = ""
step: int = 0
tool_name: str = ""
parameters: dict = None
result: str = "" # Truncated to 500 chars
duration_ms: int = 0
success: bool = True
error: str = ""
timestamp: str = ""
@dataclass
class AgentDecisionEvent:
event_type: str = "agent_decision"
agent_id: str = ""
session_id: str = ""
step: int = 0
decision: str = "" # What the agent decided
reasoning: str = "" # Why it decided this
context_snapshot: dict = None # Key context at decision point
timestamp: str = ""
Every event gets logged as a single JSON line. This is important — JSON Lines format means each line is self-contained, easy to stream, and easy to query with grep or jq.
import logging
# Structured JSON logging
class StructuredLogger:
def __init__(self, log_dir: str = "agent_logs"):
self.logger = logging.getLogger("agent")
handler = logging.FileHandler(f"{log_dir}/agent.log", mode="a")
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_event(self, event: dict):
event["timestamp"] = datetime.utcnow().isoformat()
self.logger.info(json.dumps(event, default=str))
def log_llm_call(self, **kwargs):
self.log_event({"event_type": "llm_call", **kwargs})
def log_tool_call(self, **kwargs):
self.log_event({"event_type": "tool_execution", **kwargs})
def log_decision(self, **kwargs):
self.log_event({"event_type": "agent_decision", **kwargs})
Integrating logging into your agent
The cleanest way: wrap your agent loop with a logger that intercepts every significant event.
class LoggedAgent:
def __init__(self, agent, logger: StructuredLogger):
self.agent = agent
self.logger = logger
self.session_id = str(uuid4())
async def run(self, user_input: str) -> str:
self.logger.log_decision(
agent_id=self.agent.id,
session_id=self.session_id,
step=0,
decision="start",
reasoning="Agent received user input",
context_snapshot={"input": user_input[:200]}
)
step = 0
while True:
step += 1
start = time.time()
# Log the LLM call (intercept by wrapping the LLM client)
response = await self._logged_llm_call(step, self.agent.messages)
latency = (time.time() - start) * 1000
if response.tool_calls:
# Log each tool call
for tc in response.tool_calls:
t_start = time.time()
try:
result = await self._execute_tool(tc)
success = True
error = ""
except Exception as e:
result = {"error": str(e)}
success = False
error = str(e)
self.logger.log_tool_call(
agent_id=self.agent.id,
session_id=self.session_id,
step=step,
tool_name=tc.name,
parameters=tc.args,
result=json.dumps(result)[:500],
duration_ms=(time.time() - t_start) * 1000,
success=success,
error=error
)
else:
self.logger.log_decision(
agent_id=self.agent.id,
session_id=self.session_id,
step=step,
decision="return_result",
reasoning="Model returned text response, no tool calls",
context_snapshot={"response_preview": response.content[:200]}
)
return response.content
Don't log full message histories in every LLM call event — they get huge fast. Log message summaries (first 200 chars of user messages, first 500 chars of assistant responses) and provide a replay_id that lets you reconstruct the full conversation from step-by-step logs.
What metrics to track
Logging individual events is for debugging. Metrics are for understanding system health over time.
Here are the metrics I track for every agent in production:
| Metric | What it measures | Alert threshold |
|---|---|---|
| Cost per session | API cost for one complete agent run | > $0.50 |
| Cost per user | Total cost divided by active users | Weekly trend up |
| Error rate | Failed tool calls / total tool calls | > 5% |
| Latency p50 | Median time per agent run | > 15s |
| Latency p95 | Slowest 5% of agent runs | > 45s |
| Latency p99 | Worst-case agent runs | > 120s |
| Tool success rate | Successful tool executions / total | < 95% |
| Loop iterations | Number of LLM calls per agent run | > 15 |
| Stuck agents | Agents running > 30 iterations | Any |
Computing metrics from logs
import json
from collections import defaultdict
from statistics import median
class MetricsCalculator:
def __init__(self, log_file: str):
self.log_file = log_file
def compute_session_metrics(self, session_id: str) -> dict:
events = self._load_session_events(session_id)
llm_calls = [e for e in events if e["event_type"] == "llm_call"]
tool_calls = [e for e in events if e["event_type"] == "tool_execution"]
decisions = [e for e in events if e["event_type"] == "agent_decision"]
total_cost = sum(e.get("cost", 0) for e in llm_calls)
total_latency = sum(e.get("latency_ms", 0) for e in llm_calls)
tool_failures = [e for e in tool_calls if not e.get("success", True)]
return {
"session_id": session_id,
"total_steps": len(llm_calls),
"total_cost": round(total_cost, 4),
"total_latency_ms": total_latency,
"tool_calls": len(tool_calls),
"tool_failures": len(tool_failures),
"tool_success_rate": round((len(tool_calls) - len(tool_failures)) / len(tool_calls) * 100, 1)
if tool_calls else 100.0,
"decisions_made": len(decisions),
}
def compute_aggregate_metrics(self, time_window_hours: int = 24) -> dict:
events = self._load_time_window(time_window_hours)
# Group by session
sessions = defaultdict(list)
for e in events:
sessions[e.get("session_id")].append(e)
session_costs = []
session_latencies = []
session_steps = []
for sid, session_events in sessions.items():
llm_calls = [e for e in session_events if e["event_type"] == "llm_call"]
total_cost = sum(e.get("cost", 0) for e in llm_calls)
total_latency = sum(e.get("latency_ms", 0) for e in llm_calls)
session_costs.append(total_cost)
session_latencies.append(total_latency)
session_steps.append(len(llm_calls))
latencies_sorted = sorted(session_latencies)
n = len(latencies_sorted)
return {
"total_sessions": len(sessions),
"avg_cost": round(sum(session_costs) / len(session_costs), 4) if session_costs else 0,
"p50_latency_ms": median(session_latencies) if session_latencies else 0,
"p95_latency_ms": latencies_sorted[int(n * 0.95)] if n > 0 else 0,
"p99_latency_ms": latencies_sorted[int(n * 0.99)] if n > 0 else 0,
"avg_steps": round(sum(session_steps) / len(session_steps), 1) if session_steps else 0,
}
Setting up dashboards
You don’t need a complex observability platform to start. Here’s my progression:
Phase 1 — File-based (day 1). JSON Lines to rotating files. Query with jq and grep. This covers 90% of debugging needs.
# Find all sessions where cost exceeded $1
jq 'select(.event_type == "llm_call" and .cost > 1.0) | .session_id' agent_logs/agent.log | sort -u
# Get p95 latency for last 1000 LLM calls
tail -10000 agent_logs/agent.log | jq -s 'map(select(.event_type == "llm_call") | .latency_ms) | sort | .[length * 0.95 | floor]'
Phase 2 — SQLite (week 1). Write logs to SQLite instead of flat files. Query with SQL.
-- Average cost per session in last 24 hours
SELECT session_id, COUNT(*) as steps, SUM(cost) as total_cost, AVG(latency_ms) as avg_latency
FROM llm_calls
WHERE timestamp > datetime('now', '-1 day')
GROUP BY session_id
ORDER BY total_cost DESC;
Phase 3 — Grafana (month 1). Ship logs to a structured logging service (Grafana Loki, Axiom, or similar) and build dashboards. Only do this when you need to share metrics with a team or track trends over weeks.
Debugging patterns
Here’s how I debug agents using logs:
Pattern 1: Replay the run. Take the logged events from a session and replay them step by step. This is the single most powerful debugging technique.
class AgentReplayer:
def __init__(self, log_file: str):
self.log_file = log_file
def replay(self, session_id: str):
events = self._load_session(session_id)
for i, event in enumerate(events):
event_type = event.get("event_type")
step = event.get("step", 0)
if event_type == "llm_call":
print(f"\n{'='*60}")
print(f"Step {step} — LLM Call ({event.get('model')})")
print(f"Cost: ${event.get('cost', 0):.4f} | Latency: {event.get('latency_ms', 0)}ms")
print(f"Tokens: {event.get('prompt_tokens', 0)} in / {event.get('completion_tokens', 0)} out")
print(f"\nInput (truncated): {json.dumps(event.get('input_messages', [])[:2])[:300]}")
print(f"\nOutput: {event.get('output_content', '')[:500]}")
elif event_type == "tool_execution":
status = "✓" if event.get("success") else "✗"
print(f" {status} Tool: {event.get('tool_name')}({json.dumps(event.get('parameters', {}))})")
print(f" Duration: {event.get('duration_ms', 0)}ms")
if not event.get("success"):
print(f" Error: {event.get('error', '')}")
elif event_type == "agent_decision":
print(f" → Decision: {event.get('decision')}")
input("Press Enter for next step...") # Step through manually
Pattern 2: Compare two runs. When the same input produces different outputs, compare the decision paths side by side.
def compare_runs(session_a: str, session_b: str):
events_a = load_session(session_a)
events_b = load_session(session_b)
decisions_a = [e for e in events_a if e["event_type"] == "agent_decision"]
decisions_b = [e for e in events_b if e["event_type"] == "agent_decision"]
for i, (da, db) in enumerate(zip(decisions_a, decisions_b)):
if da.get("decision") != db.get("decision"):
print(f"Divergence at step {i}:")
print(f" Run A: {da.get('decision')}")
print(f" Run B: {db.get('decision')}")
print(f" Context A: {da.get('context_snapshot')}")
print(f" Context B: {db.get('context_snapshot')}")
Pattern 3: The stuck agent detector. Monitor for agents that are looping (repeating the same tool call with similar parameters).
def detect_stuck_agents(events: list, max_iterations: int = 15):
sessions = defaultdict(list)
for e in events:
sessions[e.get("session_id")].append(e)
stuck = []
for sid, sess_events in sessions.items():
tool_calls = [e for e in sess_events if e["event_type"] == "tool_execution"]
# Check for repeating tool calls
if len(tool_calls) > max_iterations:
# Check if it's repeating the same tool
tool_names = [t.get("tool_name") for t in tool_calls]
if len(set(tool_names)) <= 2: # Using only 1-2 tools repeatedly
stuck.append({
"session_id": sid,
"iterations": len(tool_calls),
"tools_used": tool_names[:10],
"total_cost": sum(t.get("cost", 0) for t in sess_events
if t.get("event_type") == "llm_call")
})
return stuck
A 1-hour logging setup
If you do nothing else, implement this. It takes one hour and covers 90% of debugging needs:
import json
import os
from datetime import datetime
from pathlib import Path
class QuickLogger:
"""Minimal agent logger — 50 lines, one file, zero dependencies."""
def __init__(self, log_dir: str = "logs"):
Path(log_dir).mkdir(parents=True, exist_ok=True)
self.log_file = Path(log_dir) / f"agent-{datetime.now().strftime('%Y%m%d')}.jsonl"
def log(self, event: dict):
event["_timestamp"] = datetime.utcnow().isoformat()
with open(self.log_file, "a") as f:
f.write(json.dumps(event, default=str) + "\n")
def llm_call(self, session: str, step: int, model: str, prompt_tokens: int,
completion_tokens: int, cost: float, latency_ms: int,
response: str = ""):
self.log({
"type": "llm_call", "session": session, "step": step,
"model": model, "prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens, "cost": round(cost, 6),
"latency_ms": latency_ms, "response_preview": response[:200]
})
def tool_call(self, session: str, step: int, tool: str, params: dict,
duration_ms: int, success: bool, error: str = ""):
self.log({
"type": "tool_call", "session": session, "step": step,
"tool": tool, "params": params, "duration_ms": duration_ms,
"success": success, "error": error
})
def decision(self, session: str, step: int, decision: str, context: dict = None):
self.log({
"type": "decision", "session": session, "step": step,
"decision": decision, "context": context
})
def error(self, session: str, step: int, error: str, traceback: str = ""):
self.log({
"type": "error", "session": session, "step": step,
"error": error, "traceback": traceback[:500]
})
Usage:
logger = QuickLogger("agent_logs")
logger.llm_call(session_id, step, "gpt-4o", 1500, 400, 0.015, 1200, response_text)
logger.tool_call(session_id, step, "search_web", {"q": "Bengaluru weather"}, 800, True)
logger.decision(session_id, step, "use_weather_tool", {"confidence": 0.85})
That’s it. Rotate the log file daily. Query with jq. You’re now in the top 10% of agent developers who actually know what their agents are doing.
Alerting
Logs are for after something goes wrong. Alerts are for catching it as it happens.
class AlertManager:
def __init__(self, cost_threshold: float = 0.50,
error_rate_threshold: float = 0.1,
max_loops: int = 20):
self.cost_threshold = cost_threshold
self.error_rate_threshold = error_rate_threshold
self.max_loops = max_loops
async def check_session(self, session_id: str, events: list):
alerts = []
# Cost spike
total_cost = sum(e.get("cost", 0) for e in events
if e.get("event_type") == "llm_call")
if total_cost > self.cost_threshold:
alerts.append(f"Cost spike: ${total_cost:.2f} for session {session_id}")
# Error rate
tool_events = [e for e in events if e.get("event_type") == "tool_execution"]
if tool_events:
failures = [e for e in tool_events if not e.get("success")]
if len(failures) / len(tool_events) > self.error_rate_threshold:
alerts.append(f"High error rate: {len(failures)}/{len(tool_events)} tool failures")
# Stuck loop
if len(tool_events) > self.max_loops:
alerts.append(f"Agent stuck: {len(tool_events)} tool calls without resolution")
return alerts
For production, I route alerts to a Telegram bot. The format is simple:
⚠️ Agent Alert
Session: abc-123
Type: Cost spike
Detail: $1.24 in last 2 minutes
Agent: content-writer-v2
The tools landscape
Here’s what I’ve used and what I’d recommend:
| Tool | Best for | Cost | Setup time |
|---|---|---|---|
| File-based + jq | Solo developers, startups | Free | 1 hour |
| SQLite + Metabase | Small teams | Free | 1 day |
| LangSmith | LangGraph users | Pay per trace | 30 min |
| Grafana Loki + Promtail | Teams with existing Grafana | Free tier | 2 days |
| Axiom | Easy hosted solution | Free tier (50GB) | 1 hour |
| OpenTelemetry | Distributed tracing across services | Free (host it) | 2–3 days |
| Sentry | Error tracking specifically | Free tier | 30 min |
My recommendation: start with file-based logging. When you need more, add SQLite for queryability. When you need team dashboards, add Grafana or Axiom. Don’t over-invest in observability before you have users to observe.
Related: AI agent multi-step workflows: orchestrating complex agent pipelines — how to design workflows that benefit from good observability.
Logging won’t make your agent perfect. But it will make failures visible, debuggable, and — eventually — preventable. And that’s the difference between agent development that feels like guessing and agent development that feels like engineering.