Back to Blog
8 min read

Delta Lake Everywhere: The Universal Table Format

Delta Lake has become the de facto standard for lakehouse table formats. Microsoft Fabric’s native support for Delta Lake enables interoperability across platforms while providing ACID transactions, time travel, and schema evolution.

Why Delta Lake?

┌─────────────────────────────────────────────────────────────┐
│                    Traditional Data Lake                     │
│                                                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐    │
│  │ Parquet  │  │  JSON    │  │   CSV    │  │   ORC    │    │
│  │  Files   │  │  Files   │  │  Files   │  │  Files   │    │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘    │
│       │             │             │             │           │
│       └─────────────┴─────────────┴─────────────┘           │
│                           │                                  │
│                    No Transactions                          │
│                    No Schema Enforcement                    │
│                    No Time Travel                           │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                      Delta Lake                              │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │              Parquet Files + Delta Log                │   │
│  │                                                       │   │
│  │   data/                                              │   │
│  │   ├── part-00000.parquet                             │   │
│  │   ├── part-00001.parquet                             │   │
│  │   └── _delta_log/                                    │   │
│  │       ├── 00000000000000000000.json                  │   │
│  │       ├── 00000000000000000001.json                  │   │
│  │       └── 00000000000000000002.checkpoint.parquet    │   │
│  └──────────────────────────────────────────────────────┘   │
│                           │                                  │
│                    ACID Transactions                        │
│                    Schema Evolution                         │
│                    Time Travel                              │
│                    Audit History                            │
└─────────────────────────────────────────────────────────────┘

Delta Lake in Microsoft Fabric

Native Delta Support

# All Fabric lakehouses use Delta Lake natively

# Create Delta table
df = spark.createDataFrame([
    (1, "Alice", "Engineering"),
    (2, "Bob", "Sales"),
    (3, "Carol", "Marketing")
], ["id", "name", "department"])

# Save as Delta table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("employees")

# Or save to Files folder
df.write.format("delta") \
    .mode("overwrite") \
    .save("Files/raw/employees")

# Verify it's Delta
from delta.tables import DeltaTable

print(DeltaTable.isDeltaTable(spark, "Tables/employees"))  # True

Schema Evolution

from delta.tables import DeltaTable
from pyspark.sql.types import *

class DeltaSchemaManager:
    """Manage Delta Lake schema evolution."""

    def __init__(self, spark, table_path: str):
        self.spark = spark
        self.table_path = table_path
        self.delta_table = DeltaTable.forPath(spark, table_path)

    def get_current_schema(self) -> StructType:
        """Get current table schema."""
        return self.delta_table.toDF().schema

    def add_column(self, column_name: str, data_type: DataType):
        """Add new column to schema."""

        # Delta handles this automatically with mergeSchema
        df_with_new_col = self.delta_table.toDF().withColumn(
            column_name,
            lit(None).cast(data_type)
        )

        df_with_new_col.write.format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "true") \
            .save(self.table_path)

    def evolve_schema_on_write(self, new_df):
        """Write data with schema evolution enabled."""

        new_df.write.format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .save(self.table_path)

    def overwrite_schema(self, new_df):
        """Completely replace schema (use with caution)."""

        new_df.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(self.table_path)

# Usage
schema_manager = DeltaSchemaManager(spark, "Tables/employees")

# Check current schema
current = schema_manager.get_current_schema()
print("Current columns:", [f.name for f in current.fields])

# Add new data with additional column
new_employees = spark.createDataFrame([
    (4, "David", "Finance", "2024-08-14")
], ["id", "name", "department", "hire_date"])

schema_manager.evolve_schema_on_write(new_employees)

Time Travel

