5 min read
Databricks Notebook Workflows for Data Pipelines
Databricks Notebook Workflows for Data Pipelines
Notebook workflows in Databricks enable you to build modular, reusable data pipelines by chaining notebooks together. Let’s explore patterns for building robust production workflows.
Basic Notebook Chaining
Using dbutils.notebook.run
# main_workflow.py
# Orchestrator notebook
# Run notebooks sequentially
result1 = dbutils.notebook.run(
"/ETL/extract_data",
timeout_seconds=3600,
arguments={"date": "2021-10-29", "source": "sales"}
)
print(f"Extract completed: {result1}")
result2 = dbutils.notebook.run(
"/ETL/transform_data",
timeout_seconds=3600,
arguments={"date": "2021-10-29"}
)
print(f"Transform completed: {result2}")
result3 = dbutils.notebook.run(
"/ETL/load_data",
timeout_seconds=3600,
arguments={"date": "2021-10-29", "target": "warehouse"}
)
print(f"Load completed: {result3}")
Child Notebook with Return Value
# /ETL/extract_data.py
# Child notebook
# Get parameters
date = dbutils.widgets.get("date")
source = dbutils.widgets.get("source")
# Process data
records_count = spark.read.parquet(f"/data/raw/{source}/{date}").count()
# Return result
dbutils.notebook.exit(f"Extracted {records_count} records")
Parallel Execution
Using ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_notebook(notebook_path, params):
return dbutils.notebook.run(
notebook_path,
timeout_seconds=3600,
arguments=params
)
# Define parallel tasks
notebooks = [
("/ETL/process_sales", {"region": "east"}),
("/ETL/process_sales", {"region": "west"}),
("/ETL/process_sales", {"region": "north"}),
("/ETL/process_sales", {"region": "south"}),
]
# Execute in parallel
results = {}
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(run_notebook, nb, params): (nb, params)
for nb, params in notebooks
}
for future in as_completed(futures):
nb, params = futures[future]
try:
result = future.result()
results[params["region"]] = result
print(f"Completed {params['region']}: {result}")
except Exception as e:
print(f"Failed {params['region']}: {e}")
raise
print(f"All regions processed: {results}")
Error Handling
Try-Catch Pattern
# workflow_with_error_handling.py
import json
def safe_run_notebook(path, timeout, args):
try:
result = dbutils.notebook.run(path, timeout, args)
return {"status": "success", "result": result}
except Exception as e:
return {"status": "failed", "error": str(e)}
# Run with error handling
steps = [
("/ETL/step1_extract", {"source": "raw"}),
("/ETL/step2_transform", {}),
("/ETL/step3_load", {"target": "warehouse"}),
]
results = []
for path, args in steps:
print(f"Running {path}...")
result = safe_run_notebook(path, 3600, args)
results.append({"notebook": path, **result})
if result["status"] == "failed":
print(f"Pipeline failed at {path}")
# Send alert
dbutils.notebook.run("/Utilities/send_alert", 300, {
"message": f"Pipeline failed: {result['error']}",
"severity": "critical"
})
break
# Store results
dbutils.notebook.exit(json.dumps(results))
Retry Logic
import time
def run_with_retry(notebook_path, timeout, args, max_retries=3, delay=60):
for attempt in range(max_retries):
try:
result = dbutils.notebook.run(notebook_path, timeout, args)
return result
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
time.sleep(delay)
delay *= 2 # Exponential backoff
else:
raise Exception(f"Failed after {max_retries} attempts: {e}")
# Usage
result = run_with_retry(
"/ETL/unreliable_source",
timeout=3600,
args={"date": "2021-10-29"},
max_retries=3
)
Passing Data Between Notebooks
Using Temporary Tables
# notebook_1.py - Create temp table
df = spark.read.parquet("/data/raw/sales")
df_transformed = df.filter(col("amount") > 100)
df_transformed.createOrReplaceTempView("processed_sales")
# Save to temp location for next notebook
df_transformed.write.mode("overwrite").parquet("/tmp/workflow/processed_sales")
dbutils.notebook.exit("success")
# notebook_2.py - Read from temp table
df = spark.read.parquet("/tmp/workflow/processed_sales")
# Continue processing...
Using Delta Tables
# notebook_1.py - Write to Delta
df.write.format("delta").mode("overwrite").saveAsTable("workflow_temp.step1_output")
dbutils.notebook.exit("success")
# notebook_2.py - Read from Delta
df = spark.table("workflow_temp.step1_output")
# Process and save next step
df_processed.write.format("delta").mode("overwrite").saveAsTable("workflow_temp.step2_output")
Workflow Patterns
DAG-Style Pipeline
# dag_workflow.py
from concurrent.futures import ThreadPoolExecutor, wait
class WorkflowDAG:
def __init__(self):
self.results = {}
def run_task(self, task_id, notebook_path, args, dependencies=[]):
# Wait for dependencies
for dep in dependencies:
if dep not in self.results:
raise Exception(f"Dependency {dep} not completed")
if self.results[dep]["status"] != "success":
raise Exception(f"Dependency {dep} failed")
# Run notebook
result = dbutils.notebook.run(notebook_path, 3600, args)
self.results[task_id] = {"status": "success", "result": result}
return result
# Define DAG
dag = WorkflowDAG()
# Level 0: No dependencies
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(dag.run_task, "extract_sales", "/ETL/extract_sales", {}),
executor.submit(dag.run_task, "extract_inventory", "/ETL/extract_inventory", {}),
]
wait(futures)
# Level 1: Depends on extracts
dag.run_task("transform", "/ETL/transform", {}, ["extract_sales", "extract_inventory"])
# Level 2: Depends on transform
dag.run_task("load", "/ETL/load", {}, ["transform"])
print(f"DAG completed: {dag.results}")
Factory Pattern
# workflow_factory.py
class NotebookWorkflow:
def __init__(self, name, base_path):
self.name = name
self.base_path = base_path
self.steps = []
def add_step(self, name, notebook, args=None, timeout=3600):
self.steps.append({
"name": name,
"notebook": f"{self.base_path}/{notebook}",
"args": args or {},
"timeout": timeout
})
return self
def run(self):
results = []
for step in self.steps:
print(f"Running step: {step['name']}")
result = dbutils.notebook.run(
step["notebook"],
step["timeout"],
step["args"]
)
results.append({"step": step["name"], "result": result})
return results
# Usage
workflow = (NotebookWorkflow("daily_etl", "/Production/ETL")
.add_step("extract", "extract", {"date": "2021-10-29"})
.add_step("transform", "transform")
.add_step("load", "load", {"target": "warehouse"})
.add_step("validate", "validate"))
results = workflow.run()
Widget-Based Parameterization
Define Widgets
# Create widgets for interactive use
dbutils.widgets.text("date", "", "Processing Date")
dbutils.widgets.dropdown("environment", "dev", ["dev", "staging", "prod"], "Environment")
dbutils.widgets.multiselect("regions", "all", ["all", "east", "west", "north", "south"], "Regions")
# Get widget values
date = dbutils.widgets.get("date")
env = dbutils.widgets.get("environment")
regions = dbutils.widgets.get("regions").split(",")
print(f"Processing {date} for {env} environment, regions: {regions}")
Pass Widgets to Child Notebooks
# Main notebook
date = dbutils.widgets.get("date")
env = dbutils.widgets.get("environment")
# Pass to child notebooks
dbutils.notebook.run("/ETL/process", 3600, {
"date": date,
"environment": env
})
Monitoring and Logging
Workflow Logging
import datetime
import json
class WorkflowLogger:
def __init__(self, workflow_name):
self.workflow_name = workflow_name
self.start_time = datetime.datetime.now()
self.logs = []
def log(self, step, status, message="", duration=None):
entry = {
"timestamp": datetime.datetime.now().isoformat(),
"workflow": self.workflow_name,
"step": step,
"status": status,
"message": message,
"duration_seconds": duration
}
self.logs.append(entry)
print(json.dumps(entry))
def save_log(self):
log_path = f"/logs/workflows/{self.workflow_name}/{self.start_time.strftime('%Y%m%d_%H%M%S')}.json"
dbutils.fs.put(log_path, json.dumps(self.logs, indent=2), overwrite=True)
return log_path
# Usage
logger = WorkflowLogger("daily_etl")
import time
start = time.time()
result = dbutils.notebook.run("/ETL/extract", 3600, {})
duration = time.time() - start
logger.log("extract", "success", result, duration)
# Save logs
log_path = logger.save_log()
Best Practices
- Keep notebooks focused - Each notebook should do one thing well
- Use meaningful return values - Return useful information from notebooks
- Implement timeouts - Set appropriate timeout values
- Handle failures gracefully - Implement retry and alerting
- Log extensively - Track workflow progress and results
- Parameterize everything - Make notebooks reusable
- Use Delta for intermediate data - ACID guarantees between steps
Conclusion
Notebook workflows provide a flexible way to orchestrate data pipelines in Databricks. By combining sequential and parallel execution with proper error handling, you can build robust production workflows.
Tomorrow, we’ll explore Git integration in Databricks for version control.