5 min read
Fabric Notebooks: Interactive Data Engineering and Science
Fabric Notebooks provide an interactive environment for data engineering and science workloads. Today, I will cover advanced notebook patterns and best practices.
Notebook Fundamentals
Fabric notebooks support multiple languages and seamlessly integrate with Lakehouses:
# Available languages in Fabric notebooks:
# - PySpark (default)
# - Spark SQL
# - Scala
# - R
# Magic commands for switching languages
# %%pyspark - PySpark (default)
# %%sql - Spark SQL
# %%scala - Scala
# %%r - R
# Notebook context
print(f"Spark version: {spark.version}")
print(f"Python version: {sys.version}")
print(f"Current user: {mssparkutils.credentials.getCurrentUser()}")
Working with Lakehouses
# Attach Lakehouse to notebook
# Done in Fabric UI: Add Lakehouse > Select Lakehouse
# Access default Lakehouse
# Tables appear in spark.catalog
spark.catalog.listTables()
# Read from attached Lakehouse
df = spark.read.format("delta").table("my_table")
# Write to attached Lakehouse
df.write.format("delta").mode("overwrite").saveAsTable("output_table")
# Access Files folder
files_df = spark.read.csv("Files/raw/data.csv", header=True)
# Access another Lakehouse with absolute path
other_df = spark.read.format("delta").load(
"abfss://workspace@onelake.dfs.fabric.microsoft.com/OtherLakehouse.Lakehouse/Tables/table_name"
)
Parameterized Notebooks
# Cell 1: Define parameters (tagged as "parameters")
# Tag the cell in Fabric UI: View > Cell tags > parameters
# Parameters with defaults
input_table = "raw_sales"
output_table = "processed_sales"
process_date = "2023-06-06"
batch_size = 10000
# Cell 2: Use parameters
print(f"Processing {input_table} for date {process_date}")
print(f"Output to {output_table}")
print(f"Batch size: {batch_size}")
Running Parameterized Notebooks from Pipeline
{
"name": "RunNotebookActivity",
"type": "NotebookActivity",
"typeProperties": {
"notebook": {
"referenceName": "ProcessSalesNotebook",
"type": "NotebookReference"
},
"parameters": {
"input_table": {
"value": "raw_sales",
"type": "String"
},
"output_table": {
"value": "processed_sales",
"type": "String"
},
"process_date": {
"value": "@formatDateTime(utcNow(), 'yyyy-MM-dd')",
"type": "Expression"
}
}
}
}
MSSparkUtils
# Notebook utilities
from notebookutils import mssparkutils
# File system operations
mssparkutils.fs.ls("Files/")
mssparkutils.fs.mkdirs("Files/output/")
mssparkutils.fs.cp("Files/source.csv", "Files/backup/source.csv")
mssparkutils.fs.rm("Files/temp/", recurse=True)
# Read file content
content = mssparkutils.fs.head("Files/config.json", 1000)
print(content)
# Mount external storage (ADLS Gen2)
mssparkutils.fs.mount(
"abfss://container@storage.dfs.core.windows.net/",
"/mnt/external",
{"linkedService": "ExternalStorageLinkedService"}
)
# Access mounted data
df = spark.read.csv("/mnt/external/data.csv")
# Unmount
mssparkutils.fs.unmount("/mnt/external")
# Credentials
token = mssparkutils.credentials.getToken("https://storage.azure.com/")
connection_string = mssparkutils.credentials.getConnectionStringOrCreds("KeyVaultLinkedService")
# Notebook orchestration
result = mssparkutils.notebook.run("./ChildNotebook", 600, {"param1": "value1"})
print(f"Child notebook result: {result}")
# Run multiple notebooks in parallel
mssparkutils.notebook.runMultiple([
{"path": "./Notebook1", "params": {"p1": "v1"}},
{"path": "./Notebook2", "params": {"p2": "v2"}}
])
Data Visualization
# Built-in visualization
df = spark.read.format("delta").table("sales_summary")
display(df) # Interactive table with chart options
# Matplotlib
import matplotlib.pyplot as plt
pdf = df.toPandas()
plt.figure(figsize=(12, 6))
plt.bar(pdf['category'], pdf['total_sales'])
plt.xlabel('Category')
plt.ylabel('Sales')
plt.title('Sales by Category')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# Seaborn
import seaborn as sns
plt.figure(figsize=(10, 8))
sns.heatmap(
pdf.pivot_table(values='sales', index='month', columns='category'),
annot=True,
fmt='.0f',
cmap='YlOrRd'
)
plt.title('Sales Heatmap')
plt.show()
# Plotly (interactive)
import plotly.express as px
fig = px.line(pdf, x='date', y='sales', color='category', title='Sales Trend')
fig.show()
Error Handling and Logging
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def process_with_logging(df, process_name):
"""Process data with comprehensive logging"""
start_time = datetime.now()
logger.info(f"Starting {process_name}")
logger.info(f"Input row count: {df.count()}")
try:
# Your processing logic
result_df = df.transform(your_transformation)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
logger.info(f"Output row count: {result_df.count()}")
logger.info(f"{process_name} completed in {duration:.2f} seconds")
return result_df
except Exception as e:
logger.error(f"Error in {process_name}: {str(e)}")
raise
# Usage
try:
df = spark.read.format("delta").table("source_table")
result = process_with_logging(df, "Sales Processing")
result.write.format("delta").mode("overwrite").saveAsTable("output_table")
except Exception as e:
logger.error(f"Pipeline failed: {e}")
# Send alert, update status, etc.
raise
Testing in Notebooks
# Unit testing patterns in notebooks
def test_transformation():
"""Test data transformation logic"""
# Create test data
test_data = [
("A", 100, "2023-01-01"),
("B", 200, "2023-01-02"),
("C", -50, "2023-01-03"), # Invalid: negative amount
]
test_df = spark.createDataFrame(test_data, ["product", "amount", "date"])
# Apply transformation
result_df = clean_sales_data(test_df)
# Assertions
assert result_df.count() == 2, "Should filter out negative amounts"
assert "amount_cleaned" in result_df.columns, "Should have cleaned amount column"
# Check specific values
row = result_df.filter("product = 'A'").first()
assert row["amount"] == 100, "Amount should be preserved"
print("All tests passed!")
def test_aggregation():
"""Test aggregation logic"""
test_data = [
("Electronics", 100),
("Electronics", 200),
("Clothing", 150),
]
test_df = spark.createDataFrame(test_data, ["category", "amount"])
result_df = aggregate_by_category(test_df)
electronics_total = result_df.filter("category = 'Electronics'").first()["total"]
assert electronics_total == 300, f"Expected 300, got {electronics_total}"
print("Aggregation tests passed!")
# Run tests
test_transformation()
test_aggregation()
Notebook Best Practices
best_practices = {
"organization": [
"Use clear cell descriptions/markdown",
"Group related code into sections",
"Keep notebooks focused on one task",
"Use meaningful variable names"
],
"performance": [
"Cache DataFrames used multiple times",
"Use appropriate number of partitions",
"Avoid collect() on large datasets",
"Use broadcast for small lookup tables"
],
"maintainability": [
"Use parameterized notebooks",
"Extract reusable functions to libraries",
"Version control notebooks via Git integration",
"Document assumptions and dependencies"
],
"debugging": [
"Use display() for intermediate results",
"Check row counts at each step",
"Use explain() to understand query plans",
"Log key metrics and timings"
]
}
# Example: Caching
frequently_used_df = spark.read.format("delta").table("large_dimension")
frequently_used_df.cache()
# Use it multiple times...
frequently_used_df.unpersist() # Clean up when done
# Example: Broadcast
from pyspark.sql.functions import broadcast
small_df = spark.read.format("delta").table("small_lookup") # < 10MB
large_df = spark.read.format("delta").table("large_fact")
result = large_df.join(broadcast(small_df), "key_column")
Notebooks are the primary development environment in Fabric. Tomorrow, I will cover Fabric Pipelines for orchestration.