6 min read
Data Quality with Delta Live Tables Expectations
Delta Live Tables expectations provide declarative data quality enforcement. They validate data as it flows through your pipeline, ensuring only quality data reaches downstream consumers.
Understanding Expectations
Expectations are data quality rules expressed as SQL boolean expressions:
import dlt
# Basic expectation syntax
@dlt.table
@dlt.expect("constraint_name", "sql_boolean_expression")
def my_table():
return source_data
The expression must evaluate to true for valid records.
Types of Expectations
Expect (Warn)
Log violations but keep all records:
@dlt.table
@dlt.expect("valid_email", "email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'")
@dlt.expect("valid_age", "age BETWEEN 0 AND 150")
def customers_all():
return dlt.read_stream("raw_customers")
# Use when:
# - You want visibility into data issues
# - Downstream processes can handle invalid data
# - You're in development/discovery phase
Expect or Drop
Remove records that fail validation:
@dlt.table
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect_or_drop("valid_date", "order_date IS NOT NULL")
@dlt.expect_or_drop("known_customer", "customer_id IS NOT NULL")
def orders_clean():
return dlt.read_stream("raw_orders")
# Use when:
# - Invalid records should not propagate
# - You have a quarantine process for dropped records
# - Data quality is critical for downstream analytics
Expect or Fail
Stop the pipeline on any violation:
@dlt.table
@dlt.expect_or_fail("schema_valid", "id IS NOT NULL AND timestamp IS NOT NULL")
@dlt.expect_or_fail("no_duplicates", "row_count = 1") # With appropriate windowing
def critical_events():
return dlt.read_stream("raw_events")
# Use when:
# - Data issues require immediate human intervention
# - Regulatory requirements demand data integrity
# - Downstream systems cannot handle any invalid data
Combining Multiple Expectations
# Multiple individual expectations
@dlt.table
@dlt.expect("rule1", "condition1")
@dlt.expect("rule2", "condition2")
@dlt.expect_or_drop("rule3", "condition3")
def combined_rules():
return source
# Expect all (all rules use same behavior)
@dlt.table
@dlt.expect_all({
"valid_id": "id IS NOT NULL",
"valid_amount": "amount > 0",
"valid_date": "date <= current_date()"
})
def with_expect_all():
return source
# Expect all or drop
@dlt.table
@dlt.expect_all_or_drop({
"required_fields": "customer_id IS NOT NULL AND product_id IS NOT NULL",
"valid_quantity": "quantity > 0 AND quantity < 10000",
"valid_price": "unit_price > 0"
})
def orders_strict():
return dlt.read_stream("raw_orders")
# Expect all or fail
@dlt.table
@dlt.expect_all_or_fail({
"pk_not_null": "id IS NOT NULL",
"timestamp_valid": "event_time IS NOT NULL"
})
def critical_table():
return source
Complex Validation Rules
Referential Integrity
# Create a reference table
@dlt.table
def valid_products():
return spark.table("production.master.products").select("product_id")
# Validate foreign key
@dlt.table
def orders_with_valid_products():
products = dlt.read("valid_products")
orders = dlt.read_stream("raw_orders")
return (
orders
.join(products, "product_id", "left")
.withColumn("has_valid_product", col("product_id").isNotNull())
)
@dlt.table
@dlt.expect_or_drop("valid_product_ref", "has_valid_product")
def orders_validated():
return dlt.read_stream("orders_with_valid_products")
Statistical Validations
@dlt.table
def orders_with_stats():
orders = dlt.read_stream("raw_orders")
# Calculate rolling statistics
stats = (
orders
.groupBy("product_id")
.agg(
F.avg("amount").alias("avg_amount"),
F.stddev("amount").alias("stddev_amount")
)
)
return (
orders
.join(stats, "product_id")
.withColumn("z_score",
(col("amount") - col("avg_amount")) / col("stddev_amount")
)
)
@dlt.table
@dlt.expect_or_drop("not_outlier", "abs(z_score) < 3")
def orders_no_outliers():
return dlt.read_stream("orders_with_stats")
Cross-Field Validations
@dlt.table
@dlt.expect_or_drop("valid_date_range", "end_date >= start_date")
@dlt.expect_or_drop("consistent_totals", "abs(quantity * unit_price - total_amount) < 0.01")
@dlt.expect_or_drop("valid_discount", "discount_percent BETWEEN 0 AND 100")
def invoices_validated():
return dlt.read_stream("raw_invoices")
Quarantine Pattern
Capture dropped records for analysis:
# Source data
@dlt.view
def raw_orders_view():
return spark.readStream.table("bronze.raw_orders")
# Add validation flags
@dlt.view
def orders_with_validation():
return (
dlt.read_stream("raw_orders_view")
.withColumn("is_valid_amount", col("amount") > 0)
.withColumn("is_valid_customer", col("customer_id").isNotNull())
.withColumn("is_valid_date", col("order_date").isNotNull())
.withColumn("is_valid",
col("is_valid_amount") &
col("is_valid_customer") &
col("is_valid_date")
)
)
# Valid records
@dlt.table(
comment="Valid orders that passed all quality checks"
)
@dlt.expect_or_drop("all_valid", "is_valid")
def silver_orders():
return (
dlt.read_stream("orders_with_validation")
.drop("is_valid_amount", "is_valid_customer", "is_valid_date", "is_valid")
)
# Quarantine invalid records
@dlt.table(
comment="Orders that failed quality checks - for investigation"
)
def quarantine_orders():
return (
dlt.read_stream("orders_with_validation")
.filter("NOT is_valid")
.withColumn("quarantine_reason",
F.concat_ws(", ",
F.when(~col("is_valid_amount"), F.lit("invalid_amount")),
F.when(~col("is_valid_customer"), F.lit("missing_customer")),
F.when(~col("is_valid_date"), F.lit("missing_date"))
)
)
.withColumn("quarantine_timestamp", F.current_timestamp())
)
Monitoring Data Quality
Query Quality Metrics
# DLT automatically logs quality metrics to system tables
quality_metrics = spark.sql("""
SELECT
date(timestamp) as date,
flow_name,
expectation_name,
passed_records,
failed_records,
failed_records / (passed_records + failed_records) as failure_rate
FROM system.live_tables.data_quality_log
WHERE pipeline_id = 'your-pipeline-id'
ORDER BY timestamp DESC
""")
display(quality_metrics)
Quality Dashboard
# Create quality summary view
@dlt.table
def data_quality_summary():
return spark.sql("""
SELECT
flow_name as table_name,
expectation_name as quality_rule,
SUM(passed_records) as total_passed,
SUM(failed_records) as total_failed,
SUM(failed_records) / SUM(passed_records + failed_records) as overall_failure_rate,
MAX(timestamp) as last_updated
FROM system.live_tables.data_quality_log
WHERE pipeline_id = 'your-pipeline-id'
GROUP BY flow_name, expectation_name
""")
Alerting on Quality Issues
def check_quality_thresholds():
"""Alert when quality drops below threshold"""
thresholds = {
"silver_orders": 0.01, # Max 1% failure rate
"silver_customers": 0.005 # Max 0.5% failure rate
}
alerts = []
for table, threshold in thresholds.items():
failure_rate = spark.sql(f"""
SELECT
failed_records / (passed_records + failed_records) as rate
FROM system.live_tables.data_quality_log
WHERE flow_name = '{table}'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
LIMIT 1
""").first()["rate"]
if failure_rate > threshold:
alerts.append({
"table": table,
"failure_rate": failure_rate,
"threshold": threshold
})
if alerts:
send_alert(alerts)
return alerts
Advanced Patterns
Dynamic Expectations
# Load rules from configuration
def get_quality_rules(table_name):
return spark.table("config.quality_rules").filter(
f"table_name = '{table_name}'"
).collect()
# Apply rules dynamically (in notebook parameters)
# Note: DLT decorators require static definitions,
# so dynamic rules are applied within the function
@dlt.table
def dynamically_validated():
df = dlt.read_stream("source")
rules = get_quality_rules("orders")
for rule in rules:
df = df.withColumn(
f"valid_{rule['rule_name']}",
F.expr(rule['condition'])
)
# Filter based on validation columns
validation_cols = [c for c in df.columns if c.startswith("valid_")]
df = df.withColumn("all_valid", F.reduce(validation_cols, F.lit(True), lambda a, b: a & b))
return df.filter("all_valid")
Versioned Quality Rules
# Track rule changes over time
@dlt.table
def quality_rule_history():
return spark.sql("""
SELECT
rule_id,
rule_name,
condition,
effective_from,
effective_to,
version
FROM config.quality_rules_history
WHERE current_date() BETWEEN effective_from AND COALESCE(effective_to, '9999-12-31')
""")
Best Practices
- Start with warnings: Use
@dlt.expectduring development to understand data quality issues - Graduate to drops: Move to
@dlt.expect_or_droponce rules are validated - Fail selectively: Use
@dlt.expect_or_failonly for critical business rules - Always quarantine: Capture dropped records for root cause analysis
- Monitor continuously: Set up alerts for quality degradation
- Document rules: Use descriptive constraint names that explain the business logic
Conclusion
DLT expectations transform data quality from an afterthought to a first-class concern:
- Declarative syntax makes rules easy to understand
- Automatic enforcement prevents bad data propagation
- Built-in metrics enable quality monitoring
- Multiple enforcement levels provide flexibility
By embedding quality rules directly in your pipeline definitions, you ensure data quality is maintained consistently across all processing runs.