Back to Blog
8 min read

ML Model Deployment Patterns in Microsoft Fabric

Introduction

Deploying machine learning models effectively is crucial for delivering business value from data science investments. Microsoft Fabric provides multiple patterns for model deployment, from batch scoring to real-time predictions. This post covers deployment strategies and best practices.

Deployment Patterns Overview

Pattern Selection Guide

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class DeploymentPattern(Enum):
    BATCH_SCORING = "batch"
    REAL_TIME_API = "realtime"
    EMBEDDED_PREDICT = "embedded"
    STREAMING = "streaming"

class LatencyRequirement(Enum):
    SUB_SECOND = "< 1 second"
    SECONDS = "1-30 seconds"
    MINUTES = "1-60 minutes"
    HOURS = "hours"

@dataclass
class DeploymentRequirements:
    latency: LatencyRequirement
    throughput_per_hour: int
    data_freshness: str
    infrastructure_preference: str

class DeploymentPatternSelector:
    """Select appropriate deployment pattern based on requirements"""

    def __init__(self):
        self.patterns = {
            DeploymentPattern.BATCH_SCORING: {
                "best_for": ["Large scale scoring", "Scheduled predictions", "Historical analysis"],
                "latency": [LatencyRequirement.MINUTES, LatencyRequirement.HOURS],
                "throughput": "Millions+ records",
                "infrastructure": "Spark notebooks, Pipelines",
                "example_use_cases": ["Daily churn predictions", "Monthly scoring", "Bulk recommendations"]
            },
            DeploymentPattern.REAL_TIME_API: {
                "best_for": ["Interactive applications", "User-facing predictions", "Low latency needs"],
                "latency": [LatencyRequirement.SUB_SECOND, LatencyRequirement.SECONDS],
                "throughput": "Thousands requests/minute",
                "infrastructure": "Azure ML endpoints, Custom APIs",
                "example_use_cases": ["Price optimization", "Fraud detection", "Chatbot responses"]
            },
            DeploymentPattern.EMBEDDED_PREDICT: {
                "best_for": ["SQL-based analytics", "Power BI integration", "Ad-hoc predictions"],
                "latency": [LatencyRequirement.SECONDS, LatencyRequirement.MINUTES],
                "throughput": "Moderate",
                "infrastructure": "PREDICT function in Fabric",
                "example_use_cases": ["Dashboard predictions", "Query-time scoring", "What-if analysis"]
            },
            DeploymentPattern.STREAMING: {
                "best_for": ["Real-time data", "Event processing", "Continuous predictions"],
                "latency": [LatencyRequirement.SUB_SECOND, LatencyRequirement.SECONDS],
                "throughput": "Continuous stream",
                "infrastructure": "Event Streams, Spark Streaming",
                "example_use_cases": ["IoT anomaly detection", "Real-time recommendations", "Live monitoring"]
            }
        }

    def recommend_pattern(self, requirements: DeploymentRequirements) -> Dict:
        """Recommend deployment pattern based on requirements"""
        recommendations = []

        for pattern, details in self.patterns.items():
            score = 0

            # Check latency match
            if requirements.latency in details["latency"]:
                score += 40

            # Check throughput alignment
            if requirements.throughput_per_hour > 1000000 and pattern == DeploymentPattern.BATCH_SCORING:
                score += 30
            elif requirements.throughput_per_hour < 10000 and pattern in [DeploymentPattern.REAL_TIME_API, DeploymentPattern.EMBEDDED_PREDICT]:
                score += 30

            recommendations.append({
                "pattern": pattern.value,
                "score": score,
                "details": details
            })

        recommendations.sort(key=lambda x: x["score"], reverse=True)
        return recommendations[0]

# Usage
selector = DeploymentPatternSelector()

requirements = DeploymentRequirements(
    latency=LatencyRequirement.MINUTES,
    throughput_per_hour=500000,
    data_freshness="daily",
    infrastructure_preference="Fabric native"
)

recommendation = selector.recommend_pattern(requirements)
print(f"Recommended: {recommendation['pattern']}")
print(f"Best for: {recommendation['details']['best_for']}")

Batch Deployment Pattern

