5 min read
Change Data Capture in Microsoft Fabric
Change Data Capture (CDC) is fundamental for incremental data loading. Microsoft Fabric supports CDC through multiple mechanisms, enabling efficient data synchronization without full table scans.
CDC Fundamentals
CDC captures row-level changes (inserts, updates, deletes) from source systems:
Source Database
│
├── INSERT → CDC Log → Fabric
├── UPDATE → CDC Log → Fabric
└── DELETE → CDC Log → Fabric
CDC Options in Fabric
1. Database Mirroring (Built-in CDC)
The simplest option for supported sources:
# Mirroring handles CDC automatically
# Just enable change tracking on source
# Azure SQL
"""
ALTER DATABASE [YourDB] SET CHANGE_TRACKING = ON;
ALTER TABLE [dbo].[Orders] ENABLE CHANGE_TRACKING;
"""
# The mirroring service polls for changes and applies them
2. Data Factory CDC Connector
For more control over the CDC process:
def create_cdc_copy_activity():
"""Create CDC-aware copy activity in Data Factory."""
activity = {
"name": "IncrementalCopy",
"type": "Copy",
"inputs": [{
"referenceName": "SqlChangeTrackingSource",
"type": "DatasetReference"
}],
"outputs": [{
"referenceName": "LakehouseDelta",
"type": "DatasetReference"
}],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": """
DECLARE @from_lsn binary(10), @to_lsn binary(10);
SET @from_lsn = @LastLSN;
SET @to_lsn = sys.fn_cdc_get_max_lsn();
SELECT
CT.__$operation,
CT.__$start_lsn,
T.*
FROM cdc.fn_cdc_get_all_changes_dbo_Orders(
@from_lsn, @to_lsn, 'all'
) CT
JOIN dbo.Orders T ON CT.OrderID = T.OrderID
"""
},
"sink": {
"type": "DeltaLakeSink",
"writeBehavior": "upsert",
"updateCondition": "__$operation IN (2, 4)", # Update operations
"deleteCondition": "__$operation = 1" # Delete operations
}
}
}
return activity
3. Eventstream with CDC Sources
For streaming CDC:
class EventstreamCDC:
def create_debezium_source(self):
"""Configure Debezium CDC source for Eventstream."""
# Debezium connector configuration
connector_config = {
"name": "sql-server-connector",
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "sqlserver.database.windows.net",
"database.port": "1433",
"database.user": "cdc_user",
"database.password": "${secrets:sql-password}",
"database.dbname": "SalesDB",
"database.server.name": "salesdb",
"table.include.list": "dbo.Orders,dbo.Customers",
"database.history.kafka.bootstrap.servers": "eventhub.servicebus.windows.net:9093",
"database.history.kafka.topic": "schema-changes"
}
return connector_config
def process_cdc_events(self):
"""Eventstream processing for CDC events."""
eventstream_config = {
"name": "CDC-Orders-Stream",
"source": {
"type": "Kafka", # Debezium outputs to Kafka-compatible endpoint
"topic": "salesdb.dbo.Orders"
},
"transformations": [
{
"name": "parse_debezium",
"type": "ParseJson",
"extract_fields": ["before", "after", "op", "ts_ms"]
},
{
"name": "transform_changes",
"type": "Expression",
"expressions": [
"CASE op WHEN 'c' THEN 'INSERT' WHEN 'u' THEN 'UPDATE' WHEN 'd' THEN 'DELETE' END AS operation",
"COALESCE(after.OrderID, before.OrderID) AS OrderID",
"after.* AS current_values"
]
}
],
"destinations": [
{
"type": "Lakehouse",
"table": "orders_cdc_raw",
"mode": "append"
}
]
}
return eventstream_config
Implementing CDC Processing
Delta Lake Merge Pattern
from delta.tables import DeltaTable
from pyspark.sql.functions import *
class CDCProcessor:
def __init__(self, spark, target_path: str):
self.spark = spark
self.target_path = target_path
def apply_changes(self, changes_df, key_columns: list[str]):
"""Apply CDC changes to Delta table."""
# Check if target exists
if DeltaTable.isDeltaTable(self.spark, self.target_path):
target = DeltaTable.forPath(self.spark, self.target_path)
# Build merge condition
merge_condition = " AND ".join(
f"target.{col} = changes.{col}"
for col in key_columns
)
# Apply changes based on operation type
target.alias("target").merge(
changes_df.alias("changes"),
merge_condition
).whenMatchedUpdate(
condition="changes.__$operation = 4", # Update
set=self._build_update_set(changes_df.columns, key_columns)
).whenMatchedDelete(
condition="changes.__$operation = 1" # Delete
).whenNotMatchedInsert(
condition="changes.__$operation IN (2, 4)", # Insert or Update (new row)
values=self._build_insert_values(changes_df.columns)
).execute()
else:
# Initial load
initial_data = changes_df.filter(
col("__$operation").isin([2, 4]) # Inserts and Updates
).drop("__$operation", "__$start_lsn")
initial_data.write.format("delta").save(self.target_path)
def _build_update_set(self, columns: list, key_columns: list) -> dict:
"""Build update set excluding keys and CDC columns."""
update_cols = [
c for c in columns
if c not in key_columns and not c.startswith("__$")
]
return {col: f"changes.{col}" for col in update_cols}
def _build_insert_values(self, columns: list) -> dict:
"""Build insert values excluding CDC columns."""
insert_cols = [c for c in columns if not c.startswith("__$")]
return {col: f"changes.{col}" for col in insert_cols}
# Usage
processor = CDCProcessor(spark, "Tables/orders")
# Read CDC changes
changes = spark.read.format("delta").load("Tables/orders_cdc_raw") \
.filter(col("processed") == False)
# Apply changes
processor.apply_changes(changes, key_columns=["OrderID"])
Handling Late-Arriving Data
def handle_late_arrivals(self, changes_df, watermark_column: str, max_delay_hours: int = 24):
"""Handle out-of-order CDC events."""
# Add processing timestamp
changes_with_ts = changes_df.withColumn(
"processing_time",
current_timestamp()
)
# Filter out events that are too old
valid_changes = changes_with_ts.filter(
col(watermark_column) >= (
current_timestamp() - expr(f"INTERVAL {max_delay_hours} HOURS")
)
)
# Log dropped events
dropped = changes_with_ts.exceptAll(valid_changes)
if dropped.count() > 0:
dropped.write.format("delta").mode("append").save("Tables/cdc_dropped_events")
print(f"Dropped {dropped.count()} late events")
return valid_changes
CDC State Management
class CDCStateManager:
"""Track CDC processing state for incremental loads."""
def __init__(self, state_path: str):
self.state_path = state_path
def get_last_lsn(self, table_name: str) -> bytes:
"""Get last processed LSN for a table."""
try:
state = spark.read.format("delta").load(self.state_path) \
.filter(col("table_name") == table_name) \
.orderBy(col("updated_at").desc()) \
.first()
return state["last_lsn"] if state else None
except:
return None
def update_lsn(self, table_name: str, lsn: bytes, rows_processed: int):
"""Update last processed LSN."""
from pyspark.sql import Row
state_row = Row(
table_name=table_name,
last_lsn=lsn,
rows_processed=rows_processed,
updated_at=datetime.utcnow()
)
spark.createDataFrame([state_row]).write \
.format("delta") \
.mode("append") \
.save(self.state_path)
def get_processing_stats(self, table_name: str, days: int = 7) -> dict:
"""Get CDC processing statistics."""
stats = spark.read.format("delta").load(self.state_path) \
.filter(col("table_name") == table_name) \
.filter(col("updated_at") >= date_sub(current_date(), days)) \
.agg(
sum("rows_processed").alias("total_rows"),
count("*").alias("sync_count"),
min("updated_at").alias("first_sync"),
max("updated_at").alias("last_sync")
).first()
return stats.asDict() if stats else {}
Best Practices
- Enable CDC early: Easier to set up before data volume grows
- Monitor LSN/watermarks: Track your position in the change stream
- Handle deletes carefully: Soft deletes often preferred for analytics
- Test recovery scenarios: Know how to replay from a specific point
- Consider retention: CDC logs can grow large
Conclusion
CDC in Fabric enables efficient incremental data processing. Whether through built-in mirroring, Data Factory, or custom implementations, understanding CDC patterns is essential for modern data platforms.
Choose the approach that matches your latency requirements and operational capabilities.