Back to Blog
5 min read

Experiment Tracking in Fabric: Best Practices and Patterns

Effective experiment tracking is crucial for reproducible machine learning. Today we’ll explore advanced patterns and best practices for tracking experiments in Fabric.

Experiment Tracking Framework

# Comprehensive tracking structure
tracking_structure = {
    "experiment_level": {
        "name": "Descriptive project name",
        "tags": "Team, project, dataset version"
    },
    "run_level": {
        "parameters": "All hyperparameters",
        "metrics": "Training and evaluation metrics",
        "artifacts": "Models, plots, data samples",
        "tags": "Run-specific metadata"
    }
}

Creating a Tracking Template

import mlflow
import json
import hashlib
from datetime import datetime

class ExperimentTracker:
    def __init__(self, experiment_name, team="data_science"):
        self.experiment_name = experiment_name
        self.team = team
        mlflow.set_experiment(experiment_name)

        # Set experiment tags
        mlflow.set_experiment_tag("team", team)
        mlflow.set_experiment_tag("created_at", datetime.now().isoformat())

    def start_run(self, run_name, description=None):
        """Start a new tracked run."""
        self.run = mlflow.start_run(run_name=run_name)

        if description:
            mlflow.set_tag("description", description)

        mlflow.set_tag("run_started", datetime.now().isoformat())
        return self

    def log_data_info(self, train_df, test_df, feature_cols, target_col):
        """Log dataset information."""
        data_info = {
            "train_rows": len(train_df),
            "test_rows": len(test_df),
            "n_features": len(feature_cols),
            "target_column": target_col,
            "feature_columns": ",".join(feature_cols[:10])  # First 10
        }
        mlflow.log_params(data_info)

        # Log data hash for reproducibility
        data_hash = hashlib.md5(
            train_df.to_json().encode()
        ).hexdigest()[:8]
        mlflow.log_param("data_hash", data_hash)

    def log_model_params(self, model_type, params):
        """Log model parameters."""
        mlflow.log_param("model_type", model_type)
        mlflow.log_params(params)

    def log_training_metrics(self, metrics, prefix="train"):
        """Log training metrics."""
        prefixed_metrics = {f"{prefix}_{k}": v for k, v in metrics.items()}
        mlflow.log_metrics(prefixed_metrics)

    def log_evaluation_metrics(self, metrics, prefix="eval"):
        """Log evaluation metrics."""
        prefixed_metrics = {f"{prefix}_{k}": v for k, v in metrics.items()}
        mlflow.log_metrics(prefixed_metrics)

    def end_run(self, status="FINISHED"):
        """End the current run."""
        mlflow.set_tag("run_ended", datetime.now().isoformat())
        mlflow.set_tag("status", status)
        mlflow.end_run()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self.end_run(status="FAILED")
        else:
            self.end_run(status="FINISHED")

Using the Tracking Template

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# Initialize tracker
tracker = ExperimentTracker("customer_churn_v2", team="analytics")

# Start tracked run
with tracker.start_run("rf_baseline", description="Random Forest baseline model"):
    # Log data info
    tracker.log_data_info(
        train_df=X_train,
        test_df=X_test,
        feature_cols=feature_columns,
        target_col="churn"
    )

    # Define and log parameters
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5
    }
    tracker.log_model_params("RandomForestClassifier", params)

    # Train model
    model = RandomForestClassifier(**params, random_state=42)
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1": f1_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "recall": recall_score(y_test, y_pred)
    }
    tracker.log_evaluation_metrics(metrics)

    # Log model
    mlflow.sklearn.log_model(model, "model")

Tracking Hyperparameter Searches

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform

def track_hyperparameter_search(X_train, y_train, X_test, y_test, n_iter=20):
    """Track hyperparameter search with nested runs."""

    with mlflow.start_run(run_name="hyperparameter_search") as parent:
        # Define search space
        param_distributions = {
            "n_estimators": randint(50, 300),
            "max_depth": randint(3, 20),
            "min_samples_split": randint(2, 20),
            "min_samples_leaf": randint(1, 10)
        }

        mlflow.log_param("search_method", "RandomizedSearchCV")
        mlflow.log_param("n_iterations", n_iter)

        # Perform search
        base_model = RandomForestClassifier(random_state=42)
        search = RandomizedSearchCV(
            base_model,
            param_distributions,
            n_iter=n_iter,
            cv=5,
            scoring='f1',
            random_state=42
        )
        search.fit(X_train, y_train)

        # Log each configuration as nested run
        for i, (params, score) in enumerate(zip(
            search.cv_results_['params'],
            search.cv_results_['mean_test_score']
        )):
            with mlflow.start_run(run_name=f"config_{i}", nested=True):
                mlflow.log_params(params)
                mlflow.log_metric("cv_score", score)

        # Log best results
        mlflow.log_params({"best_" + k: v for k, v in search.best_params_.items()})
        mlflow.log_metric("best_cv_score", search.best_score_)

        # Evaluate on test set
        y_pred = search.best_estimator_.predict(X_test)
        test_f1 = f1_score(y_test, y_pred)
        mlflow.log_metric("test_f1", test_f1)

        # Log best model
        mlflow.sklearn.log_model(search.best_estimator_, "best_model")

        return search.best_estimator_