class BatchDeploymentPattern:
    """Batch model deployment in Fabric"""

    def generate_batch_pipeline(
        self,
        model_name: str,
        source_table: str,
        predictions_table: str,
        feature_columns: List[str],
        schedule: str = "daily"
    ) -> str:
        """Generate batch scoring pipeline code"""
        return f'''
# Batch Model Deployment Pipeline
import mlflow
from pyspark.sql import SparkSession, functions as F
from datetime import datetime

spark = SparkSession.builder.getOrCreate()

class BatchScoringPipeline:
    def __init__(self, model_name: str, stage: str = "Production"):
        self.model_name = model_name
        self.model_uri = f"models:/{{model_name}}/{{stage}}"
        self.predict_udf = mlflow.pyfunc.spark_udf(spark, self.model_uri)

    def score(self, source_table: str, feature_columns: list) -> "DataFrame":
        """Score data from source table"""
        # Read source data
        df = spark.table(source_table)
        print(f"Scoring {{df.count()}} records...")

        # Apply model
        predictions = df.withColumn(
            "prediction",
            self.predict_udf(*[F.col(c) for c in feature_columns])
        )

        # Add metadata
        predictions = predictions.withColumn("model_name", F.lit(self.model_name))
        predictions = predictions.withColumn("model_uri", F.lit(self.model_uri))
        predictions = predictions.withColumn("scored_at", F.current_timestamp())

        return predictions

    def save_predictions(self, predictions, target_table: str, mode: str = "overwrite"):
        """Save predictions to Delta table"""
        (predictions
            .write
            .format("delta")
            .mode(mode)
            .option("mergeSchema", "true")
            .saveAsTable(target_table)
        )
        print(f"Saved predictions to {{target_table}}")

    def run(self, source_table: str, target_table: str, feature_columns: list):
        """Run complete pipeline"""
        start_time = datetime.now()

        predictions = self.score(source_table, feature_columns)
        self.save_predictions(predictions, target_table)

        duration = (datetime.now() - start_time).total_seconds()
        print(f"Pipeline completed in {{duration:.2f}} seconds")

        return predictions

# Execute pipeline
pipeline = BatchScoringPipeline("{model_name}")
predictions = pipeline.run(
    source_table="{source_table}",
    target_table="{predictions_table}",
    feature_columns={feature_columns}
)
'''

    def generate_scheduled_pipeline(self, schedule_config: Dict) -> str:
        """Generate scheduled pipeline configuration"""
        return f'''
# Scheduled Batch Scoring Configuration
# This would be set up in Fabric Data Factory Pipeline

pipeline_config = {{
    "name": "BatchScoringPipeline",
    "schedule": {{
        "frequency": "{schedule_config.get('frequency', 'daily')}",
        "time": "{schedule_config.get('time', '06:00')}",
        "timezone": "{schedule_config.get('timezone', 'UTC')}"
    }},
    "activities": [
        {{
            "name": "ScoreNewData",
            "type": "Notebook",
            "notebook_path": "Notebooks/batch_scoring",
            "parameters": {{
                "source_table": "@pipeline().parameters.source_table",
                "target_table": "@pipeline().parameters.target_table"
            }}
        }},
        {{
            "name": "ValidatePredictions",
            "type": "Notebook",
            "notebook_path": "Notebooks/validate_predictions",
            "dependsOn": ["ScoreNewData"]
        }},
        {{
            "name": "NotifyCompletion",
            "type": "Web",
            "url": "https://hooks.example.com/notify",
            "body": {{"status": "completed", "records": "@activity('ScoreNewData').output.count"}},
            "dependsOn": ["ValidatePredictions"]
        }}
    ]
}}
'''

Real-Time Deployment Pattern

