Back to Blog
7 min read

Data Observability: Monitoring Your Data Pipelines

Data observability extends traditional monitoring to data systems. In 2021, as data pipelines became more complex, observability became essential for reliability. Let’s explore how to implement data observability.

The Five Pillars of Data Observability

  1. Freshness: Is data up to date?
  2. Volume: Is the expected amount of data present?
  3. Schema: Has the schema changed unexpectedly?
  4. Distribution: Are data values within expected ranges?
  5. Lineage: Where did this data come from?

Implementing Data Observability

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import pandas as pd
import numpy as np
from scipy import stats

@dataclass
class DatasetMetrics:
    """Metrics for a monitored dataset"""
    dataset_name: str
    timestamp: datetime
    row_count: int
    column_count: int
    freshness_seconds: float
    schema_hash: str
    column_stats: Dict[str, dict]
    anomalies: List[dict] = field(default_factory=list)

class DataObserver:
    """Monitor data for anomalies and issues"""

    def __init__(self, baseline_days: int = 30):
        self.baseline_days = baseline_days
        self.historical_metrics: Dict[str, List[DatasetMetrics]] = {}

    def observe(self, df: pd.DataFrame, dataset_name: str) -> DatasetMetrics:
        """Collect metrics for a dataset"""
        now = datetime.utcnow()

        # Calculate column statistics
        column_stats = {}
        for col in df.columns:
            column_stats[col] = self._calculate_column_stats(df[col])

        # Calculate schema hash
        schema_hash = self._hash_schema(df)

        # Determine freshness (assuming a timestamp column exists)
        freshness = self._calculate_freshness(df)

        metrics = DatasetMetrics(
            dataset_name=dataset_name,
            timestamp=now,
            row_count=len(df),
            column_count=len(df.columns),
            freshness_seconds=freshness,
            schema_hash=schema_hash,
            column_stats=column_stats
        )

        # Detect anomalies
        metrics.anomalies = self._detect_anomalies(metrics, dataset_name)

        # Store for historical comparison
        if dataset_name not in self.historical_metrics:
            self.historical_metrics[dataset_name] = []
        self.historical_metrics[dataset_name].append(metrics)

        return metrics

    def _calculate_column_stats(self, series: pd.Series) -> dict:
        """Calculate statistics for a column"""
        stats_dict = {
            'dtype': str(series.dtype),
            'null_count': series.isnull().sum(),
            'null_pct': series.isnull().mean(),
            'unique_count': series.nunique()
        }

        if series.dtype in ['int64', 'float64']:
            stats_dict.update({
                'mean': series.mean(),
                'std': series.std(),
                'min': series.min(),
                'max': series.max(),
                'median': series.median(),
                'p5': series.quantile(0.05),
                'p95': series.quantile(0.95)
            })
        elif series.dtype == 'object':
            value_counts = series.value_counts()
            stats_dict.update({
                'top_values': value_counts.head(5).to_dict(),
                'avg_length': series.str.len().mean() if series.str.len().notna().any() else None
            })

        return stats_dict

    def _hash_schema(self, df: pd.DataFrame) -> str:
        """Create hash of schema for change detection"""
        import hashlib
        schema_str = str([(col, str(dtype)) for col, dtype in df.dtypes.items()])
        return hashlib.md5(schema_str.encode()).hexdigest()

    def _calculate_freshness(self, df: pd.DataFrame) -> float:
        """Calculate data freshness in seconds"""
        timestamp_cols = df.select_dtypes(include=['datetime64']).columns

        if len(timestamp_cols) > 0:
            # Use the most recent timestamp
            max_timestamp = df[timestamp_cols].max().max()
            if pd.notna(max_timestamp):
                return (datetime.utcnow() - max_timestamp.to_pydatetime().replace(tzinfo=None)).total_seconds()

        return -1  # Unknown freshness

    def _detect_anomalies(
        self,
        current: DatasetMetrics,
        dataset_name: str
    ) -> List[dict]:
        """Detect anomalies compared to historical baseline"""
        anomalies = []
        history = self.historical_metrics.get(dataset_name, [])

        if len(history) < 7:  # Need minimum history
            return anomalies

        # Volume anomaly
        historical_counts = [m.row_count for m in history[-self.baseline_days:]]
        if historical_counts:
            mean_count = np.mean(historical_counts)
            std_count = np.std(historical_counts)

            if std_count > 0:
                z_score = (current.row_count - mean_count) / std_count
                if abs(z_score) > 3:
                    anomalies.append({
                        'type': 'volume',
                        'severity': 'high' if abs(z_score) > 5 else 'medium',
                        'message': f'Row count {current.row_count} is {z_score:.1f} std devs from mean {mean_count:.0f}',
                        'z_score': z_score
                    })

        # Freshness anomaly
        historical_freshness = [m.freshness_seconds for m in history[-self.baseline_days:] if m.freshness_seconds > 0]
        if historical_freshness and current.freshness_seconds > 0:
            max_freshness = max(historical_freshness) * 2  # 2x historical max
            if current.freshness_seconds > max_freshness:
                anomalies.append({
                    'type': 'freshness',
                    'severity': 'high',
                    'message': f'Data is {current.freshness_seconds/3600:.1f} hours old, exceeds threshold of {max_freshness/3600:.1f} hours'
                })

        # Schema change
        if history:
            if current.schema_hash != history[-1].schema_hash:
                anomalies.append({
                    'type': 'schema',
                    'severity': 'medium',
                    'message': 'Schema has changed since last observation'
                })

        # Distribution anomalies
        for col, stats in current.column_stats.items():
            if 'mean' in stats:
                anomalies.extend(
                    self._detect_distribution_anomaly(col, stats, history)
                )

        return anomalies

    def _detect_distribution_anomaly(
        self,
        column: str,
        current_stats: dict,
        history: List[DatasetMetrics]
    ) -> List[dict]:
        """Detect distribution anomalies for a column"""
        anomalies = []

        historical_means = [
            m.column_stats.get(column, {}).get('mean')
            for m in history[-self.baseline_days:]
            if m.column_stats.get(column, {}).get('mean') is not None
        ]

        if len(historical_means) >= 7:
            mean_of_means = np.mean(historical_means)
            std_of_means = np.std(historical_means)

            if std_of_means > 0:
                z_score = (current_stats['mean'] - mean_of_means) / std_of_means
                if abs(z_score) > 3:
                    anomalies.append({
                        'type': 'distribution',
                        'severity': 'medium',
                        'column': column,
                        'message': f'Column {column} mean {current_stats["mean"]:.2f} deviates from historical mean {mean_of_means:.2f}',
                        'z_score': z_score
                    })

        # Null rate anomaly
        historical_nulls = [
            m.column_stats.get(column, {}).get('null_pct', 0)
            for m in history[-self.baseline_days:]
        ]

        if historical_nulls:
            max_historical_null = max(historical_nulls)
            if current_stats['null_pct'] > max_historical_null * 2 and current_stats['null_pct'] > 0.05:
                anomalies.append({
                    'type': 'null_rate',
                    'severity': 'medium',
                    'column': column,
                    'message': f'Column {column} null rate {current_stats["null_pct"]*100:.1f}% exceeds historical max {max_historical_null*100:.1f}%'
                })

        return anomalies

