Back to Blog
5 min read

Canary Deployments for ML Models

Canary deployment gradually shifts traffic to a new model version, allowing you to detect issues before full rollout. This pattern minimizes the blast radius of problematic deployments.

Understanding Canary Deployment

Canary deployment differs from blue-green:

  • Traffic shifts gradually (e.g., 5% -> 25% -> 50% -> 100%)
  • Issues affect only a subset of users
  • Automatic rollback based on metrics

Setting Up Canary Deployment

from azure.ai.ml import MLClient
from azure.ai.ml.entities import ManagedOnlineEndpoint, ManagedOnlineDeployment
from azure.identity import DefaultAzureCredential
import time

ml_client = MLClient(
    credential=DefaultAzureCredential(),
    subscription_id="your-subscription",
    resource_group_name="your-rg",
    workspace_name="your-workspace"
)

# Assume blue (stable) is already deployed with 100% traffic
# Deploy canary (new version)
canary_deployment = ManagedOnlineDeployment(
    name="canary",
    endpoint_name="production-endpoint",
    model="azureml:my-model:2",
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
    code_configuration=CodeConfiguration(
        code="./src",
        scoring_script="score.py"
    ),
    instance_type="Standard_DS2_v2",
    instance_count=1
)

ml_client.online_deployments.begin_create_or_update(canary_deployment).result()
print("Canary deployment created")

Gradual Traffic Shift

class CanaryDeploymentManager:
    def __init__(self, ml_client, endpoint_name, stable_name, canary_name):
        self.ml_client = ml_client
        self.endpoint_name = endpoint_name
        self.stable_name = stable_name
        self.canary_name = canary_name

    def set_traffic(self, canary_percentage):
        """Set traffic split between stable and canary"""
        endpoint = self.ml_client.online_endpoints.get(self.endpoint_name)

        stable_percentage = 100 - canary_percentage
        endpoint.traffic = {
            self.stable_name: stable_percentage,
            self.canary_name: canary_percentage
        }

        self.ml_client.online_endpoints.begin_create_or_update(endpoint).result()
        print(f"Traffic: {self.stable_name}={stable_percentage}%, {self.canary_name}={canary_percentage}%")

    def get_current_traffic(self):
        """Get current traffic allocation"""
        endpoint = self.ml_client.online_endpoints.get(self.endpoint_name)
        return endpoint.traffic

    def rollback(self):
        """Rollback all traffic to stable"""
        self.set_traffic(0)
        print("Rolled back to stable deployment")

    def promote(self):
        """Promote canary to 100%"""
        self.set_traffic(100)
        print("Canary promoted to 100%")

# Initialize manager
canary_manager = CanaryDeploymentManager(
    ml_client,
    "production-endpoint",
    "blue",
    "canary"
)

# Start with 5% canary traffic
canary_manager.set_traffic(5)

Monitoring Canary Health

import requests
from datetime import datetime, timedelta
from azure.monitor.query import MetricsQueryClient, LogsQueryClient

class CanaryHealthMonitor:
    def __init__(self, endpoint_name, metrics_client, logs_client):
        self.endpoint_name = endpoint_name
        self.metrics_client = metrics_client
        self.logs_client = logs_client
        self.workspace_id = "your-log-analytics-workspace-id"

    def get_metrics(self, deployment_name, duration_minutes=15):
        """Get metrics for a specific deployment"""
        resource_uri = f"/subscriptions/.../providers/Microsoft.MachineLearningServices/workspaces/.../onlineEndpoints/{self.endpoint_name}/deployments/{deployment_name}"

        response = self.metrics_client.query_resource(
            resource_uri=resource_uri,
            metric_names=["RequestLatencyP99", "RequestsPerMinute", "CPUUtilization"],
            timespan=timedelta(minutes=duration_minutes)
        )

        metrics = {}
        for metric in response.metrics:
            values = [dp.average for ts in metric.timeseries for dp in ts.data if dp.average]
            metrics[metric.name] = sum(values) / len(values) if values else 0

        return metrics

    def get_error_rate(self, deployment_name, duration_minutes=15):
        """Get error rate from logs"""
        query = f"""
        AzureMLOnlineEndpoint
        | where TimeGenerated > ago({duration_minutes}m)
        | where EndpointName == '{self.endpoint_name}'
        | where DeploymentName == '{deployment_name}'
        | summarize
            TotalRequests = count(),
            Errors = countif(ResultCode >= 400)
        | extend ErrorRate = Errors * 100.0 / TotalRequests
        """

        response = self.logs_client.query_workspace(
            workspace_id=self.workspace_id,
            query=query,
            timespan=timedelta(minutes=duration_minutes)
        )

        for row in response.tables[0].rows:
            return {
                "total_requests": row[0],
                "errors": row[1],
                "error_rate": row[2]
            }

        return {"total_requests": 0, "errors": 0, "error_rate": 0}

    def compare_deployments(self, stable_name, canary_name):
        """Compare metrics between stable and canary"""
        stable_metrics = self.get_metrics(stable_name)
        canary_metrics = self.get_metrics(canary_name)

        stable_errors = self.get_error_rate(stable_name)
        canary_errors = self.get_error_rate(canary_name)

        comparison = {
            "stable": {**stable_metrics, **stable_errors},
            "canary": {**canary_metrics, **canary_errors},
            "differences": {}
        }

        for metric in stable_metrics:
            if stable_metrics[metric] > 0:
                diff = (canary_metrics.get(metric, 0) - stable_metrics[metric]) / stable_metrics[metric] * 100
                comparison["differences"][metric] = diff

        return comparison

