Back to Blog
5 min read

Delta Lake in Microsoft Fabric: ACID Transactions for Data Lakes

Delta Lake is the default table format in Microsoft Fabric, providing ACID transactions, time travel, and schema enforcement. Today, I will explore Delta Lake features and how to leverage them effectively.

What is Delta Lake?

Delta Lake adds reliability to data lakes through:

┌─────────────────────────────────────────────────────┐
│                    Delta Lake                        │
├─────────────────────────────────────────────────────┤
│                                                      │
│  ┌─────────────────────────────────────────────────┐│
│  │              Delta Table                         ││
│  │  ┌───────────────────────────────────────────┐  ││
│  │  │           Transaction Log                 │  ││
│  │  │  (_delta_log/00000.json)                  │  ││
│  │  │  (_delta_log/00001.json)                  │  ││
│  │  │  (_delta_log/...)                         │  ││
│  │  └───────────────────────────────────────────┘  ││
│  │                      │                          ││
│  │                      ▼                          ││
│  │  ┌───────────────────────────────────────────┐  ││
│  │  │           Parquet Files                   │  ││
│  │  │  (part-00000.parquet)                     │  ││
│  │  │  (part-00001.parquet)                     │  ││
│  │  │  (...)                                    │  ││
│  │  └───────────────────────────────────────────┘  ││
│  └─────────────────────────────────────────────────┘│
│                                                      │
│  Features:                                           │
│  - ACID Transactions                                │
│  - Time Travel                                       │
│  - Schema Evolution                                  │
│  - Unified Batch/Streaming                          │
│                                                      │
└─────────────────────────────────────────────────────┘

Creating Delta Tables

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, DateType

# Define schema
schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("product_id", StringType(), False),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DecimalType(18, 2), True),
    StructField("order_date", DateType(), True)
])

# Create empty Delta table with schema
spark.createDataFrame([], schema) \
    .write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("orders")

# Or create with SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS orders (
        order_id STRING NOT NULL,
        customer_id STRING NOT NULL,
        product_id STRING NOT NULL,
        quantity INT,
        unit_price DECIMAL(18, 2),
        order_date DATE
    )
    USING DELTA
    PARTITIONED BY (order_date)
""")

ACID Transactions

Atomic Writes

from delta.tables import DeltaTable

# All-or-nothing write operation
try:
    df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("orders")
    print("Transaction committed successfully")
except Exception as e:
    print(f"Transaction failed and rolled back: {e}")

MERGE (Upsert)

# MERGE operation - atomically update or insert
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

# Source data with updates
updates_df = spark.read.format("delta").table("staging_orders")

# Target Delta table
target_table = DeltaTable.forName(spark, "orders")

# Perform merge
target_table.alias("target") \
    .merge(
        updates_df.alias("source"),
        "target.order_id = source.order_id"
    ) \
    .whenMatchedUpdate(set={
        "quantity": "source.quantity",
        "unit_price": "source.unit_price",
        "updated_at": "current_timestamp()"
    }) \
    .whenNotMatchedInsert(values={
        "order_id": "source.order_id",
        "customer_id": "source.customer_id",
        "product_id": "source.product_id",
        "quantity": "source.quantity",
        "unit_price": "source.unit_price",
        "order_date": "source.order_date",
        "created_at": "current_timestamp()",
        "updated_at": "current_timestamp()"
    }) \
    .execute()

print("Merge completed")

Conditional Updates

# Update with conditions
target_table.alias("target") \
    .merge(
        updates_df.alias("source"),
        "target.order_id = source.order_id"
    ) \
    .whenMatchedUpdate(
        condition="source.quantity > target.quantity",  # Only update if quantity increased
        set={
            "quantity": "source.quantity",
            "updated_at": "current_timestamp()"
        }
    ) \
    .whenMatchedDelete(
        condition="source.is_cancelled = true"  # Delete cancelled orders
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

Time Travel

# View table history
orders_table = DeltaTable.forName(spark, "orders")
display(orders_table.history())

# Read specific version
df_v5 = spark.read \
    .format("delta") \
    .option("versionAsOf", 5) \
    .table("orders")

# Read at specific timestamp
df_yesterday = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2023-06-01 10:00:00") \
    .table("orders")

# Compare versions
df_current = spark.read.format("delta").table("orders")
df_previous = spark.read.format("delta").option("versionAsOf", 10).table("orders")

# Find changes
new_orders = df_current.subtract(df_previous)
deleted_orders = df_previous.subtract(df_current)

print(f"New orders: {new_orders.count()}")
print(f"Deleted orders: {deleted_orders.count()}")

Restore to Previous Version

# Restore table to previous version
orders_table = DeltaTable.forName(spark, "orders")

# Restore to version 5
orders_table.restoreToVersion(5)

# Or restore to timestamp
orders_table.restoreToTimestamp("2023-06-01 00:00:00")

Schema Evolution

# Enable automatic schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Or per-write schema merge
df_with_new_columns.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("orders")

# Add column manually
spark.sql("ALTER TABLE orders ADD COLUMN discount DECIMAL(5,2)")

# Change column type (with care)
spark.sql("ALTER TABLE orders ALTER COLUMN quantity TYPE BIGINT")

Change Data Capture (CDC)

# Enable Change Data Feed
spark.sql("""
    ALTER TABLE orders
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read changes since version
changes_df = spark.read \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 10) \
    .table("orders")

