Skip to content
Back to Blog
1 min read

SynapseML: Distributed AI at Scale

I wrote “SynapseML: Distributed AI at Scale” to share practical, production-minded guidance on this topic.

SynapseML Fundamentals

# SynapseML setup in Synapse or Databricks
from synapse.ml.core.spark import FluentAPI
from synapse.ml.featurize import *
from synapse.ml.train import *
from synapse.ml.lightgbm import *
from pyspark.sql import SparkSession

# Configure Spark for SynapseML
spark = SparkSession.builder \
    .appName("SynapseML-Demo") \
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.11.0") \
    .getOrCreate()

class SynapseMLPipeline:
    """Distributed ML pipeline with SynapseML."""

    def __init__(self, spark: SparkSession):
        self.spark = spark

    def create_feature_pipeline(self, df, numeric_cols: list, categorical_cols: list):
        """Create feature engineering pipeline."""
        from synapse.ml.featurize import Featurize
        from pyspark.ml import Pipeline
        from pyspark.ml.feature import VectorAssembler, StringIndexer

        stages = []

        # Index categorical columns
        for col in categorical_cols:
            indexer = StringIndexer(
                inputCol=col,
                outputCol=f"{col}_indexed",
                handleInvalid="keep"
            )
            stages.append(indexer)

        # Assemble features
        feature_cols = numeric_cols + [f"{c}_indexed" for c in categorical_cols]
        assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol="features",
            handleInvalid="skip"
        )
        stages.append(assembler)

        pipeline = Pipeline(stages=stages)
        return pipeline.fit(df)

    def train_lightgbm_classifier(
        self,
        df,
        label_col: str,
        feature_col: str = "features"
    ):
        """Train LightGBM classifier at scale."""
        from synapse.ml.lightgbm import LightGBMClassifier

        lgbm = LightGBMClassifier(
            featuresCol=feature_col,
            labelCol=label_col,
            predictionCol="prediction",
            probabilityCol="probability",
            rawPredictionCol="rawPrediction",
            numLeaves=31,
            numIterations=100,
            learningRate=0.1,
            featureFraction=0.8,
            baggingFraction=0.8,
            baggingFreq=5,
            verbosity=-1
        )

        return lgbm.fit(df)

    def train_lightgbm_regressor(
        self,
        df,
        label_col: str,
        feature_col: str = "features"
    ):
        """Train LightGBM regressor at scale."""
        from synapse.ml.lightgbm import LightGBMRegressor

        lgbm = LightGBMRegressor(
            featuresCol=feature_col,
            labelCol=label_col,
            predictionCol="prediction",
            numLeaves=31,
            numIterations=100,
            learningRate=0.1,
            featureFraction=0.8
        )

        return lgbm.fit(df)

Distributed Deep Learning

class DistributedDeepLearning:
    """Deep learning at scale with SynapseML."""

    def __init__(self, spark):
        self.spark = spark

    def train_image_classifier(
        self,
        df,
        image_col: str,
        label_col: str,
        model_name: str = "ResNet50"
    ):
        """Train image classifier using transfer learning."""
        from synapse.ml.onnx import ImageFeaturizer
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml import Pipeline

        # Extract features using pre-trained model
        featurizer = (ImageFeaturizer()
            .setInputCol(image_col)
            .setOutputCol("image_features")
            .setModelName(model_name)
            .setCutOutputLayers(1))

        # Classifier on top
        classifier = LightGBMClassifier(
            featuresCol="image_features",
            labelCol=label_col,
            numIterations=50
        )

        pipeline = Pipeline(stages=[featurizer, classifier])
        return pipeline.fit(df)

    def train_text_classifier(
        self,
        df,
        text_col: str,
        label_col: str
    ):
        """Train text classifier with embeddings."""
        from synapse.ml.featurize.text import TextFeaturizer
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml import Pipeline

        # Text featurization
        text_featurizer = (TextFeaturizer()
            .setInputCol(text_col)
            .setOutputCol("text_features")
            .setUseTokenizer(True)
            .setUseStopWordsRemover(True)
            .setUseIDF(True)
            .setMinDocFreq(5))

        # Classifier
        classifier = LightGBMClassifier(
            featuresCol="text_features",
            labelCol=label_col,
            numIterations=100
        )

        pipeline = Pipeline(stages=[text_featurizer, classifier])
        return pipeline.fit(df)

    def ensemble_models(self, models: list, df):
        """Create ensemble from multiple models."""
        from pyspark.sql.functions import array, col
        from pyspark.ml.functions import vector_to_array

        predictions = df

        # Get predictions from each model
        for i, model in enumerate(models):
            pred_col = f"pred_{i}"
            predictions = model.transform(predictions)
            predictions = predictions.withColumnRenamed("prediction", pred_col)

        # Majority vote for classification
        pred_cols = [f"pred_{i}" for i in range(len(models))]

        from pyspark.sql.functions import greatest, least, avg
        predictions = predictions.withColumn(
            "ensemble_prediction",
            self._majority_vote(pred_cols)
        )

        return predictions

