Back to Blog
6 min read

Agent Observability: Monitoring AI Systems in Production

Observability for AI agents goes beyond traditional monitoring. Let’s explore how to gain visibility into complex, autonomous AI systems.

The Three Pillars for AI

from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional
from datetime import datetime
import time

# 1. Metrics - What happened
@dataclass
class AgentMetrics:
    """Core metrics for agent observability"""
    # Throughput
    requests_total: int = 0
    requests_succeeded: int = 0
    requests_failed: int = 0

    # Latency
    latency_samples: List[float] = field(default_factory=list)

    # Token usage
    tokens_input: int = 0
    tokens_output: int = 0

    # Tool usage
    tool_calls_total: int = 0
    tool_calls_by_name: Dict[str, int] = field(default_factory=dict)

    # Errors
    errors_by_type: Dict[str, int] = field(default_factory=dict)

    def record_request(self, success: bool, latency_ms: float,
                      input_tokens: int, output_tokens: int):
        self.requests_total += 1
        if success:
            self.requests_succeeded += 1
        else:
            self.requests_failed += 1

        self.latency_samples.append(latency_ms)
        self.tokens_input += input_tokens
        self.tokens_output += output_tokens

    def record_tool_call(self, tool_name: str):
        self.tool_calls_total += 1
        self.tool_calls_by_name[tool_name] = self.tool_calls_by_name.get(tool_name, 0) + 1

    def record_error(self, error_type: str):
        self.errors_by_type[error_type] = self.errors_by_type.get(error_type, 0) + 1

    def get_summary(self) -> Dict:
        latencies = self.latency_samples[-1000:]  # Last 1000 samples
        return {
            "success_rate": self.requests_succeeded / max(1, self.requests_total),
            "avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
            "p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
            "total_tokens": self.tokens_input + self.tokens_output,
            "tool_calls": self.tool_calls_total,
            "error_rate": sum(self.errors_by_type.values()) / max(1, self.requests_total)
        }

# 2. Logs - What was said
@dataclass
class AgentLogEntry:
    """Structured log entry for agents"""
    timestamp: str
    level: str
    agent_id: str
    session_id: str
    message: str
    context: Dict[str, Any] = field(default_factory=dict)

# 3. Traces - What path was taken
@dataclass
class Span:
    """A span in a distributed trace"""
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    start_time: float
    end_time: Optional[float] = None
    status: str = "running"
    attributes: Dict[str, Any] = field(default_factory=dict)
    events: List[Dict] = field(default_factory=list)

    def end(self, status: str = "ok"):
        self.end_time = time.time()
        self.status = status

    def add_event(self, name: str, attributes: Dict = None):
        self.events.append({
            "timestamp": time.time(),
            "name": name,
            "attributes": attributes or {}
        })

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0

Observability Platform Integration

from contextlib import contextmanager
import uuid
import threading

class ObservabilityPlatform:
    """Unified observability for AI agents"""

    def __init__(self):
        self.metrics = AgentMetrics()
        self.logs: List[AgentLogEntry] = []
        self.traces: Dict[str, List[Span]] = {}
        self._current_trace = threading.local()

    @contextmanager
    def trace(self, operation_name: str, attributes: Dict = None):
        """Create a trace span"""
        trace_id = getattr(self._current_trace, 'trace_id', None)
        parent_span_id = getattr(self._current_trace, 'span_id', None)

        if not trace_id:
            trace_id = str(uuid.uuid4())
            self._current_trace.trace_id = trace_id
            self.traces[trace_id] = []

        span = Span(
            trace_id=trace_id,
            span_id=str(uuid.uuid4()),
            parent_span_id=parent_span_id,
            operation_name=operation_name,
            start_time=time.time(),
            attributes=attributes or {}
        )

        self.traces[trace_id].append(span)
        old_span_id = getattr(self._current_trace, 'span_id', None)
        self._current_trace.span_id = span.span_id

        try:
            yield span
            span.end("ok")
        except Exception as e:
            span.end("error")
            span.add_event("exception", {"type": type(e).__name__, "message": str(e)})
            raise
        finally:
            self._current_trace.span_id = old_span_id

    def log(self, level: str, message: str, agent_id: str = "",
            session_id: str = "", **context):
        """Log a structured message"""
        entry = AgentLogEntry(
            timestamp=datetime.utcnow().isoformat(),
            level=level,
            agent_id=agent_id,
            session_id=session_id,
            message=message,
            context=context
        )
        self.logs.append(entry)

    def record_llm_call(self, model: str, input_tokens: int,
                       output_tokens: int, latency_ms: float,
                       success: bool):
        """Record an LLM call"""
        self.metrics.record_request(success, latency_ms, input_tokens, output_tokens)

        # Also add to current span
        if hasattr(self._current_trace, 'span_id'):
            trace_id = self._current_trace.trace_id
            for span in self.traces.get(trace_id, []):
                if span.span_id == self._current_trace.span_id:
                    span.add_event("llm_call", {
                        "model": model,
                        "input_tokens": input_tokens,
                        "output_tokens": output_tokens,
                        "latency_ms": latency_ms
                    })

    def get_trace(self, trace_id: str) -> List[Dict]:
        """Get a complete trace"""
        spans = self.traces.get(trace_id, [])
        return [
            {
                "span_id": s.span_id,
                "parent_span_id": s.parent_span_id,
                "operation": s.operation_name,
                "duration_ms": s.duration_ms,
                "status": s.status,
                "events": s.events
            }
            for s in spans
        ]

# Global instance
observability = ObservabilityPlatform()

