Back to Blog
6 min read

MLOps Maturity: Productionizing Machine Learning

MLOps maturity determines how successfully organizations can deploy and maintain ML models in production. Let’s examine the practices that separate successful ML operations from struggling ones.

MLOps Maturity Levels

Level 0: No MLOps
├── Notebooks only
├── Manual model deployment
├── No version control
└── No monitoring

Level 1: DevOps for ML
├── Version control for code
├── Basic CI/CD
├── Manual testing
└── Basic monitoring

Level 2: ML Pipeline Automation
├── Automated training pipelines
├── Feature stores
├── Experiment tracking
└── Model registry

Level 3: Full MLOps
├── Automated retraining
├── A/B testing
├── Model monitoring
└── Automated rollback

Level 4: Autonomous ML
├── Self-healing models
├── AutoML integration
├── Automated feature engineering
└── Continuous optimization

Core MLOps Components

Component 1: Experiment Tracking

import mlflow
from mlflow.tracking import MlflowClient

class ExperimentTracker:
    """Track ML experiments systematically."""

    def __init__(self, experiment_name: str):
        mlflow.set_experiment(experiment_name)
        self.client = MlflowClient()

    def run_experiment(
        self,
        model_fn,
        params: dict,
        train_data,
        test_data,
        tags: dict = None
    ):
        """Run and track an experiment."""

        with mlflow.start_run(tags=tags) as run:
            # Log parameters
            mlflow.log_params(params)

            # Train model
            model = model_fn(**params)
            model.fit(train_data.X, train_data.y)

            # Evaluate
            predictions = model.predict(test_data.X)
            metrics = self.calculate_metrics(test_data.y, predictions)

            # Log metrics
            mlflow.log_metrics(metrics)

            # Log model
            mlflow.sklearn.log_model(
                model,
                "model",
                registered_model_name=f"{mlflow.get_experiment().name}_model"
            )

            # Log artifacts
            self.log_artifacts(model, test_data, predictions)

            return {
                "run_id": run.info.run_id,
                "metrics": metrics,
                "model_uri": f"runs:/{run.info.run_id}/model"
            }

    def calculate_metrics(self, y_true, y_pred) -> dict:
        from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

        return {
            "accuracy": accuracy_score(y_true, y_pred),
            "precision": precision_score(y_true, y_pred, average='weighted'),
            "recall": recall_score(y_true, y_pred, average='weighted'),
            "f1": f1_score(y_true, y_pred, average='weighted')
        }

    def compare_runs(self, metric: str = "f1", top_n: int = 5) -> list:
        """Compare experiment runs."""
        experiment = mlflow.get_experiment_by_name(mlflow.get_experiment().name)
        runs = self.client.search_runs(
            experiment_ids=[experiment.experiment_id],
            order_by=[f"metrics.{metric} DESC"],
            max_results=top_n
        )

        return [
            {
                "run_id": run.info.run_id,
                "params": run.data.params,
                "metrics": run.data.metrics
            }
            for run in runs
        ]

Component 2: Feature Store

from feast import FeatureStore, Entity, Feature, FeatureView, FileSource
from datetime import timedelta

class MLFeatureStore:
    """Centralized feature store for ML."""

    def __init__(self, repo_path: str):
        self.store = FeatureStore(repo_path=repo_path)

    def define_features(self):
        """Define feature views."""

        # Customer entity
        customer = Entity(
            name="customer",
            join_keys=["customer_id"]
        )

        # Customer features
        customer_features = FeatureView(
            name="customer_features",
            entities=[customer],
            ttl=timedelta(days=1),
            features=[
                Feature(name="total_purchases", dtype=Float64),
                Feature(name="avg_order_value", dtype=Float64),
                Feature(name="days_since_last_purchase", dtype=Int64),
                Feature(name="purchase_frequency", dtype=Float64),
                Feature(name="customer_segment", dtype=String)
            ],
            source=FileSource(
                path="data/customer_features.parquet",
                timestamp_field="event_timestamp"
            )
        )

        return [customer, customer_features]

    def get_training_data(
        self,
        entity_df,
        feature_refs: list,
        label_column: str = None
    ):
        """Get training dataset from feature store."""

        training_df = self.store.get_historical_features(
            entity_df=entity_df,
            features=feature_refs
        ).to_df()

        if label_column:
            # Join with labels
            training_df = training_df.merge(
                entity_df[[self.store.get_entity("customer").join_keys[0], label_column]],
                on=self.store.get_entity("customer").join_keys[0]
            )

        return training_df

    def get_online_features(self, entity_dict: dict, feature_refs: list) -> dict:
        """Get features for online inference."""
        response = self.store.get_online_features(
            features=feature_refs,
            entity_rows=[entity_dict]
        )
        return response.to_dict()

