1 min read
File Compaction Strategies in Delta Lake
I wrote “File Compaction Strategies in Delta Lake” to share practical, production-minded guidance on this topic.
The Small File Problem
Before Compaction:
┌──────────────────────────────────────────────────────────────┐
│ Delta Table │
│ ┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐ │
│ │1MB ││2MB ││500K││1MB ││3MB ││800K││1MB ││2MB ││500K│... │
│ └────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘└────┘ │
│ 1000+ small files │
│ Query: Read 1000 files │
│ Time: Very slow │
└──────────────────────────────────────────────────────────────┘
After Compaction:
┌──────────────────────────────────────────────────────────────┐
│ Delta Table │
│ ┌──────────────┐┌──────────────┐┌──────────────┐ │
│ │ 128MB ││ 128MB ││ 95MB │ │
│ └──────────────┘└──────────────┘└──────────────┘ │
│ 3 optimized files │
│ Query: Read 3 files │
│ Time: Fast │
└──────────────────────────────────────────────────────────────┘
Basic Compaction with OPTIMIZE
Simple Compaction
from delta.tables import DeltaTable
# Basic compaction
delta_table = DeltaTable.forPath(spark, "Tables/sales")
delta_table.optimize().executeCompaction()
# Or via SQL
spark.sql("OPTIMIZE sales")
# Check results
detail = spark.sql("DESCRIBE DETAIL sales").first()
print(f"Files after compaction: {detail.numFiles}")
Compaction with Z-Order
# Compact and Z-Order for better data skipping
delta_table.optimize().executeZOrderBy("order_date", "region")
# SQL equivalent
spark.sql("""
OPTIMIZE sales
ZORDER BY (order_date, region)
""")
Compaction Strategies
Strategy 1: Scheduled Compaction
from datetime import datetime, timedelta
class ScheduledCompactor:
"""Schedule compaction based on data patterns."""
def __init__(self, spark):
self.spark = spark
self.compaction_log = []
def needs_compaction(self, table_path: str, threshold_files: int = 100) -> bool:
"""Check if table needs compaction."""
detail = self.spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
return detail.numFiles > threshold_files
def get_file_stats(self, table_path: str) -> dict:
"""Get file statistics for compaction decision."""
detail = self.spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
# Estimate average file size
avg_file_size = detail.sizeInBytes / detail.numFiles if detail.numFiles > 0 else 0
return {
"num_files": detail.numFiles,
"total_size_bytes": detail.sizeInBytes,
"avg_file_size_mb": avg_file_size / (1024 * 1024),
"needs_compaction": avg_file_size < 64 * 1024 * 1024 # < 64MB
}
def compact_if_needed(
self,
table_path: str,
min_files: int = 50,
zorder_columns: list[str] = None
):
"""Conditionally compact based on file count."""
stats = self.get_file_stats(table_path)
if stats["num_files"] >= min_files or stats["needs_compaction"]:
delta_table = DeltaTable.forPath(self.spark, table_path)
if zorder_columns:
delta_table.optimize().executeZOrderBy(*zorder_columns)
else:
delta_table.optimize().executeCompaction()
self.compaction_log.append({
"table": table_path,
"timestamp": datetime.utcnow().isoformat(),
"files_before": stats["num_files"],
"action": "compacted"
})
return True
return False
def compact_all_tables(self, tables_config: list[dict]):
"""Compact multiple tables based on configuration."""
for config in tables_config:
compacted = self.compact_if_needed(
table_path=config["path"],
min_files=config.get("min_files", 50),
zorder_columns=config.get("zorder_columns")
)
status = "compacted" if compacted else "skipped"
print(f"{config['path']}: {status}")
# Usage
compactor = ScheduledCompactor(spark)
# Define tables to manage
tables = [
{
"path": "Tables/sales",
"min_files": 100,
"zorder_columns": ["order_date", "region"]
},
{
"path": "Tables/customers",
"min_files": 50,
"zorder_columns": ["customer_id"]
},
{
"path": "Tables/events",
"min_files": 200 # High-volume table
}
]
# Run compaction
compactor.compact_all_tables(tables)
Strategy 2: Partition-Aware Compaction
class PartitionCompactor:
"""Compact specific partitions instead of entire table."""
def __init__(self, spark):
self.spark = spark
def get_partition_stats(self, table_path: str, partition_column: str) -> list:
"""Get file stats per partition."""
# Read partition information
partitions = self.spark.sql(f"""
SELECT
{partition_column},
COUNT(*) as file_count,
SUM(size) as total_size
FROM (
SELECT
input_file_name() as file_path,
{partition_column},
1 as size
FROM delta.`{table_path}`
)
GROUP BY {partition_column}
HAVING COUNT(*) > 10
ORDER BY file_count DESC
""")
return partitions.collect()
def compact_partition(
self,
table_path: str,
partition_filter: str,
zorder_columns: list[str] = None
):
"""Compact specific partition."""
delta_table = DeltaTable.forPath(self.spark, table_path)
if zorder_columns:
delta_table.optimize() \
.where(partition_filter) \
.executeZOrderBy(*zorder_columns)
else:
delta_table.optimize() \
.where(partition_filter) \
.executeCompaction()
def compact_recent_partitions(
self,
table_path: str,
partition_column: str,
days_back: int = 7
):
"""Compact partitions from recent days."""
from datetime import date, timedelta
for i in range(days_back):
partition_date = date.today() - timedelta(days=i)
partition_filter = f"{partition_column} = '{partition_date}'"
print(f"Compacting partition: {partition_filter}")
self.compact_partition(table_path, partition_filter)
def compact_hot_partitions(
self,
table_path: str,
partition_column: str,
file_threshold: int = 50
):
"""Compact only partitions with many small files."""
# This requires analyzing file distribution
# Simplified approach: compact recent partitions
partition_filter = f"{partition_column} >= date_sub(current_date(), 7)"
delta_table = DeltaTable.forPath(self.spark, table_path)
delta_table.optimize().where(partition_filter).executeCompaction()
# Usage
compactor = PartitionCompactor(spark)
# Compact only recent partitions
compactor.compact_recent_partitions(
table_path="Tables/events",
partition_column="event_date",
days_back=7
)
# Compact specific partition
compactor.compact_partition(
table_path="Tables/sales",
partition_filter="order_date = '2024-08-01'",
zorder_columns=["customer_id"]
)
Strategy 3: Auto-Compaction
# Enable auto-compaction on table
spark.sql("""
ALTER TABLE sales
SET TBLPROPERTIES (
'delta.autoOptimize.autoCompact' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true'
)
""")
# Configure auto-compact thresholds
auto_compact_config = {
# Minimum number of files before auto-compact triggers
"spark.databricks.delta.autoCompact.minNumFiles": "50",
# Target file size for compaction
"spark.databricks.delta.autoCompact.maxFileSize": "134217728", # 128MB
# Enable for all tables in session
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true"
}
for key, value in auto_compact_config.items():
spark.conf.set(key, value)
Optimized Write
Enable Optimized Write
# Optimized write bins data into optimal file sizes during write
# Prevents small files from being created in the first place
# Enable globally
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.binSize", "134217728") # 128MB
# Enable per-table
spark.sql("""
ALTER TABLE sales
SET TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true')
""")
# Write with optimized settings
df.write.format("delta") \
.option("optimizeWrite", "true") \
.mode("append") \
.saveAsTable("sales")
Adaptive Shuffle for Better File Distribution
# Enable adaptive query execution for better file sizes
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728")
Vacuum Old Files
Cleaning Up After Compaction
class TableMaintenance:
"""Full table maintenance including compaction and vacuum."""
def __init__(self, spark):
self.spark = spark
def full_maintenance(
self,
table_path: str,
zorder_columns: list[str] = None,
vacuum_hours: int = 168
):
"""Run full maintenance routine."""
print(f"Starting maintenance for {table_path}")
# Step 1: Get current state
before = self.spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
print(f"Before: {before.numFiles} files, {before.sizeInBytes/(1024**3):.2f}GB")
# Step 2: Compact
print("Running compaction...")
delta_table = DeltaTable.forPath(self.spark, table_path)
if zorder_columns:
delta_table.optimize().executeZOrderBy(*zorder_columns)
else:
delta_table.optimize().executeCompaction()
# Step 3: Vacuum old files
print(f"Vacuuming files older than {vacuum_hours} hours...")
delta_table.vacuum(vacuum_hours)
# Step 4: Get final state
after = self.spark.sql(f"DESCRIBE DETAIL delta.`{table_path}`").first()
print(f"After: {after.numFiles} files, {after.sizeInBytes/(1024**3):.2f}GB")
return {
"files_before": before.numFiles,
"files_after": after.numFiles,
"files_reduced": before.numFiles - after.numFiles,
"size_change_gb": (after.sizeInBytes - before.sizeInBytes) / (1024**3)
}
def maintenance_all_tables(self, tables: list[str]):
"""Run maintenance on all tables."""
results = {}
for table in tables:
results[table] = self.full_maintenance(table)
return results
# Usage
maintenance = TableMaintenance(spark)
# Run full maintenance
result = maintenance.full_maintenance(
table_path="Tables/sales",
zorder_columns=["order_date", "region"],
vacuum_hours=168 # 7 days
)
print(f"Reduced files from {result['files_before']} to {result['files_after']}")
Monitoring Compaction Needs
Compaction Dashboard Query
-- Query to identify tables needing compaction
WITH table_stats AS (
SELECT
'sales' as table_name,
(SELECT COUNT(*) FROM sales) as row_count,
(SELECT numFiles FROM (DESCRIBE DETAIL sales)) as file_count,
(SELECT sizeInBytes FROM (DESCRIBE DETAIL sales)) as size_bytes
UNION ALL
SELECT
'customers' as table_name,
(SELECT COUNT(*) FROM customers) as row_count,
(SELECT numFiles FROM (DESCRIBE DETAIL customers)) as file_count,
(SELECT sizeInBytes FROM (DESCRIBE DETAIL customers)) as size_bytes
)
SELECT
table_name,
row_count,
file_count,
size_bytes / (1024*1024*1024) as size_gb,
size_bytes / file_count / (1024*1024) as avg_file_size_mb,
CASE
WHEN size_bytes / file_count < 64*1024*1024 THEN 'COMPACT NOW'
WHEN size_bytes / file_count < 128*1024*1024 THEN 'CONSIDER COMPACTION'
ELSE 'OK'
END as recommendation
FROM table_stats
ORDER BY avg_file_size_mb;
Python Monitoring
class CompactionMonitor:
"""Monitor tables for compaction needs."""
def __init__(self, spark):
self.spark = spark
def analyze_all_tables(self, table_paths: list[str]) -> list[dict]:
"""Analyze all tables and return status."""
results = []
for path in table_paths:
try:
detail = self.spark.sql(f"DESCRIBE DETAIL delta.`{path}`").first()
avg_size_mb = (detail.sizeInBytes / detail.numFiles) / (1024 * 1024)
results.append({
"table": path,
"files": detail.numFiles,
"size_gb": detail.sizeInBytes / (1024**3),
"avg_file_mb": avg_size_mb,
"needs_compaction": avg_size_mb < 64 or detail.numFiles > 500,
"priority": self._calculate_priority(detail.numFiles, avg_size_mb)
})
except Exception as e:
results.append({
"table": path,
"error": str(e)
})
return sorted(results, key=lambda x: x.get("priority", 0), reverse=True)
def _calculate_priority(self, file_count: int, avg_size_mb: float) -> int:
"""Calculate compaction priority (higher = more urgent)."""
priority = 0
if file_count > 1000:
priority += 50
elif file_count > 500:
priority += 30
elif file_count > 100:
priority += 10
if avg_size_mb < 16:
priority += 50
elif avg_size_mb < 32:
priority += 30
elif avg_size_mb < 64:
priority += 10
return priority
def generate_report(self, table_paths: list[str]) -> str:
"""Generate compaction status report."""
results = self.analyze_all_tables(table_paths)
report = "# Compaction Status Report\n\n"
report += f"Generated: {datetime.utcnow().isoformat()}\n\n"
for result in results:
if "error" in result:
report += f"- {result['table']}: ERROR - {result['error']}\n"
else:
status = "NEEDS COMPACTION" if result["needs_compaction"] else "OK"
report += f"- {result['table']}: {status}\n"
report += f" Files: {result['files']}, Avg Size: {result['avg_file_mb']:.1f}MB\n"
return report
# Usage
monitor = CompactionMonitor(spark)
tables = ["Tables/sales", "Tables/customers", "Tables/events"]
report = monitor.generate_report(tables)
print(report)
Best Practices
- Schedule regular compaction: Daily for high-volume tables
- Use partition-aware compaction: Avoid rewriting entire large tables
- Enable auto-compaction: For streaming or frequent write tables
- Set vacuum retention: Balance storage vs. time travel needs
- Monitor file statistics: Track avg file size over time
Conclusion
File compaction is essential for maintaining Delta Lake performance. Combine scheduled compaction with auto-compact features to keep your tables optimized. Monitor file statistics regularly and adjust thresholds based on your query patterns.
The goal is files between 128-256MB for optimal query performance in Microsoft Fabric and other analytics engines.