Data Transformation with Azure Data Factory Mapping Data Flows
Mapping Data Flows in Azure Data Factory provide a visual, code-free way to design data transformation logic. Under the hood, these flows run on fully managed Apache Spark clusters, giving you the power of distributed computing without writing a single line of Spark code.
Understanding Mapping Data Flows
Mapping Data Flows offer:
- Visual designer - Drag-and-drop transformation logic
- Spark-powered - Automatic Spark code generation
- Schema flexibility - Handle schema drift dynamically
- Integration - Seamlessly integrate with ADF pipelines
- Debug mode - Interactive data preview during development
Creating a Data Flow
Let’s build a data flow that processes sales data:
Source Configuration
{
"name": "source_sales",
"type": "Source",
"dataset": {
"referenceName": "ds_blob_sales_raw",
"type": "DatasetReference"
},
"typeProperties": {
"allowSchemaDrift": true,
"validateSchema": false,
"inferDriftedColumnTypes": true,
"samplingMethod": "none"
}
}
Data Flow Script
Data flows can also be defined using Data Flow Script:
source(output(
OrderId as integer,
CustomerId as string,
ProductId as string,
Quantity as integer,
UnitPrice as decimal(10,2),
OrderDate as timestamp,
ShipCountry as string
),
allowSchemaDrift: true,
validateSchema: false) ~> SalesSource
SalesSource derive(
TotalAmount = Quantity * UnitPrice,
OrderYear = year(OrderDate),
OrderMonth = month(OrderDate),
OrderQuarter = quarter(OrderDate)
) ~> CalculatedColumns
CalculatedColumns filter(
TotalAmount > 0 && !isNull(CustomerId)
) ~> FilterValidRecords
FilterValidRecords aggregate(
groupBy(
CustomerId,
OrderYear,
OrderMonth
),
TotalRevenue = sum(TotalAmount),
OrderCount = count(),
AvgOrderValue = avg(TotalAmount),
MaxOrderValue = max(TotalAmount)
) ~> AggregateByCustomer
AggregateByCustomer window(
over(CustomerId),
asc(OrderYear, true),
asc(OrderMonth, true),
PrevMonthRevenue = lag(TotalRevenue, 1),
RevenueGrowth = iif(isNull(lag(TotalRevenue, 1)), 0,
(TotalRevenue - lag(TotalRevenue, 1)) / lag(TotalRevenue, 1) * 100)
) ~> WindowFunctions
WindowFunctions select(
mapColumn(
CustomerId,
Year = OrderYear,
Month = OrderMonth,
Revenue = TotalRevenue,
Orders = OrderCount,
AvgOrder = AvgOrderValue,
MaxOrder = MaxOrderValue,
MoMGrowth = RevenueGrowth
)
) ~> SelectFinalColumns
SelectFinalColumns sink(
allowSchemaDrift: true,
validateSchema: false,
format: 'parquet',
partitionBy('key', 0, Year, Month)
) ~> SinkToDataLake
Common Transformation Patterns
Deduplication
source ~> Source1
Source1 aggregate(
groupBy(
CustomerId,
OrderId
),
each(match(true()), $$ = first($$))
) ~> Deduplicate
Pivot Data
source ~> Source1
Source1 pivot(
groupBy(CustomerId),
pivotBy(Category),
TotalSales = sum(Amount)
) ~> PivotByCategory
Unpivot Data
source ~> Source1
Source1 unpivot(
output(
MetricName as string,
MetricValue as double
),
ungroupBy(ProductId),
lateral: true,
ignoreNullPivots: false
) (
Revenue as 'Revenue',
Cost as 'Cost',
Margin as 'Margin'
) ~> UnpivotMetrics
Slowly Changing Dimension Type 2
// Source: Current records
currentRecords derive(
HashKey = md5(columns()),
EffectiveDate = currentTimestamp(),
ExpirationDate = toTimestamp('9999-12-31'),
IsCurrent = true
) ~> AddSCDColumns
// Lookup existing records
currentRecords, existingDimension lookup(
currentRecords@BusinessKey == existingDimension@BusinessKey,
multiple: false,
pickup: 'any'
) ~> LookupExisting
// Split based on change detection
LookupExisting split(
isNull(existingDimension@BusinessKey),
currentRecords@HashKey != existingDimension@HashKey,
disjoint: true
) ~> SplitForSCD @(NewRecords, ChangedRecords, UnchangedRecords)
// New records go directly to insert
SplitForSCD@NewRecords sink() ~> InsertNew
// Changed records: expire old, insert new
SplitForSCD@ChangedRecords derive(
ExpirationDate = currentTimestamp(),
IsCurrent = false
) ~> ExpireOld
ExpireOld sink(
updateCondition: true
) ~> UpdateExpired
SplitForSCD@ChangedRecords sink() ~> InsertChanged
ARM Template for Data Flow
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"resources": [
{
"type": "Microsoft.DataFactory/factories/dataflows",
"apiVersion": "2018-06-01",
"name": "[concat(parameters('factoryName'), '/df_sales_transform')]",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "ds_sales_raw",
"type": "DatasetReference"
},
"name": "SalesSource"
}
],
"sinks": [
{
"dataset": {
"referenceName": "ds_sales_curated",
"type": "DatasetReference"
},
"name": "SalesSink"
}
],
"transformations": [
{
"name": "DeriveColumns",
"description": "Calculate derived columns"
},
{
"name": "FilterRecords",
"description": "Remove invalid records"
},
{
"name": "AggregateData",
"description": "Aggregate by dimensions"
}
],
"script": "source(output(...)) ~> SalesSource\nSalesSource derive(...) ~> DeriveColumns\nDeriveColumns filter(...) ~> FilterRecords\nFilterRecords aggregate(...) ~> AggregateData\nAggregateData sink(...) ~> SalesSink"
}
}
}
]
}
Pipeline Integration
Integrate data flows into ADF pipelines:
{
"name": "pipeline_sales_etl",
"properties": {
"activities": [
{
"name": "Run Sales Data Flow",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "1.00:00:00",
"retry": 3,
"retryIntervalInSeconds": 30
},
"typeProperties": {
"dataflow": {
"referenceName": "df_sales_transform",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": false
}
},
{
"name": "Log Success",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "Run Sales Data Flow",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"storedProcedureName": "sp_LogPipelineExecution",
"storedProcedureParameters": {
"Status": {"value": "Success"},
"RowsProcessed": {
"value": "@activity('Run Sales Data Flow').output.runStatus.metrics.sink1.rowsWritten",
"type": "Int32"
}
}
}
}
]
}
}
Performance Optimization
Partitioning Strategies
// Hash partitioning
source ~> Source1
Source1 sink(
partitionBy('hash', 10, CustomerId)
) ~> Sink1
// Round-robin partitioning
source ~> Source1
Source1 sink(
partitionBy('roundRobin', 8)
) ~> Sink1
// Key-based partitioning
source ~> Source1
Source1 sink(
partitionBy('key', 0, Year, Month)
) ~> Sink1
Caching for Lookups
// Small dimension table - broadcast join
smallDimension cache(
cacheMode: 'single'
) ~> CachedDimension
largeFactTable, CachedDimension lookup(
FactKey == DimKey,
broadcast: 'auto'
) ~> JoinWithCache
Monitoring Data Flow Execution
from azure.identity import DefaultAzureCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
import pandas as pd
def get_dataflow_run_metrics(subscription_id, resource_group, factory_name, run_id):
"""Get detailed metrics for a data flow run."""
credential = DefaultAzureCredential()
client = DataFactoryManagementClient(credential, subscription_id)
# Get pipeline run
run = client.pipeline_runs.get(
resource_group_name=resource_group,
factory_name=factory_name,
run_id=run_id
)
# Get activity runs
activities = client.activity_runs.query_by_pipeline_run(
resource_group_name=resource_group,
factory_name=factory_name,
run_id=run_id,
filter_parameters={}
)
metrics = []
for activity in activities.value:
if activity.activity_type == "ExecuteDataFlow":
output = activity.output
if output and "runStatus" in output:
run_status = output["runStatus"]
metrics.append({
"activity": activity.activity_name,
"status": activity.status,
"duration": activity.duration_in_ms,
"rowsRead": run_status.get("metrics", {}).get("source1", {}).get("rowsRead"),
"rowsWritten": run_status.get("metrics", {}).get("sink1", {}).get("rowsWritten"),
"profile": run_status.get("profile", {})
})
return pd.DataFrame(metrics)
# Usage
metrics_df = get_dataflow_run_metrics(
"subscription-id",
"rg-datafactory",
"adf-demo",
"run-id-12345"
)
print(metrics_df)
Best Practices
- Enable data flow debug during development for quick iterations
- Use appropriate cluster size based on data volume
- Partition large datasets for optimal parallel processing
- Cache small lookup tables to avoid shuffle operations
- Monitor execution metrics to identify bottlenecks
- Use parameterization for reusable data flows
Conclusion
Mapping Data Flows in Azure Data Factory democratize data transformation by providing a visual interface backed by Spark’s distributed computing power. Whether you’re performing simple transformations or complex slowly changing dimension logic, data flows can handle it without requiring Spark expertise.
Start with simple transformations and gradually add complexity as you become comfortable with the visual designer and data flow script syntax.