Back to Blog
7 min read

Photon Engine: Accelerating Spark Workloads on Databricks

Photon is Databricks’ next-generation query engine that dramatically accelerates Apache Spark workloads. Written in C++, it provides native vectorized execution for SQL and DataFrame operations.

What is Photon?

Photon is a native execution engine that:

  • Replaces Spark’s JVM-based execution for supported operations
  • Uses vectorized processing (SIMD) for CPU efficiency
  • Operates directly on columnar data in memory
  • Provides 2-8x performance improvements for typical workloads

How Photon Works

Traditional Spark:
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Catalyst   │ -> │  Tungsten   │ -> │    JVM      │
│  Optimizer  │    │  Codegen    │    │  Execution  │
└─────────────┘    └─────────────┘    └─────────────┘

With Photon:
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Catalyst   │ -> │   Photon    │ -> │   Native    │
│  Optimizer  │    │   Codegen   │    │  C++ Engine │
└─────────────┘    └─────────────┘    └─────────────┘

Enabling Photon

Cluster Configuration

# Create Photon-enabled cluster via API
cluster_config = {
    "cluster_name": "photon-analytics",
    "spark_version": "11.3.x-photon-scala2.12",  # Photon runtime
    "node_type_id": "Standard_D4ds_v5",
    "num_workers": 4,
    "runtime_engine": "PHOTON",
    "spark_conf": {
        "spark.databricks.photon.enabled": "true",
        "spark.databricks.photon.parquetWriter.enabled": "true"
    }
}

response = requests.post(
    f"{workspace_url}/api/2.0/clusters/create",
    headers=headers,
    json=cluster_config
)

SQL Warehouse

# Photon is automatically enabled for Pro and Serverless warehouses
warehouse_config = {
    "name": "photon-warehouse",
    "enable_photon": True,
    "warehouse_type": "PRO"
}

Photon-Optimized Operations

Supported Operations

Photon excels at:

-- Aggregations
SELECT region, SUM(sales), AVG(price), COUNT(DISTINCT customer_id)
FROM transactions
GROUP BY region;

-- Joins
SELECT t.*, c.customer_name
FROM transactions t
JOIN customers c ON t.customer_id = c.customer_id;

-- Filters and projections
SELECT product_id, quantity, price * quantity as total
FROM order_items
WHERE quantity > 0 AND price < 1000;

-- Window functions
SELECT
    customer_id,
    order_date,
    amount,
    SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date) as running_total
FROM orders;

-- Sorting
SELECT * FROM large_table ORDER BY timestamp DESC;

Performance Comparison

# Benchmark Photon vs traditional Spark

def benchmark_query(spark, query, num_runs=5):
    """Run query multiple times and return statistics"""
    import time

    times = []
    for _ in range(num_runs):
        start = time.time()
        spark.sql(query).collect()
        elapsed = time.time() - start
        times.append(elapsed)

    return {
        "min": min(times),
        "max": max(times),
        "avg": sum(times) / len(times)
    }

# Complex aggregation benchmark
aggregation_query = """
SELECT
    region,
    product_category,
    YEAR(transaction_date) as year,
    MONTH(transaction_date) as month,
    COUNT(*) as transaction_count,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_transaction,
    PERCENTILE(amount, 0.5) as median_amount
FROM production.sales.transactions
WHERE transaction_date >= '2021-01-01'
GROUP BY region, product_category, YEAR(transaction_date), MONTH(transaction_date)
"""

# Results typically show 2-5x improvement with Photon
result = benchmark_query(spark, aggregation_query)
print(f"Average query time: {result['avg']:.2f}s")

Monitoring Photon Execution

Query Plans

# Check if Photon is being used
df = spark.sql("""
    SELECT region, SUM(amount) FROM transactions GROUP BY region
""")

# Explain the query plan
df.explain(mode="extended")

# Look for "PhotonGroupingAgg", "PhotonScan", etc. in the plan

Spark UI Metrics

# Access Photon-specific metrics
def get_photon_metrics(spark_session):
    """Get Photon execution metrics from Spark UI"""
    sc = spark_session.sparkContext
    status_tracker = sc.statusTracker()

    # Get active jobs
    job_ids = status_tracker.getActiveJobIds()

    metrics = []
    for job_id in job_ids:
        job_info = status_tracker.getJobInfo(job_id)
        if job_info:
            metrics.append({
                "job_id": job_id,
                "status": str(job_info.status),
                "stage_ids": job_info.stageIds
            })

    return metrics

# In Spark UI, look for:
# - "Photon" operators in the query plan
# - "photon" metrics in stage details
# - CPU time vs elapsed time (Photon has better CPU utilization)

Optimizing for Photon

Data Format Optimization

# Photon works best with Delta Lake and Parquet
# Optimize column encoding

# Write optimized Delta table
df.write \
    .format("delta") \
    .option("optimizeWrite", "true") \
    .option("autoCompact", "true") \
    .saveAsTable("optimized_table")

