Skip to content
Back to Blog
1 min read

Agent Observability: Monitoring AI Systems in Production

I wrote “Agent Observability: Monitoring AI Systems in Production” to share practical, production-minded guidance on this topic.

The Three Pillars for AI

from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional
from datetime import datetime
import time

# 1. Metrics - What happened
@dataclass
class AgentMetrics:
    """Core metrics for agent observability"""
    # Throughput
    requests_total: int = 0
    requests_succeeded: int = 0
    requests_failed: int = 0

    # Latency
    latency_samples: List[float] = field(default_factory=list)

    # Token usage
    tokens_input: int = 0
    tokens_output: int = 0

    # Tool usage
    tool_calls_total: int = 0
    tool_calls_by_name: Dict[str, int] = field(default_factory=dict)

    # Errors
    errors_by_type: Dict[str, int] = field(default_factory=dict)

    def record_request(self, success: bool, latency_ms: float,
                      input_tokens: int, output_tokens: int):
        self.requests_total += 1
        if success:
            self.requests_succeeded += 1
        else:
            self.requests_failed += 1

        self.latency_samples.append(latency_ms)
        self.tokens_input += input_tokens
        self.tokens_output += output_tokens

    def record_tool_call(self, tool_name: str):
        self.tool_calls_total += 1
        self.tool_calls_by_name[tool_name] = self.tool_calls_by_name.get(tool_name, 0) + 1

    def record_error(self, error_type: str):
        self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1

    def get_summary(self) -> Dict:
        latencies = self.latency_samples[-1000:]  # Last 1000 samples
        return {
            "success_rate": self.requests_succeeded / max(1, self.requests_total),
            "avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
            "p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
            "total_tokens": self.tokens_input + self.tokens_output,
            "tool_calls": self.tool_calls_total,
            "error_rate": sum(self.errors_by_type.values()) / max(1, self.requests_total)
        }

# 2. Logs - What was said
@dataclass
class AgentLogEntry:
    """Structured log entry for agents"""
    timestamp: str
    level: str
    agent_id: str
    session_id: str
    message: str
    context: Dict[str, Any] = field(default_factory=dict)

# 3. Traces - What path was taken
@dataclass
class Span:
    """A span in a distributed trace"""
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    start_time: float
    end_time: Optional[float] = None
    status: str = "running"
    attributes: Dict[str, Any] = field(default_factory=dict)
    events: List[Dict] = field(default_factory=list)

    def end(self, status: str = "ok"):
        self.end_time = time.time()
        self.status = status

    def add_event(self, name: str, attributes: Dict = None):
        self.events.append({
            "timestamp": time.time(),
            "name": name,
            "attributes": attributes or {}
        })

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0

Observability Platform Integration

from contextlib import contextmanager
import uuid
import threading

class ObservabilityPlatform:
    """Unified observability for AI agents"""

    def __init__(self):
        self.metrics = AgentMetrics()
        self.logs: List[AgentLogEntry] = []
        self.traces: Dict[str, List[Span]] = {}
        self._current_trace = threading.local()

    @contextmanager
    def trace(self, operation_name: str, attributes: Dict = None):
        """Create a trace span"""
        trace_id = getattr(self._current_trace, 'trace_id', None)
        parent_span_id = getattr(self._current_trace, 'span_id', None)

        if not trace_id:
            trace_id = str(uuid.uuid4())
            self._current_trace.trace_id = trace_id
            self.traces[trace_id] = []

        span = Span(
            trace_id=trace_id,
            span_id=str(uuid.uuid4()),
            parent_span_id=parent_span_id,
            operation_name=operation_name,
            start_time=time.time(),
            attributes=attributes or {}
        )

        self.traces[trace_id].append(span)
        old_span_id = getattr(self._current_trace, 'span_id', None)
        self._current_trace.span_id = span.span_id

        try:
            yield span
            span.end("ok")
        except Exception as e:
            span.end("error")
            span.add_event("exception", {"type": type(e).__name__, "message": str(e)})
            raise
        finally:
            self._current_trace.span_id = old_span_id

    def log(self, level: str, message: str, agent_id: str = "",
            session_id: str = "", **context):
        """Log a structured message"""
        entry = AgentLogEntry(
            timestamp=datetime.utcnow().isoformat(),
            level=level,
            agent_id=agent_id,
            session_id=session_id,
            message=message,
            context=context
        )
        self.logs.append(entry)

    def record_llm_call(self, model: str, input_tokens: int,
                       output_tokens: int, latency_ms: float,
                       success: bool):
        """Record an LLM call"""
        self.metrics.record_request(success, latency_ms, input_tokens, output_tokens)

        # Also add to current span
        if hasattr(self._current_trace, 'span_id'):
            trace_id = self._current_trace.trace_id
            for span in self.traces.get(trace_id, []):
                if span.span_id == self._current_trace.span_id:
                    span.add_event("llm_call", {
                        "model": model,
                        "input_tokens": input_tokens,
                        "output_tokens": output_tokens,
                        "latency_ms": latency_ms
                    })

    def get_trace(self, trace_id: str) -> List[Dict]:
        """Get a complete trace"""
        spans = self.traces.get(trace_id, [])
        return [
            {
                "span_id": s.span_id,
                "parent_span_id": s.parent_span_id,
                "operation": s.operation_name,
                "duration_ms": s.duration_ms,
                "status": s.status,
                "events": s.events
            }
            for s in spans
        ]

# Global instance
observability = ObservabilityPlatform()

Instrumented Agent

