Microsoft Fabric Data Engineering: Best Practices After 6 Months in Production
We’ve been running production data engineering workloads in Microsoft Fabric for six months now. Here’s what we’ve learned about building reliable, performant pipelines in this new paradigm.
Notebook Development Patterns
Environment Management
Fabric environments let you package Python dependencies and Spark configurations. Always create a dedicated environment for production workloads:
# environment.yml
name: production-data-eng
dependencies:
- delta-spark==2.4.0
- great-expectations==0.18.0
- azure-identity==1.15.0
variables:
ENVIRONMENT: production
LOG_LEVEL: INFO
sparkConfiguration:
spark.sql.shuffle.partitions: 200
spark.databricks.delta.optimizeWrite.enabled: true
spark.databricks.delta.autoCompact.enabled: true
Modular Notebook Design
Don’t put everything in one notebook. Use the %run magic to build modular, testable code:
# utilities/transformations.py
def standardize_dates(df, date_columns):
"""Standardize date columns to consistent format."""
from pyspark.sql.functions import to_date, coalesce
for col_name in date_columns:
df = df.withColumn(
col_name,
coalesce(
to_date(col(col_name), "yyyy-MM-dd"),
to_date(col(col_name), "MM/dd/yyyy"),
to_date(col(col_name), "dd-MM-yyyy")
)
)
return df
def add_audit_columns(df):
"""Add standard audit columns."""
from pyspark.sql.functions import current_timestamp, lit
return df \
.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_system", lit("fabric_pipeline"))
# Main notebook
%run utilities/transformations
# Now use the functions
df = spark.read.format("delta").load("Tables/raw_sales")
df = standardize_dates(df, ["order_date", "ship_date"])
df = add_audit_columns(df)
Delta Lake Optimization
Table Maintenance
Delta tables need maintenance. Schedule regular optimization:
from delta.tables import DeltaTable
def optimize_table(table_name, z_order_columns=None):
"""Run OPTIMIZE and optionally Z-ORDER."""
delta_table = DeltaTable.forName(spark, table_name)
if z_order_columns:
delta_table.optimize().executeZOrderBy(z_order_columns)
else:
delta_table.optimize().executeCompaction()
# Vacuum old files (retain 7 days for time travel)
delta_table.vacuum(168) # hours
# Schedule weekly
optimize_table("sales_curated", z_order_columns=["customer_id", "order_date"])
Partitioning Strategy
Partition by columns you frequently filter on:
# Good: Partition by date for time-range queries
df.write \
.format("delta") \
.partitionBy("year", "month") \
.mode("overwrite") \
.saveAsTable("Tables/sales_partitioned")
# Query efficiency
spark.sql("""
SELECT * FROM sales_partitioned
WHERE year = 2024 AND month = 1
""") # Only reads January 2024 partition
Avoid over-partitioning. If partitions have < 1GB of data, consolidate.
Pipeline Orchestration
Use Data Pipelines, Not Just Notebooks
While notebooks can call notebooks, Data Pipelines provide:
- Retry logic
- Alerting
- Parameter passing
- Dependency management
- Monitoring dashboard
{
"name": "DailyIngestion",
"activities": [
{
"name": "Extract",
"type": "Notebook",
"notebookReference": "notebooks/extract_source_data",
"parameters": {
"execution_date": "@pipeline().parameters.execution_date"
},
"retryPolicy": {
"count": 3,
"intervalInSeconds": 300
}
},
{
"name": "Transform",
"type": "Notebook",
"dependsOn": ["Extract"],
"notebookReference": "notebooks/transform_data",
"parameters": {
"execution_date": "@pipeline().parameters.execution_date"
}
}
]
}
Idempotent Pipelines
Design pipelines that can be safely re-run:
def upsert_to_lakehouse(source_df, target_table, key_columns):
"""Merge source into target using Delta MERGE."""
from delta.tables import DeltaTable
delta_table = DeltaTable.forName(spark, target_table)
merge_condition = " AND ".join([f"target.{k} = source.{k}" for k in key_columns])
delta_table.alias("target") \
.merge(source_df.alias("source"), merge_condition) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# This can be run multiple times safely
upsert_to_lakehouse(sales_df, "Tables/dim_customer", ["customer_id"])
Error Handling and Monitoring
Structured Logging
Use consistent logging for pipeline monitoring:
import logging
from datetime import datetime
class PipelineLogger:
def __init__(self, pipeline_name):
self.pipeline_name = pipeline_name
self.run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
self.logger = logging.getLogger(pipeline_name)
def log_step(self, step_name, status, row_count=None, details=None):
log_record = {
"pipeline": self.pipeline_name,
"run_id": self.run_id,
"step": step_name,
"status": status,
"row_count": row_count,
"details": details,
"timestamp": datetime.utcnow().isoformat()
}
# Write to logging table
spark.createDataFrame([log_record]).write \
.format("delta") \
.mode("append") \
.saveAsTable("Tables/_pipeline_logs")
self.logger.info(str(log_record))
# Usage
logger = PipelineLogger("daily_sales_pipeline")
logger.log_step("extract", "started")
df = extract_sales_data()
logger.log_step("extract", "completed", row_count=df.count())
Data Quality Checks
Integrate quality checks into your pipelines:
def validate_dataframe(df, table_name, checks):
"""Run data quality checks and fail pipeline if critical issues found."""
results = []
for check in checks:
if check["type"] == "not_null":
null_count = df.filter(col(check["column"]).isNull()).count()
passed = null_count == 0
results.append({
"check": f"not_null_{check['column']}",
"passed": passed,
"details": f"{null_count} nulls found"
})
elif check["type"] == "unique":
total = df.count()
distinct = df.select(check["column"]).distinct().count()
passed = total == distinct
results.append({
"check": f"unique_{check['column']}",
"passed": passed,
"details": f"{total - distinct} duplicates found"
})
# Fail on critical check failures
critical_failures = [r for r in results if not r["passed"] and r.get("critical", True)]
if critical_failures:
raise ValueError(f"Data quality checks failed: {critical_failures}")
return results
# Define checks
checks = [
{"type": "not_null", "column": "customer_id", "critical": True},
{"type": "unique", "column": "order_id", "critical": True},
{"type": "not_null", "column": "email", "critical": False}
]
validate_dataframe(sales_df, "sales_curated", checks)
Performance Optimization
Broadcast Joins
For small dimension tables, use broadcast joins:
from pyspark.sql.functions import broadcast
# Small dimension table (< 10MB)
dim_product = spark.table("Tables/dim_product")
# Large fact table
fact_sales = spark.table("Tables/fact_sales")
# Broadcast the small table
result = fact_sales.join(
broadcast(dim_product),
"product_id"
)
Caching Strategy
Cache intermediate results used multiple times:
# Cache expensive intermediate result
enriched_df = enrich_with_customer_data(raw_df).cache()
# Use cached result multiple times
daily_summary = enriched_df.groupBy("date").agg(...)
monthly_summary = enriched_df.groupBy("month").agg(...)
customer_summary = enriched_df.groupBy("customer_id").agg(...)
# Unpersist when done
enriched_df.unpersist()
Capacity Management
Monitor CU Consumption
Track which workloads consume the most capacity:
# Query capacity metrics via Fabric REST APIs
import requests
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default")
headers = {"Authorization": f"Bearer {token.token}"}
# Get capacity metrics via Admin API
workspace_id = "your-workspace-id"
capacity_id = "your-capacity-id"
# Use Fabric Capacity Metrics app or REST API
url = f"https://api.fabric.microsoft.com/v1/admin/capacities/{capacity_id}/workloads"
response = requests.get(url, headers=headers)
metrics = response.json()
# Alternative: Use the Fabric Capacity Metrics Power BI app
# which provides detailed CU consumption analysis
print("For detailed metrics, use the Fabric Capacity Metrics app")
Workload Isolation
For critical workloads, consider separate workspaces with dedicated capacities. Don’t let ad-hoc exploration throttle production pipelines.
Conclusion
Microsoft Fabric Data Engineering is maturing rapidly. The patterns above reflect what works in production today. Key takeaways:
- Modularize: Small, testable notebooks with shared utilities
- Optimize Delta: Regular maintenance, appropriate partitioning
- Pipeline-first: Use Data Pipelines for orchestration, not notebook chaining
- Quality gates: Fail fast on data quality issues
- Monitor everything: Capacity, performance, data quality metrics
The platform is evolving monthly. Stay current with Fabric updates and adjust patterns as new features land.