5 min read
Parameterized Pipelines in Azure Data Factory: Building Reusable Data Workflows
Parameterized pipelines in Azure Data Factory enable you to build flexible, reusable data workflows. Instead of creating separate pipelines for each data source or destination, you can create a single parameterized pipeline that handles multiple scenarios.
Defining Pipeline Parameters
{
"name": "ParameterizedCopyPipeline",
"properties": {
"parameters": {
"sourceContainer": {
"type": "String",
"defaultValue": "rawdata"
},
"sourceFolder": {
"type": "String"
},
"sourceFileName": {
"type": "String",
"defaultValue": "*.csv"
},
"sinkSchema": {
"type": "String",
"defaultValue": "staging"
},
"sinkTable": {
"type": "String"
},
"loadDate": {
"type": "String",
"defaultValue": "@utcnow('yyyy-MM-dd')"
},
"isFullLoad": {
"type": "Bool",
"defaultValue": false
}
},
"activities": [
{
"name": "CopyData",
"type": "Copy",
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"wildcardFolderPath": {
"value": "@pipeline().parameters.sourceFolder",
"type": "Expression"
},
"wildcardFileName": {
"value": "@pipeline().parameters.sourceFileName",
"type": "Expression"
}
}
},
"sink": {
"type": "AzureSqlSink",
"preCopyScript": {
"value": "@if(pipeline().parameters.isFullLoad, concat('TRUNCATE TABLE ', pipeline().parameters.sinkSchema, '.', pipeline().parameters.sinkTable), '')",
"type": "Expression"
}
}
}
}
]
}
}
Parameterized Datasets
{
"name": "GenericBlobDataset",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"containerName": { "type": "String" },
"folderPath": { "type": "String" },
"fileName": { "type": "String" }
},
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"container": {
"value": "@dataset().containerName",
"type": "Expression"
},
"folderPath": {
"value": "@dataset().folderPath",
"type": "Expression"
},
"fileName": {
"value": "@dataset().fileName",
"type": "Expression"
}
},
"columnDelimiter": ",",
"firstRowAsHeader": true
}
}
}
{
"name": "GenericSqlTableDataset",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSqlDatabase",
"type": "LinkedServiceReference"
},
"parameters": {
"schemaName": { "type": "String" },
"tableName": { "type": "String" }
},
"type": "AzureSqlTable",
"typeProperties": {
"schema": {
"value": "@dataset().schemaName",
"type": "Expression"
},
"table": {
"value": "@dataset().tableName",
"type": "Expression"
}
}
}
}
Using Parameters in Copy Activity
{
"name": "CopyWithParameterizedDatasets",
"type": "Copy",
"inputs": [
{
"referenceName": "GenericBlobDataset",
"type": "DatasetReference",
"parameters": {
"containerName": {
"value": "@pipeline().parameters.sourceContainer",
"type": "Expression"
},
"folderPath": {
"value": "@pipeline().parameters.sourceFolder",
"type": "Expression"
},
"fileName": {
"value": "@pipeline().parameters.sourceFileName",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "GenericSqlTableDataset",
"type": "DatasetReference",
"parameters": {
"schemaName": {
"value": "@pipeline().parameters.sinkSchema",
"type": "Expression"
},
"tableName": {
"value": "@pipeline().parameters.sinkTable",
"type": "Expression"
}
}
}
]
}
Metadata-Driven Pipeline
{
"name": "MetadataDrivenPipeline",
"properties": {
"parameters": {
"configTableName": {
"type": "String",
"defaultValue": "ETL.PipelineConfig"
}
},
"activities": [
{
"name": "GetPipelineConfig",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "SELECT * FROM @{pipeline().parameters.configTableName} WHERE IsEnabled = 1",
"type": "Expression"
}
},
"dataset": {
"referenceName": "ConfigDataset",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "ForEachTable",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetPipelineConfig",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetPipelineConfig').output.value",
"type": "Expression"
},
"isSequential": false,
"batchCount": 10,
"activities": [
{
"name": "ExecuteCopyPipeline",
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "ParameterizedCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceContainer": {
"value": "@item().SourceContainer",
"type": "Expression"
},
"sourceFolder": {
"value": "@item().SourceFolder",
"type": "Expression"
},
"sinkSchema": {
"value": "@item().SinkSchema",
"type": "Expression"
},
"sinkTable": {
"value": "@item().SinkTable",
"type": "Expression"
}
},
"waitOnCompletion": true
}
}
]
}
}
]
}
}
Configuration Table Schema
-- SQL Server configuration table
CREATE TABLE ETL.PipelineConfig (
ConfigID INT IDENTITY(1,1) PRIMARY KEY,
PipelineName NVARCHAR(100) NOT NULL,
SourceSystem NVARCHAR(50) NOT NULL,
SourceContainer NVARCHAR(100) NOT NULL,
SourceFolder NVARCHAR(500) NOT NULL,
SourceFilePattern NVARCHAR(100) DEFAULT '*.csv',
SinkSchema NVARCHAR(50) NOT NULL,
SinkTable NVARCHAR(100) NOT NULL,
LoadType NVARCHAR(20) DEFAULT 'Incremental', -- Full, Incremental
WatermarkColumn NVARCHAR(100) NULL,
IsEnabled BIT DEFAULT 1,
LastLoadDate DATETIME NULL,
CreatedDate DATETIME DEFAULT GETUTCDATE(),
ModifiedDate DATETIME DEFAULT GETUTCDATE()
);
-- Sample configuration data
INSERT INTO ETL.PipelineConfig
(PipelineName, SourceSystem, SourceContainer, SourceFolder, SinkSchema, SinkTable, LoadType, WatermarkColumn)
VALUES
('LoadCustomers', 'CRM', 'crm-exports', 'customers/daily', 'staging', 'Customers', 'Incremental', 'ModifiedDate'),
('LoadOrders', 'ERP', 'erp-data', 'orders/2021', 'staging', 'Orders', 'Incremental', 'OrderDate'),
('LoadProducts', 'MDM', 'master-data', 'products', 'staging', 'Products', 'Full', NULL);
Parameterized Linked Services
{
"name": "ParameterizedSqlLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"parameters": {
"serverName": { "type": "String" },
"databaseName": { "type": "String" }
},
"typeProperties": {
"connectionString": {
"value": "Server=tcp:@{linkedService().serverName}.database.windows.net,1433;Database=@{linkedService().databaseName};Authentication=Active Directory Managed Identity;",
"type": "Expression"
}
}
}
}
Triggering Parameterized Pipelines
# Python - Trigger parameterized pipeline
from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
import uuid
class ADFPipelineTrigger:
def __init__(self, subscription_id, resource_group, factory_name):
self.credential = DefaultAzureCredential()
self.client = DataFactoryManagementClient(
self.credential, subscription_id
)
self.resource_group = resource_group
self.factory_name = factory_name
def trigger_with_parameters(self, pipeline_name, parameters):
"""Trigger pipeline with specific parameters"""
run_id = str(uuid.uuid4())
run_response = self.client.pipelines.create_run(
self.resource_group,
self.factory_name,
pipeline_name,
parameters=parameters
)
return run_response.run_id
def trigger_multiple_loads(self, pipeline_name, configs):
"""Trigger multiple pipeline runs"""
run_ids = []
for config in configs:
parameters = {
'sourceContainer': config['source_container'],
'sourceFolder': config['source_folder'],
'sinkSchema': config['sink_schema'],
'sinkTable': config['sink_table'],
'isFullLoad': config.get('is_full_load', False)
}
run_id = self.trigger_with_parameters(pipeline_name, parameters)
run_ids.append({
'config': config,
'run_id': run_id
})
return run_ids
# Usage
trigger = ADFPipelineTrigger(subscription_id, resource_group, factory_name)
configs = [
{
'source_container': 'sales',
'source_folder': 'orders/2021/08',
'sink_schema': 'staging',
'sink_table': 'Orders',
'is_full_load': False
},
{
'source_container': 'crm',
'source_folder': 'customers',
'sink_schema': 'staging',
'sink_table': 'Customers',
'is_full_load': True
}
]
runs = trigger.trigger_multiple_loads('ParameterizedCopyPipeline', configs)
Best Practices
- Use defaults wisely: Provide sensible defaults for optional parameters
- Validate parameters: Add validation activities where needed
- Document parameters: Use descriptions for clarity
- Keep datasets generic: Parameterize at the dataset level
- Use metadata tables: Store configuration externally
Parameterized pipelines dramatically reduce maintenance overhead and enable rapid deployment of new data integration scenarios without code changes.