Automated Canary Progression

class AutomatedCanaryRollout:
    def __init__(self, manager, monitor, thresholds):
        self.manager = manager
        self.monitor = monitor
        self.thresholds = thresholds  # e.g., {"error_rate": 5, "latency_increase": 20}

    def check_health(self):
        """Check if canary is healthy"""
        comparison = self.monitor.compare_deployments(
            self.manager.stable_name,
            self.manager.canary_name
        )

        issues = []

        # Check error rate
        if comparison["canary"]["error_rate"] > self.thresholds["error_rate"]:
            issues.append(f"Error rate too high: {comparison['canary']['error_rate']:.2f}%")

        # Check latency
        latency_diff = comparison["differences"].get("RequestLatencyP99", 0)
        if latency_diff > self.thresholds["latency_increase"]:
            issues.append(f"Latency increased by {latency_diff:.1f}%")

        return len(issues) == 0, issues

    def run_rollout(self, stages=[5, 25, 50, 75, 100], wait_minutes=15):
        """Run automated canary rollout"""
        for stage in stages:
            print(f"\n=== Stage: {stage}% canary traffic ===")
            self.manager.set_traffic(stage)

            # Wait for metrics to accumulate
            print(f"Waiting {wait_minutes} minutes for metrics...")
            time.sleep(wait_minutes * 60)

            # Check health
            healthy, issues = self.check_health()

            if not healthy:
                print(f"UNHEALTHY! Issues: {issues}")
                self.manager.rollback()
                return False, f"Rollback at {stage}%: {issues}"

            print(f"HEALTHY - proceeding to next stage")

        print("\nCanary rollout completed successfully!")
        return True, "Rollout complete"

# Configure thresholds
thresholds = {
    "error_rate": 5,  # Max 5% error rate
    "latency_increase": 20  # Max 20% latency increase
}

# Run automated rollout
rollout = AutomatedCanaryRollout(canary_manager, health_monitor, thresholds)
success, message = rollout.run_rollout(
    stages=[5, 25, 50, 100],
    wait_minutes=10
)
print(f"Rollout {'succeeded' if success else 'failed'}: {message}")

Shadow Testing (Pre-Canary)

class ShadowTester:
    """Run shadow traffic before canary deployment"""

    def __init__(self, stable_endpoint, shadow_endpoint):
        self.stable_endpoint = stable_endpoint
        self.shadow_endpoint = shadow_endpoint
        self.results = []

    async def shadow_request(self, request_data):
        """Send request to both endpoints and compare"""
        import aiohttp

        async with aiohttp.ClientSession() as session:
            # Send to stable (actual response)
            stable_response = await self._send_request(session, self.stable_endpoint, request_data)

            # Send to shadow (comparison only)
            shadow_response = await self._send_request(session, self.shadow_endpoint, request_data)

            # Compare
            comparison = self._compare_responses(stable_response, shadow_response)
            self.results.append(comparison)

            # Return stable response to user
            return stable_response

    def _compare_responses(self, stable, shadow):
        """Compare responses from stable and shadow"""
        return {
            "predictions_match": stable.get("predictions") == shadow.get("predictions"),
            "stable_latency": stable.get("latency"),
            "shadow_latency": shadow.get("latency"),
            "stable_predictions": stable.get("predictions"),
            "shadow_predictions": shadow.get("predictions")
        }

    def get_summary(self):
        """Get summary of shadow testing"""
        if not self.results:
            return None

        match_rate = sum(r["predictions_match"] for r in self.results) / len(self.results)
        avg_stable_latency = sum(r["stable_latency"] for r in self.results) / len(self.results)
        avg_shadow_latency = sum(r["shadow_latency"] for r in self.results) / len(self.results)

        return {
            "total_requests": len(self.results),
            "match_rate": match_rate,
            "avg_stable_latency": avg_stable_latency,
            "avg_shadow_latency": avg_shadow_latency
        }

Canary deployment provides a controlled, observable approach to rolling out new model versions with minimal risk.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.