Back to Blog
6 min read

SynapseML: Distributed AI at Scale

SynapseML (formerly MMLSpark) brings scalable machine learning to Apache Spark. Train models on billions of rows, integrate with Cognitive Services, and deploy AI at enterprise scale.

SynapseML Fundamentals

# SynapseML setup in Synapse or Databricks
from synapse.ml.core.spark import FluentAPI
from synapse.ml.featurize import *
from synapse.ml.train import *
from synapse.ml.lightgbm import *
from pyspark.sql import SparkSession

# Configure Spark for SynapseML
spark = SparkSession.builder \
    .appName("SynapseML-Demo") \
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.11.0") \
    .getOrCreate()

class SynapseMLPipeline:
    """Distributed ML pipeline with SynapseML."""

    def __init__(self, spark: SparkSession):
        self.spark = spark

    def create_feature_pipeline(self, df, numeric_cols: list, categorical_cols: list):
        """Create feature engineering pipeline."""
        from synapse.ml.featurize import Featurize
        from pyspark.ml import Pipeline
        from pyspark.ml.feature import VectorAssembler, StringIndexer

        stages = []

        # Index categorical columns
        for col in categorical_cols:
            indexer = StringIndexer(
                inputCol=col,
                outputCol=f"{col}_indexed",
                handleInvalid="keep"
            )
            stages.append(indexer)

        # Assemble features
        feature_cols = numeric_cols + [f"{c}_indexed" for c in categorical_cols]
        assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol="features",
            handleInvalid="skip"
        )
        stages.append(assembler)

        pipeline = Pipeline(stages=stages)
        return pipeline.fit(df)

    def train_lightgbm_classifier(
        self,
        df,
        label_col: str,
        feature_col: str = "features"
    ):
        """Train LightGBM classifier at scale."""
        from synapse.ml.lightgbm import LightGBMClassifier

        lgbm = LightGBMClassifier(
            featuresCol=feature_col,
            labelCol=label_col,
            predictionCol="prediction",
            probabilityCol="probability",
            rawPredictionCol="rawPrediction",
            numLeaves=31,
            numIterations=100,
            learningRate=0.1,
            featureFraction=0.8,
            baggingFraction=0.8,
            baggingFreq=5,
            verbosity=-1
        )

        return lgbm.fit(df)

    def train_lightgbm_regressor(
        self,
        df,
        label_col: str,
        feature_col: str = "features"
    ):
        """Train LightGBM regressor at scale."""
        from synapse.ml.lightgbm import LightGBMRegressor

        lgbm = LightGBMRegressor(
            featuresCol=feature_col,
            labelCol=label_col,
            predictionCol="prediction",
            numLeaves=31,
            numIterations=100,
            learningRate=0.1,
            featureFraction=0.8
        )

        return lgbm.fit(df)

Distributed Deep Learning

class DistributedDeepLearning:
    """Deep learning at scale with SynapseML."""

    def __init__(self, spark):
        self.spark = spark

    def train_image_classifier(
        self,
        df,
        image_col: str,
        label_col: str,
        model_name: str = "ResNet50"
    ):
        """Train image classifier using transfer learning."""
        from synapse.ml.onnx import ImageFeaturizer
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml import Pipeline

        # Extract features using pre-trained model
        featurizer = (ImageFeaturizer()
            .setInputCol(image_col)
            .setOutputCol("image_features")
            .setModelName(model_name)
            .setCutOutputLayers(1))

        # Classifier on top
        classifier = LightGBMClassifier(
            featuresCol="image_features",
            labelCol=label_col,
            numIterations=50
        )

        pipeline = Pipeline(stages=[featurizer, classifier])
        return pipeline.fit(df)

    def train_text_classifier(
        self,
        df,
        text_col: str,
        label_col: str
    ):
        """Train text classifier with embeddings."""
        from synapse.ml.featurize.text import TextFeaturizer
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml import Pipeline

        # Text featurization
        text_featurizer = (TextFeaturizer()
            .setInputCol(text_col)
            .setOutputCol("text_features")
            .setUseTokenizer(True)
            .setUseStopWordsRemover(True)
            .setUseIDF(True)
            .setMinDocFreq(5))

        # Classifier
        classifier = LightGBMClassifier(
            featuresCol="text_features",
            labelCol=label_col,
            numIterations=100
        )

        pipeline = Pipeline(stages=[text_featurizer, classifier])
        return pipeline.fit(df)

    def ensemble_models(self, models: list, df):
        """Create ensemble from multiple models."""
        from pyspark.sql.functions import array, col
        from pyspark.ml.functions import vector_to_array

        predictions = df

        # Get predictions from each model
        for i, model in enumerate(models):
            pred_col = f"pred_{i}"
            predictions = model.transform(predictions)
            predictions = predictions.withColumnRenamed("prediction", pred_col)

        # Majority vote for classification
        pred_cols = [f"pred_{i}" for i in range(len(models))]

        from pyspark.sql.functions import greatest, least, avg
        predictions = predictions.withColumn(
            "ensemble_prediction",
            self._majority_vote(pred_cols)
        )

        return predictions