Component 3: Model Registry

from mlflow.tracking import MlflowClient
from mlflow.entities.model_registry import ModelVersion

class ModelRegistry:
    """Manage model lifecycle."""

    def __init__(self):
        self.client = MlflowClient()

    def register_model(self, run_id: str, model_name: str) -> ModelVersion:
        """Register a model from an experiment run."""
        model_uri = f"runs:/{run_id}/model"

        result = mlflow.register_model(
            model_uri=model_uri,
            name=model_name
        )

        return result

    def promote_model(
        self,
        model_name: str,
        version: int,
        stage: str,
        archive_existing: bool = True
    ):
        """Promote model to a stage (Staging, Production)."""

        if archive_existing:
            # Archive current models in target stage
            current_models = self.client.get_latest_versions(
                model_name, stages=[stage]
            )
            for model in current_models:
                self.client.transition_model_version_stage(
                    name=model_name,
                    version=model.version,
                    stage="Archived"
                )

        # Promote new model
        self.client.transition_model_version_stage(
            name=model_name,
            version=version,
            stage=stage
        )

    def get_production_model(self, model_name: str):
        """Get current production model."""
        models = self.client.get_latest_versions(
            model_name, stages=["Production"]
        )
        if models:
            return mlflow.pyfunc.load_model(
                f"models:/{model_name}/Production"
            )
        return None

    def compare_models(
        self,
        model_name: str,
        version_a: int,
        version_b: int,
        test_data
    ) -> dict:
        """Compare two model versions."""
        model_a = mlflow.pyfunc.load_model(
            f"models:/{model_name}/{version_a}"
        )
        model_b = mlflow.pyfunc.load_model(
            f"models:/{model_name}/{version_b}"
        )

        predictions_a = model_a.predict(test_data.X)
        predictions_b = model_b.predict(test_data.X)

        metrics_a = self.calculate_metrics(test_data.y, predictions_a)
        metrics_b = self.calculate_metrics(test_data.y, predictions_b)

        return {
            "version_a": {"version": version_a, "metrics": metrics_a},
            "version_b": {"version": version_b, "metrics": metrics_b},
            "winner": version_a if metrics_a["f1"] > metrics_b["f1"] else version_b
        }

Component 4: Model Monitoring

from dataclasses import dataclass
from typing import Optional
import numpy as np

@dataclass
class DriftMetrics:
    feature_name: str
    drift_score: float
    is_drifted: bool
    baseline_stats: dict
    current_stats: dict

