5 min read
Visual ETL with Azure Synapse Data Flows
Azure Synapse Data Flows provide a visual, code-free interface for building data transformation logic. Under the hood, it generates optimized Spark code, giving you the power of distributed computing without writing code.
Understanding Data Flows
Data flows in Synapse run on Spark clusters and offer:
- Visual drag-and-drop interface
- 80+ built-in transformations
- Schema drift handling
- Data preview and debugging
- Code generation for advanced scenarios
Creating a Data Flow
Source Configuration
{
"name": "SalesDataFlow",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText_Sales",
"type": "DatasetReference"
},
"name": "SalesSource",
"description": "Read sales data from CSV files"
}
]
}
}
}
Transformation Examples
Filter Transformation
Filter rows based on conditions:
// Data flow expression for filtering
filter(
Year >= 2020 &&
TotalAmount > 0 &&
!isNull(CustomerID)
)
Derived Column
Create calculated columns:
// Calculate profit margin
ProfitMargin = (TotalAmount - Cost) / TotalAmount * 100,
// Extract year and month
SalesYear = year(OrderDate),
SalesMonth = month(OrderDate),
SalesQuarter = case(
month(OrderDate) <= 3, 1,
month(OrderDate) <= 6, 2,
month(OrderDate) <= 9, 3,
4
),
// Customer segment classification
CustomerSegment = case(
TotalAmount > 10000, 'Premium',
TotalAmount > 1000, 'Regular',
'Basic'
)
Aggregate Transformation
Perform aggregations:
// Group by configuration
groupBy(CustomerID, SalesYear, SalesMonth)
// Aggregations
TotalSales = sum(TotalAmount),
OrderCount = count(OrderID),
AvgOrderValue = avg(TotalAmount),
MaxOrderValue = max(TotalAmount),
MinOrderValue = min(TotalAmount),
UniqueProducts = countDistinct(ProductID)
Join Transformation
Join multiple sources:
// Left outer join on customer
join(
CustomerDim,
SalesSource@CustomerID == CustomerDim@CustomerKey,
joinType: 'left'
)
// Lookup join for product data
lookup(
ProductDim,
SalesSource@ProductID == ProductDim@ProductKey,
multiple: false,
pickup: 'any'
)
Pivot Transformation
Pivot rows to columns:
// Pivot sales by month
pivot(
groupBy(CustomerID, ProductCategory),
pivotBy(SalesMonth),
SalesAmount = sum(TotalAmount)
)
Unpivot Transformation
Convert columns to rows:
// Unpivot monthly columns back to rows
unpivot(
output(
Month as string,
SalesValue as decimal
),
skipDuplicateMapInputs: false,
skipDuplicateMapOutputs: false,
Jan, Feb, Mar, Apr, May, Jun, Jul, Aug, Sep, Oct, Nov, Dec
)
Complete Data Flow Example
Here’s a complete sales data transformation pipeline:
// Source: Read raw sales data
source(
output(
TransactionID as string,
OrderDate as timestamp,
CustomerID as string,
ProductID as string,
Quantity as integer,
UnitPrice as decimal(18,2),
Discount as decimal(5,2)
),
allowSchemaDrift: true,
format: 'delimited',
columnDelimiter: ',',
escapeChar: '\\',
quoteChar: '"',
columnNamesAsHeader: true
) ~> SalesSource
// Derived: Calculate amounts and date parts
SalesSource derive(
TotalAmount = Quantity * UnitPrice * (1 - Discount/100),
NetAmount = Quantity * UnitPrice,
DiscountAmount = Quantity * UnitPrice * (Discount/100),
OrderYear = year(OrderDate),
OrderMonth = month(OrderDate),
OrderQuarter = ceil(month(OrderDate) / 3.0),
DayOfWeek = dayOfWeek(OrderDate)
) ~> CalculateAmounts
// Filter: Remove invalid records
CalculateAmounts filter(
!isNull(CustomerID) &&
!isNull(ProductID) &&
TotalAmount > 0
) ~> ValidRecords
// Lookup: Enrich with customer data
ValidRecords, CustomerDim lookup(
CustomerID == CustomerKey,
multiple: false,
pickup: 'any',
broadcast: 'auto'
) ~> EnrichCustomer
// Lookup: Enrich with product data
EnrichCustomer, ProductDim lookup(
ProductID == ProductKey,
multiple: false,
pickup: 'any',
broadcast: 'auto'
) ~> EnrichProduct
// Select: Choose final columns
EnrichProduct select(
TransactionID,
OrderDate,
OrderYear,
OrderMonth,
OrderQuarter,
CustomerID,
CustomerName,
CustomerSegment,
CustomerRegion,
ProductID,
ProductName,
Category,
SubCategory,
Quantity,
UnitPrice,
Discount,
TotalAmount,
NetAmount,
DiscountAmount
) ~> SelectColumns
// Aggregate: Create summary
SelectColumns aggregate(
groupBy(CustomerID, CustomerName, CustomerSegment, OrderYear, OrderMonth),
MonthlySales = sum(TotalAmount),
MonthlyOrders = count(TransactionID),
AvgOrderValue = avg(TotalAmount),
TotalQuantity = sum(Quantity),
UniqueProducts = countDistinct(ProductID)
) ~> MonthlySummary
// Sink: Write to data lake
SelectColumns sink(
input(
// schema matches SelectColumns output
),
allowSchemaDrift: true,
format: 'parquet',
partitionBy('key', 0, OrderYear, OrderMonth)
) ~> FactSalesSink
// Sink: Write summary to SQL
MonthlySummary sink(
input(
// schema matches MonthlySummary output
),
allowSchemaDrift: true,
truncate: false,
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true
) ~> SummarySink
Pipeline Integration
Embed data flow in a Synapse pipeline:
{
"name": "SalesETLPipeline",
"properties": {
"activities": [
{
"name": "CopyRawData",
"type": "Copy",
"dependsOn": [],
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
}
},
{
"name": "TransformSalesData",
"type": "ExecuteDataFlow",
"dependsOn": [
{
"activity": "CopyRawData",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"dataFlow": {
"referenceName": "SalesDataFlow",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": false
}
},
{
"name": "UpdateMetadata",
"type": "SqlPoolStoredProcedure",
"dependsOn": [
{
"activity": "TransformSalesData",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"storedProcedureName": "dbo.UpdateLoadMetadata"
}
}
],
"parameters": {
"LoadDate": {
"type": "string",
"defaultValue": "@utcnow()"
}
}
}
}
Performance Optimization
Partition Settings
// Optimize partitioning for large datasets
optimizedPartitioning = true,
partitionOption = 'Hash',
partitionColumn = 'CustomerID',
numberOfPartitions = 50
Caching Strategy
// Cache dimension lookups
lookup(
DimProduct,
broadcast: 'fixed' // Broadcasts smaller dataset
)
Debug Settings
{
"debugSettings": {
"sourceSettings": [
{
"sourceName": "SalesSource",
"rowLimit": 1000
}
],
"parameters": {
"LoadDate": "2021-03-01"
}
}
}
Monitoring Data Flows
# Monitor data flow execution using Synapse SDK
from azure.synapse.artifacts import ArtifactsClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
client = ArtifactsClient(
endpoint="https://mysynapse.dev.azuresynapse.net",
credential=credential
)
# Get pipeline run status
run = client.pipeline_run.get_pipeline_run(
run_id="<run-id>"
)
print(f"Status: {run.status}")
print(f"Duration: {run.duration_in_ms}ms")
# Get activity runs
activities = client.activity_runs.query_activity_runs_by_pipeline_run(
pipeline_name="SalesETLPipeline",
run_id="<run-id>",
filter_parameters={}
)
for activity in activities.value:
if activity.activity_type == "ExecuteDataFlow":
print(f"Data Flow: {activity.activity_name}")
print(f"Status: {activity.status}")
print(f"Rows Written: {activity.output.get('runStatus', {}).get('metrics', {})}")
Conclusion
Azure Synapse Data Flows democratize data engineering by providing:
- Visual development without Spark expertise required
- Powerful transformations for complex ETL scenarios
- Scalable execution on managed Spark clusters
- Integration with Synapse pipelines and monitoring
Data flows are ideal for:
- Business analysts building transformation logic
- Rapid prototyping of ETL processes
- Teams transitioning from SSIS or similar tools
- Scenarios requiring visual documentation of data lineage