Back to Blog
5 min read

Databricks ML: End-to-End Machine Learning on the Lakehouse

Databricks ML provides a unified platform for the entire machine learning lifecycle - from data preparation through model serving. Built on the lakehouse architecture, it combines the scale of Spark with purpose-built ML tools.

Databricks ML Components

The ML platform includes:

  • Feature Store: Centralized feature management
  • AutoML: Automated model training and selection
  • MLflow: Experiment tracking and model registry
  • Model Serving: Real-time inference endpoints
  • ML Runtime: Optimized environment with pre-installed libraries

Setting Up the ML Environment

# ML Runtime includes popular libraries pre-installed:
# - TensorFlow, PyTorch, Keras
# - scikit-learn, XGBoost, LightGBM
# - MLflow, Hyperopt, SHAP
# - pandas, numpy, scipy

# Import commonly used libraries
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import pandas as pd
import numpy as np

# Set up MLflow experiment
mlflow.set_experiment("/Users/username/customer-churn")

Data Preparation with Spark

# Load and prepare training data
from pyspark.sql.functions import col, when, datediff, current_date

# Read from Delta Lake
df = spark.read.table("production.customers.user_activity")

# Feature engineering at scale
features_df = df.groupBy("user_id").agg(
    F.count("*").alias("total_actions"),
    F.sum(when(col("action_type") == "purchase", 1).otherwise(0)).alias("purchase_count"),
    F.avg("session_duration_minutes").alias("avg_session_duration"),
    F.max("activity_date").alias("last_activity_date")
).withColumn(
    "days_since_last_activity",
    datediff(current_date(), col("last_activity_date"))
)

# Add label (churned if no activity in 30 days)
labeled_df = features_df.withColumn(
    "churned",
    when(col("days_since_last_activity") > 30, 1).otherwise(0)
)

# Convert to pandas for sklearn (or use Spark ML for larger datasets)
pdf = labeled_df.toPandas()

Training with MLflow Tracking

# Define features and target
feature_columns = ["total_actions", "purchase_count", "avg_session_duration", "days_since_last_activity"]
X = pdf[feature_columns]
y = pdf["churned"]

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train with MLflow tracking
with mlflow.start_run(run_name="random_forest_baseline") as run:
    # Log parameters
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 5,
        "random_state": 42
    }
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Predict and evaluate
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    # Log metrics
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred),
        "roc_auc": roc_auc_score(y_test, y_pred_proba)
    }
    mlflow.log_metrics(metrics)

    # Log feature importance
    importance_df = pd.DataFrame({
        "feature": feature_columns,
        "importance": model.feature_importances_
    }).sort_values("importance", ascending=False)
    mlflow.log_table(importance_df, "feature_importance.json")

    # Log model
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="customer-churn-classifier"
    )

    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {metrics}")

Hyperparameter Tuning with Hyperopt

from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
from hyperopt.pyll import scope

# Define search space
search_space = {
    "n_estimators": scope.int(hp.quniform("n_estimators", 50, 300, 50)),
    "max_depth": scope.int(hp.quniform("max_depth", 3, 15, 1)),
    "min_samples_split": scope.int(hp.quniform("min_samples_split", 2, 20, 1)),
    "min_samples_leaf": scope.int(hp.quniform("min_samples_leaf", 1, 10, 1))
}

def objective(params):
    with mlflow.start_run(nested=True):
        # Log parameters
        mlflow.log_params(params)

        # Train model
        model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
        model.fit(X_train, y_train)

        # Evaluate
        y_pred_proba = model.predict_proba(X_test)[:, 1]
        roc_auc = roc_auc_score(y_test, y_pred_proba)

        mlflow.log_metric("roc_auc", roc_auc)

        # Hyperopt minimizes, so negate the metric
        return {"loss": -roc_auc, "status": STATUS_OK}

# Run distributed hyperparameter search
with mlflow.start_run(run_name="hyperparameter_tuning"):
    spark_trials = SparkTrials(parallelism=4)

    best_params = fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=20,
        trials=spark_trials
    )

    print(f"Best parameters: {best_params}")

Using Spark ML for Scale

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier as SparkRF
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Prepare features
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features_raw"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features"
)

