5 min read
Azure Data Factory Until Activity: Implementing Loops and Polling Patterns
The Until Activity in Azure Data Factory executes activities repeatedly until a specified condition is met. It’s perfect for polling scenarios, waiting for external processes, or implementing retry logic with custom conditions.
Basic Until Structure
{
"name": "WaitForFileReady",
"type": "Until",
"typeProperties": {
"expression": {
"value": "@equals(variables('fileReady'), true)",
"type": "Expression"
},
"timeout": "0.01:00:00",
"activities": [
{
"name": "CheckFile",
"type": "GetMetadata",
"typeProperties": {
"dataset": {
"referenceName": "SourceFileDataset",
"type": "DatasetReference"
},
"fieldList": ["exists"]
}
},
{
"name": "SetFileReadyVariable",
"type": "SetVariable",
"dependsOn": [
{
"activity": "CheckFile",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "fileReady",
"value": {
"value": "@activity('CheckFile').output.exists",
"type": "Expression"
}
}
},
{
"name": "WaitBeforeRetry",
"type": "Wait",
"dependsOn": [
{
"activity": "SetFileReadyVariable",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"waitTimeInSeconds": 60
}
}
]
}
}
Polling External API
{
"name": "PollExternalJob",
"properties": {
"variables": {
"jobStatus": { "type": "String", "defaultValue": "Running" },
"pollCount": { "type": "Int", "defaultValue": "0" }
},
"activities": [
{
"name": "StartExternalJob",
"type": "WebActivity",
"typeProperties": {
"url": "https://api.external-service.com/jobs",
"method": "POST",
"body": {
"jobType": "DataProcessing",
"parameters": "@pipeline().parameters.jobParams"
}
}
},
{
"name": "WaitForJobCompletion",
"type": "Until",
"dependsOn": [
{
"activity": "StartExternalJob",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"expression": {
"value": "@or(equals(variables('jobStatus'), 'Completed'), equals(variables('jobStatus'), 'Failed'))",
"type": "Expression"
},
"timeout": "0.02:00:00",
"activities": [
{
"name": "CheckJobStatus",
"type": "WebActivity",
"typeProperties": {
"url": {
"value": "@concat('https://api.external-service.com/jobs/', activity('StartExternalJob').output.jobId, '/status')",
"type": "Expression"
},
"method": "GET"
}
},
{
"name": "UpdateJobStatus",
"type": "SetVariable",
"dependsOn": [
{
"activity": "CheckJobStatus",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "jobStatus",
"value": {
"value": "@activity('CheckJobStatus').output.status",
"type": "Expression"
}
}
},
{
"name": "IncrementPollCount",
"type": "SetVariable",
"dependsOn": [
{
"activity": "UpdateJobStatus",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "pollCount",
"value": {
"value": "@add(variables('pollCount'), 1)",
"type": "Expression"
}
}
},
{
"name": "WaitBetweenPolls",
"type": "Wait",
"dependsOn": [
{
"activity": "IncrementPollCount",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"waitTimeInSeconds": 30
}
}
]
}
},
{
"name": "CheckFinalStatus",
"type": "IfCondition",
"dependsOn": [
{
"activity": "WaitForJobCompletion",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"expression": {
"value": "@equals(variables('jobStatus'), 'Failed')",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "FailPipeline",
"type": "Fail",
"typeProperties": {
"message": "External job failed",
"errorCode": "500"
}
}
]
}
}
]
}
}
Retry Pattern with Exponential Backoff
{
"name": "RetryWithBackoff",
"properties": {
"variables": {
"success": { "type": "Bool", "defaultValue": "false" },
"retryCount": { "type": "Int", "defaultValue": "0" },
"waitSeconds": { "type": "Int", "defaultValue": "5" }
},
"activities": [
{
"name": "RetryLoop",
"type": "Until",
"typeProperties": {
"expression": {
"value": "@or(variables('success'), greater(variables('retryCount'), 5))",
"type": "Expression"
},
"timeout": "0.00:30:00",
"activities": [
{
"name": "AttemptOperation",
"type": "WebActivity",
"typeProperties": {
"url": "https://api.service.com/operation",
"method": "POST"
}
},
{
"name": "SetSuccess",
"type": "SetVariable",
"dependsOn": [
{
"activity": "AttemptOperation",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "success",
"value": true
}
},
{
"name": "IncrementRetry",
"type": "SetVariable",
"dependsOn": [
{
"activity": "AttemptOperation",
"dependencyConditions": ["Failed"]
}
],
"typeProperties": {
"variableName": "retryCount",
"value": {
"value": "@add(variables('retryCount'), 1)",
"type": "Expression"
}
}
},
{
"name": "CalculateBackoff",
"type": "SetVariable",
"dependsOn": [
{
"activity": "IncrementRetry",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "waitSeconds",
"value": {
"value": "@mul(5, variables('retryCount'))",
"type": "Expression"
}
}
},
{
"name": "WaitBeforeRetry",
"type": "Wait",
"dependsOn": [
{
"activity": "CalculateBackoff",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"waitTimeInSeconds": {
"value": "@variables('waitSeconds')",
"type": "Expression"
}
}
}
]
}
}
]
}
}
Waiting for Database Availability
{
"name": "WaitForDatabase",
"type": "Until",
"typeProperties": {
"expression": {
"value": "@variables('databaseReady')",
"type": "Expression"
},
"timeout": "0.00:30:00",
"activities": [
{
"name": "TestConnection",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT 1 as IsAlive"
},
"dataset": {
"referenceName": "TargetDatabase",
"type": "DatasetReference"
}
},
"policy": {
"timeout": "0.00:00:30"
}
},
{
"name": "SetDatabaseReady",
"type": "SetVariable",
"dependsOn": [
{
"activity": "TestConnection",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "databaseReady",
"value": true
}
},
{
"name": "HandleConnectionFailure",
"type": "Wait",
"dependsOn": [
{
"activity": "TestConnection",
"dependencyConditions": ["Failed"]
}
],
"typeProperties": {
"waitTimeInSeconds": 10
}
}
]
}
}
Processing Items Until Empty
{
"name": "ProcessQueueUntilEmpty",
"properties": {
"variables": {
"hasMoreItems": { "type": "Bool", "defaultValue": "true" }
},
"activities": [
{
"name": "ProcessUntilEmpty",
"type": "Until",
"typeProperties": {
"expression": {
"value": "@equals(variables('hasMoreItems'), false)",
"type": "Expression"
},
"timeout": "0.12:00:00",
"activities": [
{
"name": "GetBatch",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT TOP 100 * FROM ProcessingQueue WHERE Status = 'Pending' ORDER BY CreatedDate"
},
"firstRowOnly": false
}
},
{
"name": "CheckIfMoreItems",
"type": "SetVariable",
"dependsOn": [
{
"activity": "GetBatch",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "hasMoreItems",
"value": {
"value": "@greater(activity('GetBatch').output.count, 0)",
"type": "Expression"
}
}
},
{
"name": "ProcessBatch",
"type": "ForEach",
"dependsOn": [
{
"activity": "CheckIfMoreItems",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": "@activity('GetBatch').output.value",
"isSequential": false,
"batchCount": 10,
"activities": [
{
"name": "ProcessItem",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "ProcessItemPipeline",
"type": "PipelineReference"
},
"parameters": {
"itemId": "@item().Id"
}
}
}
]
}
}
]
}
}
]
}
}
Monitoring Until Activity
# Python - Monitor Until activity iterations
def monitor_until_activity(client, resource_group, factory_name, run_id,
until_activity_name):
"""Monitor iterations of an Until activity"""
iterations = []
start_time = datetime.utcnow()
while True:
activity_runs = client.activity_runs.query_by_pipeline_run(
resource_group,
factory_name,
run_id,
{
'lastUpdatedAfter': start_time - timedelta(hours=1),
'lastUpdatedBefore': datetime.utcnow()
}
)
until_activities = [
a for a in activity_runs.value
if until_activity_name in a.activity_name
]
for activity in until_activities:
iteration_info = {
'iteration': len(iterations) + 1,
'status': activity.status,
'start_time': activity.activity_run_start,
'end_time': activity.activity_run_end,
'duration_ms': activity.duration_in_ms
}
iterations.append(iteration_info)
# Check if Until completed
main_activity = next(
(a for a in activity_runs.value
if a.activity_name == until_activity_name),
None
)
if main_activity and main_activity.status in ['Succeeded', 'Failed']:
break
time.sleep(10)
return {
'total_iterations': len(iterations),
'final_status': main_activity.status if main_activity else 'Unknown',
'iterations': iterations
}
Best Practices
- Always set timeout: Prevent infinite loops
- Include wait activities: Avoid tight polling loops
- Log iteration status: Track progress within the loop
- Handle failure states: Don’t just wait for success
- Use variables wisely: Track state between iterations
The Until Activity enables sophisticated control flow patterns that go beyond simple linear pipelines, allowing you to build resilient integrations that wait for and respond to external events.