Back to Blog
4 min read

Mastering Delta Lake MERGE Operations for Upserts and SCD

Introduction

Delta Lake MERGE operations provide a powerful way to handle upserts (update + insert), slowly changing dimensions (SCD), and change data capture (CDC) scenarios. Unlike traditional data lakes, Delta Lake supports ACID transactions, making it possible to perform complex data modifications reliably.

In this post, we will explore various MERGE patterns that are essential for modern data engineering.

Basic MERGE Syntax

The MERGE statement combines INSERT, UPDATE, and DELETE operations in a single atomic transaction:

from delta.tables import DeltaTable
from pyspark.sql.functions import *

# Load the target Delta table
target_table = DeltaTable.forPath(spark, "/delta/customers")

# Source data with updates
source_df = spark.read.parquet("/staging/customer_updates")

# Perform MERGE operation
target_table.alias("target").merge(
    source_df.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "email": "source.email",
    "updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values={
    "customer_id": "source.customer_id",
    "name": "source.name",
    "email": "source.email",
    "created_at": "current_timestamp()",
    "updated_at": "current_timestamp()"
}).execute()

Conditional MERGE with Multiple Conditions

Handle complex business logic with conditional updates:

from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, lit

target_table = DeltaTable.forPath(spark, "/delta/products")

target_table.alias("t").merge(
    source_df.alias("s"),
    "t.product_id = s.product_id"
).whenMatchedUpdate(
    condition="s.price > t.price",
    set={
        "price": "s.price",
        "price_increased_at": "current_timestamp()",
        "price_change_count": "t.price_change_count + 1"
    }
).whenMatchedUpdate(
    condition="s.price < t.price",
    set={
        "price": "s.price",
        "price_decreased_at": "current_timestamp()",
        "discount_flag": "true"
    }
).whenMatchedDelete(
    condition="s.is_discontinued = true"
).whenNotMatchedInsert(
    condition="s.is_active = true",
    values={
        "product_id": "s.product_id",
        "name": "s.name",
        "price": "s.price",
        "created_at": "current_timestamp()"
    }
).execute()

Implementing SCD Type 2

Slowly Changing Dimension Type 2 maintains history by creating new records:

from pyspark.sql.functions import *
from delta.tables import DeltaTable

# Define the SCD Type 2 merge logic
def merge_scd2(target_path, source_df, key_columns, tracked_columns):
    target_table = DeltaTable.forPath(spark, target_path)

    # Prepare source with hash for change detection
    source_with_hash = source_df.withColumn(
        "row_hash",
        sha2(concat_ws("||", *[col(c) for c in tracked_columns]), 256)
    )

    # Create staging view for new records
    staged_updates = source_with_hash.alias("s").join(
        target_table.toDF().alias("t"),
        [col(f"s.{k}") == col(f"t.{k}") for k in key_columns],
        "left"
    ).filter(
        (col("t.is_current") == True) &
        (col("t.row_hash") != col("s.row_hash"))
    ).select("s.*")

    # Close existing records
    target_table.alias("t").merge(
        staged_updates.alias("u"),
        " AND ".join([f"t.{k} = u.{k}" for k in key_columns]) +
        " AND t.is_current = true"
    ).whenMatchedUpdate(set={
        "is_current": "false",
        "end_date": "current_timestamp()"
    }).execute()

    # Insert new current records
    new_records = staged_updates.withColumn("is_current", lit(True)) \
        .withColumn("start_date", current_timestamp()) \
        .withColumn("end_date", lit(None).cast("timestamp"))

    new_records.write.format("delta").mode("append").save(target_path)

# Usage
merge_scd2(
    target_path="/delta/dim_customer",
    source_df=customer_updates,
    key_columns=["customer_id"],
    tracked_columns=["name", "email", "address", "phone"]
)

Change Data Capture (CDC) Processing

Process CDC events from sources like Debezium or Azure Data Factory:

from pyspark.sql.functions import *
from delta.tables import DeltaTable

def process_cdc_events(target_path, cdc_df):
    """
    Process CDC events with operation types:
    'I' = Insert, 'U' = Update, 'D' = Delete
    """
    target_table = DeltaTable.forPath(spark, target_path)

    # Get the latest event per key
    windowed = cdc_df.withColumn(
        "row_num",
        row_number().over(
            Window.partitionBy("key").orderBy(col("timestamp").desc())
        )
    ).filter(col("row_num") == 1)

    # Separate by operation type
    inserts = windowed.filter(col("operation") == "I")
    updates = windowed.filter(col("operation") == "U")
    deletes = windowed.filter(col("operation") == "D")

    # Merge updates and inserts
    upserts = inserts.union(updates)

    target_table.alias("t").merge(
        upserts.alias("s"),
        "t.id = s.key"
    ).whenMatchedUpdate(set={
        "value": "s.value",
        "updated_at": "s.timestamp"
    }).whenNotMatchedInsert(values={
        "id": "s.key",
        "value": "s.value",
        "created_at": "s.timestamp",
        "updated_at": "s.timestamp"
    }).execute()

    # Handle deletes separately
    target_table.alias("t").merge(
        deletes.alias("d"),
        "t.id = d.key"
    ).whenMatchedDelete().execute()

# Read CDC events from Event Hubs
cdc_events = spark.readStream \
    .format("eventhubs") \
    .options(**eventhub_conf) \
    .load() \
    .select(from_json(col("body").cast("string"), cdc_schema).alias("data")) \
    .select("data.*")

# Process in micro-batches
cdc_events.writeStream \
    .foreachBatch(lambda df, epoch: process_cdc_events("/delta/target", df)) \
    .outputMode("update") \
    .start()

Performance Optimization

Optimize MERGE performance with these techniques:

# 1. Partition pruning - filter source to relevant partitions
source_df = source_df.filter(col("date") >= "2021-07-01")

# 2. Z-Order for faster lookups
spark.sql("""
    OPTIMIZE delta.`/delta/customers`
    ZORDER BY (customer_id)
""")

# 3. Use broadcast for small source tables
from pyspark.sql.functions import broadcast

target_table.alias("t").merge(
    broadcast(small_source_df).alias("s"),
    "t.id = s.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

# 4. Enable optimized writes
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

SQL-Based MERGE

You can also use SQL syntax for MERGE operations:

MERGE INTO delta.`/delta/inventory` AS target
USING staging_inventory AS source
ON target.sku = source.sku AND target.warehouse_id = source.warehouse_id
WHEN MATCHED AND source.quantity = 0 THEN DELETE
WHEN MATCHED THEN UPDATE SET
    target.quantity = source.quantity,
    target.last_updated = current_timestamp()
WHEN NOT MATCHED THEN INSERT (
    sku, warehouse_id, quantity, last_updated
) VALUES (
    source.sku, source.warehouse_id, source.quantity, current_timestamp()
)

Conclusion

Delta Lake MERGE operations are fundamental for building reliable data pipelines. Whether you are implementing simple upserts, complex SCD Type 2 patterns, or processing CDC events, the MERGE command provides a transactional and performant solution.

Remember to optimize your MERGE operations by leveraging partitioning, Z-Ordering, and broadcast joins for smaller source datasets. With these patterns, you can build robust data lakes that handle real-world data update scenarios effectively.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.