Hyperparameter Tuning at Scale

class DistributedHyperparameterTuning:
    """Distributed hyperparameter optimization."""

    def __init__(self, spark):
        self.spark = spark

    def tune_lightgbm(
        self,
        df,
        label_col: str,
        feature_col: str,
        param_grid: dict,
        num_folds: int = 5
    ):
        """Tune LightGBM with cross-validation."""
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
        from pyspark.ml.evaluation import BinaryClassificationEvaluator

        lgbm = LightGBMClassifier(
            featuresCol=feature_col,
            labelCol=label_col
        )

        # Build parameter grid
        grid_builder = ParamGridBuilder()

        if "numLeaves" in param_grid:
            grid_builder.addGrid(lgbm.numLeaves, param_grid["numLeaves"])
        if "numIterations" in param_grid:
            grid_builder.addGrid(lgbm.numIterations, param_grid["numIterations"])
        if "learningRate" in param_grid:
            grid_builder.addGrid(lgbm.learningRate, param_grid["learningRate"])

        param_grid = grid_builder.build()

        # Evaluator
        evaluator = BinaryClassificationEvaluator(
            labelCol=label_col,
            metricName="areaUnderROC"
        )

        # Cross-validator
        cv = CrossValidator(
            estimator=lgbm,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=num_folds,
            parallelism=4  # Parallel model training
        )

        cv_model = cv.fit(df)

        return {
            "best_model": cv_model.bestModel,
            "best_params": cv_model.bestModel.extractParamMap(),
            "cv_metrics": cv_model.avgMetrics
        }

    def random_search(
        self,
        df,
        label_col: str,
        feature_col: str,
        param_distributions: dict,
        n_iter: int = 20
    ):
        """Random search hyperparameter tuning."""
        import random
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml.evaluation import BinaryClassificationEvaluator

        evaluator = BinaryClassificationEvaluator(
            labelCol=label_col,
            metricName="areaUnderROC"
        )

        results = []
        train_df, val_df = df.randomSplit([0.8, 0.2], seed=42)

        for i in range(n_iter):
            # Sample parameters
            params = {
                k: random.choice(v) for k, v in param_distributions.items()
            }

            lgbm = LightGBMClassifier(
                featuresCol=feature_col,
                labelCol=label_col,
                **params
            )

            model = lgbm.fit(train_df)
            predictions = model.transform(val_df)
            score = evaluator.evaluate(predictions)

            results.append({
                "params": params,
                "score": score,
                "model": model
            })

            print(f"Iteration {i+1}/{n_iter}: AUC = {score:.4f}")

        # Return best
        best = max(results, key=lambda x: x["score"])
        return best

Model Interpretation

