Skip to content
Back to Blog
1 min read

Model Monitoring in Production with Azure ML

I wrote “Model Monitoring in Production with Azure ML” to share practical, production-minded guidance on this topic.

Why Model Monitoring Matters

Production models can degrade due to:

  • Data drift: Input data distribution changes
  • Concept drift: Relationship between features and target changes
  • Feature drift: Individual feature distributions change
  • Prediction drift: Model output distribution changes

Setting Up Model Monitoring

from azure.ai.ml import MLClient
from azure.ai.ml.entities import (
    MonitorSchedule,
    MonitorDefinition,
    DataDriftSignal,
    PredictionDriftSignal,
    ServerlessSparkCompute
)
from azure.identity import DefaultAzureCredential

ml_client = MLClient(
    credential=DefaultAzureCredential(),
    subscription_id="your-subscription",
    resource_group_name="your-rg",
    workspace_name="your-workspace"
)

# Create monitoring definition
monitor_definition = MonitorDefinition(
    compute=ServerlessSparkCompute(instance_type="Standard_E4s_v3", runtime_version="3.2"),
    monitoring_target={
        "endpoint_deployment_id": "/subscriptions/.../endpoints/my-endpoint/deployments/blue"
    },
    monitoring_signals={
        "data_drift": DataDriftSignal(
            production_data={
                "input_data": {
                    "path": "azureml://datastores/workspaceblobstore/paths/production-data/",
                    "type": "uri_folder"
                }
            },
            reference_data={
                "input_data": {
                    "path": "azureml://datastores/workspaceblobstore/paths/training-data/",
                    "type": "uri_folder"
                }
            },
            features=["feature1", "feature2", "feature3"],
            metric_thresholds={
                "numerical_features": {
                    "jensen_shannon_distance": 0.1,
                    "normalized_wasserstein_distance": 0.1
                },
                "categorical_features": {
                    "pearsons_chi_squared_test": 0.1
                }
            }
        ),
        "prediction_drift": PredictionDriftSignal(
            production_data={
                "input_data": {
                    "path": "azureml://datastores/workspaceblobstore/paths/predictions/",
                    "type": "uri_folder"
                }
            },
            reference_data={
                "input_data": {
                    "path": "azureml://datastores/workspaceblobstore/paths/baseline-predictions/",
                    "type": "uri_folder"
                }
            }
        )
    },
    alert_notification={
        "emails": ["ml-alerts@company.com"]
    }
)

# Create monitoring schedule
monitor_schedule = MonitorSchedule(
    name="production-model-monitor",
    trigger=RecurrenceTrigger(frequency="day", interval=1),
    create_monitor=monitor_definition
)

ml_client.schedules.begin_create_or_update(monitor_schedule).result()

Custom Monitoring with Python

import numpy as np
import pandas as pd
from scipy import stats
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class DriftResult:
    feature: str
    drift_detected: bool
    metric_value: float
    threshold: float
    method: str

class DataDriftDetector:
    def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.1):
        self.reference_data = reference_data
        self.threshold = threshold

    def detect_drift(self, production_data: pd.DataFrame) -> List[DriftResult]:
        """Detect drift for all features"""
        results = []

        for column in self.reference_data.columns:
            if self.reference_data[column].dtype in ['int64', 'float64']:
                result = self._detect_numerical_drift(column, production_data)
            else:
                result = self._detect_categorical_drift(column, production_data)

            results.append(result)

        return results

    def _detect_numerical_drift(self, column: str, production_data: pd.DataFrame) -> DriftResult:
        """Detect drift in numerical features using KS test"""
        ref_values = self.reference_data[column].dropna()
        prod_values = production_data[column].dropna()

        statistic, p_value = stats.ks_2samp(ref_values, prod_values)

        return DriftResult(
            feature=column,
            drift_detected=statistic > self.threshold,
            metric_value=statistic,
            threshold=self.threshold,
            method="kolmogorov_smirnov"
        )

    def _detect_categorical_drift(self, column: str, production_data: pd.DataFrame) -> DriftResult:
        """Detect drift in categorical features using chi-squared test"""
        ref_counts = self.reference_data[column].value_counts()
        prod_counts = production_data[column].value_counts()

        # Align categories
        all_categories = set(ref_counts.index) | set(prod_counts.index)
        ref_aligned = [ref_counts.get(cat, 0) for cat in all_categories]
        prod_aligned = [prod_counts.get(cat, 0) for cat in all_categories]

        # Chi-squared test
        statistic, p_value = stats.chisquare(prod_aligned, ref_aligned)

        return DriftResult(
            feature=column,
            drift_detected=p_value < 0.05,
            metric_value=statistic,
            threshold=0.05,
            method="chi_squared"
        )

# Usage
reference_df = pd.read_csv("training_data.csv")
production_df = pd.read_csv("production_data.csv")

detector = DataDriftDetector(reference_df)
drift_results = detector.detect_drift(production_df)

for result in drift_results:
    status = "DRIFT" if result.drift_detected else "OK"
    print(f"{result.feature}: {status} ({result.method}: {result.metric_value:.4f})")

Prediction Monitoring