# Use appropriate data types
# - Use INT instead of STRING for numeric data
# - Use DATE instead of STRING for dates
# - Avoid deeply nested structures

Query Optimization for Photon

-- Photon handles predicate pushdown well
-- Place filters early
SELECT * FROM transactions
WHERE transaction_date = '2022-03-01'  -- Pushed to scan
AND amount > 100;

-- Use column pruning
SELECT id, name, amount  -- Only needed columns
FROM transactions
WHERE region = 'US';

-- Leverage Delta Lake statistics
OPTIMIZE transactions
ZORDER BY (region, transaction_date);

-- This allows Photon to skip files efficiently

Memory Configuration

# Photon uses off-heap memory
spark_conf = {
    # Photon memory settings
    "spark.databricks.photon.memoryPressure.enabled": "true",
    "spark.databricks.photon.spillToDisk.enabled": "true",

    # General memory settings that affect Photon
    "spark.memory.offHeap.enabled": "true",
    "spark.memory.offHeap.size": "4g"
}

When Photon Falls Back

Photon falls back to Spark for unsupported operations:

-- Operations that may not use Photon:
-- 1. Complex UDFs
SELECT my_python_udf(column) FROM table;  -- Falls back

-- 2. Certain data types
SELECT parse_complex_json(json_column) FROM table;  -- May fall back

-- 3. Unsupported functions
-- Check documentation for specific function support

-- Check if Photon was used
EXPLAIN SELECT * FROM table WHERE condition;
-- Look for "Photon" in the plan output

Photon for ETL Workloads

# Photon accelerates ETL operations

# Large-scale transformation
transformed_df = (
    spark.read.table("raw.events")
    .filter(col("event_date") >= "2022-01-01")
    .groupBy("user_id", "event_type")
    .agg(
        count("*").alias("event_count"),
        sum("value").alias("total_value"),
        first("device_type").alias("primary_device")
    )
    .withColumn("processed_date", current_date())
)

# Write back - Photon accelerates the read and transform
# Write uses Photon parquet writer when enabled
transformed_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("analytics.user_events_summary")

Cost-Performance Analysis

def analyze_photon_roi(cluster_id, time_range_days=30):
    """Analyze Photon ROI based on query performance"""

    # Get query history
    queries = get_query_history(time_range_days)

    photon_queries = [q for q in queries if q.get('used_photon', False)]
    non_photon_queries = [q for q in queries if not q.get('used_photon', False)]

    if not photon_queries or not non_photon_queries:
        return None

    # Calculate average speedup
    photon_avg_time = sum(q['duration_ms'] for q in photon_queries) / len(photon_queries)
    spark_avg_time = sum(q['duration_ms'] for q in non_photon_queries) / len(non_photon_queries)

    speedup = spark_avg_time / photon_avg_time if photon_avg_time > 0 else 0

    # Calculate DBU savings
    # Photon uses same DBU rate but finishes faster
    total_dbus = sum(q['dbus'] for q in photon_queries)
    estimated_non_photon_dbus = total_dbus * speedup

    return {
        "queries_analyzed": len(photon_queries) + len(non_photon_queries),
        "average_speedup": speedup,
        "photon_avg_duration_ms": photon_avg_time,
        "spark_avg_duration_ms": spark_avg_time,
        "estimated_dbu_savings": estimated_non_photon_dbus - total_dbus
    }

Best Practices

Cluster Sizing with Photon

# Photon is more CPU-efficient, so consider:
# - Use compute-optimized instances (D-series on Azure)
# - May need less total nodes due to efficiency gains
# - Monitor memory usage - Photon has different memory patterns

recommended_configs = {
    "small_workload": {
        "node_type": "Standard_D4ds_v5",
        "workers": 2,
        "with_photon": True
    },
    "medium_workload": {
        "node_type": "Standard_D8ds_v5",
        "workers": 4,
        "with_photon": True
    },
    "large_workload": {
        "node_type": "Standard_D16ds_v5",
        "workers": 8,
        "with_photon": True
    }
}

Query Patterns

-- DO: Use Photon-friendly patterns
-- Simple aggregations
SELECT category, SUM(sales) FROM products GROUP BY category;

-- Standard joins
SELECT * FROM orders o JOIN customers c ON o.cust_id = c.id;

-- DON'T: Avoid patterns that force fallback
-- Complex Python UDFs
-- Heavily nested structures
-- Unsupported data types

Conclusion

Photon represents a significant advancement in query execution for Databricks. By leveraging native code execution and vectorized processing, it delivers substantial performance improvements without requiring query changes.

Key takeaways:

  • Enable Photon on all production workloads (default for Pro/Serverless)
  • Use Delta Lake for optimal Photon performance
  • Monitor query plans to ensure Photon is being utilized
  • Optimize table layout with ZORDER for best file pruning
  • Consider Photon’s efficiency when sizing clusters

The performance gains often translate directly to cost savings, making Photon a clear win for most Spark workloads.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.