Back to Blog
8 min read

Streaming Machine Learning: Training and Inference on Real-Time Data

Streaming ML enables models to learn from and make predictions on real-time data. Let’s explore how to implement streaming ML pipelines in Microsoft Fabric using real tools and frameworks.

Streaming ML Architecture

┌─────────────────────────────────────────────────────────────┐
│                  Streaming ML Pipeline                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Data Stream → Feature      → Model        → Predictions    │
│                Engineering    Serving                        │
│                    │             │                           │
│                    ↓             ↓                           │
│              [Feature      [Model          [Monitoring      │
│               Store]        Registry]       & Feedback]     │
│                    │             │              │            │
│                    └─────────────┴──────────────┘            │
│                              │                               │
│                    [Continuous Training]                     │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Online Feature Engineering with Spark Structured Streaming

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("StreamingFeatures") \
    .getOrCreate()

# Read from EventHub/Kafka stream
activity_stream = spark.readStream \
    .format("eventhubs") \
    .options(**eventhub_config) \
    .load()

# Parse the event data
parsed_stream = activity_stream.select(
    F.from_json(F.col("body").cast("string"), event_schema).alias("data"),
    F.col("enqueuedTime").alias("event_time")
).select("data.*", "event_time")

# Define windowed aggregations for features
windowed_features = parsed_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        F.col("user_id"),
        F.window("event_time", "5 minutes")
    ) \
    .agg(
        F.count(F.when(F.col("event_type") == "click", 1)).alias("clicks_last_5min"),
        F.avg("session_duration").alias("avg_session_duration"),
        F.countDistinct("page_id").alias("unique_pages_viewed"),
        F.sum(F.when(F.col("event_type") == "purchase", F.col("amount")).otherwise(0)).alias("purchase_amount")
    )

# Add derived features
enriched_features = windowed_features \
    .withColumn("engagement_score",
        F.col("clicks_last_5min") * 0.3 +
        F.col("unique_pages_viewed") * 0.4 +
        F.when(F.col("purchase_amount") > 0, 30).otherwise(0)
    )

# Write to Delta Lake for offline feature store
feature_query = enriched_features.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/user_features") \
    .toTable("feature_store.user_realtime_features")

# Write to Redis for online feature serving
def write_to_redis(batch_df, batch_id):
    import redis
    r = redis.Redis(host='localhost', port=6379)

    for row in batch_df.collect():
        key = f"user_features:{row.user_id}"
        r.hset(key, mapping={
            "clicks_last_5min": row.clicks_last_5min,
            "avg_session_duration": row.avg_session_duration,
            "unique_pages_viewed": row.unique_pages_viewed,
            "engagement_score": row.engagement_score
        })
        r.expire(key, 3600)  # 1 hour TTL

redis_query = enriched_features.writeStream \
    .foreachBatch(write_to_redis) \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/redis_features") \
    .start()

Online Model Serving with MLflow

import mlflow
from mlflow.tracking import MlflowClient
from fastapi import FastAPI
from pydantic import BaseModel
import redis
import numpy as np

app = FastAPI()
client = MlflowClient()

# Load model from MLflow registry
model_name = "user_recommender"
model_version = "production"
model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_version}")

# Redis client for online features
redis_client = redis.Redis(host='localhost', port=6379)

class RecommendationRequest(BaseModel):
    user_id: str
    current_page: str
    device_type: str

class RecommendationResponse(BaseModel):
    recommendations: list
    model_version: str
    latency_ms: float

@app.post("/recommend", response_model=RecommendationResponse)
async def recommend(request: RecommendationRequest):
    import time
    start_time = time.time()

    # Fetch online features from Redis
    features = redis_client.hgetall(f"user_features:{request.user_id}")

    # Prepare feature vector
    feature_vector = {
        "clicks_last_5min": float(features.get(b"clicks_last_5min", 0)),
        "avg_session_duration": float(features.get(b"avg_session_duration", 0)),
        "unique_pages_viewed": float(features.get(b"unique_pages_viewed", 0)),
        "engagement_score": float(features.get(b"engagement_score", 0)),
        "current_page": request.current_page,
        "device_type": request.device_type
    }

    # Run inference
    predictions = model.predict([feature_vector])

    latency_ms = (time.time() - start_time) * 1000

    # Log prediction for monitoring
    mlflow.log_metrics({
        "inference_latency_ms": latency_ms,
        "user_engagement_score": feature_vector["engagement_score"]
    })

    return RecommendationResponse(
        recommendations=predictions[0].tolist(),
        model_version=model_version,
        latency_ms=latency_ms
    )

