4 min read
Apache Spark in Microsoft Fabric: Getting Started
Apache Spark is the data engineering engine in Microsoft Fabric. Today we’ll explore how Spark works in Fabric and how to use it effectively for data transformation.
Spark in Fabric Overview
Fabric provides a managed Spark experience:
# Fabric Spark characteristics
fabric_spark = {
"runtime": "Spark 3.4 with Delta Lake 2.4",
"languages": ["Python (PySpark)", "Scala", "SparkSQL", "R"],
"management": "Fully managed - no cluster configuration",
"startup": "Starter pools for fast spin-up",
"integration": "Native OneLake integration"
}
# Key differences from Databricks/Synapse:
# - No cluster creation required
# - Fixed compute configurations in preview
# - Automatic session management
# - Built-in Lakehouse integration
Creating a Spark Notebook
# 1. In your workspace, click "+ New" > "Notebook"
# 2. Name your notebook
# 3. Attach to a Lakehouse (provides default context)
# Once created, you have access to:
# - spark: SparkSession (pre-configured)
# - sc: SparkContext
# - dbutils: Utility functions
Basic Spark Operations
Reading Data
# Read from Lakehouse Tables
df = spark.read.format("delta").table("sales")
# Read from Lakehouse Files
csv_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("Files/raw/sales/*.csv")
json_df = spark.read.format("json") \
.load("Files/raw/events/*.json")
parquet_df = spark.read.format("parquet") \
.load("Files/raw/data.parquet")
# Read with explicit schema (recommended for production)
from pyspark.sql.types import *
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("date", DateType(), True)
])
df = spark.read.format("csv") \
.schema(schema) \
.option("header", "true") \
.load("Files/raw/data.csv")
Transformations
from pyspark.sql.functions import *
# Select and filter
result = df.select("id", "name", "amount") \
.filter(col("amount") > 100) \
.orderBy(col("amount").desc())
# Add computed columns
df_enriched = df.withColumn(
"amount_with_tax",
col("amount") * 1.1
).withColumn(
"processed_date",
current_timestamp()
)
# Aggregations
summary = df.groupBy("category") \
.agg(
count("*").alias("count"),
sum("amount").alias("total"),
avg("amount").alias("average"),
min("amount").alias("min_amount"),
max("amount").alias("max_amount")
)
# Window functions
from pyspark.sql.window import Window
window_spec = Window.partitionBy("customer_id") \
.orderBy("order_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_running_total = df.withColumn(
"running_total",
sum("amount").over(window_spec)
)
Joins
# Inner join
result = orders.join(
customers,
orders.customer_id == customers.id,
"inner"
).select(
orders.order_id,
customers.name,
orders.amount
)
# Left join with aliasing
from pyspark.sql.functions import col
result = orders.alias("o").join(
products.alias("p"),
col("o.product_id") == col("p.id"),
"left"
).select(
col("o.order_id"),
col("p.product_name"),
col("o.quantity")
)
# Broadcast join for small tables
from pyspark.sql.functions import broadcast
result = large_orders.join(
broadcast(small_lookup),
"lookup_key"
)
Writing Data
# Write to Delta table
df.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("processed_sales")
# Write modes:
# - "overwrite": Replace entire table
# - "append": Add to existing data
# - "ignore": Skip if table exists
# - "error": Fail if table exists (default)
# Write with partitioning
df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.saveAsTable("sales_partitioned")
# Write to Files folder (non-Delta)
df.write \
.format("parquet") \
.mode("overwrite") \
.save("Files/processed/sales_output")
Spark SQL
# Register DataFrame as temp view
df.createOrReplaceTempView("temp_sales")
# Run SQL queries
result = spark.sql("""
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM temp_sales
WHERE order_date >= '2023-01-01'
GROUP BY customer_id
HAVING COUNT(*) > 5
ORDER BY total_amount DESC
""")
# Query Lakehouse tables directly
sales_df = spark.sql("SELECT * FROM sales_lakehouse.sales")
# Create permanent views
spark.sql("""
CREATE OR REPLACE VIEW sales_summary AS
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(amount) as monthly_total
FROM sales
GROUP BY DATE_TRUNC('month', order_date)
""")
Spark Configuration
# View current Spark configuration
for item in spark.sparkContext.getConf().getAll():
print(f"{item[0]}: {item[1]}")
# Set session-level configurations
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Delta-specific settings
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
# Note: Some settings are fixed in Fabric and cannot be changed
Using dbutils
# File system utilities
dbutils.fs.ls("Files/")
dbutils.fs.cp("Files/source.csv", "Files/destination.csv")
dbutils.fs.mv("Files/old_name", "Files/new_name")
dbutils.fs.rm("Files/to_delete", recurse=True)
dbutils.fs.mkdirs("Files/new_folder/subfolder")
# Notebook utilities
dbutils.notebook.run("./other_notebook", timeout_seconds=300)
# Parameter handling
# In parent notebook:
dbutils.notebook.run("./child_notebook", 300, {"param1": "value1"})
# In child notebook:
param1 = dbutils.widgets.get("param1")
Performance Tips
# 1. Cache frequently used DataFrames
df.cache()
df.count() # Triggers caching
# Use unpersist when done
df.unpersist()
# 2. Use appropriate partitioning
# Repartition for even distribution
df.repartition(100)
# Coalesce to reduce partitions (no shuffle)
df.coalesce(10)
# 3. Use broadcast for small tables
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "key")
# 4. Push down predicates
# Filter early, select only needed columns
df.filter(col("date") > "2023-01-01") \
.select("id", "name", "amount")
# 5. Use Delta's optimization features
spark.sql("OPTIMIZE large_table ZORDER BY (customer_id)")
Error Handling
from pyspark.sql.utils import AnalysisException
try:
df = spark.read.table("non_existent_table")
except AnalysisException as e:
print(f"Table not found: {e}")
# Validate data before processing
def validate_dataframe(df, required_columns):
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
return df
# Schema validation
expected_schema = StructType([...])
if df.schema != expected_schema:
print("Schema mismatch detected")
Tomorrow we’ll dive deeper into Fabric notebooks and their unique features.