class PredictionMonitor:
    def __init__(self, baseline_predictions: np.ndarray):
        self.baseline_predictions = baseline_predictions
        self.baseline_distribution = self._compute_distribution(baseline_predictions)

    def _compute_distribution(self, predictions: np.ndarray) -> Dict:
        """Compute prediction distribution statistics"""
        return {
            "mean": np.mean(predictions),
            "std": np.std(predictions),
            "min": np.min(predictions),
            "max": np.max(predictions),
            "percentiles": {
                "p25": np.percentile(predictions, 25),
                "p50": np.percentile(predictions, 50),
                "p75": np.percentile(predictions, 75)
            }
        }

    def check_prediction_drift(self, current_predictions: np.ndarray, threshold: float = 0.1) -> Dict:
        """Check for prediction drift"""
        current_distribution = self._compute_distribution(current_predictions)

        # KS test
        ks_stat, p_value = stats.ks_2samp(self.baseline_predictions, current_predictions)

        # Calculate PSI
        psi = self._calculate_psi(self.baseline_predictions, current_predictions)

        return {
            "ks_statistic": ks_stat,
            "ks_p_value": p_value,
            "psi": psi,
            "drift_detected": ks_stat > threshold or psi > 0.2,
            "baseline_stats": self.baseline_distribution,
            "current_stats": current_distribution
        }

    def _calculate_psi(self, expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float:
        """Calculate Population Stability Index"""
        expected_hist, bin_edges = np.histogram(expected, bins=bins)
        actual_hist, _ = np.histogram(actual, bins=bin_edges)

        expected_pct = expected_hist / len(expected)
        actual_pct = actual_hist / len(actual)

        # Avoid division by zero
        expected_pct = np.where(expected_pct == 0, 0.0001, expected_pct)
        actual_pct = np.where(actual_pct == 0, 0.0001, actual_pct)

        psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
        return psi

# Usage
baseline_preds = np.array([0.3, 0.5, 0.7, 0.4, 0.6])
current_preds = np.array([0.5, 0.6, 0.8, 0.5, 0.7])

monitor = PredictionMonitor(baseline_preds)
drift_result = monitor.check_prediction_drift(current_preds)
print(f"Drift detected: {drift_result['drift_detected']}")
print(f"PSI: {drift_result['psi']:.4f}")

Performance Monitoring

class ModelPerformanceMonitor:
    def __init__(self, baseline_metrics: Dict):
        self.baseline_metrics = baseline_metrics
        self.alerts = []

    def evaluate(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict:
        """Evaluate model performance"""
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

        current_metrics = {
            "accuracy": accuracy_score(y_true, y_pred),
            "precision": precision_score(y_true, y_pred, average='weighted'),
            "recall": recall_score(y_true, y_pred, average='weighted'),
            "f1": f1_score(y_true, y_pred, average='weighted')
        }

        # Compare with baseline
        degradations = {}
        for metric, current_value in current_metrics.items():
            baseline_value = self.baseline_metrics.get(metric, current_value)
            degradation = (baseline_value - current_value) / baseline_value * 100

            if degradation > 5:  # 5% degradation threshold
                degradations[metric] = {
                    "baseline": baseline_value,
                    "current": current_value,
                    "degradation_pct": degradation
                }

        return {
            "current_metrics": current_metrics,
            "degradations": degradations,
            "alert": len(degradations) > 0
        }

# Usage
baseline = {"accuracy": 0.92, "precision": 0.90, "recall": 0.88, "f1": 0.89}
perf_monitor = ModelPerformanceMonitor(baseline)

# When ground truth becomes available
result = perf_monitor.evaluate(y_true, y_pred)
if result["alert"]:
    print("ALERT: Performance degradation detected!")
    for metric, details in result["degradations"].items():
        print(f"  {metric}: {details['current']:.3f} (was {details['baseline']:.3f})")

Alerting Integration

from azure.communication.email import EmailClient

class MonitoringAlertManager:
    def __init__(self, email_connection_string, sender_email):
        self.email_client = EmailClient.from_connection_string(email_connection_string)
        self.sender_email = sender_email

    def send_drift_alert(self, drift_results: List[DriftResult], recipients: List[str]):
        """Send alert for detected drift"""
        drifted_features = [r for r in drift_results if r.drift_detected]

        if not drifted_features:
            return

        subject = f"[ML Alert] Data Drift Detected - {len(drifted_features)} features"
        body = self._format_drift_alert(drifted_features)

        self._send_email(recipients, subject, body)

    def _format_drift_alert(self, drifted_features: List[DriftResult]) -> str:
        """Format drift alert email body"""
        lines = ["Data drift has been detected in the following features:\n"]

        for result in drifted_features:
            lines.append(f"- {result.feature}:")
            lines.append(f"    Method: {result.method}")
            lines.append(f"    Metric: {result.metric_value:.4f}")
            lines.append(f"    Threshold: {result.threshold}")

        lines.append("\nPlease investigate and consider retraining the model.")
        return "\n".join(lines)

    def _send_email(self, recipients, subject, body):
        """Send email via Azure Communication Services"""
        message = {
            "senderAddress": self.sender_email,
            "recipients": {
                "to": [{"address": r} for r in recipients]
            },
            "content": {
                "subject": subject,
                "plainText": body
            }
        }
        self.email_client.begin_send(message)

Comprehensive monitoring ensures your ML models maintain their performance and reliability in production.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.