class ModelInterpretation:
    """Interpret SynapseML models at scale."""

    def __init__(self, spark):
        self.spark = spark

    def get_feature_importance(self, model, feature_names: list) -> dict:
        """Get feature importance from LightGBM model."""

        # LightGBM models have native feature importance
        if hasattr(model, "getFeatureImportances"):
            importances = model.getFeatureImportances("split")

            return dict(sorted(
                zip(feature_names, importances),
                key=lambda x: x[1],
                reverse=True
            ))

        return {}

    def compute_shap_values(self, model, df, feature_col: str):
        """Compute SHAP values for model explanations."""
        from synapse.ml.explainers import VectorSHAP

        shap_explainer = (VectorSHAP()
            .setInputCol(feature_col)
            .setOutputCol("shap_values")
            .setBackgroundData(df.limit(100))
            .setModel(model)
            .setNumSamples(1000))

        return shap_explainer.transform(df)

    def local_interpretation(self, model, df, row_index: int, feature_names: list):
        """Get local interpretation for a single prediction."""

        # Get the specific row
        row = df.collect()[row_index]

        # Get prediction
        single_df = self.spark.createDataFrame([row])
        prediction = model.transform(single_df).collect()[0]

        # Compute SHAP for this instance
        shap_df = self.compute_shap_values(model, single_df, "features")
        shap_values = shap_df.select("shap_values").collect()[0][0]

        return {
            "prediction": prediction["prediction"],
            "probability": prediction["probability"].toArray().tolist(),
            "feature_contributions": dict(zip(feature_names, shap_values))
        }

    def partial_dependence(
        self,
        model,
        df,
        feature_col: str,
        feature_name: str,
        num_points: int = 50
    ):
        """Compute partial dependence for a feature."""
        from pyspark.sql.functions import lit, col

        # Get feature range
        feature_stats = df.select(feature_name).describe().collect()
        min_val = float(feature_stats[3][1])  # min
        max_val = float(feature_stats[4][1])  # max

        # Generate grid points
        step = (max_val - min_val) / num_points
        grid_points = [min_val + i * step for i in range(num_points)]

        pdp_values = []

        for point in grid_points:
            # Replace feature value
            modified_df = df.withColumn(feature_name, lit(point))

            # Re-assemble features and predict
            predictions = model.transform(modified_df)
            avg_pred = predictions.select("prediction").agg({"prediction": "avg"}).collect()[0][0]

            pdp_values.append({"feature_value": point, "avg_prediction": avg_pred})

        return pdp_values

End-to-End ML Pipeline

class EndToEndMLPipeline:
    """Complete ML pipeline with SynapseML."""

    def __init__(self, spark):
        self.spark = spark
        self.pipeline_helper = SynapseMLPipeline(spark)
        self.tuner = DistributedHyperparameterTuning(spark)

    def run_classification_pipeline(
        self,
        df,
        label_col: str,
        numeric_cols: list,
        categorical_cols: list,
        tune_hyperparams: bool = True
    ):
        """Run complete classification pipeline."""

        # Split data
        train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

        # Feature engineering
        feature_pipeline = self.pipeline_helper.create_feature_pipeline(
            train_df, numeric_cols, categorical_cols
        )
        train_features = feature_pipeline.transform(train_df)
        test_features = feature_pipeline.transform(test_df)

        # Hyperparameter tuning or default training
        if tune_hyperparams:
            result = self.tuner.tune_lightgbm(
                train_features,
                label_col,
                "features",
                param_grid={
                    "numLeaves": [15, 31, 63],
                    "numIterations": [50, 100, 200],
                    "learningRate": [0.05, 0.1, 0.2]
                }
            )
            model = result["best_model"]
        else:
            model = self.pipeline_helper.train_lightgbm_classifier(
                train_features, label_col
            )

        # Evaluate
        predictions = model.transform(test_features)
        metrics = self._evaluate_classification(predictions, label_col)

        return {
            "feature_pipeline": feature_pipeline,
            "model": model,
            "metrics": metrics,
            "predictions": predictions
        }

    def _evaluate_classification(self, predictions, label_col: str) -> dict:
        """Evaluate classification model."""
        from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

        binary_eval = BinaryClassificationEvaluator(labelCol=label_col)
        multi_eval = MulticlassClassificationEvaluator(labelCol=label_col)

        return {
            "auc": binary_eval.evaluate(predictions, {binary_eval.metricName: "areaUnderROC"}),
            "accuracy": multi_eval.evaluate(predictions, {multi_eval.metricName: "accuracy"}),
            "f1": multi_eval.evaluate(predictions, {multi_eval.metricName: "f1"})
        }

# Usage
pipeline = EndToEndMLPipeline(spark)

# Load data
df = spark.table("silver.customer_features")

# Run pipeline
result = pipeline.run_classification_pipeline(
    df,
    label_col="churn",
    numeric_cols=["age", "tenure", "monthly_charges", "total_charges"],
    categorical_cols=["contract_type", "payment_method", "internet_service"],
    tune_hyperparams=True
)

print(f"Model AUC: {result['metrics']['auc']:.4f}")

SynapseML democratizes distributed machine learning. Train on massive datasets, tune hyperparameters in parallel, and deploy models at scale - all with familiar APIs.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.