# Health check
@app.get("/health")
async def health():
    return {"status": "healthy", "model": model_name, "version": model_version}

Continuous Training Pipeline with MLflow and PySpark

import mlflow
from mlflow.models.signature import infer_signature
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from datetime import datetime, timedelta
import schedule
import time

spark = SparkSession.builder.appName("ContinuousTraining").getOrCreate()

def train_model():
    """Train model on recent data window."""
    mlflow.set_experiment("recommender_continuous_training")

    with mlflow.start_run(run_name=f"training_{datetime.now().strftime('%Y%m%d_%H%M')}"):
        # Load training data from last 24 hours
        training_data = spark.sql("""
            SELECT * FROM feature_store.training_data
            WHERE event_time >= current_timestamp() - INTERVAL 24 HOURS
        """)

        # Log data stats
        mlflow.log_param("training_samples", training_data.count())
        mlflow.log_param("training_window", "24h")

        # Feature engineering
        feature_cols = ["clicks_last_5min", "avg_session_duration",
                       "unique_pages_viewed", "engagement_score"]

        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
        scaler = StandardScaler(inputCol="features_raw", outputCol="features")

        # Model
        gbt = GBTClassifier(
            labelCol="label",
            featuresCol="features",
            maxIter=100,
            maxDepth=5
        )

        pipeline = Pipeline(stages=[assembler, scaler, gbt])

        # Split data
        train_df, val_df = training_data.randomSplit([0.9, 0.1], seed=42)

        # Train
        model = pipeline.fit(train_df)

        # Evaluate
        predictions = model.transform(val_df)
        evaluator = BinaryClassificationEvaluator(labelCol="label")

        auc = evaluator.evaluate(predictions)
        mlflow.log_metric("validation_auc", auc)

        # Log model
        signature = infer_signature(train_df.select(feature_cols), predictions.select("prediction"))
        mlflow.spark.log_model(model, "model", signature=signature)

        # Check if should promote
        current_production = get_production_metrics()
        if should_promote(auc, current_production):
            promote_model(mlflow.active_run().info.run_id)

        return auc

def get_production_metrics():
    """Get metrics of current production model."""
    client = mlflow.tracking.MlflowClient()
    try:
        production_version = client.get_latest_versions("recommender", stages=["Production"])[0]
        run = client.get_run(production_version.run_id)
        return {"auc": run.data.metrics.get("validation_auc", 0)}
    except:
        return {"auc": 0}

def should_promote(new_auc: float, current_metrics: dict) -> bool:
    """Decide if new model should be promoted."""
    current_auc = current_metrics.get("auc", 0)
    # Promote if AUC improves by at least 1%
    return new_auc > current_auc * 1.01

def promote_model(run_id: str):
    """Promote model to production."""
    client = mlflow.tracking.MlflowClient()

    # Register model
    model_uri = f"runs:/{run_id}/model"
    mv = mlflow.register_model(model_uri, "recommender")

    # Transition to production
    client.transition_model_version_stage(
        name="recommender",
        version=mv.version,
        stage="Production"
    )

    print(f"Promoted model version {mv.version} to Production")

# Schedule training every 6 hours
schedule.every(6).hours.do(train_model)

if __name__ == "__main__":
    while True:
        schedule.run_pending()
        time.sleep(60)

Model Monitoring and Drift Detection

import mlflow
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from scipy import stats
import numpy as np
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("ModelMonitoring").getOrCreate()

