5 min read
Change Data Capture with Delta Live Tables
Change Data Capture (CDC) patterns in Delta Live Tables enable efficient processing of database changes. The APPLY CHANGES API handles the complexity of merging inserts, updates, and deletes into your lakehouse.
Understanding CDC in DLT
CDC feeds typically contain:
- Operation type (insert, update, delete)
- Timestamp of the change
- Complete or partial row data
DLT’s apply_changes function processes these into clean, current-state tables.
Basic CDC Implementation
import dlt
from pyspark.sql.functions import col, expr
# Ingest raw CDC data
@dlt.table(
comment="Raw CDC events from source database"
)
def raw_customers_cdc():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/schema/customers_cdc")
.load("/data/cdc/customers/")
)
# Define target table (created by apply_changes)
dlt.create_streaming_table("customers")
# Apply changes
dlt.apply_changes(
target="customers",
source="raw_customers_cdc",
keys=["customer_id"],
sequence_by=col("operation_timestamp"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_timestamp", "_rescued_data"]
)
CDC Schema Patterns
Operation in Separate Column
# CDC format: {customer_id, name, email, operation, op_timestamp}
dlt.apply_changes(
target="customers",
source="cdc_customers",
keys=["customer_id"],
sequence_by="op_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "op_timestamp"]
)
Debezium Format
# Debezium CDC format with before/after
@dlt.table
def debezium_customers_parsed():
return (
dlt.read_stream("raw_debezium_events")
.select(
col("after.customer_id").alias("customer_id"),
col("after.name").alias("name"),
col("after.email").alias("email"),
col("op").alias("operation"),
col("ts_ms").alias("operation_timestamp")
)
)
dlt.create_streaming_table("customers")
dlt.apply_changes(
target="customers",
source="debezium_customers_parsed",
keys=["customer_id"],
sequence_by="operation_timestamp",
apply_as_deletes=expr("operation = 'd'"),
except_column_list=["operation", "operation_timestamp"]
)
AWS DMS Format
# AWS DMS format with Op column
@dlt.table
def dms_orders_parsed():
return (
dlt.read_stream("raw_dms_events")
.withColumn("operation_type",
F.when(col("Op") == "I", "INSERT")
.when(col("Op") == "U", "UPDATE")
.when(col("Op") == "D", "DELETE")
)
)
dlt.create_streaming_table("orders")
dlt.apply_changes(
target="orders",
source="dms_orders_parsed",
keys=["order_id"],
sequence_by="timestamp",
apply_as_deletes=expr("operation_type = 'DELETE'"),
except_column_list=["Op", "operation_type", "timestamp"]
)
SCD Type 2 (History Tracking)
# Create streaming table with SCD Type 2
dlt.create_streaming_table(
"customers_history",
table_properties={
"pipelines.reset.allowed": "false" # Preserve history on full refresh
}
)
# Apply changes with SCD Type 2
dlt.apply_changes(
target="customers_history",
source="raw_customers_cdc",
keys=["customer_id"],
sequence_by="operation_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_timestamp"],
stored_as_scd_type=2 # Enable history tracking
)
# DLT automatically adds:
# - __START_AT: When this version became active
# - __END_AT: When this version was superseded (null for current)
Querying SCD Type 2
# Current records
@dlt.table
def customers_current():
return (
dlt.read("customers_history")
.filter("__END_AT IS NULL")
)
# Point-in-time query
@dlt.table
def customers_as_of_date():
as_of_date = "2022-03-01"
return (
dlt.read("customers_history")
.filter(f"__START_AT <= '{as_of_date}' AND (__END_AT > '{as_of_date}' OR __END_AT IS NULL)")
)
Handling Truncates
# Handle TRUNCATE operations
dlt.apply_changes(
target="products",
source="products_cdc",
keys=["product_id"],
sequence_by="change_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
apply_as_truncates=expr("operation = 'TRUNCATE'"),
except_column_list=["operation", "change_timestamp"]
)
Composite Keys
# Multiple columns as primary key
dlt.apply_changes(
target="order_items",
source="order_items_cdc",
keys=["order_id", "line_number"], # Composite key
sequence_by="change_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "change_timestamp"]
)
CDC from Multiple Sources
# Merge CDC from multiple databases
@dlt.table
def unified_customers_cdc():
east = (
dlt.read_stream("cdc_customers_east")
.withColumn("source_region", F.lit("EAST"))
)
west = (
dlt.read_stream("cdc_customers_west")
.withColumn("source_region", F.lit("WEST"))
)
return east.union(west)
dlt.create_streaming_table("customers_unified")
dlt.apply_changes(
target="customers_unified",
source="unified_customers_cdc",
keys=["customer_id", "source_region"], # Include region in key
sequence_by="operation_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_timestamp"]
)
Error Handling in CDC
# Validate CDC data before applying
@dlt.table
@dlt.expect_or_drop("valid_operation", "operation IN ('INSERT', 'UPDATE', 'DELETE')")
@dlt.expect_or_drop("has_timestamp", "operation_timestamp IS NOT NULL")
@dlt.expect_or_drop("has_key", "customer_id IS NOT NULL")
def validated_customers_cdc():
return dlt.read_stream("raw_customers_cdc")
# Apply changes to validated data
dlt.create_streaming_table("customers")
dlt.apply_changes(
target="customers",
source="validated_customers_cdc",
keys=["customer_id"],
sequence_by="operation_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_timestamp"]
)
Handling Late-Arriving Data
# CDC events may arrive out of order
# sequence_by ensures correct ordering
dlt.apply_changes(
target="orders",
source="orders_cdc",
keys=["order_id"],
sequence_by="change_sequence_number", # Use monotonic sequence
# If an older change arrives after a newer one, it's ignored
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "change_sequence_number"]
)
# For timestamp-based ordering, ensure timestamps are precise
dlt.apply_changes(
target="events",
source="events_cdc",
keys=["event_id"],
sequence_by="event_timestamp_microseconds", # High precision
...
)
Monitoring CDC Processing
# Create monitoring table
@dlt.table
def cdc_processing_stats():
return spark.sql("""
SELECT
date(timestamp) as processing_date,
flow_name,
count(*) as update_count,
sum(rows_affected) as total_rows_processed
FROM system.live_tables.flow_progress_log
WHERE pipeline_id = 'your-pipeline-id'
AND flow_type = 'CDC'
GROUP BY date(timestamp), flow_name
""")
Complete CDC Pipeline Example
import dlt
from pyspark.sql.functions import col, expr, current_timestamp
import pyspark.sql.functions as F
# Bronze: Raw CDC ingestion
@dlt.table(
comment="Raw CDC events from ERP system"
)
def bronze_erp_orders_cdc():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/schema/erp_orders")
.load("/data/cdc/erp/orders/")
.withColumn("ingestion_timestamp", current_timestamp())
)
# Silver: Validated CDC
@dlt.table
@dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid_timestamp", "change_timestamp IS NOT NULL")
@dlt.expect_or_drop("valid_operation", "operation IN ('I', 'U', 'D')")
def silver_orders_cdc():
return (
dlt.read_stream("bronze_erp_orders_cdc")
.withColumn("operation_type",
F.when(col("operation") == "I", "INSERT")
.when(col("operation") == "U", "UPDATE")
.when(col("operation") == "D", "DELETE")
)
)
# Silver: Current state table
dlt.create_streaming_table(
"silver_orders_current",
comment="Current state of orders from ERP"
)
dlt.apply_changes(
target="silver_orders_current",
source="silver_orders_cdc",
keys=["order_id"],
sequence_by="change_timestamp",
apply_as_deletes=expr("operation_type = 'DELETE'"),
except_column_list=["operation", "operation_type", "change_timestamp", "ingestion_timestamp"]
)
# Silver: History table (SCD Type 2)
dlt.create_streaming_table(
"silver_orders_history",
comment="Historical versions of orders"
)
dlt.apply_changes(
target="silver_orders_history",
source="silver_orders_cdc",
keys=["order_id"],
sequence_by="change_timestamp",
apply_as_deletes=expr("operation_type = 'DELETE'"),
stored_as_scd_type=2,
except_column_list=["operation", "operation_type", "change_timestamp", "ingestion_timestamp"]
)
# Gold: Aggregated view
@dlt.table
def gold_daily_order_stats():
return (
dlt.read("silver_orders_current")
.groupBy("order_date", "region")
.agg(
F.count("*").alias("order_count"),
F.sum("total_amount").alias("total_revenue"),
F.avg("total_amount").alias("avg_order_value")
)
)
Best Practices
- Use monotonic sequences: Prefer database sequence numbers over timestamps when available
- Validate before applying: Use expectations to catch corrupt CDC data
- Handle deletes explicitly: Always specify
apply_as_deletes - Consider SCD Type 2: Use for audit requirements or point-in-time queries
- Monitor lag: Track processing latency to ensure CDC is keeping up
Conclusion
Delta Live Tables transforms CDC processing from complex merge logic into simple declarative statements. The apply_changes API handles:
- Insert/update/delete merging
- Out-of-order event handling
- SCD Type 2 history tracking
- Efficient incremental processing
This enables reliable, scalable CDC pipelines with minimal code.