class RealTimeDeploymentPattern:
    """Real-time model deployment patterns"""

    def generate_azure_ml_deployment(
        self,
        model_name: str,
        environment_name: str
    ) -> str:
        """Generate Azure ML endpoint deployment code"""
        return f'''
# Real-Time Deployment to Azure ML Managed Endpoint
from azure.ai.ml import MLClient
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    Model,
    Environment,
    CodeConfiguration
)
from azure.identity import DefaultAzureCredential

# Initialize ML client
credential = DefaultAzureCredential()
ml_client = MLClient(
    credential=credential,
    subscription_id="<subscription-id>",
    resource_group_name="<resource-group>",
    workspace_name="<workspace>"
)

# Create endpoint
endpoint_name = "{model_name.lower().replace('_', '-')}-endpoint"
endpoint = ManagedOnlineEndpoint(
    name=endpoint_name,
    description="Endpoint for {model_name}",
    auth_mode="key"
)
ml_client.online_endpoints.begin_create_or_update(endpoint).result()

# Create deployment
deployment = ManagedOnlineDeployment(
    name="blue",
    endpoint_name=endpoint_name,
    model=ml_client.models.get("{model_name}", version="1"),
    environment="{environment_name}",
    instance_type="Standard_DS3_v2",
    instance_count=1,
    scoring_script="score.py"
)
ml_client.online_deployments.begin_create_or_update(deployment).result()

# Set traffic
endpoint.traffic = {{"blue": 100}}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()

print(f"Endpoint deployed: {{endpoint.scoring_uri}}")
'''

    def generate_scoring_script(self) -> str:
        """Generate scoring script for deployment"""
        return '''
# score.py - Scoring script for Azure ML deployment
import json
import mlflow
import pandas as pd
import os

def init():
    """Initialize model on startup"""
    global model
    model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "model")
    model = mlflow.pyfunc.load_model(model_path)

def run(raw_data):
    """Score incoming data"""
    try:
        # Parse input
        data = json.loads(raw_data)

        if isinstance(data, dict):
            df = pd.DataFrame([data])
        else:
            df = pd.DataFrame(data)

        # Make predictions
        predictions = model.predict(df)

        # Return results
        return json.dumps({
            "predictions": predictions.tolist(),
            "status": "success"
        })

    except Exception as e:
        return json.dumps({
            "error": str(e),
            "status": "error"
        })
'''

    def generate_client_code(self, endpoint_url: str) -> str:
        """Generate client code for calling endpoint"""
        return f'''
# Client Code for Real-Time Predictions
import requests
import json

class ModelClient:
    def __init__(self, endpoint_url: str, api_key: str):
        self.endpoint_url = endpoint_url
        self.headers = {{
            "Authorization": f"Bearer {{api_key}}",
            "Content-Type": "application/json"
        }}

    def predict(self, data: dict) -> dict:
        """Make single prediction"""
        response = requests.post(
            self.endpoint_url,
            headers=self.headers,
            json=data
        )
        response.raise_for_status()
        return response.json()

    def predict_batch(self, data_list: list) -> list:
        """Make batch predictions"""
        response = requests.post(
            self.endpoint_url,
            headers=self.headers,
            json=data_list
        )
        response.raise_for_status()
        return response.json()

# Usage
client = ModelClient(
    endpoint_url="{endpoint_url}",
    api_key="your-api-key"
)

# Single prediction
result = client.predict({{
    "feature1": 10.5,
    "feature2": "category_a",
    "feature3": 100
}})
print(f"Prediction: {{result['predictions'][0]}}")

# Batch predictions
batch_data = [
    {{"feature1": 10.5, "feature2": "category_a", "feature3": 100}},
    {{"feature1": 20.3, "feature2": "category_b", "feature3": 200}}
]
results = client.predict_batch(batch_data)
print(f"Batch predictions: {{results['predictions']}}")
'''

Streaming Deployment Pattern