# Changes include _change_type column:
# - insert
# - update_preimage (before update)
# - update_postimage (after update)
# - delete

# Process only inserts and updates
new_and_updated = changes_df.filter(
    col("_change_type").isin(["insert", "update_postimage"])
)

display(new_and_updated)

Optimization

OPTIMIZE and Z-ORDER

# Compact small files
spark.sql("OPTIMIZE orders")

# Z-order for query optimization
# Place related data together for faster filtering
spark.sql("OPTIMIZE orders ZORDER BY (customer_id, product_id)")

# Optimize specific partitions
spark.sql("OPTIMIZE orders WHERE order_date >= '2023-06-01'")

VACUUM

# Remove old files (default 7 days retention)
spark.sql("VACUUM orders")

# Remove files older than 24 hours (use with caution)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.sql("VACUUM orders RETAIN 24 HOURS")

Statistics

# Compute statistics for query optimization
spark.sql("ANALYZE TABLE orders COMPUTE STATISTICS FOR ALL COLUMNS")

# View table details
spark.sql("DESCRIBE DETAIL orders").show(truncate=False)

Streaming with Delta

# Read stream from Delta table
stream_df = spark.readStream \
    .format("delta") \
    .table("orders")

# Process stream
processed_stream = stream_df \
    .withColumn("total_amount", col("quantity") * col("unit_price"))

# Write stream to another Delta table
query = processed_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/orders_processed") \
    .toTable("orders_processed")

# Using Change Data Feed for streaming
cdf_stream = spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .table("orders")

Best Practices

best_practices = {
    "partitioning": [
        "Partition by date for time-series data",
        "Avoid too many partitions (< 10,000)",
        "Partition column should be low cardinality"
    ],
    "optimization": [
        "Run OPTIMIZE regularly (daily for active tables)",
        "Use Z-ORDER for common filter columns",
        "Set appropriate retention for VACUUM"
    ],
    "schema": [
        "Define NOT NULL constraints where applicable",
        "Use appropriate data types",
        "Document schema changes"
    ],
    "operations": [
        "Use MERGE for upserts instead of delete+insert",
        "Enable Change Data Feed for CDC scenarios",
        "Use time travel for debugging and auditing"
    ]
}

Delta Lake provides the reliability foundation for Fabric Lakehouses. Tomorrow, I will cover Shortcuts in OneLake.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.