class ModelMonitor:
    def __init__(self, model_name: str, reference_data_path: str):
        self.model_name = model_name
        self.reference_data = spark.read.parquet(reference_data_path)
        self.reference_stats = self._compute_stats(self.reference_data)

    def _compute_stats(self, df):
        """Compute statistics for drift detection."""
        stats = {}
        numeric_cols = ["clicks_last_5min", "avg_session_duration",
                       "unique_pages_viewed", "engagement_score"]

        for col in numeric_cols:
            col_data = [row[col] for row in df.select(col).collect()]
            stats[col] = {
                "mean": np.mean(col_data),
                "std": np.std(col_data),
                "percentiles": np.percentile(col_data, [25, 50, 75])
            }
        return stats

    def detect_drift(self, current_data):
        """Detect feature drift using KS test."""
        current_stats = self._compute_stats(current_data)
        drift_results = {}

        for col in self.reference_stats:
            ref_data = [row[col] for row in self.reference_data.select(col).collect()]
            curr_data = [row[col] for row in current_data.select(col).collect()]

            # Kolmogorov-Smirnov test
            ks_stat, p_value = stats.ks_2samp(ref_data, curr_data)

            drift_results[col] = {
                "ks_statistic": ks_stat,
                "p_value": p_value,
                "drift_detected": p_value < 0.05
            }

        return drift_results

    def calculate_psi(self, expected, actual, bins=10):
        """Calculate Population Stability Index."""
        expected_percents = np.histogram(expected, bins=bins)[0] / len(expected)
        actual_percents = np.histogram(actual, bins=bins)[0] / len(actual)

        # Avoid division by zero
        expected_percents = np.clip(expected_percents, 0.0001, None)
        actual_percents = np.clip(actual_percents, 0.0001, None)

        psi = np.sum((actual_percents - expected_percents) *
                     np.log(actual_percents / expected_percents))
        return psi

    def monitor_predictions(self, predictions_table: str, feedback_table: str):
        """Monitor prediction quality with feedback."""
        # Get predictions from last hour
        predictions = spark.sql(f"""
            SELECT p.*, f.actual_outcome
            FROM {predictions_table} p
            LEFT JOIN {feedback_table} f ON p.request_id = f.request_id
            WHERE p.prediction_time >= current_timestamp() - INTERVAL 1 HOUR
            AND f.actual_outcome IS NOT NULL
        """)

        # Calculate metrics
        accuracy = predictions.filter(
            F.col("prediction") == F.col("actual_outcome")
        ).count() / predictions.count()

        # Log to MLflow
        with mlflow.start_run(run_name=f"monitoring_{datetime.now().isoformat()}"):
            mlflow.log_metric("hourly_accuracy", accuracy)

            # Check for drift
            current_features = spark.sql(f"""
                SELECT clicks_last_5min, avg_session_duration,
                       unique_pages_viewed, engagement_score
                FROM {predictions_table}
                WHERE prediction_time >= current_timestamp() - INTERVAL 1 HOUR
            """)

            drift_results = self.detect_drift(current_features)

            for col, result in drift_results.items():
                mlflow.log_metric(f"drift_ks_{col}", result["ks_statistic"])
                if result["drift_detected"]:
                    self._send_alert(f"Drift detected in {col}", "warning")

        return {"accuracy": accuracy, "drift": drift_results}

    def _send_alert(self, message: str, severity: str):
        """Send alert to monitoring system."""
        import requests
        # Send to Teams webhook, PagerDuty, etc.
        print(f"[{severity.upper()}] {message}")

# Usage
monitor = ModelMonitor(
    model_name="recommender",
    reference_data_path="/reference_data/recommender_baseline"
)

# Run monitoring check
results = monitor.monitor_predictions(
    predictions_table="predictions.recommender_predictions",
    feedback_table="feedback.user_actions"
)
print(f"Accuracy: {results['accuracy']:.4f}")

A/B Testing in Streaming

import hashlib
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import mlflow

spark = SparkSession.builder.appName("ABTesting").getOrCreate()