class StreamingDeploymentPattern:
    """Streaming model deployment in Fabric"""

    def generate_spark_streaming_scorer(
        self,
        model_uri: str,
        input_topic: str,
        output_topic: str
    ) -> str:
        """Generate Spark Structured Streaming scoring code"""
        return f'''
# Streaming Model Deployment with Spark Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import mlflow

spark = SparkSession.builder.getOrCreate()

# Load model as UDF
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri="{model_uri}")

# Define input schema
input_schema = StructType([
    StructField("id", StringType(), True),
    StructField("feature1", DoubleType(), True),
    StructField("feature2", DoubleType(), True),
    StructField("feature3", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

# Read from Event Hub/Kafka
stream_df = (spark
    .readStream
    .format("eventhubs")  # or "kafka"
    .option("eventhubs.connectionString", "<connection-string>")
    .option("eventhubs.consumerGroup", "$Default")
    .load()
    .select(F.from_json(F.col("body").cast("string"), input_schema).alias("data"))
    .select("data.*")
)

# Apply model predictions
predictions_df = stream_df.withColumn(
    "prediction",
    predict_udf(
        F.col("feature1"),
        F.col("feature2"),
        F.col("feature3")
    )
).withColumn("scored_at", F.current_timestamp())

# Write to output
query = (predictions_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoints/streaming_predictions")
    .table("streaming_predictions")
)

# Monitor stream
query.awaitTermination()
'''

    def generate_eventstream_integration(self) -> str:
        """Generate Fabric Eventstream integration code"""
        return '''
# Fabric Eventstream Integration for Real-Time Scoring
# This pattern uses Fabric's Eventstream for streaming data

# 1. Create Eventstream in Fabric UI
# 2. Add source (Event Hub, Kafka, custom app)
# 3. Add transformation node for feature engineering
# 4. Add destination to KQL Database or Lakehouse

# For custom processing, use KQL to apply model:
kql_query = """
events
| extend prediction = predict_linear_regression(
    pack_array(feature1, feature2, feature3),
    model_coefficients
)
| project timestamp, id, feature1, feature2, feature3, prediction
"""

# Or process in Spark with streaming
from pyspark.sql.streaming import StreamingQueryListener

class PredictionMonitor(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")

    def onQueryProgress(self, event):
        print(f"Processed {event.progress.numInputRows} rows")

    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")

spark.streams.addListener(PredictionMonitor())
'''

Model Versioning and A/B Testing

class ModelVersioningAndTesting:
    """Model versioning and A/B testing patterns"""

    def generate_ab_testing_code(
        self,
        model_a_uri: str,
        model_b_uri: str,
        traffic_split: float = 0.5
    ) -> str:
        """Generate A/B testing code for models"""
        return f'''
# A/B Testing for Model Deployment
import mlflow
from pyspark.sql import functions as F
import random

class ABTestingScorer:
    def __init__(self, model_a_uri: str, model_b_uri: str, split: float = 0.5):
        self.model_a = mlflow.pyfunc.spark_udf(spark, model_a_uri)
        self.model_b = mlflow.pyfunc.spark_udf(spark, model_b_uri)
        self.split = split

    def score_with_ab_test(self, df, feature_columns):
        """Score data with A/B test assignment"""
        # Assign to A or B group
        df = df.withColumn(
            "ab_group",
            F.when(F.rand() < self.split, "A").otherwise("B")
        )

        # Score with appropriate model
        feature_cols = [F.col(c) for c in feature_columns]

        df = df.withColumn(
            "prediction",
            F.when(
                F.col("ab_group") == "A",
                self.model_a(*feature_cols)
            ).otherwise(
                self.model_b(*feature_cols)
            )
        )

        df = df.withColumn("scored_at", F.current_timestamp())

        return df

# Usage
ab_scorer = ABTestingScorer(
    model_a_uri="{model_a_uri}",
    model_b_uri="{model_b_uri}",
    split={traffic_split}
)

predictions = ab_scorer.score_with_ab_test(
    df,
    feature_columns=["feature1", "feature2", "feature3"]
)

# Analyze A/B test results
ab_results = (predictions
    .groupBy("ab_group")
    .agg(
        F.count("*").alias("count"),
        F.avg("prediction").alias("avg_prediction"),
        F.stddev("prediction").alias("std_prediction")
    )
)
ab_results.show()
'''

# Usage
versioning = ModelVersioningAndTesting()

ab_code = versioning.generate_ab_testing_code(
    model_a_uri="models:/churn_model/1",
    model_b_uri="models:/churn_model/2",
    traffic_split=0.5
)
print(ab_code)

Conclusion

Microsoft Fabric offers flexible model deployment patterns to meet various business requirements. By understanding the trade-offs between batch, real-time, embedded, and streaming deployment patterns, you can choose the right approach for your use case. Combining these patterns with proper versioning and A/B testing enables robust MLOps practices that deliver reliable ML-powered insights.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.