best_model = track_hyperparameter_search(X_train, y_train, X_test, y_test)

Cross-Validation Tracking

from sklearn.model_selection import cross_validate
import numpy as np

def track_cross_validation(model, X, y, cv=5):
    """Track cross-validation results."""

    with mlflow.start_run(run_name="cross_validation"):
        # Log model parameters
        mlflow.log_params(model.get_params())

        # Perform cross-validation
        cv_results = cross_validate(
            model, X, y,
            cv=cv,
            scoring=['accuracy', 'f1', 'precision', 'recall'],
            return_train_score=True
        )

        # Log mean and std for each metric
        for metric in ['accuracy', 'f1', 'precision', 'recall']:
            test_key = f'test_{metric}'
            train_key = f'train_{metric}'

            mlflow.log_metrics({
                f"cv_{metric}_mean": np.mean(cv_results[test_key]),
                f"cv_{metric}_std": np.std(cv_results[test_key]),
                f"cv_{metric}_train_mean": np.mean(cv_results[train_key])
            })

        # Log fold-by-fold results
        for fold, score in enumerate(cv_results['test_f1']):
            mlflow.log_metric("fold_f1", score, step=fold)

        # Create CV summary plot
        fig, ax = plt.subplots(figsize=(10, 6))
        metrics = ['accuracy', 'f1', 'precision', 'recall']
        means = [np.mean(cv_results[f'test_{m}']) for m in metrics]
        stds = [np.std(cv_results[f'test_{m}']) for m in metrics]

        ax.bar(metrics, means, yerr=stds, capsize=5)
        ax.set_ylabel('Score')
        ax.set_title('Cross-Validation Results')
        ax.set_ylim([0, 1])

        mlflow.log_figure(fig, "cv_results.png")
        plt.close()

        return cv_results

cv_results = track_cross_validation(model, X_train, y_train)

Tracking Training Progress

def track_training_progress(model, X_train, y_train, X_val, y_val, epochs=100):
    """Track training progress over epochs (for iterative models)."""

    with mlflow.start_run(run_name="training_progress"):
        train_scores = []
        val_scores = []

        for epoch in range(epochs):
            # Partial fit (for models that support it) or retrain
            model.fit(X_train, y_train)

            # Calculate scores
            train_score = accuracy_score(y_train, model.predict(X_train))
            val_score = accuracy_score(y_val, model.predict(X_val))

            train_scores.append(train_score)
            val_scores.append(val_score)

            # Log step metrics
            mlflow.log_metric("train_accuracy", train_score, step=epoch)
            mlflow.log_metric("val_accuracy", val_score, step=epoch)

            # Early stopping check
            if epoch > 10:
                if val_scores[-1] < val_scores[-10]:
                    mlflow.log_param("early_stop_epoch", epoch)
                    break

        # Log training curves
        fig, ax = plt.subplots()
        ax.plot(train_scores, label='Train')
        ax.plot(val_scores, label='Validation')
        ax.set_xlabel('Epoch')
        ax.set_ylabel('Accuracy')
        ax.legend()
        ax.set_title('Training Progress')

        mlflow.log_figure(fig, "training_curves.png")
        plt.close()

        return model

Comparing Experiments

def compare_experiments(experiment_name, metric="eval_f1"):
    """Generate comparison report for experiment runs."""

    experiment = mlflow.get_experiment_by_name(experiment_name)

    # Get all runs
    runs = mlflow.search_runs(
        experiment_ids=[experiment.experiment_id],
        filter_string="status = 'FINISHED'",
        order_by=[f"metrics.{metric} DESC"]
    )

    # Create comparison report
    comparison_cols = [
        'run_id', 'tags.mlflow.runName',
        'params.model_type', 'params.n_estimators',
        f'metrics.{metric}', 'metrics.eval_accuracy'
    ]

    available_cols = [c for c in comparison_cols if c in runs.columns]
    report = runs[available_cols].head(10)

    print("=" * 60)
    print(f"Experiment: {experiment_name}")
    print(f"Top 10 runs by {metric}")
    print("=" * 60)
    print(report.to_string())

    return report

comparison = compare_experiments("customer_churn_v2")

Tomorrow we’ll explore Azure OpenAI Code Interpreter.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.