class DeltaTimeTravel:
    """Time travel operations for Delta tables."""

    def __init__(self, spark, table_path: str):
        self.spark = spark
        self.table_path = table_path

    def read_at_version(self, version: int):
        """Read table at specific version."""

        return self.spark.read.format("delta") \
            .option("versionAsOf", version) \
            .load(self.table_path)

    def read_at_timestamp(self, timestamp: str):
        """Read table at specific timestamp."""

        return self.spark.read.format("delta") \
            .option("timestampAsOf", timestamp) \
            .load(self.table_path)

    def get_history(self, limit: int = 10):
        """Get table version history."""

        delta_table = DeltaTable.forPath(self.spark, self.table_path)
        return delta_table.history(limit)

    def restore_to_version(self, version: int):
        """Restore table to previous version."""

        delta_table = DeltaTable.forPath(self.spark, self.table_path)
        delta_table.restoreToVersion(version)

    def restore_to_timestamp(self, timestamp: str):
        """Restore table to timestamp."""

        delta_table = DeltaTable.forPath(self.spark, self.table_path)
        delta_table.restoreToTimestamp(timestamp)

    def compare_versions(self, version1: int, version2: int):
        """Compare data between two versions."""

        df1 = self.read_at_version(version1)
        df2 = self.read_at_version(version2)

        added = df2.exceptAll(df1)
        removed = df1.exceptAll(df2)

        return {
            "added_rows": added.count(),
            "removed_rows": removed.count(),
            "added": added,
            "removed": removed
        }

# Usage
time_travel = DeltaTimeTravel(spark, "Tables/employees")

# View history
history = time_travel.get_history()
display(history.select("version", "timestamp", "operation", "operationParameters"))

# Read previous version
old_data = time_travel.read_at_version(2)

# Compare versions
changes = time_travel.compare_versions(1, 3)
print(f"Added: {changes['added_rows']}, Removed: {changes['removed_rows']}")

# Restore if needed
time_travel.restore_to_version(2)

ACID Transactions

Concurrent Writes

from delta.tables import DeltaTable