class InstrumentedAgent:
    """Agent with full observability instrumentation"""

    def __init__(self, agent_id: str, obs: ObservabilityPlatform):
        self.agent_id = agent_id
        self.obs = obs
        self.session_id = str(uuid.uuid4())

    async def process_request(self, user_input: str) -> str:
        """Process a request with full instrumentation"""

        with self.obs.trace("process_request", {"input_length": len(user_input)}) as span:
            self.obs.log("info", "Starting request processing",
                        agent_id=self.agent_id, session_id=self.session_id,
                        input_preview=user_input[:100])

            try:
                # Planning phase
                with self.obs.trace("planning") as plan_span:
                    plan = await self._create_plan(user_input)
                    plan_span.attributes["steps"] = len(plan)

                # Execution phase
                with self.obs.trace("execution") as exec_span:
                    result = await self._execute_plan(plan)
                    exec_span.attributes["tools_used"] = len(result.get("tools", []))

                # Response generation
                with self.obs.trace("response_generation"):
                    response = await self._generate_response(result)

                self.obs.log("info", "Request completed successfully",
                            agent_id=self.agent_id, session_id=self.session_id,
                            response_length=len(response))

                return response

            except Exception as e:
                self.obs.log("error", f"Request failed: {str(e)}",
                            agent_id=self.agent_id, session_id=self.session_id,
                            error_type=type(e).__name__)
                self.obs.metrics.record_error(type(e).__name__)
                raise

    async def _create_plan(self, user_input: str) -> List[Dict]:
        """Create execution plan"""
        start = time.time()

        response = await call_llm("Create a plan for: " + user_input)

        self.obs.record_llm_call(
            model="gpt-4o",
            input_tokens=len(user_input) // 4,
            output_tokens=len(response) // 4,
            latency_ms=(time.time() - start) * 1000,
            success=True
        )

        return parse_plan(response)

    async def _execute_plan(self, plan: List[Dict]) -> Dict:
        """Execute the plan"""
        results = []

        for step in plan:
            with self.obs.trace(f"step_{step['name']}") as step_span:
                if step.get("tool"):
                    self.obs.metrics.record_tool_call(step["tool"])
                    step_span.add_event("tool_call", {"tool": step["tool"]})

                result = await execute_step(step)
                results.append(result)

        return {"results": results, "tools": [s.get("tool") for s in plan if s.get("tool")]}

    async def _generate_response(self, execution_result: Dict) -> str:
        """Generate final response"""
        start = time.time()

        response = await call_llm(f"Generate response from: {execution_result}")

        self.obs.record_llm_call(
            model="gpt-4o",
            input_tokens=100,
            output_tokens=len(response) // 4,
            latency_ms=(time.time() - start) * 1000,
            success=True
        )

        return response

Dashboard Metrics

class AgentDashboard:
    """Dashboard data for agent monitoring"""

    def __init__(self, obs: ObservabilityPlatform):
        self.obs = obs

    def get_overview(self) -> Dict:
        """Get dashboard overview data"""
        metrics = self.obs.metrics

        return {
            "health": self._calculate_health(metrics),
            "throughput": {
                "requests_per_minute": self._calculate_rpm(),
                "tokens_per_minute": self._calculate_tpm()
            },
            "latency": {
                "avg_ms": metrics.get_summary()["avg_latency_ms"],
                "p99_ms": metrics.get_summary()["p99_latency_ms"]
            },
            "errors": {
                "rate": metrics.get_summary()["error_rate"],
                "by_type": metrics.errors_by_type
            },
            "tools": {
                "total_calls": metrics.tool_calls_total,
                "by_name": metrics.tool_calls_by_name
            }
        }

    def _calculate_health(self, metrics: AgentMetrics) -> str:
        summary = metrics.get_summary()

        if summary["error_rate"] > 0.1:
            return "critical"
        if summary["error_rate"] > 0.05:
            return "degraded"
        if summary["avg_latency_ms"] > 5000:
            return "slow"
        return "healthy"

    def _calculate_rpm(self) -> float:
        # Calculate requests per minute from recent activity
        return self.obs.metrics.requests_total / max(1, self._minutes_running())

    def _calculate_tpm(self) -> float:
        total_tokens = self.obs.metrics.tokens_input + self.obs.metrics.tokens_output
        return total_tokens / max(1, self._minutes_running())

    def _minutes_running(self) -> float:
        # This would track actual runtime
        return 10  # Placeholder

    def get_recent_errors(self, limit: int = 10) -> List[Dict]:
        """Get recent error logs"""
        error_logs = [l for l in self.obs.logs if l.level == "error"]
        return [
            {
                "timestamp": l.timestamp,
                "message": l.message,
                "agent_id": l.agent_id,
                "context": l.context
            }
            for l in error_logs[-limit:]
        ]

    def get_slow_traces(self, threshold_ms: float = 5000) -> List[Dict]:
        """Get traces exceeding latency threshold"""
        slow_traces = []

        for trace_id, spans in self.obs.traces.items():
            root_span = next((s for s in spans if not s.parent_span_id), None)
            if root_span and root_span.duration_ms > threshold_ms:
                slow_traces.append({
                    "trace_id": trace_id,
                    "duration_ms": root_span.duration_ms,
                    "operation": root_span.operation_name,
                    "span_count": len(spans)
                })

        return sorted(slow_traces, key=lambda t: t["duration_ms"], reverse=True)[:10]

Observability transforms AI agents from black boxes into understandable systems. Combine metrics, logs, and traces to gain complete visibility into agent behavior.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.