class ModelMonitor:
    """Monitor model performance and drift."""

    def __init__(self, model_name: str, baseline_data):
        self.model_name = model_name
        self.baseline_stats = self.calculate_stats(baseline_data)

    def detect_data_drift(
        self,
        current_data,
        threshold: float = 0.1
    ) -> list[DriftMetrics]:
        """Detect drift in input features."""
        drift_results = []

        for column in current_data.columns:
            baseline = self.baseline_stats[column]
            current = self.calculate_column_stats(current_data[column])

            # Calculate PSI (Population Stability Index)
            psi = self.calculate_psi(baseline["distribution"], current["distribution"])

            drift_results.append(DriftMetrics(
                feature_name=column,
                drift_score=psi,
                is_drifted=psi > threshold,
                baseline_stats=baseline,
                current_stats=current
            ))

        return drift_results

    def detect_prediction_drift(
        self,
        predictions,
        threshold: float = 0.1
    ) -> dict:
        """Detect drift in model predictions."""
        baseline_pred_dist = self.baseline_stats["predictions"]["distribution"]
        current_pred_dist = self.get_distribution(predictions)

        psi = self.calculate_psi(baseline_pred_dist, current_pred_dist)

        return {
            "drift_score": psi,
            "is_drifted": psi > threshold,
            "baseline_mean": self.baseline_stats["predictions"]["mean"],
            "current_mean": np.mean(predictions)
        }

    def monitor_performance(
        self,
        predictions,
        actuals,
        alert_threshold: float = 0.05
    ) -> dict:
        """Monitor model performance vs baseline."""
        current_metrics = self.calculate_metrics(actuals, predictions)
        baseline_metrics = self.baseline_stats["metrics"]

        degradation = {}
        alerts = []

        for metric, value in current_metrics.items():
            baseline_value = baseline_metrics[metric]
            change = (baseline_value - value) / baseline_value

            degradation[metric] = {
                "baseline": baseline_value,
                "current": value,
                "change_percent": change * 100
            }

            if change > alert_threshold:
                alerts.append({
                    "metric": metric,
                    "severity": "high" if change > 0.1 else "medium",
                    "message": f"{metric} degraded by {change*100:.1f}%"
                })

        return {
            "degradation": degradation,
            "alerts": alerts,
            "needs_retraining": len([a for a in alerts if a["severity"] == "high"]) > 0
        }

    def calculate_psi(self, baseline_dist: list, current_dist: list) -> float:
        """Calculate Population Stability Index."""
        psi = 0
        for b, c in zip(baseline_dist, current_dist):
            if b > 0 and c > 0:
                psi += (c - b) * np.log(c / b)
        return psi

Component 5: Automated Retraining

class AutomatedRetraining:
    """Automate model retraining based on triggers."""

    def __init__(
        self,
        model_name: str,
        feature_store: MLFeatureStore,
        registry: ModelRegistry,
        monitor: ModelMonitor
    ):
        self.model_name = model_name
        self.feature_store = feature_store
        self.registry = registry
        self.monitor = monitor

    async def check_and_retrain(self):
        """Check if retraining is needed and execute."""

        # Get recent data
        recent_data = self.get_recent_data()

        # Check for drift
        drift_results = self.monitor.detect_data_drift(recent_data)
        significant_drift = [d for d in drift_results if d.is_drifted]

        # Check performance
        if labels_available := self.get_recent_labels():
            current_model = self.registry.get_production_model(self.model_name)
            predictions = current_model.predict(recent_data)
            performance = self.monitor.monitor_performance(predictions, labels_available)
        else:
            performance = {"needs_retraining": False}

        # Decide if retraining needed
        should_retrain = (
            len(significant_drift) > 2 or
            performance.get("needs_retraining", False)
        )

        if should_retrain:
            new_model = await self.retrain()
            await self.validate_and_deploy(new_model)

        return {
            "drift_detected": len(significant_drift),
            "performance_degraded": performance.get("needs_retraining", False),
            "retrained": should_retrain
        }

    async def retrain(self):
        """Execute retraining pipeline."""
        # Get training data from feature store
        training_data = self.feature_store.get_training_data(
            entity_df=self.get_training_entities(),
            feature_refs=self.get_feature_refs(),
            label_column="label"
        )

        # Train new model
        tracker = ExperimentTracker(f"{self.model_name}_retrain")
        result = tracker.run_experiment(
            model_fn=self.get_model_fn(),
            params=self.get_best_params(),
            train_data=training_data,
            test_data=self.get_test_data(),
            tags={"trigger": "automated_retraining"}
        )

        return result

    async def validate_and_deploy(self, new_model_result: dict):
        """Validate new model and deploy if better."""
        # Register new model
        new_version = self.registry.register_model(
            run_id=new_model_result["run_id"],
            model_name=self.model_name
        )

        # Compare with current production
        current_version = self.registry.get_production_model(self.model_name)
        comparison = self.registry.compare_models(
            self.model_name,
            new_version.version,
            current_version.version,
            self.get_test_data()
        )

        # Deploy if better
        if comparison["winner"] == new_version.version:
            self.registry.promote_model(
                self.model_name,
                new_version.version,
                "Production"
            )
            return {"deployed": True, "version": new_version.version}

        return {"deployed": False, "reason": "New model not better"}

MLOps is essential for sustainable ML in production. Start with experiment tracking and gradually add components as your ML practice matures.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.