Back to Blog
2 min read

Microsoft Fabric Lakehouse: Medallion Architecture Implementation

The medallion architecture organizes data into bronze, silver, and gold layers, progressively refining raw data into business-ready assets. Microsoft Fabric Lakehouse provides the ideal platform for implementing this pattern.

Understanding the Layers

Bronze holds raw ingested data. Silver contains cleansed and conformed data. Gold presents business-level aggregates ready for consumption.

Implementing Bronze Layer

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit
from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

def ingest_to_bronze(
    source_path: str,
    bronze_table: str,
    source_system: str
):
    """Ingest raw data to bronze layer with metadata."""

    # Read raw data
    df = spark.read.format("json").load(source_path)

    # Add ingestion metadata
    df_with_metadata = df \
        .withColumn("_ingestion_timestamp", current_timestamp()) \
        .withColumn("_source_file", input_file_name()) \
        .withColumn("_source_system", lit(source_system))

    # Write to bronze (append mode preserves history)
    df_with_metadata.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable(bronze_table)

    return df_with_metadata.count()

# Ingest sales data
records = ingest_to_bronze(
    source_path="abfss://raw@storage.dfs.core.windows.net/sales/2025/09/",
    bronze_table="bronze.sales_raw",
    source_system="pos_system"
)
print(f"Ingested {records} records to bronze")

Implementing Silver Layer

from pyspark.sql.functions import col, when, trim, upper, to_date

def transform_to_silver(bronze_table: str, silver_table: str):
    """Transform bronze data to silver with cleansing rules."""

    bronze_df = spark.table(bronze_table)

    # Apply cleansing transformations
    silver_df = bronze_df \
        .filter(col("order_id").isNotNull()) \
        .withColumn("customer_name", trim(upper(col("customer_name")))) \
        .withColumn("order_date", to_date(col("order_date_str"), "yyyy-MM-dd")) \
        .withColumn("amount", col("amount").cast("decimal(18,2)")) \
        .withColumn("status",
            when(col("status").isin("complete", "completed"), "COMPLETED")
            .when(col("status").isin("cancel", "cancelled"), "CANCELLED")
            .otherwise(upper(col("status")))
        ) \
        .drop("order_date_str") \
        .dropDuplicates(["order_id"])

    # Merge into silver (upsert pattern)
    if DeltaTable.isDeltaTable(spark, f"Tables/{silver_table}"):
        delta_table = DeltaTable.forName(spark, silver_table)
        delta_table.alias("target").merge(
            silver_df.alias("source"),
            "target.order_id = source.order_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        silver_df.write.format("delta").saveAsTable(silver_table)

transform_to_silver("bronze.sales_raw", "silver.sales_cleansed")

Implementing Gold Layer

def build_gold_aggregates(silver_table: str, gold_table: str):
    """Build business-level aggregates for gold layer."""

    silver_df = spark.table(silver_table)

    gold_df = silver_df \
        .filter(col("status") == "COMPLETED") \
        .groupBy("order_date", "region", "product_category") \
        .agg(
            count("order_id").alias("order_count"),
            sum("amount").alias("total_revenue"),
            avg("amount").alias("avg_order_value"),
            countDistinct("customer_id").alias("unique_customers")
        )

    gold_df.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(gold_table)

The medallion architecture provides clear data lineage, enables incremental processing, and separates concerns between data engineering and analytics teams.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.