Back to Blog
4 min read

Building Data Pipelines with Azure Data Factory

Azure Data Factory is the go-to service for orchestrating data movement and transformation at scale. With organizations increasingly relying on data for decisions, building robust data pipelines is essential. Here is how to get started.

Creating a Data Factory

# Create a Data Factory
az datafactory create \
    --resource-group rg-data \
    --factory-name adf-mycompany-2020 \
    --location australiaeast

# Enable Git integration (optional but recommended)
az datafactory configure-factory-repo \
    --resource-group rg-data \
    --factory-name adf-mycompany-2020 \
    --repository-name "data-factory-repo" \
    --account-name "your-azure-devops-org" \
    --project-name "DataPlatform" \
    --collaboration-branch "main" \
    --root-folder "/adf"

Understanding Key Concepts

  • Pipeline: A logical grouping of activities
  • Activity: A task to perform (copy, transform, etc.)
  • Dataset: A reference to data
  • Linked Service: Connection to a data store
  • Trigger: Defines when a pipeline runs

Creating Linked Services

Azure SQL Database

{
    "name": "AzureSqlDatabase",
    "type": "Microsoft.DataFactory/factories/linkedservices",
    "properties": {
        "type": "AzureSqlDatabase",
        "typeProperties": {
            "connectionString": {
                "type": "AzureKeyVaultSecret",
                "store": {
                    "referenceName": "AzureKeyVault",
                    "type": "LinkedServiceReference"
                },
                "secretName": "SqlConnectionString"
            }
        }
    }
}

Azure Blob Storage

{
    "name": "AzureBlobStorage",
    "type": "Microsoft.DataFactory/factories/linkedservices",
    "properties": {
        "type": "AzureBlobStorage",
        "typeProperties": {
            "connectionString": {
                "type": "AzureKeyVaultSecret",
                "store": {
                    "referenceName": "AzureKeyVault",
                    "type": "LinkedServiceReference"
                },
                "secretName": "BlobStorageConnectionString"
            }
        }
    }
}

Defining Datasets

Source CSV Dataset

{
    "name": "SourceCsvDataset",
    "properties": {
        "type": "DelimitedText",
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
            "location": {
                "type": "AzureBlobStorageLocation",
                "container": "raw-data",
                "folderPath": {
                    "value": "@formatDateTime(pipeline().parameters.processDate, 'yyyy/MM/dd')",
                    "type": "Expression"
                },
                "fileName": "*.csv"
            },
            "columnDelimiter": ",",
            "firstRowAsHeader": true
        },
        "schema": []
    }
}

Sink SQL Table Dataset

{
    "name": "SinkSqlDataset",
    "properties": {
        "type": "AzureSqlTable",
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
            "schema": "staging",
            "table": "RawData"
        }
    }
}

Building a Pipeline

Copy Data Pipeline

{
    "name": "CopyRawDataPipeline",
    "properties": {
        "activities": [
            {
                "name": "CopyFromBlobToSql",
                "type": "Copy",
                "inputs": [
                    {
                        "referenceName": "SourceCsvDataset",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "SinkSqlDataset",
                        "type": "DatasetReference"
                    }
                ],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource",
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSettings",
                            "recursive": true,
                            "wildcardFileName": "*.csv"
                        }
                    },
                    "sink": {
                        "type": "AzureSqlSink",
                        "preCopyScript": "TRUNCATE TABLE staging.RawData",
                        "writeBehavior": "insert"
                    },
                    "enableStaging": false
                }
            }
        ],
        "parameters": {
            "processDate": {
                "type": "string"
            }
        }
    }
}

Data Flow for Transformations

Create a mapping data flow for complex transformations:

Source (CSV)
    -> Derived Column (add calculated fields)
    -> Filter (remove invalid records)
    -> Aggregate (summarize by category)
    -> Sink (SQL table)

Data Flow Script

source(output(
    OrderId as string,
    CustomerId as string,
    Amount as decimal(10,2),
    OrderDate as date
),
    allowSchemaDrift: true) ~> SourceData

SourceData derive(
    ProcessedDate = currentDate(),
    AmountWithTax = Amount * 1.1
) ~> DerivedColumns

DerivedColumns filter(Amount > 0 && !isNull(CustomerId)) ~> FilterInvalid

FilterInvalid aggregate(groupBy(CustomerId),
    TotalAmount = sum(AmountWithTax),
    OrderCount = count()
) ~> AggregateByCustomer

AggregateByCustomer sink(
    input(
        CustomerId as string,
        TotalAmount as decimal,
        OrderCount as integer
    )
) ~> SinkToSql

Triggers

Schedule Trigger

{
    "name": "DailyTrigger",
    "properties": {
        "type": "ScheduleTrigger",
        "typeProperties": {
            "recurrence": {
                "frequency": "Day",
                "interval": 1,
                "startTime": "2020-08-01T06:00:00Z",
                "timeZone": "AUS Eastern Standard Time"
            }
        },
        "pipelines": [
            {
                "pipelineReference": {
                    "referenceName": "CopyRawDataPipeline",
                    "type": "PipelineReference"
                },
                "parameters": {
                    "processDate": "@trigger().scheduledTime"
                }
            }
        ]
    }
}

Event Trigger (Blob Created)

{
    "name": "BlobCreatedTrigger",
    "properties": {
        "type": "BlobEventsTrigger",
        "typeProperties": {
            "blobPathBeginsWith": "/raw-data/blobs/",
            "blobPathEndsWith": ".csv",
            "events": ["Microsoft.Storage.BlobCreated"]
        }
    }
}

Monitoring and Alerts

# Create an alert for pipeline failures
az monitor metrics alert create \
    --name "ADF-Pipeline-Failure" \
    --resource-group rg-data \
    --scopes "/subscriptions/{sub}/resourceGroups/rg-data/providers/Microsoft.DataFactory/factories/adf-mycompany-2020" \
    --condition "count PipelineFailedRuns > 0" \
    --window-size 5m \
    --action-group ops-alerts

Azure Data Factory provides a scalable, serverless platform for building enterprise data pipelines without managing infrastructure.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.