class ConcurrentWriteHandler:
    """Handle concurrent writes with Delta Lake."""

    def __init__(self, spark, table_path: str):
        self.spark = spark
        self.table_path = table_path

    def upsert_with_merge(self, updates_df, key_columns: list[str]):
        """Upsert data using MERGE (handles concurrent writes)."""

        delta_table = DeltaTable.forPath(self.spark, self.table_path)

        # Build merge condition
        merge_condition = " AND ".join([
            f"target.{col} = updates.{col}" for col in key_columns
        ])

        # Execute merge
        delta_table.alias("target").merge(
            updates_df.alias("updates"),
            merge_condition
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

    def conditional_update(
        self,
        updates_df,
        key_columns: list[str],
        update_condition: str
    ):
        """Update only if condition is met."""

        delta_table = DeltaTable.forPath(self.spark, self.table_path)

        merge_condition = " AND ".join([
            f"target.{col} = updates.{col}" for col in key_columns
        ])

        delta_table.alias("target").merge(
            updates_df.alias("updates"),
            merge_condition
        ).whenMatchedUpdate(
            condition=update_condition,
            set={"*": "updates.*"}
        ).whenNotMatchedInsertAll() \
         .execute()

    def delete_with_condition(self, condition: str):
        """Delete rows matching condition."""

        delta_table = DeltaTable.forPath(self.spark, self.table_path)
        delta_table.delete(condition)

    def update_column(self, condition: str, column: str, value):
        """Update specific column value."""

        delta_table = DeltaTable.forPath(self.spark, self.table_path)
        delta_table.update(
            condition=condition,
            set={column: value}
        )

# Usage
handler = ConcurrentWriteHandler(spark, "Tables/orders")

# Upsert new orders
new_orders = spark.createDataFrame([
    (1001, "2024-08-14", "customer_1", 150.00, "pending"),
    (1002, "2024-08-14", "customer_2", 200.00, "pending")
], ["order_id", "order_date", "customer_id", "amount", "status"])

handler.upsert_with_merge(new_orders, key_columns=["order_id"])

# Conditional update: only update if status is 'pending'
updated_orders = spark.createDataFrame([
    (1001, "2024-08-14", "customer_1", 160.00, "confirmed")
], ["order_id", "order_date", "customer_id", "amount", "status"])

handler.conditional_update(
    updated_orders,
    key_columns=["order_id"],
    update_condition="target.status = 'pending'"
)

Delta Lake Features

Change Data Feed

# Enable Change Data Feed on table
spark.sql("""
    ALTER TABLE orders
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read changes since version
class DeltaCDC:
    """Change Data Feed operations."""

    def __init__(self, spark, table_path: str):
        self.spark = spark
        self.table_path = table_path

    def get_changes_since_version(self, start_version: int):
        """Get changes since specific version."""

        return self.spark.read.format("delta") \
            .option("readChangeFeed", "true") \
            .option("startingVersion", start_version) \
            .load(self.table_path)

    def get_changes_in_range(
        self,
        start_version: int,
        end_version: int
    ):
        """Get changes between versions."""

        return self.spark.read.format("delta") \
            .option("readChangeFeed", "true") \
            .option("startingVersion", start_version) \
            .option("endingVersion", end_version) \
            .load(self.table_path)

    def get_changes_since_timestamp(self, timestamp: str):
        """Get changes since timestamp."""

        return self.spark.read.format("delta") \
            .option("readChangeFeed", "true") \
            .option("startingTimestamp", timestamp) \
            .load(self.table_path)

    def process_changes(self, changes_df):
        """Process change data feed."""

        # _change_type can be: insert, update_preimage, update_postimage, delete
        inserts = changes_df.filter("_change_type = 'insert'")
        updates = changes_df.filter("_change_type = 'update_postimage'")
        deletes = changes_df.filter("_change_type = 'delete'")

        return {
            "inserts": inserts,
            "updates": updates,
            "deletes": deletes,
            "insert_count": inserts.count(),
            "update_count": updates.count(),
            "delete_count": deletes.count()
        }

# Usage
cdc = DeltaCDC(spark, "Tables/orders")

changes = cdc.get_changes_since_version(5)
processed = cdc.process_changes(changes)

print(f"Inserts: {processed['insert_count']}")
print(f"Updates: {processed['update_count']}")
print(f"Deletes: {processed['delete_count']}")

Deletion Vectors

# Deletion Vectors provide faster deletes without rewriting files

# Enable deletion vectors
spark.sql("""
    ALTER TABLE large_table
    SET TBLPROPERTIES (
        'delta.enableDeletionVectors' = 'true'
    )
""")

# Deletes now mark rows as deleted without rewriting
# Much faster for tables with many files

# Configure deletion vector behavior
deletion_vector_config = {
    "delta.enableDeletionVectors": "true",
    "delta.targetFileSize": "134217728",  # 128MB
    "delta.tuneFileSizesForRewrites": "true"
}

for key, value in deletion_vector_config.items():
    spark.sql(f"ALTER TABLE large_table SET TBLPROPERTIES ('{key}' = '{value}')")

Liquid Clustering

# Liquid Clustering replaces Z-Order for better performance
# Available in newer Delta Lake versions

# Create table with liquid clustering
spark.sql("""
    CREATE TABLE sales_clustered (
        order_id BIGINT,
        customer_id BIGINT,
        order_date DATE,
        amount DECIMAL(10,2),
        region STRING
    )
    USING DELTA
    CLUSTER BY (region, order_date)
""")

# Clustering happens automatically during writes
# No need for manual OPTIMIZE ZORDER

# Or convert existing table
spark.sql("""
    ALTER TABLE existing_table
    CLUSTER BY (region, date)
""")

Cross-Platform Compatibility

Reading Delta from Different Platforms

# Databricks
df = spark.read.format("delta").load("abfss://container@storage.dfs.core.windows.net/delta/table")

# Synapse
df = spark.read.format("delta").load("abfss://container@storage.dfs.core.windows.net/delta/table")

# Fabric
df = spark.read.format("delta").table("lakehouse.table")

# Python (deltalake library)
import deltalake

dt = deltalake.DeltaTable("abfss://container@storage.dfs.core.windows.net/delta/table")
df = dt.to_pandas()

# Rust/Python (delta-rs)
from deltalake import DeltaTable

dt = DeltaTable("./path/to/delta/table")
print(dt.schema())

Version Compatibility

# Check and set Delta protocol versions

def check_protocol(table_path: str):
    """Check Delta protocol version."""

    delta_table = DeltaTable.forPath(spark, table_path)
    detail = delta_table.detail().first()

    return {
        "min_reader_version": detail.minReaderVersion,
        "min_writer_version": detail.minWriterVersion,
        "table_features": detail.tableFeatures if hasattr(detail, 'tableFeatures') else None
    }

def set_compatible_protocol(table_path: str):
    """Set protocol for maximum compatibility."""

    # Version 1/2 is most compatible
    spark.sql(f"""
        ALTER TABLE delta.`{table_path}`
        SET TBLPROPERTIES (
            'delta.minReaderVersion' = '1',
            'delta.minWriterVersion' = '2'
        )
    """)

def upgrade_protocol(table_path: str, reader: int, writer: int):
    """Upgrade protocol version for new features."""

    delta_table = DeltaTable.forPath(spark, table_path)
    delta_table.upgradeTableProtocol(reader, writer)

# Check compatibility
protocol = check_protocol("Tables/my_table")
print(f"Reader: {protocol['min_reader_version']}, Writer: {protocol['min_writer_version']}")

Best Practices

Table Optimization

class DeltaOptimizer:
    """Best practices for Delta table optimization."""

    def __init__(self, spark, table_path: str):
        self.spark = spark
        self.table_path = table_path
        self.delta_table = DeltaTable.forPath(spark, table_path)

    def compact_files(self):
        """Compact small files."""
        self.delta_table.optimize().executeCompaction()

    def zorder_optimize(self, columns: list[str]):
        """Optimize with Z-Order on specified columns."""
        self.delta_table.optimize().executeZOrderBy(*columns)

    def vacuum(self, retention_hours: int = 168):
        """Remove old files."""
        self.delta_table.vacuum(retention_hours)

    def analyze_stats(self):
        """Compute statistics for query optimization."""
        self.spark.sql(f"ANALYZE TABLE delta.`{self.table_path}` COMPUTE STATISTICS")

    def set_auto_optimize(self):
        """Enable automatic optimization."""
        self.spark.sql(f"""
            ALTER TABLE delta.`{self.table_path}`
            SET TBLPROPERTIES (
                'delta.autoOptimize.optimizeWrite' = 'true',
                'delta.autoOptimize.autoCompact' = 'true'
            )
        """)

    def full_optimization_routine(self, zorder_columns: list[str] = None):
        """Run full optimization routine."""

        # 1. Compact files
        print("Compacting files...")
        self.compact_files()

        # 2. Z-Order if columns specified
        if zorder_columns:
            print(f"Z-Ordering on {zorder_columns}...")
            self.zorder_optimize(zorder_columns)

        # 3. Vacuum old files
        print("Vacuuming old files...")
        self.vacuum()

        # 4. Compute statistics
        print("Computing statistics...")
        self.analyze_stats()

        print("Optimization complete!")

# Usage
optimizer = DeltaOptimizer(spark, "Tables/sales")
optimizer.full_optimization_routine(zorder_columns=["region", "date"])

Conclusion

Delta Lake provides a reliable, performant foundation for modern data platforms. Its ACID transactions, time travel, and schema evolution capabilities make it ideal for both analytical and operational workloads.

In Microsoft Fabric, Delta Lake is the native format, ensuring compatibility with the broader Delta ecosystem while providing tight integration with Fabric services like Direct Lake and SQL endpoints.

Embrace Delta Lake as your standard table format across all your data platforms for maximum interoperability and data reliability.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.