2 min read
Azure Databricks Jobs: Scheduled and Triggered Workflows
Databricks Jobs orchestrate notebook and Spark job execution. Schedule recurring jobs, trigger on events, or chain dependent tasks.
Creating a Job (UI)
- Jobs → Create Job
- Configure task (notebook, JAR, Python)
- Set cluster configuration
- Define schedule or trigger
Jobs API
import requests
databricks_instance = "https://adb-xxx.azuredatabricks.net"
token = "dapi..."
# Create job
job_spec = {
"name": "Daily ETL",
"tasks": [
{
"task_key": "extract",
"notebook_task": {
"notebook_path": "/ETL/Extract"
},
"new_cluster": {
"spark_version": "10.4.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"num_workers": 2
}
},
{
"task_key": "transform",
"depends_on": [{"task_key": "extract"}],
"notebook_task": {
"notebook_path": "/ETL/Transform"
},
"job_cluster_key": "shared_cluster"
},
{
"task_key": "load",
"depends_on": [{"task_key": "transform"}],
"notebook_task": {
"notebook_path": "/ETL/Load"
},
"job_cluster_key": "shared_cluster"
}
],
"job_clusters": [
{
"job_cluster_key": "shared_cluster",
"new_cluster": {
"spark_version": "10.4.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"num_workers": 4
}
}
],
"schedule": {
"quartz_cron_expression": "0 0 6 * * ?",
"timezone_id": "America/New_York"
}
}
response = requests.post(
f"{databricks_instance}/api/2.1/jobs/create",
headers={"Authorization": f"Bearer {token}"},
json=job_spec
)
job_id = response.json()["job_id"]
Multi-Task Workflows
┌──────────┐
│ Extract │
└────┬─────┘
│
┌────────┴────────┐
│ │
┌───┴───┐ ┌───┴───┐
│Trans A│ │Trans B│
└───┬───┘ └───┬───┘
│ │
└────────┬────────┘
│
┌────┴────┐
│ Load │
└─────────┘
Parameterized Jobs
# Notebook with parameters
dbutils.widgets.text("date", "2020-11-03")
dbutils.widgets.dropdown("environment", "dev", ["dev", "staging", "prod"])
date = dbutils.widgets.get("date")
env = dbutils.widgets.get("environment")
# Use parameters
df = spark.read.parquet(f"/data/{env}/sales/{date}")
# Run with parameters
response = requests.post(
f"{databricks_instance}/api/2.1/jobs/run-now",
headers={"Authorization": f"Bearer {token}"},
json={
"job_id": job_id,
"notebook_params": {
"date": "2020-11-03",
"environment": "prod"
}
}
)
run_id = response.json()["run_id"]
Job Clusters vs All-Purpose
# Job cluster (ephemeral, cost-effective)
"new_cluster": {
"spark_version": "10.4.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"num_workers": 4,
"spark_conf": {
"spark.speculation": "true"
}
}
# Existing cluster (for development)
"existing_cluster_id": "0101-123456-abcd"
Alerting
job_spec["email_notifications"] = {
"on_start": ["team@company.com"],
"on_success": ["team@company.com"],
"on_failure": ["alerts@company.com", "oncall@company.com"]
}
job_spec["webhook_notifications"] = {
"on_failure": [
{"id": "slack-webhook-id"}
]
}
Retry Policies
{
"task_key": "flaky_task",
"max_retries": 3,
"min_retry_interval_millis": 60000,
"retry_on_timeout": True
}
Monitoring Runs
# Get run status
response = requests.get(
f"{databricks_instance}/api/2.1/jobs/runs/get",
headers={"Authorization": f"Bearer {token}"},
params={"run_id": run_id}
)
status = response.json()["state"]["life_cycle_state"]
# PENDING, RUNNING, TERMINATED
Databricks Jobs: production data workflows made simple.