5 min read
Delta Live Tables: Declarative Data Pipelines on Databricks
Delta Live Tables (DLT) is a declarative framework for building reliable data pipelines. Instead of writing procedural ETL code, you declare what your data should look like, and DLT handles the how.
The DLT Approach
Traditional ETL:
# You manage: scheduling, dependencies, retries, state, incremental logic
df = spark.read.parquet("/source")
df_transformed = df.filter(...).groupBy(...).agg(...)
df_transformed.write.mode("append").saveAsTable("target")
Delta Live Tables:
# You declare: what the output should be
@dlt.table
def target():
return spark.read.parquet("/source").filter(...).groupBy(...).agg(...)
# DLT manages: everything else
Creating Your First Pipeline
Define Tables in a Notebook
import dlt
from pyspark.sql.functions import col, current_timestamp
# Bronze layer: Raw ingestion
@dlt.table(
comment="Raw sales data from source system",
table_properties={"quality": "bronze"}
)
def raw_sales():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/schema/raw_sales")
.load("/data/incoming/sales/")
)
# Silver layer: Cleaned and standardized
@dlt.table(
comment="Cleaned sales data with valid records only",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_date", "sale_date IS NOT NULL")
def cleaned_sales():
return (
dlt.read_stream("raw_sales")
.select(
col("transaction_id"),
col("customer_id"),
col("product_id"),
col("amount").cast("decimal(10,2)"),
col("sale_date").cast("date"),
col("region")
)
.withColumn("processed_at", current_timestamp())
)
# Gold layer: Business aggregates
@dlt.table(
comment="Daily sales summary by region",
table_properties={"quality": "gold"}
)
def daily_sales_summary():
return (
dlt.read("cleaned_sales")
.groupBy("sale_date", "region")
.agg(
F.count("*").alias("transaction_count"),
F.sum("amount").alias("total_revenue"),
F.countDistinct("customer_id").alias("unique_customers")
)
)
Create the Pipeline
# Via API
pipeline_config = {
"name": "sales-data-pipeline",
"storage": "abfss://dlt@storage.dfs.core.windows.net/sales",
"target": "production.sales",
"libraries": [
{"notebook": {"path": "/Production/DLT/sales_pipeline"}}
],
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"continuous": False, # True for streaming
"development": False, # True for dev mode (no data persistence)
"photon": True
}
response = requests.post(
f"{workspace_url}/api/2.0/pipelines",
headers=headers,
json=pipeline_config
)
pipeline_id = response.json()["pipeline_id"]
Data Quality Expectations
DLT provides built-in data quality enforcement:
# Expect and warn (log violations but keep records)
@dlt.table
@dlt.expect("valid_email", "email RLIKE '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'")
def customers_with_warnings():
return dlt.read_stream("raw_customers")
# Expect or drop (remove violating records)
@dlt.table
@dlt.expect_or_drop("positive_quantity", "quantity > 0")
@dlt.expect_or_drop("valid_product", "product_id IS NOT NULL")
def orders_clean():
return dlt.read_stream("raw_orders")
# Expect or fail (stop pipeline on violation)
@dlt.table
@dlt.expect_or_fail("has_required_fields", "id IS NOT NULL AND timestamp IS NOT NULL")
def critical_data():
return dlt.read_stream("raw_critical")
# Multiple expectations
@dlt.table
@dlt.expect_all({
"valid_amount": "amount > 0 AND amount < 1000000",
"valid_date": "transaction_date <= current_date()",
"valid_customer": "customer_id IS NOT NULL"
})
def transactions_validated():
return dlt.read_stream("raw_transactions")
# Expect all or drop
@dlt.table
@dlt.expect_all_or_drop({
"rule1": "condition1",
"rule2": "condition2"
})
def strict_table():
return dlt.read_stream("source")
Views vs Tables
# Table: Materialized, persisted to storage
@dlt.table
def sales():
return spark.read.table("raw_sales")
# View: Computed on-demand, not persisted
@dlt.view
def sales_view():
return spark.read.table("raw_sales")
# Use views for:
# - Intermediate transformations
# - Shared logic between tables
# - Data that doesn't need to be persisted
Streaming vs Batch
# Streaming table (processes incrementally)
@dlt.table
def streaming_sales():
return dlt.read_stream("raw_sales") # read_stream
# Batch table (processes all data each run)
@dlt.table
def batch_summary():
return dlt.read("cleaned_sales") # read (not read_stream)
# Mixed: Stream from bronze, aggregate to gold
@dlt.table
def streaming_silver():
return dlt.read_stream("bronze") # Streaming
@dlt.table
def batch_gold():
return (
dlt.read("streaming_silver") # Batch read of streaming table
.groupBy("date")
.agg(...)
)
Change Data Capture (CDC)
Handle CDC feeds with APPLY CHANGES:
# Define target table
dlt.create_streaming_table("customers")
# Apply changes from CDC feed
dlt.apply_changes(
target="customers",
source="raw_cdc_customers",
keys=["customer_id"],
sequence_by="operation_timestamp",
apply_as_deletes=expr("operation = 'DELETE'"),
apply_as_truncates=expr("operation = 'TRUNCATE'"),
except_column_list=["operation", "operation_timestamp"],
stored_as_scd_type=2 # or 1 for overwrite
)
# The raw CDC table
@dlt.table
def raw_cdc_customers():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/data/cdc/customers/")
)
Pipeline Monitoring
# Get pipeline status
def get_pipeline_status(pipeline_id):
response = requests.get(
f"{workspace_url}/api/2.0/pipelines/{pipeline_id}",
headers=headers
)
return response.json()
# List pipeline updates (runs)
def list_updates(pipeline_id):
response = requests.get(
f"{workspace_url}/api/2.0/pipelines/{pipeline_id}/updates",
headers=headers
)
return response.json()
# Get update details
def get_update_details(pipeline_id, update_id):
response = requests.get(
f"{workspace_url}/api/2.0/pipelines/{pipeline_id}/updates/{update_id}",
headers=headers
)
return response.json()
# Check data quality metrics
def get_quality_metrics(pipeline_id):
# Query the system tables
return spark.sql(f"""
SELECT
table_name,
expectation_name,
passed_records,
failed_records,
failed_records / (passed_records + failed_records) as failure_rate
FROM system.live_tables.data_quality_log
WHERE pipeline_id = '{pipeline_id}'
ORDER BY timestamp DESC
""")
Error Handling
# Graceful error handling in transformations
@dlt.table
def robust_processing():
return (
dlt.read_stream("source")
.withColumn("parsed_json",
F.when(
F.json_tuple(col("raw_json"), "id").isNotNull(),
F.from_json(col("raw_json"), schema)
).otherwise(F.lit(None))
)
.withColumn("has_error", col("parsed_json").isNull())
)
# Quarantine pattern
@dlt.table
@dlt.expect_all_or_drop({"valid": "NOT has_error"})
def valid_records():
return dlt.read_stream("robust_processing")
@dlt.table
def quarantine_records():
return (
dlt.read_stream("robust_processing")
.filter("has_error")
)
Performance Optimization
# Optimize table layout
@dlt.table(
partition_cols=["date"], # Partition by date
table_properties={
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true",
"pipelines.autoOptimize.zOrderCols": "customer_id"
}
)
def optimized_sales():
return dlt.read_stream("raw_sales")
# Use appropriate cluster sizing
cluster_config = {
"autoscale": {
"min_workers": 2,
"max_workers": 10,
"mode": "ENHANCED" # Better auto-scaling for streaming
}
}
Development Workflow
# Development mode: Test without persisting data
pipeline_config = {
"name": "dev-pipeline",
"development": True, # Dev mode
...
}
# Validate pipeline without running
response = requests.post(
f"{workspace_url}/api/2.0/pipelines/{pipeline_id}/updates",
headers=headers,
json={
"full_refresh": False,
"validate_only": True # Just validate
}
)
# Run with full refresh (reprocess all data)
response = requests.post(
f"{workspace_url}/api/2.0/pipelines/{pipeline_id}/updates",
headers=headers,
json={"full_refresh": True}
)
Connecting to Workflows
# Trigger DLT from a workflow
workflow_task = {
"task_key": "run_dlt_pipeline",
"pipeline_task": {
"pipeline_id": pipeline_id,
"full_refresh": False
}
}
# Wait for completion and check status
# DLT tasks automatically fail the workflow if the pipeline fails
Best Practices
Organize by Medallion Architecture
/DLT/
├── bronze/
│ ├── ingest_sales.py
│ ├── ingest_customers.py
│ └── ingest_products.py
├── silver/
│ ├── clean_sales.py
│ └── clean_customers.py
└── gold/
├── sales_summary.py
└── customer_metrics.py
Naming Conventions
# Clear, consistent naming
@dlt.table(name="bronze_sales_raw")
def bronze_sales():
...
@dlt.table(name="silver_sales_cleaned")
def silver_sales():
...
@dlt.table(name="gold_daily_revenue")
def gold_revenue():
...
Conclusion
Delta Live Tables transforms data engineering:
- Declarative syntax reduces boilerplate code
- Built-in data quality enforcement
- Automatic dependency management
- Simplified streaming and CDC
- Production-ready monitoring
DLT is particularly powerful for teams that want reliability without the complexity of managing traditional Spark streaming jobs.