# Create Spark ML model
rf = SparkRF(
    labelCol="churned",
    featuresCol="features",
    numTrees=100,
    maxDepth=10
)

# Build pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Split data
train_df, test_df = labeled_df.randomSplit([0.8, 0.2], seed=42)

# Train
with mlflow.start_run(run_name="spark_ml_model"):
    model = pipeline.fit(train_df)

    # Evaluate
    predictions = model.transform(test_df)
    evaluator = BinaryClassificationEvaluator(
        labelCol="churned",
        metricName="areaUnderROC"
    )
    roc_auc = evaluator.evaluate(predictions)

    mlflow.log_metric("roc_auc", roc_auc)
    mlflow.spark.log_model(model, "spark_model")

    print(f"Spark ML ROC AUC: {roc_auc}")

Deep Learning with PyTorch

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Define neural network
class ChurnPredictor(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.layers(x)

# Prepare data
X_train_tensor = torch.FloatTensor(X_train.values)
y_train_tensor = torch.FloatTensor(y_train.values)
X_test_tensor = torch.FloatTensor(X_test.values)
y_test_tensor = torch.FloatTensor(y_test.values)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Train with MLflow
with mlflow.start_run(run_name="pytorch_model"):
    model = ChurnPredictor(len(feature_columns))
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    epochs = 50
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch).squeeze()
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # Log epoch metrics
        mlflow.log_metric("train_loss", total_loss / len(train_loader), step=epoch)

    # Evaluate
    model.eval()
    with torch.no_grad():
        y_pred_proba = model(X_test_tensor).squeeze().numpy()
        y_pred = (y_pred_proba > 0.5).astype(int)

    roc_auc = roc_auc_score(y_test, y_pred_proba)
    mlflow.log_metric("roc_auc", roc_auc)

    # Log model
    mlflow.pytorch.log_model(model, "pytorch_model")

Model Registry

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register a model
model_uri = f"runs:/{run.info.run_id}/model"
model_details = mlflow.register_model(model_uri, "customer-churn-classifier")

# Transition to staging
client.transition_model_version_stage(
    name="customer-churn-classifier",
    version=model_details.version,
    stage="Staging"
)

# Add model description
client.update_model_version(
    name="customer-churn-classifier",
    version=model_details.version,
    description="Random Forest model for customer churn prediction. Trained on 6 months of user activity data."
)

# Promote to production after validation
client.transition_model_version_stage(
    name="customer-churn-classifier",
    version=model_details.version,
    stage="Production"
)

Model Comparison

# Compare multiple runs
experiment = mlflow.get_experiment_by_name("/Users/username/customer-churn")
runs = mlflow.search_runs(
    experiment_ids=[experiment.experiment_id],
    order_by=["metrics.roc_auc DESC"]
)

# Display comparison
comparison_df = runs[["run_id", "params.n_estimators", "params.max_depth",
                      "metrics.roc_auc", "metrics.f1_score", "metrics.accuracy"]]
display(comparison_df)

# Get the best model
best_run = runs.iloc[0]
best_model = mlflow.sklearn.load_model(f"runs:/{best_run['run_id']}/model")

Batch Inference

# Load production model
production_model = mlflow.sklearn.load_model("models:/customer-churn-classifier/Production")

# Score new data
new_customers_df = spark.read.table("production.customers.new_users")
new_features = prepare_features(new_customers_df)  # Same feature engineering

# Apply model as UDF for distributed scoring
predict_udf = mlflow.pyfunc.spark_udf(spark, "models:/customer-churn-classifier/Production")

scored_df = new_features.withColumn(
    "churn_probability",
    predict_udf(*feature_columns)
)

# Save predictions
scored_df.write.mode("overwrite").saveAsTable("analytics.predictions.churn_scores")

Conclusion

Databricks ML provides all the tools needed for production machine learning:

  • Seamless integration with lakehouse data
  • Distributed training with Spark ML
  • Experiment tracking with MLflow
  • Hyperparameter tuning at scale
  • Model registry for versioning and deployment

The unified platform eliminates the friction of moving between data engineering and data science, enabling faster iteration and more reliable models.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.