4 min read
Building Data Pipelines with Azure Data Factory
Azure Data Factory is the go-to service for orchestrating data movement and transformation at scale. With organizations increasingly relying on data for decisions, building robust data pipelines is essential. Here is how to get started.
Creating a Data Factory
# Create a Data Factory
az datafactory create \
--resource-group rg-data \
--factory-name adf-mycompany-2020 \
--location australiaeast
# Enable Git integration (optional but recommended)
az datafactory configure-factory-repo \
--resource-group rg-data \
--factory-name adf-mycompany-2020 \
--repository-name "data-factory-repo" \
--account-name "your-azure-devops-org" \
--project-name "DataPlatform" \
--collaboration-branch "main" \
--root-folder "/adf"
Understanding Key Concepts
- Pipeline: A logical grouping of activities
- Activity: A task to perform (copy, transform, etc.)
- Dataset: A reference to data
- Linked Service: Connection to a data store
- Trigger: Defines when a pipeline runs
Creating Linked Services
Azure SQL Database
{
"name": "AzureSqlDatabase",
"type": "Microsoft.DataFactory/factories/linkedservices",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"connectionString": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "AzureKeyVault",
"type": "LinkedServiceReference"
},
"secretName": "SqlConnectionString"
}
}
}
}
Azure Blob Storage
{
"name": "AzureBlobStorage",
"type": "Microsoft.DataFactory/factories/linkedservices",
"properties": {
"type": "AzureBlobStorage",
"typeProperties": {
"connectionString": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "AzureKeyVault",
"type": "LinkedServiceReference"
},
"secretName": "BlobStorageConnectionString"
}
}
}
}
Defining Datasets
Source CSV Dataset
{
"name": "SourceCsvDataset",
"properties": {
"type": "DelimitedText",
"linkedServiceName": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"container": "raw-data",
"folderPath": {
"value": "@formatDateTime(pipeline().parameters.processDate, 'yyyy/MM/dd')",
"type": "Expression"
},
"fileName": "*.csv"
},
"columnDelimiter": ",",
"firstRowAsHeader": true
},
"schema": []
}
}
Sink SQL Table Dataset
{
"name": "SinkSqlDataset",
"properties": {
"type": "AzureSqlTable",
"linkedServiceName": {
"referenceName": "AzureSqlDatabase",
"type": "LinkedServiceReference"
},
"typeProperties": {
"schema": "staging",
"table": "RawData"
}
}
}
Building a Pipeline
Copy Data Pipeline
{
"name": "CopyRawDataPipeline",
"properties": {
"activities": [
{
"name": "CopyFromBlobToSql",
"type": "Copy",
"inputs": [
{
"referenceName": "SourceCsvDataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "SinkSqlDataset",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"wildcardFileName": "*.csv"
}
},
"sink": {
"type": "AzureSqlSink",
"preCopyScript": "TRUNCATE TABLE staging.RawData",
"writeBehavior": "insert"
},
"enableStaging": false
}
}
],
"parameters": {
"processDate": {
"type": "string"
}
}
}
}
Data Flow for Transformations
Create a mapping data flow for complex transformations:
Source (CSV)
-> Derived Column (add calculated fields)
-> Filter (remove invalid records)
-> Aggregate (summarize by category)
-> Sink (SQL table)
Data Flow Script
source(output(
OrderId as string,
CustomerId as string,
Amount as decimal(10,2),
OrderDate as date
),
allowSchemaDrift: true) ~> SourceData
SourceData derive(
ProcessedDate = currentDate(),
AmountWithTax = Amount * 1.1
) ~> DerivedColumns
DerivedColumns filter(Amount > 0 && !isNull(CustomerId)) ~> FilterInvalid
FilterInvalid aggregate(groupBy(CustomerId),
TotalAmount = sum(AmountWithTax),
OrderCount = count()
) ~> AggregateByCustomer
AggregateByCustomer sink(
input(
CustomerId as string,
TotalAmount as decimal,
OrderCount as integer
)
) ~> SinkToSql
Triggers
Schedule Trigger
{
"name": "DailyTrigger",
"properties": {
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Day",
"interval": 1,
"startTime": "2020-08-01T06:00:00Z",
"timeZone": "AUS Eastern Standard Time"
}
},
"pipelines": [
{
"pipelineReference": {
"referenceName": "CopyRawDataPipeline",
"type": "PipelineReference"
},
"parameters": {
"processDate": "@trigger().scheduledTime"
}
}
]
}
}
Event Trigger (Blob Created)
{
"name": "BlobCreatedTrigger",
"properties": {
"type": "BlobEventsTrigger",
"typeProperties": {
"blobPathBeginsWith": "/raw-data/blobs/",
"blobPathEndsWith": ".csv",
"events": ["Microsoft.Storage.BlobCreated"]
}
}
}
Monitoring and Alerts
# Create an alert for pipeline failures
az monitor metrics alert create \
--name "ADF-Pipeline-Failure" \
--resource-group rg-data \
--scopes "/subscriptions/{sub}/resourceGroups/rg-data/providers/Microsoft.DataFactory/factories/adf-mycompany-2020" \
--condition "count PipelineFailedRuns > 0" \
--window-size 5m \
--action-group ops-alerts
Azure Data Factory provides a scalable, serverless platform for building enterprise data pipelines without managing infrastructure.