Delta Lake Everywhere: The Universal Table Format
Delta Lake has become the de facto standard for lakehouse table formats. Microsoft Fabric’s native support for Delta Lake enables interoperability across platforms while providing ACID transactions, time travel, and schema evolution.
Why Delta Lake?
┌─────────────────────────────────────────────────────────────┐
│ Traditional Data Lake │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Parquet │ │ JSON │ │ CSV │ │ ORC │ │
│ │ Files │ │ Files │ │ Files │ │ Files │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │ │
│ └─────────────┴─────────────┴─────────────┘ │
│ │ │
│ No Transactions │
│ No Schema Enforcement │
│ No Time Travel │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Delta Lake │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Parquet Files + Delta Log │ │
│ │ │ │
│ │ data/ │ │
│ │ ├── part-00000.parquet │ │
│ │ ├── part-00001.parquet │ │
│ │ └── _delta_log/ │ │
│ │ ├── 00000000000000000000.json │ │
│ │ ├── 00000000000000000001.json │ │
│ │ └── 00000000000000000002.checkpoint.parquet │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ACID Transactions │
│ Schema Evolution │
│ Time Travel │
│ Audit History │
└─────────────────────────────────────────────────────────────┘
Delta Lake in Microsoft Fabric
Native Delta Support
# All Fabric lakehouses use Delta Lake natively
# Create Delta table
df = spark.createDataFrame([
(1, "Alice", "Engineering"),
(2, "Bob", "Sales"),
(3, "Carol", "Marketing")
], ["id", "name", "department"])
# Save as Delta table
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("employees")
# Or save to Files folder
df.write.format("delta") \
.mode("overwrite") \
.save("Files/raw/employees")
# Verify it's Delta
from delta.tables import DeltaTable
print(DeltaTable.isDeltaTable(spark, "Tables/employees")) # True
Schema Evolution
from delta.tables import DeltaTable
from pyspark.sql.types import *
class DeltaSchemaManager:
"""Manage Delta Lake schema evolution."""
def __init__(self, spark, table_path: str):
self.spark = spark
self.table_path = table_path
self.delta_table = DeltaTable.forPath(spark, table_path)
def get_current_schema(self) -> StructType:
"""Get current table schema."""
return self.delta_table.toDF().schema
def add_column(self, column_name: str, data_type: DataType):
"""Add new column to schema."""
# Delta handles this automatically with mergeSchema
df_with_new_col = self.delta_table.toDF().withColumn(
column_name,
lit(None).cast(data_type)
)
df_with_new_col.write.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.save(self.table_path)
def evolve_schema_on_write(self, new_df):
"""Write data with schema evolution enabled."""
new_df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(self.table_path)
def overwrite_schema(self, new_df):
"""Completely replace schema (use with caution)."""
new_df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save(self.table_path)
# Usage
schema_manager = DeltaSchemaManager(spark, "Tables/employees")
# Check current schema
current = schema_manager.get_current_schema()
print("Current columns:", [f.name for f in current.fields])
# Add new data with additional column
new_employees = spark.createDataFrame([
(4, "David", "Finance", "2024-08-14")
], ["id", "name", "department", "hire_date"])
schema_manager.evolve_schema_on_write(new_employees)
Time Travel
class DeltaTimeTravel:
"""Time travel operations for Delta tables."""
def __init__(self, spark, table_path: str):
self.spark = spark
self.table_path = table_path
def read_at_version(self, version: int):
"""Read table at specific version."""
return self.spark.read.format("delta") \
.option("versionAsOf", version) \
.load(self.table_path)
def read_at_timestamp(self, timestamp: str):
"""Read table at specific timestamp."""
return self.spark.read.format("delta") \
.option("timestampAsOf", timestamp) \
.load(self.table_path)
def get_history(self, limit: int = 10):
"""Get table version history."""
delta_table = DeltaTable.forPath(self.spark, self.table_path)
return delta_table.history(limit)
def restore_to_version(self, version: int):
"""Restore table to previous version."""
delta_table = DeltaTable.forPath(self.spark, self.table_path)
delta_table.restoreToVersion(version)
def restore_to_timestamp(self, timestamp: str):
"""Restore table to timestamp."""
delta_table = DeltaTable.forPath(self.spark, self.table_path)
delta_table.restoreToTimestamp(timestamp)
def compare_versions(self, version1: int, version2: int):
"""Compare data between two versions."""
df1 = self.read_at_version(version1)
df2 = self.read_at_version(version2)
added = df2.exceptAll(df1)
removed = df1.exceptAll(df2)
return {
"added_rows": added.count(),
"removed_rows": removed.count(),
"added": added,
"removed": removed
}
# Usage
time_travel = DeltaTimeTravel(spark, "Tables/employees")
# View history
history = time_travel.get_history()
display(history.select("version", "timestamp", "operation", "operationParameters"))
# Read previous version
old_data = time_travel.read_at_version(2)
# Compare versions
changes = time_travel.compare_versions(1, 3)
print(f"Added: {changes['added_rows']}, Removed: {changes['removed_rows']}")
# Restore if needed
time_travel.restore_to_version(2)
ACID Transactions
Concurrent Writes
from delta.tables import DeltaTable
class ConcurrentWriteHandler:
"""Handle concurrent writes with Delta Lake."""
def __init__(self, spark, table_path: str):
self.spark = spark
self.table_path = table_path
def upsert_with_merge(self, updates_df, key_columns: list[str]):
"""Upsert data using MERGE (handles concurrent writes)."""
delta_table = DeltaTable.forPath(self.spark, self.table_path)
# Build merge condition
merge_condition = " AND ".join([
f"target.{col} = updates.{col}" for col in key_columns
])
# Execute merge
delta_table.alias("target").merge(
updates_df.alias("updates"),
merge_condition
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
def conditional_update(
self,
updates_df,
key_columns: list[str],
update_condition: str
):
"""Update only if condition is met."""
delta_table = DeltaTable.forPath(self.spark, self.table_path)
merge_condition = " AND ".join([
f"target.{col} = updates.{col}" for col in key_columns
])
delta_table.alias("target").merge(
updates_df.alias("updates"),
merge_condition
).whenMatchedUpdate(
condition=update_condition,
set={"*": "updates.*"}
).whenNotMatchedInsertAll() \
.execute()
def delete_with_condition(self, condition: str):
"""Delete rows matching condition."""
delta_table = DeltaTable.forPath(self.spark, self.table_path)
delta_table.delete(condition)
def update_column(self, condition: str, column: str, value):
"""Update specific column value."""
delta_table = DeltaTable.forPath(self.spark, self.table_path)
delta_table.update(
condition=condition,
set={column: value}
)
# Usage
handler = ConcurrentWriteHandler(spark, "Tables/orders")
# Upsert new orders
new_orders = spark.createDataFrame([
(1001, "2024-08-14", "customer_1", 150.00, "pending"),
(1002, "2024-08-14", "customer_2", 200.00, "pending")
], ["order_id", "order_date", "customer_id", "amount", "status"])
handler.upsert_with_merge(new_orders, key_columns=["order_id"])
# Conditional update: only update if status is 'pending'
updated_orders = spark.createDataFrame([
(1001, "2024-08-14", "customer_1", 160.00, "confirmed")
], ["order_id", "order_date", "customer_id", "amount", "status"])
handler.conditional_update(
updated_orders,
key_columns=["order_id"],
update_condition="target.status = 'pending'"
)
Delta Lake Features
Change Data Feed
# Enable Change Data Feed on table
spark.sql("""
ALTER TABLE orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read changes since version
class DeltaCDC:
"""Change Data Feed operations."""
def __init__(self, spark, table_path: str):
self.spark = spark
self.table_path = table_path
def get_changes_since_version(self, start_version: int):
"""Get changes since specific version."""
return self.spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", start_version) \
.load(self.table_path)
def get_changes_in_range(
self,
start_version: int,
end_version: int
):
"""Get changes between versions."""
return self.spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", start_version) \
.option("endingVersion", end_version) \
.load(self.table_path)
def get_changes_since_timestamp(self, timestamp: str):
"""Get changes since timestamp."""
return self.spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", timestamp) \
.load(self.table_path)
def process_changes(self, changes_df):
"""Process change data feed."""
# _change_type can be: insert, update_preimage, update_postimage, delete
inserts = changes_df.filter("_change_type = 'insert'")
updates = changes_df.filter("_change_type = 'update_postimage'")
deletes = changes_df.filter("_change_type = 'delete'")
return {
"inserts": inserts,
"updates": updates,
"deletes": deletes,
"insert_count": inserts.count(),
"update_count": updates.count(),
"delete_count": deletes.count()
}
# Usage
cdc = DeltaCDC(spark, "Tables/orders")
changes = cdc.get_changes_since_version(5)
processed = cdc.process_changes(changes)
print(f"Inserts: {processed['insert_count']}")
print(f"Updates: {processed['update_count']}")
print(f"Deletes: {processed['delete_count']}")
Deletion Vectors
# Deletion Vectors provide faster deletes without rewriting files
# Enable deletion vectors
spark.sql("""
ALTER TABLE large_table
SET TBLPROPERTIES (
'delta.enableDeletionVectors' = 'true'
)
""")
# Deletes now mark rows as deleted without rewriting
# Much faster for tables with many files
# Configure deletion vector behavior
deletion_vector_config = {
"delta.enableDeletionVectors": "true",
"delta.targetFileSize": "134217728", # 128MB
"delta.tuneFileSizesForRewrites": "true"
}
for key, value in deletion_vector_config.items():
spark.sql(f"ALTER TABLE large_table SET TBLPROPERTIES ('{key}' = '{value}')")
Liquid Clustering
# Liquid Clustering replaces Z-Order for better performance
# Available in newer Delta Lake versions
# Create table with liquid clustering
spark.sql("""
CREATE TABLE sales_clustered (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2),
region STRING
)
USING DELTA
CLUSTER BY (region, order_date)
""")
# Clustering happens automatically during writes
# No need for manual OPTIMIZE ZORDER
# Or convert existing table
spark.sql("""
ALTER TABLE existing_table
CLUSTER BY (region, date)
""")
Cross-Platform Compatibility
Reading Delta from Different Platforms
# Databricks
df = spark.read.format("delta").load("abfss://container@storage.dfs.core.windows.net/delta/table")
# Synapse
df = spark.read.format("delta").load("abfss://container@storage.dfs.core.windows.net/delta/table")
# Fabric
df = spark.read.format("delta").table("lakehouse.table")
# Python (deltalake library)
import deltalake
dt = deltalake.DeltaTable("abfss://container@storage.dfs.core.windows.net/delta/table")
df = dt.to_pandas()
# Rust/Python (delta-rs)
from deltalake import DeltaTable
dt = DeltaTable("./path/to/delta/table")
print(dt.schema())
Version Compatibility
# Check and set Delta protocol versions
def check_protocol(table_path: str):
"""Check Delta protocol version."""
delta_table = DeltaTable.forPath(spark, table_path)
detail = delta_table.detail().first()
return {
"min_reader_version": detail.minReaderVersion,
"min_writer_version": detail.minWriterVersion,
"table_features": detail.tableFeatures if hasattr(detail, 'tableFeatures') else None
}
def set_compatible_protocol(table_path: str):
"""Set protocol for maximum compatibility."""
# Version 1/2 is most compatible
spark.sql(f"""
ALTER TABLE delta.`{table_path}`
SET TBLPROPERTIES (
'delta.minReaderVersion' = '1',
'delta.minWriterVersion' = '2'
)
""")
def upgrade_protocol(table_path: str, reader: int, writer: int):
"""Upgrade protocol version for new features."""
delta_table = DeltaTable.forPath(spark, table_path)
delta_table.upgradeTableProtocol(reader, writer)
# Check compatibility
protocol = check_protocol("Tables/my_table")
print(f"Reader: {protocol['min_reader_version']}, Writer: {protocol['min_writer_version']}")
Best Practices
Table Optimization
class DeltaOptimizer:
"""Best practices for Delta table optimization."""
def __init__(self, spark, table_path: str):
self.spark = spark
self.table_path = table_path
self.delta_table = DeltaTable.forPath(spark, table_path)
def compact_files(self):
"""Compact small files."""
self.delta_table.optimize().executeCompaction()
def zorder_optimize(self, columns: list[str]):
"""Optimize with Z-Order on specified columns."""
self.delta_table.optimize().executeZOrderBy(*columns)
def vacuum(self, retention_hours: int = 168):
"""Remove old files."""
self.delta_table.vacuum(retention_hours)
def analyze_stats(self):
"""Compute statistics for query optimization."""
self.spark.sql(f"ANALYZE TABLE delta.`{self.table_path}` COMPUTE STATISTICS")
def set_auto_optimize(self):
"""Enable automatic optimization."""
self.spark.sql(f"""
ALTER TABLE delta.`{self.table_path}`
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
def full_optimization_routine(self, zorder_columns: list[str] = None):
"""Run full optimization routine."""
# 1. Compact files
print("Compacting files...")
self.compact_files()
# 2. Z-Order if columns specified
if zorder_columns:
print(f"Z-Ordering on {zorder_columns}...")
self.zorder_optimize(zorder_columns)
# 3. Vacuum old files
print("Vacuuming old files...")
self.vacuum()
# 4. Compute statistics
print("Computing statistics...")
self.analyze_stats()
print("Optimization complete!")
# Usage
optimizer = DeltaOptimizer(spark, "Tables/sales")
optimizer.full_optimization_routine(zorder_columns=["region", "date"])
Conclusion
Delta Lake provides a reliable, performant foundation for modern data platforms. Its ACID transactions, time travel, and schema evolution capabilities make it ideal for both analytical and operational workloads.
In Microsoft Fabric, Delta Lake is the native format, ensuring compatibility with the broader Delta ecosystem while providing tight integration with Fabric services like Direct Lake and SQL endpoints.
Embrace Delta Lake as your standard table format across all your data platforms for maximum interoperability and data reliability.