5 min read
Azure Data Factory ForEach Activity: Parallel Processing Patterns
The ForEach Activity in Azure Data Factory enables iteration over collections, allowing you to process multiple items in parallel or sequentially. It’s essential for scenarios like loading multiple tables, processing file batches, or executing dynamic workflows.
Basic ForEach Structure
{
"name": "ForEachTable",
"type": "ForEach",
"typeProperties": {
"items": {
"value": "@pipeline().parameters.tableList",
"type": "Expression"
},
"isSequential": false,
"batchCount": 20,
"activities": [
{
"name": "CopyTable",
"type": "Copy",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "@concat('SELECT * FROM ', item().schemaName, '.', item().tableName)",
"type": "Expression"
}
},
"sink": {
"type": "ParquetSink"
}
}
}
]
}
}
Parallel vs Sequential Processing
{
"name": "ParallelProcessing",
"properties": {
"activities": [
{
"name": "ParallelForEach",
"type": "ForEach",
"typeProperties": {
"items": {
"value": "@activity('GetTables').output.value",
"type": "Expression"
},
"isSequential": false,
"batchCount": 50,
"activities": [
{
"name": "ProcessInParallel",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "CopyTablePipeline",
"type": "PipelineReference"
},
"parameters": {
"tableName": "@item().tableName"
}
}
}
]
}
},
{
"name": "SequentialForEach",
"type": "ForEach",
"dependsOn": [
{
"activity": "ParallelForEach",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetDependentTables').output.value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "ProcessSequentially",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "ProcessDependentTable",
"type": "PipelineReference"
}
}
}
]
}
}
]
}
}
ForEach with Lookup
{
"name": "DynamicForEachPipeline",
"properties": {
"activities": [
{
"name": "GetSourceTables",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT SchemaName, TableName, WatermarkColumn, TargetFolder FROM ETL.SourceConfig WHERE IsEnabled = 1"
},
"dataset": {
"referenceName": "ConfigDataset",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ProcessEachTable",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetSourceTables",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetSourceTables').output.value",
"type": "Expression"
},
"isSequential": false,
"batchCount": 10,
"activities": [
{
"name": "CopyToDataLake",
"type": "Copy",
"inputs": [
{
"referenceName": "SourceSqlDataset",
"type": "DatasetReference",
"parameters": {
"schemaName": "@item().SchemaName",
"tableName": "@item().TableName"
}
}
],
"outputs": [
{
"referenceName": "DataLakeParquetDataset",
"type": "DatasetReference",
"parameters": {
"folderPath": {
"value": "@concat(item().TargetFolder, '/', formatDateTime(utcnow(), 'yyyy/MM/dd'))",
"type": "Expression"
}
}
}
],
"typeProperties": {
"source": {
"type": "AzureSqlSource"
},
"sink": {
"type": "ParquetSink"
}
}
}
]
}
}
]
}
}
Nested ForEach Loops
{
"name": "NestedForEachPipeline",
"properties": {
"activities": [
{
"name": "GetDatabases",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT DatabaseName, ServerName FROM ETL.Databases WHERE IsEnabled = 1"
},
"firstRowOnly": false
}
},
{
"name": "ForEachDatabase",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetDatabases",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": "@activity('GetDatabases').output.value",
"isSequential": false,
"batchCount": 5,
"activities": [
{
"name": "GetTablesForDatabase",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "@concat('SELECT TableName FROM ETL.Tables WHERE DatabaseName = ''', item().DatabaseName, '''')",
"type": "Expression"
}
},
"firstRowOnly": false
}
},
{
"name": "ForEachTable",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetTablesForDatabase",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": "@activity('GetTablesForDatabase').output.value",
"isSequential": false,
"batchCount": 10,
"activities": [
{
"name": "ProcessTable",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "CopyTablePipeline",
"type": "PipelineReference"
},
"parameters": {
"serverName": {
"value": "@pipeline().parameters.currentDatabase.ServerName",
"type": "Expression"
},
"databaseName": {
"value": "@pipeline().parameters.currentDatabase.DatabaseName",
"type": "Expression"
},
"tableName": "@item().TableName"
}
}
}
]
}
}
]
}
}
]
}
}
Error Handling in ForEach
{
"name": "ForEachWithErrorHandling",
"type": "ForEach",
"typeProperties": {
"items": "@activity('GetItems').output.value",
"isSequential": false,
"batchCount": 10,
"activities": [
{
"name": "TryCopy",
"type": "Copy",
"typeProperties": {
"source": { "type": "AzureSqlSource" },
"sink": { "type": "ParquetSink" }
}
},
{
"name": "LogSuccess",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "TryCopy",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"storedProcedureName": "sp_LogCopyResult",
"storedProcedureParameters": {
"TableName": { "value": "@item().TableName" },
"Status": { "value": "Success" },
"RowsCopied": { "value": "@activity('TryCopy').output.rowsCopied" }
}
}
},
{
"name": "LogFailure",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "TryCopy",
"dependencyConditions": ["Failed"]
}
],
"typeProperties": {
"storedProcedureName": "sp_LogCopyResult",
"storedProcedureParameters": {
"TableName": { "value": "@item().TableName" },
"Status": { "value": "Failed" },
"ErrorMessage": {
"value": "@activity('TryCopy').error.message"
}
}
}
}
]
}
}
Batching Large Collections
{
"name": "BatchProcessingPipeline",
"properties": {
"parameters": {
"batchSize": { "type": "Int", "defaultValue": 100 }
},
"variables": {
"batchNumber": { "type": "Int" }
},
"activities": [
{
"name": "GetTotalCount",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT COUNT(*) as TotalCount FROM ETL.ItemsToProcess"
},
"firstRowOnly": true
}
},
{
"name": "CalculateBatches",
"type": "SetVariable",
"dependsOn": [
{
"activity": "GetTotalCount",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"variableName": "batchNumber",
"value": {
"value": "@div(add(activity('GetTotalCount').output.firstRow.TotalCount, pipeline().parameters.batchSize), pipeline().parameters.batchSize)",
"type": "Expression"
}
}
},
{
"name": "ForEachBatch",
"type": "ForEach",
"dependsOn": [
{
"activity": "CalculateBatches",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": {
"value": "@range(0, int(variables('batchNumber')))",
"type": "Expression"
},
"isSequential": false,
"batchCount": 5,
"activities": [
{
"name": "ProcessBatch",
"type": "Copy",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "@concat('SELECT * FROM ETL.ItemsToProcess ORDER BY Id OFFSET ', mul(item(), pipeline().parameters.batchSize), ' ROWS FETCH NEXT ', pipeline().parameters.batchSize, ' ROWS ONLY')",
"type": "Expression"
}
},
"sink": { "type": "ParquetSink" }
}
}
]
}
}
]
}
}
Collecting ForEach Results
# Python - Process ForEach activity outputs
def collect_foreach_results(pipeline_run_client, run_id, foreach_activity_name):
"""
Collect results from all ForEach iterations
Note: ForEach doesn't directly expose iteration outputs.
You need to log results to a table/blob during execution.
"""
activity_runs = pipeline_run_client.query_activity_runs(
resource_group,
factory_name,
run_id,
{
'lastUpdatedAfter': datetime.utcnow() - timedelta(days=1),
'lastUpdatedBefore': datetime.utcnow()
}
)
results = []
for activity in activity_runs.value:
if activity.activity_name.startswith(foreach_activity_name):
results.append({
'iteration': activity.activity_name,
'status': activity.status,
'duration': activity.duration_in_ms,
'output': activity.output
})
return results
Best Practices
- Set appropriate batchCount: Balance parallelism with resource limits
- Use sequential for dependencies: When order matters
- Implement error handling: Log failures within the loop
- Avoid deeply nested loops: Use child pipelines instead
- Monitor iteration performance: Identify slow items
The ForEach Activity is the key to building scalable, efficient data pipelines that can process thousands of items in parallel while maintaining control and visibility over each iteration.