Back to Blog
3 min read

Building Data Lakehouses: Medallion Architecture Best Practices

The medallion architecture organizes data into bronze, silver, and gold layers, progressively refining raw data into analytics-ready assets. Implementing this pattern correctly in Microsoft Fabric ensures data quality and maintainability.

Understanding the Layers

Each layer serves a specific purpose in the data refinement pipeline. Bronze captures raw data, silver cleans and conforms it, and gold delivers business-ready datasets.

Bronze Layer: Raw Ingestion

Capture data with minimal transformation, preserving the original state:

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit
from delta.tables import DeltaTable

class BronzeIngestion:
    def __init__(self, spark: SparkSession, source_name: str):
        self.spark = spark
        self.source_name = source_name
        self.bronze_path = f"Tables/bronze/{source_name}"

    def ingest_batch(self, source_path: str, file_format: str = "json"):
        """Ingest raw data with metadata preservation."""

        raw_df = self.spark.read \
            .format(file_format) \
            .option("inferSchema", "true") \
            .load(source_path)

        # Add ingestion metadata
        enriched_df = raw_df \
            .withColumn("_ingestion_timestamp", current_timestamp()) \
            .withColumn("_source_file", input_file_name()) \
            .withColumn("_source_system", lit(self.source_name)) \
            .withColumn("_batch_id", lit(self.generate_batch_id()))

        # Append to bronze table
        enriched_df.write \
            .format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .save(self.bronze_path)

        return enriched_df.count()

    def ingest_stream(self, source_config: dict):
        """Stream data continuously to bronze layer."""

        stream_df = self.spark.readStream \
            .format(source_config["format"]) \
            .options(**source_config["options"]) \
            .load()

        # Add streaming metadata
        enriched_df = stream_df \
            .withColumn("_ingestion_timestamp", current_timestamp()) \
            .withColumn("_source_system", lit(self.source_name))

        query = enriched_df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", f"/checkpoints/bronze/{self.source_name}") \
            .trigger(processingTime="1 minute") \
            .start(self.bronze_path)

        return query

Silver Layer: Cleansed Data

Apply data quality rules and business transformations:

class SilverTransformation:
    def __init__(self, spark: SparkSession):
        self.spark = spark

    def transform_customers(self):
        """Transform customer data from bronze to silver."""

        bronze_df = self.spark.table("bronze.customers")

        # Get only new/updated records since last run
        last_processed = self.get_watermark("silver.customers")
        incremental_df = bronze_df.filter(
            col("_ingestion_timestamp") > last_processed
        )

        # Apply cleaning rules
        cleaned_df = incremental_df \
            .dropDuplicates(["customer_id"]) \
            .filter(col("email").isNotNull()) \
            .withColumn("email", lower(trim(col("email")))) \
            .withColumn("phone", self.standardize_phone(col("phone"))) \
            .withColumn("created_date", to_date(col("created_date"))) \
            .withColumn("_silver_timestamp", current_timestamp())

        # Validate data quality
        valid_df = cleaned_df.filter(
            col("email").rlike(r'^[\w\.-]+@[\w\.-]+\.\w+$')
        )

        # Quarantine invalid records
        invalid_df = cleaned_df.subtract(valid_df)
        self.quarantine_records(invalid_df, "customers")

        # Merge into silver table
        silver_table = DeltaTable.forName(self.spark, "silver.customers")

        silver_table.alias("target").merge(
            valid_df.alias("source"),
            "target.customer_id = source.customer_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

        self.update_watermark("silver.customers")
        return valid_df.count()

Gold Layer: Business Aggregates

Create analytics-ready datasets optimized for consumption:

class GoldAggregation:
    def __init__(self, spark: SparkSession):
        self.spark = spark

    def build_customer_360(self):
        """Build comprehensive customer view for analytics."""

        customers = self.spark.table("silver.customers")
        orders = self.spark.table("silver.orders")
        interactions = self.spark.table("silver.customer_interactions")

        # Customer metrics
        customer_metrics = orders \
            .groupBy("customer_id") \
            .agg(
                count("order_id").alias("total_orders"),
                sum("amount").alias("lifetime_value"),
                avg("amount").alias("avg_order_value"),
                max("order_date").alias("last_order_date"),
                min("order_date").alias("first_order_date")
            )

        # Build 360 view
        customer_360 = customers \
            .join(customer_metrics, "customer_id", "left") \
            .withColumn("customer_tenure_days",
                datediff(current_date(), col("first_order_date"))) \
            .withColumn("days_since_last_order",
                datediff(current_date(), col("last_order_date"))) \
            .withColumn("_gold_timestamp", current_timestamp())

        customer_360.write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable("gold.customer_360")

The medallion architecture provides clear data lineage and quality gates. Each layer can be tested and validated independently, making the entire pipeline more maintainable and reliable.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.