Alerting on Data Issues

from enum import Enum
from typing import Callable
import json

class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class DataObservabilityAlerts:
    """Alert management for data observability"""

    def __init__(self):
        self.alert_handlers: Dict[AlertSeverity, List[Callable]] = {
            AlertSeverity.LOW: [],
            AlertSeverity.MEDIUM: [],
            AlertSeverity.HIGH: [],
            AlertSeverity.CRITICAL: []
        }

    def register_handler(
        self,
        severity: AlertSeverity,
        handler: Callable[[dict], None]
    ):
        """Register alert handler for severity level"""
        self.alert_handlers[severity].append(handler)

    def process_anomalies(
        self,
        metrics: DatasetMetrics
    ):
        """Process anomalies and send alerts"""
        for anomaly in metrics.anomalies:
            severity = AlertSeverity(anomaly.get('severity', 'medium'))

            alert = {
                'dataset': metrics.dataset_name,
                'timestamp': metrics.timestamp.isoformat(),
                'anomaly_type': anomaly['type'],
                'severity': severity.value,
                'message': anomaly['message'],
                'details': anomaly
            }

            # Call all handlers for this severity and above
            for sev in AlertSeverity:
                if sev.value >= severity.value:
                    for handler in self.alert_handlers[sev]:
                        handler(alert)


# Alert handlers
def slack_alert_handler(alert: dict):
    """Send alert to Slack"""
    import requests

    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')
    if not webhook_url:
        return

    color = {
        'low': '#36a64f',
        'medium': '#ffcc00',
        'high': '#ff6600',
        'critical': '#ff0000'
    }.get(alert['severity'], '#808080')

    payload = {
        'attachments': [{
            'color': color,
            'title': f"Data Alert: {alert['dataset']}",
            'text': alert['message'],
            'fields': [
                {'title': 'Type', 'value': alert['anomaly_type'], 'short': True},
                {'title': 'Severity', 'value': alert['severity'], 'short': True}
            ],
            'ts': datetime.utcnow().timestamp()
        }]
    }

    requests.post(webhook_url, json=payload)


