4 min read
Data Lakehouse Patterns: Best of Both Worlds
The data lakehouse emerged as the architecture that combines data lake flexibility with data warehouse reliability. 2021 saw this pattern mature with better tooling and clearer implementation patterns.
The Lakehouse Architecture
A lakehouse provides:
- ACID transactions on object storage
- Schema enforcement and evolution
- Time travel and versioning
- Support for both streaming and batch
- Direct BI tool access
Implementing with Delta Lake
Delta Lake became the de facto lakehouse format:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("Lakehouse") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Bronze layer: Raw ingestion
def ingest_to_bronze(source_path: str, bronze_path: str):
df = (spark.readStream
.format("cloudFiles") # Auto Loader
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{bronze_path}/_schema")
.option("cloudFiles.inferColumnTypes", "true")
.load(source_path)
.withColumn("_ingestion_timestamp", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
)
return (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{bronze_path}/_checkpoint")
.trigger(availableNow=True)
.start(bronze_path)
)
# Silver layer: Cleaned and conformed
def process_to_silver(bronze_path: str, silver_path: str):
bronze_df = spark.read.format("delta").load(bronze_path)
silver_df = (bronze_df
.filter(F.col("customer_id").isNotNull())
.withColumn("email", F.lower(F.trim(F.col("email"))))
.withColumn("order_date", F.to_date(F.col("order_timestamp")))
.withColumn("order_amount", F.col("order_amount").cast("decimal(18,2)"))
.dropDuplicates(["order_id"])
)
# Merge for incremental updates
if DeltaTable.isDeltaTable(spark, silver_path):
delta_table = DeltaTable.forPath(spark, silver_path)
(delta_table.alias("target")
.merge(
silver_df.alias("source"),
"target.order_id = source.order_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
silver_df.write.format("delta").mode("overwrite").save(silver_path)
# Gold layer: Business aggregates
def build_gold_aggregates(silver_path: str, gold_path: str):
silver_df = spark.read.format("delta").load(silver_path)
# Daily sales summary
daily_sales = (silver_df
.groupBy("order_date", "product_category")
.agg(
F.sum("order_amount").alias("total_revenue"),
F.count("order_id").alias("order_count"),
F.countDistinct("customer_id").alias("unique_customers"),
F.avg("order_amount").alias("avg_order_value")
)
)
(daily_sales.write
.format("delta")
.mode("overwrite")
.partitionBy("order_date")
.save(f"{gold_path}/daily_sales")
)
Schema Evolution
Handling schema changes gracefully:
# Enable schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# Or explicitly during write
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(delta_path)
)
# Schema enforcement for critical tables
(df.write
.format("delta")
.mode("append")
.option("mergeSchema", "false") # Fail on schema mismatch
.save(delta_path)
)
# View schema history
delta_table = DeltaTable.forPath(spark, delta_path)
history = delta_table.history()
history.select("version", "timestamp", "operation", "operationParameters").show()
Time Travel Queries
Accessing historical data states:
-- Query data as of a specific version
SELECT * FROM delta.`/mnt/datalake/silver/orders`
VERSION AS OF 10;
-- Query data as of a timestamp
SELECT * FROM delta.`/mnt/datalake/silver/orders`
TIMESTAMP AS OF '2021-12-01 00:00:00';
-- Compare changes between versions
SELECT * FROM delta.`/mnt/datalake/silver/orders`
WHERE order_id = '12345'
VERSION AS OF 10
EXCEPT
SELECT * FROM delta.`/mnt/datalake/silver/orders`
WHERE order_id = '12345'
VERSION AS OF 9;
Optimizing Lakehouse Performance
# Optimize file layout
delta_table = DeltaTable.forPath(spark, delta_path)
# Compact small files
delta_table.optimize().executeCompaction()
# Z-Order for query performance
delta_table.optimize().executeZOrderBy("customer_id", "order_date")
# Vacuum old versions (retain 7 days)
delta_table.vacuum(retentionHours=168)
# Auto-optimize settings
spark.conf.set("spark.databricks.delta.autoOptimize.optimizeWrite", "true")
spark.conf.set("spark.databricks.delta.autoOptimize.autoCompact", "true")
Direct BI Access with Synapse
Connecting Power BI directly to the lakehouse:
-- Synapse serverless SQL pool
CREATE DATABASE lakehouse_gold;
CREATE EXTERNAL DATA SOURCE datalake
WITH (
LOCATION = 'abfss://gold@mydatalake.dfs.core.windows.net/'
);
-- Create view over Delta table
CREATE OR ALTER VIEW daily_sales_summary AS
SELECT *
FROM OPENROWSET(
BULK 'daily_sales/',
DATA_SOURCE = 'datalake',
FORMAT = 'DELTA'
) AS sales
WHERE order_date >= DATEADD(month, -12, GETDATE());
-- Power BI connects via SQL endpoint
-- No data movement, direct query on lakehouse
Data Quality in the Lakehouse
from great_expectations.dataset import SparkDFDataset
def validate_silver_layer(df):
ge_df = SparkDFDataset(df)
# Define expectations
results = []
results.append(ge_df.expect_column_values_to_not_be_null("customer_id"))
results.append(ge_df.expect_column_values_to_be_unique("order_id"))
results.append(ge_df.expect_column_values_to_match_regex(
"email",
r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
))
results.append(ge_df.expect_column_values_to_be_between(
"order_amount",
min_value=0,
max_value=1000000
))
failures = [r for r in results if not r.success]
if failures:
raise DataQualityException(f"Validation failed: {failures}")
return True
Streaming into the Lakehouse
# Streaming ingestion with exactly-once semantics
def stream_events_to_lakehouse(kafka_bootstrap: str, topic: str, lakehouse_path: str):
stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.load()
.select(
F.col("key").cast("string"),
F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
F.col("timestamp").alias("event_time")
)
.select("key", "data.*", "event_time")
)
return (stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{lakehouse_path}/_checkpoint")
.trigger(processingTime="1 minute")
.start(lakehouse_path)
)
Key Lakehouse Benefits Realized in 2021
- Unified Batch and Streaming: Same storage, same tables
- Direct BI Access: No ETL to data warehouse
- Cost Efficiency: Object storage pricing with warehouse features
- Data Science Ready: Direct access from notebooks
The lakehouse architecture proved its value in 2021. Organizations are consolidating their data warehouses and data lakes into unified lakehouses, reducing complexity and cost.