2 min read
Data Lakehouse Architecture with Microsoft Fabric: Medallion Pattern Implementation
The medallion architecture (bronze, silver, gold) has become the standard pattern for organizing data lakehouses. Microsoft Fabric’s unified platform makes implementing this architecture straightforward. Here’s a complete implementation guide.
Setting Up the Lakehouse Structure
Organize your Fabric workspace with clear layer separation:
# Bronze Layer: Raw data ingestion
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder.getOrCreate()
# Ingest raw data preserving source format
def ingest_to_bronze(source_path: str, table_name: str):
df = spark.read.format("parquet").load(source_path)
# Add ingestion metadata
df_with_metadata = df.withColumn(
"_ingestion_timestamp", F.current_timestamp()
).withColumn(
"_source_file", F.input_file_name()
)
# Write to bronze layer
df_with_metadata.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable(f"bronze.{table_name}")
# Silver Layer: Cleaned and conformed data
def process_to_silver(bronze_table: str, silver_table: str):
bronze_df = spark.read.table(f"bronze.{bronze_table}")
silver_df = bronze_df \
.dropDuplicates(["id"]) \
.filter(F.col("id").isNotNull()) \
.withColumn("processed_date", F.to_date("date_string", "yyyy-MM-dd")) \
.withColumn("amount_decimal", F.col("amount").cast("decimal(18,2)")) \
.drop("_source_file")
# Merge for incremental updates
if spark.catalog.tableExists(f"silver.{silver_table}"):
delta_table = DeltaTable.forName(spark, f"silver.{silver_table}")
delta_table.alias("target").merge(
silver_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
silver_df.write.format("delta").saveAsTable(f"silver.{silver_table}")
Gold Layer: Business Aggregations
Create curated datasets optimized for specific use cases:
def create_gold_aggregation():
silver_df = spark.read.table("silver.transactions")
# Business-level aggregations
customer_metrics = silver_df.groupBy("customer_id").agg(
F.sum("amount_decimal").alias("total_spend"),
F.count("*").alias("transaction_count"),
F.avg("amount_decimal").alias("avg_transaction"),
F.max("processed_date").alias("last_transaction_date")
).withColumn(
"customer_segment",
F.when(F.col("total_spend") > 10000, "Premium")
.when(F.col("total_spend") > 1000, "Standard")
.otherwise("Basic")
)
customer_metrics.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("gold.customer_metrics")
Data Quality Enforcement
Use Delta Lake constraints and expectations to ensure data quality at each layer. Fabric’s Data Quality monitoring provides visibility into quality metrics across your entire lakehouse.