def pagerduty_alert_handler(alert: dict):
    """Send critical alerts to PagerDuty"""
    if alert['severity'] not in ['high', 'critical']:
        return

    import requests

    payload = {
        'routing_key': os.environ.get('PAGERDUTY_ROUTING_KEY'),
        'event_action': 'trigger',
        'dedup_key': f"{alert['dataset']}_{alert['anomaly_type']}",
        'payload': {
            'summary': f"Data issue in {alert['dataset']}: {alert['message']}",
            'severity': 'critical' if alert['severity'] == 'critical' else 'error',
            'source': 'data-observability',
            'custom_details': alert
        }
    }

    requests.post(
        'https://events.pagerduty.com/v2/enqueue',
        json=payload
    )


# Setup
alerts = DataObservabilityAlerts()
alerts.register_handler(AlertSeverity.MEDIUM, slack_alert_handler)
alerts.register_handler(AlertSeverity.HIGH, pagerduty_alert_handler)

Data Lineage Tracking

from dataclasses import dataclass
from typing import Set, Optional
import networkx as nx

@dataclass
class DataAsset:
    name: str
    type: str  # 'table', 'file', 'view', etc.
    location: str
    schema: Optional[dict] = None

@dataclass
class LineageEdge:
    source: str
    target: str
    transformation: str
    job_name: Optional[str] = None

class LineageTracker:
    """Track data lineage across the pipeline"""

    def __init__(self):
        self.graph = nx.DiGraph()
        self.assets: Dict[str, DataAsset] = {}

    def register_asset(self, asset: DataAsset):
        """Register a data asset"""
        self.assets[asset.name] = asset
        self.graph.add_node(asset.name, **asset.__dict__)

    def add_lineage(self, edge: LineageEdge):
        """Add lineage relationship"""
        self.graph.add_edge(
            edge.source,
            edge.target,
            transformation=edge.transformation,
            job_name=edge.job_name
        )

    def get_upstream(self, asset_name: str, depth: int = -1) -> Set[str]:
        """Get all upstream dependencies"""
        if depth == -1:
            return nx.ancestors(self.graph, asset_name)
        else:
            upstream = set()
            current_level = {asset_name}
            for _ in range(depth):
                next_level = set()
                for node in current_level:
                    next_level.update(self.graph.predecessors(node))
                upstream.update(next_level)
                current_level = next_level
            return upstream

    def get_downstream(self, asset_name: str, depth: int = -1) -> Set[str]:
        """Get all downstream dependents"""
        if depth == -1:
            return nx.descendants(self.graph, asset_name)
        else:
            downstream = set()
            current_level = {asset_name}
            for _ in range(depth):
                next_level = set()
                for node in current_level:
                    next_level.update(self.graph.successors(node))
                downstream.update(next_level)
                current_level = next_level
            return downstream

    def impact_analysis(self, asset_name: str) -> dict:
        """Analyze impact of issues with an asset"""
        downstream = self.get_downstream(asset_name)

        return {
            'affected_asset': asset_name,
            'downstream_count': len(downstream),
            'downstream_assets': list(downstream),
            'critical_paths': self._find_critical_paths(asset_name, downstream)
        }

    def _find_critical_paths(
        self,
        source: str,
        targets: Set[str]
    ) -> List[List[str]]:
        """Find paths from source to critical downstream assets"""
        critical_paths = []
        for target in targets:
            if self.assets.get(target, DataAsset('', '', '')).type == 'report':
                try:
                    paths = list(nx.all_simple_paths(self.graph, source, target))
                    critical_paths.extend(paths)
                except nx.NetworkXNoPath:
                    pass
        return critical_paths

    def to_json(self) -> str:
        """Export lineage as JSON"""
        data = {
            'nodes': [
                {'id': node, **self.graph.nodes[node]}
                for node in self.graph.nodes
            ],
            'edges': [
                {
                    'source': u,
                    'target': v,
                    **self.graph.edges[u, v]
                }
                for u, v in self.graph.edges
            ]
        }
        return json.dumps(data, indent=2, default=str)

Key Data Observability Practices

  1. Monitor Proactively: Don’t wait for users to report issues
  2. Establish Baselines: Know what “normal” looks like
  3. Track Lineage: Understand data flow for impact analysis
  4. Alert Intelligently: Reduce noise, focus on actionable issues
  5. Measure SLOs: Define and track data reliability targets

Data observability in 2021 became essential as organizations depended more on data. The tools emerged to make it practical; the discipline makes it effective.

Resources

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.