Hyperparameter Tuning at Scale

class DistributedHyperparameterTuning:
    """Distributed hyperparameter optimization."""

    def __init__(self, spark):
        self.spark = spark

    def tune_lightgbm(
        self,
        df,
        label_col: str,
        feature_col: str,
        param_grid: dict,
        num_folds: int = 5
    ):
        """Tune LightGBM with cross-validation."""
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
        from pyspark.ml.evaluation import BinaryClassificationEvaluator

        lgbm = LightGBMClassifier(
            featuresCol=feature_col,
            labelCol=label_col
        )

        # Build parameter grid
        grid_builder = ParamGridBuilder()

        if "numLeaves" in param_grid:
            grid_builder.addGrid(lgbm.numLeaves, param_grid["numLeaves"])
        if "numIterations" in param_grid:
            grid_builder.addGrid(lgbm.numIterations, param_grid["numIterations"])
        if "learningRate" in param_grid:
            grid_builder.addGrid(lgbm.learningRate, param_grid["learningRate"])

        param_grid = grid_builder.build()

        # Evaluator
        evaluator = BinaryClassificationEvaluator(
            labelCol=label_col,
            metricName="areaUnderROC"
        )

        # Cross-validator
        cv = CrossValidator(
            estimator=lgbm,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=num_folds,
            parallelism=4  # Parallel model training
        )

        cv_model = cv.fit(df)

        return {
            "best_model": cv_model.bestModel,
            "best_params": cv_model.bestModel.extractParamMap(),
            "cv_metrics": cv_model.avgMetrics
        }

    def random_search(
        self,
        df,
        label_col: str,
        feature_col: str,
        param_distributions: dict,
        n_iter: int = 20
    ):
        """Random search hyperparameter tuning."""
        import random
        from synapse.ml.lightgbm import LightGBMClassifier
        from pyspark.ml.evaluation import BinaryClassificationEvaluator

        evaluator = BinaryClassificationEvaluator(
            labelCol=label_col,
            metricName="areaUnderROC"
        )

        results = []
        train_df, val_df = df.randomSplit([0.8, 0.2], seed=42)

        for i in range(n_iter):
            # Sample parameters
            params = {
                k: random.choice(v) for k, v in param_distributions.items()
            }

            lgbm = LightGBMClassifier(
                featuresCol=feature_col,
                labelCol=label_col,
                **params
            )

            model = lgbm.fit(train_df)
            predictions = model.transform(val_df)
            score = evaluator.evaluate(predictions)

            results.append({
                "params": params,
                "score": score,
                "model": model
            })

            print(f"Iteration {i+1}/{n_iter}: AUC = {score:.4f}")

        # Return best
        best = max(results, key=lambda x: x["score"])
        return best

Model Interpretation

