Skip to content
Back to Blog
1 min read

Mastering Delta Lake MERGE Operations for Upserts and SCD

I wrote “2021-07-02-delta-lake-merge-operations” to share practical, production-minded guidance on this topic.

Basic MERGE Syntax

The MERGE statement combines INSERT, UPDATE, and DELETE operations in a single atomic transaction:

from delta.tables import DeltaTable
from pyspark.sql.functions import *

# Load the target Delta table
target_table = DeltaTable.forPath(spark, "/delta/customers")

# Source data with updates
source_df = spark.read.parquet("/staging/customer_updates")

# Perform MERGE operation
target_table.alias("target").merge(
    source_df.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "email": "source.email",
    "updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values={
    "customer_id": "source.customer_id",
    "name": "source.name",
    "email": "source.email",
    "created_at": "current_timestamp()",
    "updated_at": "current_timestamp()"
}).execute()

Conditional MERGE with Multiple Conditions

Handle complex business logic with conditional updates:

from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, lit

target_table = DeltaTable.forPath(spark, "/delta/products")

target_table.alias("t").merge(
    source_df.alias("s"),
    "t.product_id = s.product_id"
).whenMatchedUpdate(
    condition="s.price > t.price",
    set={
        "price": "s.price",
        "price_increased_at": "current_timestamp()",
        "price_change_count": "t.price_change_count + 1"
    }
).whenMatchedUpdate(
    condition="s.price < t.price",
    set={
        "price": "s.price",
        "price_decreased_at": "current_timestamp()",
        "discount_flag": "true"
    }
).whenMatchedDelete(
    condition="s.is_discontinued = true"
).whenNotMatchedInsert(
    condition="s.is_active = true",
    values={
        "product_id": "s.product_id",
        "name": "s.name",
        "price": "s.price",
        "created_at": "current_timestamp()"
    }
).execute()

Implementing SCD Type 2

Slowly Changing Dimension Type 2 maintains history by creating new records:

from pyspark.sql.functions import *
from delta.tables import DeltaTable

# Define the SCD Type 2 merge logic
def merge_scd2(target_path, source_df, key_columns, tracked_columns):
    target_table = DeltaTable.forPath(spark, target_path)

    # Prepare source with hash for change detection
    source_with_hash = source_df.withColumn(
        "row_hash",
        sha2(concat_ws("||", *[col(c) for c in tracked_columns]), 256)
    )

    # Create staging view for new records
    staged_updates = source_with_hash.alias("s").join(
        target_table.toDF().alias("t"),
        [col(f"s.{k}") == col(f"t.{k}") for k in key_columns],
        "left"
    ).filter(
        (col("t.is_current") == True) &
        (col("t.row_hash") != col("s.row_hash"))
    ).select("s.*")

    # Close existing records
    target_table.alias("t").merge(
        staged_updates.alias("u"),
        " AND ".join([f"t.{k} = u.{k}" for k in key_columns]) +
        " AND t.is_current = true"
    ).whenMatchedUpdate(set={
        "is_current": "false",
        "end_date": "current_timestamp()"
    }).execute()

    # Insert new current records
    new_records = staged_updates.withColumn("is_current", lit(True)) \
        .withColumn("start_date", current_timestamp()) \
        .withColumn("end_date", lit(None).cast("timestamp"))

    new_records.write.format("delta").mode("append").save(target_path)

# Usage
merge_scd2(
    target_path="/delta/dim_customer",
    source_df=customer_updates,
    key_columns=["customer_id"],
    tracked_columns=["name", "email", "address", "phone"]
)

Change Data Capture (CDC) Processing

Process CDC events from sources like Debezium or Azure Data Factory:

from pyspark.sql.functions import *
from delta.tables import DeltaTable

def process_cdc_events(target_path, cdc_df):
    """
    Process CDC events with operation types:
    'I' = Insert, 'U' = Update, 'D' = Delete
    """
    target_table = DeltaTable.forPath(spark, target_path)

    # Get the latest event per key
    windowed = cdc_df.withColumn(
        "row_num",
        row_number().over(
            Window.partitionBy("key").orderBy(col("timestamp").desc())
        )
    ).filter(col("row_num") == 1)

    # Separate by operation type
    inserts = windowed.filter(col("operation") == "I")
    updates = windowed.filter(col("operation") == "U")
    deletes = windowed.filter(col("operation") == "D")

    # Merge updates and inserts
    upserts = inserts.union(updates)

    target_table.alias("t").merge(
        upserts.alias("s"),
        "t.id = s.key"
    ).whenMatchedUpdate(set={
        "value": "s.value",
        "updated_at": "s.timestamp"
    }).whenNotMatchedInsert(values={
        "id": "s.key",
        "value": "s.value",
        "created_at": "s.timestamp",
        "updated_at": "s.timestamp"
    }).execute()

    # Handle deletes separately
    target_table.alias("t").merge(
        deletes.alias("d"),
        "t.id = d.key"
    ).whenMatchedDelete().execute()

# Read CDC events from Event Hubs
cdc_events = spark.readStream \
    .format("eventhubs") \
    .options(**eventhub_conf) \
    .load() \
    .select(from_json(col("body").cast("string"), cdc_schema).alias("data")) \
    .select("data.*")

# Process in micro-batches
cdc_events.writeStream \
    .foreachBatch(lambda df, epoch: process_cdc_events("/delta/target", df)) \
    .outputMode("update") \
    .start()

Performance Optimization

Optimize MERGE performance with these techniques:

# 1. Partition pruning - filter source to relevant partitions
source_df = source_df.filter(col("date") >= "2021-07-01")

# 2. Z-Order for faster lookups
spark.sql("""
    OPTIMIZE delta.`/delta/customers`
    ZORDER BY (customer_id)
""")

# 3. Use broadcast for small source tables
from pyspark.sql.functions import broadcast

target_table.alias("t").merge(
    broadcast(small_source_df).alias("s"),
    "t.id = s.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

# 4. Enable optimized writes
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

SQL-Based MERGE

You can also use SQL syntax for MERGE operations:

MERGE INTO delta.`/delta/inventory` AS target
USING staging_inventory AS source
ON target.sku = source.sku AND target.warehouse_id = source.warehouse_id
WHEN MATCHED AND source.quantity = 0 THEN DELETE
WHEN MATCHED THEN UPDATE SET
    target.quantity = source.quantity,
    target.last_updated = current_timestamp()
WHEN NOT MATCHED THEN INSERT (
    sku, warehouse_id, quantity, last_updated
) VALUES (
    source.sku, source.warehouse_id, source.quantity, current_timestamp()
)

Conclusion

Delta Lake MERGE operations are fundamental for building reliable data pipelines. Whether you are implementing simple upserts, complex SCD Type 2 patterns, or processing CDC events, the MERGE command provides a transactional and performant solution.

Remember to optimize your MERGE operations by leveraging partitioning, Z-Ordering, and broadcast joins for smaller source datasets. With these patterns, you can build robust data lakes that handle real-world data update scenarios effectively.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.