Instrumented Agent

class InstrumentedAgent:
    """Agent with full observability instrumentation"""

    def __init__(self, agent_id: str, obs: ObservabilityPlatform):
        self.agent_id = agent_id
        self.obs = obs
        self.session_id = str(uuid.uuid4())

    async def process_request(self, user_input: str) -> str:
        """Process a request with full instrumentation"""

        with self.obs.trace("process_request", {"input_length": len(user_input)}) as span:
            self.obs.log("info", "Starting request processing",
                        agent_id=self.agent_id, session_id=self.session_id,
                        input_preview=user_input[:100])

            try:
                # Planning phase
                with self.obs.trace("planning") as plan_span:
                    plan = await self._create_plan(user_input)
                    plan_span.attributes["steps"] = len(plan)

                # Execution phase
                with self.obs.trace("execution") as exec_span:
                    result = await self._execute_plan(plan)
                    exec_span.attributes["tools_used"] = len(result.get("tools", []))

                # Response generation
                with self.obs.trace("response_generation"):
                    response = await self._generate_response(result)

                self.obs.log("info", "Request completed successfully",
                            agent_id=self.agent_id, session_id=self.session_id,
                            response_length=len(response))

                return response

            except Exception as e:
                self.obs.log("error", f"Request failed: {str(e)}",
                            agent_id=self.agent_id, session_id=self.session_id,
                            error_type=type(e).__name__)
                self.obs.metrics.record_error(type(e).__name__)
                raise

    async def _create_plan(self, user_input: str) -> List[Dict]:
        """Create execution plan"""
        start = time.time()

        response = await call_llm("Create a plan for: " + user_input)

        self.obs.record_llm_call(
            model="gpt-4o",
            input_tokens=len(user_input) // 4,
            output_tokens=len(response) // 4,
            latency_ms=(time.time() - start) * 1000,
            success=True
        )

        return parse_plan(response)

    async def _execute_plan(self, plan: List[Dict]) -> Dict:
        """Execute the plan"""
        results = []

        for step in plan:
            with self.obs.trace(f"step_{step['name']}") as step_span:
                if step.get("tool"):
                    self.obs.metrics.record_tool_call(step["tool"])
                    step_span.add_event("tool_call", {"tool": step["tool"]})

                result = await execute_step(step)
                results.append(result)

        return {"results": results, "tools": [s.get("tool") for s in plan if s.get("tool")]}

    async def _generate_response(self, execution_result: Dict) -> str:
        """Generate final response"""
        start = time.time()

        response = await call_llm(f"Generate response from: {execution_result}")

        self.obs.record_llm_call(
            model="gpt-4o",
            input_tokens=100,
            output_tokens=len(response) // 4,
            latency_ms=(time.time() - start) * 1000,
            success=True
        )

        return response

Dashboard Metrics

class AgentDashboard:
    """Dashboard data for agent monitoring"""

    def __init__(self, obs: ObservabilityPlatform):
        self.obs = obs

    def get_overview(self) -> Dict:
        """Get dashboard overview data"""
        metrics = self.obs.metrics

        return {
            "health": self._calculate_health(metrics),
            "throughput": {
                "requests_per_minute": self._calculate_rpm(),
                "tokens_per_minute": self._calculate_tpm()
            },
            "latency": {
                "avg_ms": metrics.get_summary()["avg_latency_ms"],
                "p99_ms": metrics.get_summary()["p99_latency_ms"]
            },
            "errors": {
                "rate": metrics.get_summary()["error_rate"],
                "by_type": metrics.errors_by_type
            },
            "tools": {
                "total_calls": metrics.tool_calls_total,
                "by_name": metrics.tool_calls_by_name
            }
        }

    def _calculate_health(self, metrics: AgentMetrics) -> str:
        summary = metrics.get_summary()

        if summary["error_rate"] > 0.1:
            return "critical"
        if summary["error_rate"] > 0.05:
            return "degraded"
        if summary["avg_latency_ms"] > 5000:
            return "slow"
        return "healthy"

    def _calculate_rpm(self) -> float:
        # Calculate requests per minute from recent activity
        return self.obs.metrics.requests_total / max(1, self._minutes_running())

    def _calculate_tpm(self) -> float:
        total_tokens = self.obs.metrics.tokens_input + self.obs.metrics.tokens_output
        return total_tokens / max(1, self._minutes_running())

    def _minutes_running(self) -> float:
        # This would track actual runtime
        return 10  # Placeholder

    def get_recent_errors(self, limit: int = 10) -> List[Dict]:
        """Get recent error logs"""
        error_logs = [l for l in self.obs.logs if l.level == "error"]
        return [
            {
                "timestamp": l.timestamp,
                "message": l.message,
                "agent_id": l.agent_id,
                "context": l.context
            }
            for l in error_logs[-limit:]
        ]

    def get_slow_traces(self, threshold_ms: float = 5000) -> List[Dict]:
        """Get traces exceeding latency threshold"""
        slow_traces = []

        for trace_id, spans in self.obs.traces.items():
            root_span = next((s for s in spans if not s.parent_span_id), None)
            if root_span and root_span.duration_ms > threshold_ms:
                slow_traces.append({
                    "trace_id": trace_id,
                    "duration_ms": root_span.duration_ms,
                    "operation": root_span.operation_name,
                    "span_count": len(spans)
                })

        return sorted(slow_traces, key=lambda t: t["duration_ms"], reverse=True)[:10]

Observability transforms AI agents from black boxes into understandable systems. Combine metrics, logs, and traces to gain complete visibility into agent behavior.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.