class ModelInterpretation:
    """Interpret SynapseML models at scale."""

    def __init__(self, spark):
        self.spark = spark

    def get_feature_importance(self, model, feature_names: list) -> dict:
        """Get feature importance from LightGBM model."""

        # LightGBM models have native feature importance
        if hasattr(model, "getFeatureImportances"):
            importances = model.getFeatureImportances("split")

            return dict(sorted(
                zip(feature_names, importances),
                key=lambda x: x[1],
                reverse=True
            ))

        return {}

    def compute_shap_values(self, model, df, feature_col: str):
        """Compute SHAP values for model explanations."""
        from synapse.ml.explainers import VectorSHAP

        shap_explainer = (VectorSHAP()
            .setInputCol(feature_col)
            .setOutputCol("shap_values")
            .setBackgroundData(df.limit(100))
            .setModel(model)
            .setNumSamples(1000))

        return shap_explainer.transform(df)

    def local_interpretation(self, model, df, row_index: int, feature_names: list):
        """Get local interpretation for a single prediction."""

        # Get the specific row
        row = df.collect()[row_index]

        # Get prediction
        single_df = self.spark.createDataFrame([row])
        prediction = model.transform(single_df).collect()[0]

        # Compute SHAP for this instance
        shap_df = self.compute_shap_values(model, single_df, "features")
        shap_values = shap_df.select("shap_values").collect()[0][0]

        return {
            "prediction": prediction["prediction"],
            "probability": prediction["probability"].toArray().tolist(),
            "feature_contributions": dict(zip(feature_names, shap_values))
        }

    def partial_dependence(
        self,
        model,
        df,
        feature_col: str,
        feature_name: str,
        num_points: int = 50
    ):
        """Compute partial dependence for a feature."""
        from pyspark.sql.functions import lit, col

        # Get feature range
        feature_stats = df.select(feature_name).describe().collect()
        min_val = float(feature_stats[3][1])  # min
        max_val = float(feature_stats[4][1])  # max

        # Generate grid points
        step = (max_val - min_val) / num_points
        grid_points = [min_val + i * step for i in range(num_points)]

        pdp_values = []

        for point in grid_points:
            # Replace feature value
            modified_df = df.withColumn(feature_name, lit(point))

            # Re-assemble features and predict
            predictions = model.transform(modified_df)
            avg_pred = predictions.select("prediction").agg({"prediction": "avg"}).collect()[0][0]

            pdp_values.append({"feature_value": point, "avg_prediction": avg_pred})

        return pdp_values

End-to-End ML Pipeline

class EndToEndMLPipeline:
    """Complete ML pipeline with SynapseML."""

    def __init__(self, spark):
        self.spark = spark
        self.pipeline_helper = SynapseMLPipeline(spark)
        self.tuner = DistributedHyperparameterTuning(spark)

    def run_classification_pipeline(
        self,
        df,
        label_col: str,
        numeric_cols: list,
        categorical_cols: list,
        tune_hyperparams: bool = True
    ):
        """Run complete classification pipeline."""

        # Split data
        train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

        # Feature engineering
        feature_pipeline = self.pipeline_helper.create_feature_pipeline(
            train_df, numeric_cols, categorical_cols
        )
        train_features = feature_pipeline.transform(train_df)
        test_features = feature_pipeline.transform(test_df)

        # Hyperparameter tuning or default training
        if tune_hyperparams:
            result = self.tuner.tune_lightgbm(
                train_features,
                label_col,
                "features",
                param_grid={
                    "numLeaves": [15, 31, 63],
                    "numIterations": [50, 100, 200],
                    "learningRate": [0.05, 0.1, 0.2]
                }
            )
            model = result["best_model"]
        else:
            model = self.pipeline_helper.train_lightgbm_classifier(
                train_features, label_col
            )

        # Evaluate
        predictions = model.transform(test_features)
        metrics = self._evaluate_classification(predictions, label_col)

        return {
            "feature_pipeline": feature_pipeline,
            "model": model,
            "metrics": metrics,
            "predictions": predictions
        }

    def _evaluate_classification(self, predictions, label_col: str) -> dict:
        """Evaluate classification model."""
        from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

        binary_eval = BinaryClassificationEvaluator(labelCol=label_col)
        multi_eval = MulticlassClassificationEvaluator(labelCol=label_col)

        return {
            "auc": binary_eval.evaluate(predictions, {binary_eval.metricName: "areaUnderROC"}),
            "accuracy": multi_eval.evaluate(predictions, {multi_eval.metricName: "accuracy"}),
            "f1": multi_eval.evaluate(predictions, {multi_eval.metricName: "f1"})
        }

# Usage
pipeline = EndToEndMLPipeline(spark)

# Load data
df = spark.table("silver.customer_features")

# Run pipeline
result = pipeline.run_classification_pipeline(
    df,
    label_col="churn",
    numeric_cols=["age", "tenure", "monthly_charges", "total_charges"],
    categorical_cols=["contract_type", "payment_method", "internet_service"],
    tune_hyperparams=True
)

print(f"Model AUC: {result['metrics']['auc']:.4f}")

SynapseML democratizes distributed machine learning. Train on massive datasets, tune hyperparameters in parallel, and deploy models at scale - all with familiar APIs.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.