Back to Blog
5 min read

Synapse Data Engineering in Microsoft Fabric: Spark at Scale

Synapse Data Engineering in Microsoft Fabric provides Apache Spark capabilities for large-scale data processing. Today, I will explore how to use Spark notebooks and jobs for data engineering workloads in Fabric.

Data Engineering in Fabric

The Data Engineering workload in Fabric includes:

  • Notebooks: Interactive Spark development
  • Spark Job Definitions: Production Spark jobs
  • Lakehouse: Storage for Spark workloads
  • Environment: Spark configuration and libraries
┌─────────────────────────────────────────────────────┐
│           Synapse Data Engineering                   │
├─────────────────────────────────────────────────────┤
│  ┌───────────┐  ┌───────────┐  ┌───────────┐       │
│  │ Notebooks │  │ Spark Job │  │Environment│       │
│  │           │  │Definitions│  │           │       │
│  └───────────┘  └───────────┘  └───────────┘       │
│                       │                             │
│                       ▼                             │
│  ┌─────────────────────────────────────────┐       │
│  │            Apache Spark Pool            │       │
│  │  (Managed, auto-scaling, auto-pause)    │       │
│  └─────────────────────────────────────────┘       │
│                       │                             │
│                       ▼                             │
│  ┌─────────────────────────────────────────┐       │
│  │               Lakehouse                  │       │
│  └─────────────────────────────────────────┘       │
└─────────────────────────────────────────────────────┘

Fabric Notebooks

Creating Your First Notebook

# Cell 1: Verify Spark session
print(f"Spark version: {spark.version}")
print(f"Default parallelism: {spark.sparkContext.defaultParallelism}")

# In Fabric, Spark sessions start automatically
# No cluster configuration needed

Reading Data from Lakehouse

# Cell 2: Read from attached Lakehouse

# Read a Delta table (Tables folder)
customers_df = spark.read.format("delta").table("customers")
display(customers_df.limit(10))

# Read files (Files folder)
raw_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("Files/raw/sales_data.csv")

# Read from another Lakehouse using absolute path
other_lakehouse_df = spark.read.format("delta").load(
    "abfss://workspace@onelake.dfs.fabric.microsoft.com/OtherLakehouse.Lakehouse/Tables/products"
)

Data Transformation Patterns

# Cell 3: Common transformation patterns
from pyspark.sql.functions import (
    col, lit, when, coalesce,
    year, month, dayofmonth,
    sum as spark_sum, avg, count, countDistinct,
    row_number, dense_rank,
    explode, split, trim, lower, upper,
    to_timestamp, date_format, datediff
)
from pyspark.sql.window import Window

# Read source data
sales_df = spark.read.format("delta").table("raw_sales")

# Data cleaning
cleaned_df = sales_df \
    .withColumn("customer_name", trim(col("customer_name"))) \
    .withColumn("email", lower(col("email"))) \
    .withColumn("amount", col("amount").cast("double")) \
    .withColumn("sale_date", to_timestamp(col("sale_date"), "yyyy-MM-dd")) \
    .filter(col("amount").isNotNull()) \
    .filter(col("amount") > 0) \
    .dropDuplicates(["order_id"])

# Add derived columns
enriched_df = cleaned_df \
    .withColumn("year", year(col("sale_date"))) \
    .withColumn("month", month(col("sale_date"))) \
    .withColumn("day", dayofmonth(col("sale_date"))) \
    .withColumn("amount_category",
        when(col("amount") > 1000, "high")
        .when(col("amount") > 100, "medium")
        .otherwise("low"))

display(enriched_df.limit(10))

Window Functions

# Cell 4: Window functions for analytics
from pyspark.sql.window import Window

# Define window specs
customer_window = Window.partitionBy("customer_id").orderBy(col("sale_date").desc())
monthly_window = Window.partitionBy("year", "month").orderBy(col("amount").desc())

# Add analytics columns
analytics_df = enriched_df \
    .withColumn("customer_rank", row_number().over(customer_window)) \
    .withColumn("monthly_rank", dense_rank().over(monthly_window)) \
    .withColumn("customer_running_total",
        spark_sum("amount").over(
            Window.partitionBy("customer_id")
                  .orderBy("sale_date")
                  .rowsBetween(Window.unboundedPreceding, Window.currentRow)
        ))

# Get latest order per customer
latest_orders = analytics_df.filter(col("customer_rank") == 1)
display(latest_orders.select("customer_id", "sale_date", "amount", "customer_running_total"))