class StreamingABTest:
    def __init__(self, experiment_name: str, variants: list):
        self.experiment_name = experiment_name
        self.variants = {v["name"]: v for v in variants}
        self.total_traffic = sum(v["traffic_percentage"] for v in variants)

    def assign_variant(self, user_id: str) -> str:
        """Deterministically assign user to variant."""
        hash_value = int(hashlib.md5(
            f"{self.experiment_name}:{user_id}".encode()
        ).hexdigest(), 16)

        bucket = hash_value % 100
        cumulative = 0

        for variant in self.variants.values():
            cumulative += variant["traffic_percentage"]
            if bucket < cumulative:
                return variant["name"]

        return list(self.variants.keys())[0]

    def log_exposure(self, user_id: str, variant: str, request_id: str):
        """Log exposure event to Delta Lake."""
        exposure_df = spark.createDataFrame([{
            "experiment": self.experiment_name,
            "user_id": user_id,
            "variant": variant,
            "request_id": request_id,
            "timestamp": F.current_timestamp()
        }])

        exposure_df.write.format("delta").mode("append").saveAsTable("experiments.exposures")

    def log_outcome(self, request_id: str, converted: bool, revenue: float = 0):
        """Log outcome event."""
        outcome_df = spark.createDataFrame([{
            "request_id": request_id,
            "converted": converted,
            "revenue": revenue,
            "timestamp": F.current_timestamp()
        }])

        outcome_df.write.format("delta").mode("append").saveAsTable("experiments.outcomes")

    def get_results(self):
        """Calculate experiment results."""
        results = spark.sql(f"""
            WITH exposures AS (
                SELECT * FROM experiments.exposures
                WHERE experiment = '{self.experiment_name}'
            ),
            outcomes AS (
                SELECT * FROM experiments.outcomes
            ),
            joined AS (
                SELECT
                    e.variant,
                    e.user_id,
                    COALESCE(o.converted, false) as converted,
                    COALESCE(o.revenue, 0) as revenue
                FROM exposures e
                LEFT JOIN outcomes o ON e.request_id = o.request_id
            )
            SELECT
                variant,
                COUNT(DISTINCT user_id) as users,
                SUM(CASE WHEN converted THEN 1 ELSE 0 END) as conversions,
                AVG(CASE WHEN converted THEN 1.0 ELSE 0.0 END) as conversion_rate,
                AVG(revenue) as avg_revenue
            FROM joined
            GROUP BY variant
        """)

        return results.toPandas()

    def calculate_significance(self, control_rate: float, treatment_rate: float,
                               control_n: int, treatment_n: int) -> dict:
        """Calculate statistical significance."""
        from scipy import stats
        import numpy as np

        # Pooled proportion
        pooled = (control_rate * control_n + treatment_rate * treatment_n) / (control_n + treatment_n)
        se = np.sqrt(pooled * (1 - pooled) * (1/control_n + 1/treatment_n))

        z_score = (treatment_rate - control_rate) / se
        p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))

        return {
            "z_score": z_score,
            "p_value": p_value,
            "significant": p_value < 0.05,
            "lift": (treatment_rate - control_rate) / control_rate if control_rate > 0 else 0
        }

# Usage
ab_test = StreamingABTest(
    experiment_name="recommender_v2_test",
    variants=[
        {"name": "control", "model_version": "v1.5", "traffic_percentage": 50},
        {"name": "treatment", "model_version": "v2.0", "traffic_percentage": 50}
    ]
)

# In recommendation endpoint
def get_recommendations(user_id: str, request_id: str):
    variant = ab_test.assign_variant(user_id)
    model_version = ab_test.variants[variant]["model_version"]

    # Log exposure
    ab_test.log_exposure(user_id, variant, request_id)

    # Get predictions from appropriate model
    model = mlflow.pyfunc.load_model(f"models:/recommender/{model_version}")
    predictions = model.predict(...)

    return predictions, variant

# Get results
results = ab_test.get_results()
print(results)

Streaming ML enables intelligent real-time systems that continuously learn and adapt. Start with robust feature engineering and monitoring before enabling continuous training.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.