Back to Blog
2 min read

Implementing Feature Stores with Microsoft Fabric OneLake

Feature stores bridge the gap between data engineering and machine learning. Microsoft Fabric’s OneLake provides a unified foundation for building feature stores that serve both training and inference workloads.

Why Feature Stores Matter

Without a feature store, teams often duplicate feature engineering work across projects. Features computed for training may differ from inference, causing training-serving skew. A centralized feature store solves both problems.

Building on OneLake

Leverage Fabric’s lakehouse architecture for a scalable feature store:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, datediff, current_date
from delta.tables import DeltaTable
from datetime import datetime, timedelta

class FabricFeatureStore:
    def __init__(self, spark: SparkSession, catalog: str = "feature_store"):
        self.spark = spark
        self.catalog = catalog

    def register_feature_group(
        self,
        name: str,
        primary_key: list[str],
        event_time_column: str,
        description: str
    ):
        """Register a new feature group in the catalog."""
        self.spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {self.catalog}.feature_registry (
                feature_group STRING,
                primary_key ARRAY<STRING>,
                event_time_column STRING,
                description STRING,
                created_at TIMESTAMP,
                version INT
            ) USING DELTA
        """)

        self.spark.sql(f"""
            INSERT INTO {self.catalog}.feature_registry
            VALUES ('{name}', array{tuple(primary_key)}, '{event_time_column}',
                    '{description}', current_timestamp(), 1)
        """)

    def compute_and_store_features(
        self,
        feature_group: str,
        source_table: str,
        feature_definitions: dict
    ):
        """Compute features from source data and store in feature store."""
        source_df = self.spark.table(source_table)

        # Apply feature transformations
        features_df = source_df.select(
            *[col(pk) for pk in feature_definitions["primary_key"]],
            col(feature_definitions["event_time"]),
            *[
                expr.alias(name)
                for name, expr in feature_definitions["features"].items()
            ]
        )

        # Write to feature store with Delta Lake
        features_df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(f"{self.catalog}.{feature_group}")

        return features_df

    def get_training_data(
        self,
        feature_groups: list[str],
        entity_df,
        point_in_time_column: str
    ):
        """Join features for training with point-in-time correctness."""
        result_df = entity_df

        for fg in feature_groups:
            features_df = self.spark.table(f"{self.catalog}.{fg}")
            registry = self.spark.table(f"{self.catalog}.feature_registry") \
                .filter(col("feature_group") == fg).first()

            # Point-in-time join to prevent data leakage
            result_df = result_df.join(
                features_df,
                on=registry.primary_key,
                how="left"
            ).filter(
                col(registry.event_time_column) <= col(point_in_time_column)
            )

        return result_df

    def get_online_features(self, feature_group: str, entity_keys: dict):
        """Retrieve latest features for online inference."""
        features_df = self.spark.table(f"{self.catalog}.{feature_group}")

        filter_condition = " AND ".join([
            f"{k} = '{v}'" for k, v in entity_keys.items()
        ])

        return features_df.filter(filter_condition) \
            .orderBy(col("event_time").desc()) \
            .limit(1) \
            .collect()[0].asDict()

# Example: Customer features
feature_store = FabricFeatureStore(spark)
feature_store.register_feature_group(
    name="customer_features",
    primary_key=["customer_id"],
    event_time_column="computed_at",
    description="Customer behavior features for churn prediction"
)

Feature Freshness

Configure Fabric Data Factory pipelines to refresh features on schedules matching business requirements. Critical features may need hourly updates while others refresh daily.

A well-designed feature store accelerates ML development by enabling feature reuse across teams and ensuring consistency between training and production environments.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.