Aggregations

# Cell 5: Build aggregation tables
# Monthly summary
monthly_summary = enriched_df \
    .groupBy("year", "month") \
    .agg(
        spark_sum("amount").alias("total_sales"),
        avg("amount").alias("avg_order_value"),
        count("order_id").alias("order_count"),
        countDistinct("customer_id").alias("unique_customers")
    ) \
    .orderBy("year", "month")

# Write to Lakehouse
monthly_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("monthly_sales_summary")

# Customer summary
customer_summary = enriched_df \
    .groupBy("customer_id", "customer_name") \
    .agg(
        spark_sum("amount").alias("lifetime_value"),
        count("order_id").alias("total_orders"),
        avg("amount").alias("avg_order_value")
    ) \
    .orderBy(col("lifetime_value").desc())

customer_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("customer_summary")

Incremental Processing

# Cell 6: Incremental data processing with Delta Lake
from delta.tables import DeltaTable

# Check if target table exists
if spark.catalog.tableExists("sales_facts"):
    # Perform incremental merge
    target = DeltaTable.forName(spark, "sales_facts")

    target.alias("target") \
        .merge(
            enriched_df.alias("source"),
            "target.order_id = source.order_id"
        ) \
        .whenMatchedUpdate(set={
            "amount": "source.amount",
            "customer_name": "source.customer_name",
            "updated_at": "current_timestamp()"
        }) \
        .whenNotMatchedInsert(values={
            "order_id": "source.order_id",
            "customer_id": "source.customer_id",
            "customer_name": "source.customer_name",
            "amount": "source.amount",
            "sale_date": "source.sale_date",
            "year": "source.year",
            "month": "source.month",
            "created_at": "current_timestamp()",
            "updated_at": "current_timestamp()"
        }) \
        .execute()

    print("Incremental merge completed")
else:
    # Initial load
    enriched_df \
        .withColumn("created_at", current_timestamp()) \
        .withColumn("updated_at", current_timestamp()) \
        .write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable("sales_facts")

    print("Initial load completed")

Spark Job Definitions

For production workloads, use Spark Job Definitions:

# main.py - Spark Job Definition
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sum as spark_sum

def main():
    spark = SparkSession.builder.getOrCreate()

    # Get parameters
    source_table = sys.argv[1] if len(sys.argv) > 1 else "raw_sales"
    target_table = sys.argv[2] if len(sys.argv) > 2 else "sales_summary"

    # Read source
    df = spark.read.format("delta").table(source_table)

    # Transform
    summary = df \
        .withColumn("year", year(col("sale_date"))) \
        .withColumn("month", month(col("sale_date"))) \
        .groupBy("year", "month") \
        .agg(spark_sum("amount").alias("total_sales"))

    # Write
    summary.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(target_table)

    print(f"Job completed. Wrote {summary.count()} rows to {target_table}")

if __name__ == "__main__":
    main()

Environment Configuration

# environment.yml
name: data-engineering-env
dependencies:
  - pip:
    - pandas==2.0.0
    - numpy==1.24.0
    - scikit-learn==1.2.0
    - great-expectations==0.16.0

# Spark configuration
spark_config:
  spark.sql.shuffle.partitions: 200
  spark.sql.adaptive.enabled: true
  spark.sql.adaptive.coalescePartitions.enabled: true

Performance Optimization

# Cell 7: Performance best practices

# 1. Partition strategy for large tables
large_df.write \
    .format("delta") \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .saveAsTable("partitioned_sales")

# 2. Z-ordering for query optimization
spark.sql("OPTIMIZE partitioned_sales ZORDER BY (customer_id)")

# 3. Cache frequently used DataFrames
frequently_used_df = spark.read.format("delta").table("dimension_products")
frequently_used_df.cache()

# 4. Broadcast small tables for joins
from pyspark.sql.functions import broadcast

small_df = spark.read.format("delta").table("dim_regions")  # < 10MB
large_df = spark.read.format("delta").table("sales_facts")  # Large

result = large_df.join(broadcast(small_df), "region_id")

# 5. Use Delta table stats
spark.sql("ANALYZE TABLE sales_facts COMPUTE STATISTICS FOR ALL COLUMNS")

Synapse Data Engineering in Fabric provides a managed Spark experience that removes infrastructure concerns while delivering the full power of Apache Spark. Tomorrow, I will cover Synapse Data Science for machine learning workloads.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.