Back to Blog
3 min read

Data Flows Gen2: Visual Data Transformation at Scale

Data flows Gen2 in Azure Data Factory and Synapse provide a visual, code-free environment for data transformation at scale. Let’s explore the latest capabilities.

Data Flow Architecture

Data flows run on auto-scaling Spark clusters managed by Azure:

{
  "name": "SalesTransformationFlow",
  "properties": {
    "type": "MappingDataFlow",
    "typeProperties": {
      "sources": [
        {
          "name": "SalesSource",
          "dataset": {
            "referenceName": "AzureDataLakeSales",
            "type": "DatasetReference"
          }
        }
      ],
      "sinks": [
        {
          "name": "SalesSink",
          "dataset": {
            "referenceName": "SynapseWarehouse",
            "type": "DatasetReference"
          }
        }
      ],
      "transformations": [],
      "scriptLines": []
    }
  }
}

Source Configuration

{
  "source": {
    "name": "SalesData",
    "type": "source",
    "dataset": {
      "type": "DelimitedText",
      "linkedService": "AzureDataLakeStorage",
      "location": {
        "type": "AzureBlobFSLocation",
        "fileName": "*.csv",
        "folderPath": "raw/sales",
        "fileSystem": "data"
      }
    },
    "settings": {
      "format": {
        "type": "csv",
        "columnDelimiter": ",",
        "rowDelimiter": "\n",
        "quoteChar": "\"",
        "escapeChar": "\\",
        "firstRowAsHeader": true
      },
      "wildcardPaths": ["raw/sales/**/*.csv"],
      "partitionRootPath": "raw/sales"
    }
  }
}

Transformation Examples

Derived Column

// Data flow script for derived columns
source(output(
    OrderID as string,
    CustomerID as string,
    Amount as decimal(10,2),
    OrderDate as string
)) ~> SalesSource

SalesSource derive(
    Year = year(toDate(OrderDate, 'yyyy-MM-dd')),
    Month = month(toDate(OrderDate, 'yyyy-MM-dd')),
    Quarter = case(
        Month <= 3, 'Q1',
        Month <= 6, 'Q2',
        Month <= 9, 'Q3',
        'Q4'
    ),
    AmountWithTax = Amount * 1.1,
    IsHighValue = Amount > 1000
) ~> DerivedColumns

Aggregation

// Aggregate transformation
DerivedColumns aggregate(
    groupBy(CustomerID, Year, Month),
    TotalAmount = sum(Amount),
    OrderCount = count(),
    AvgOrderValue = avg(Amount),
    MaxOrder = max(Amount),
    MinOrder = min(Amount)
) ~> SalesAggregation

Lookup and Join

// Join with dimension table
SalesAggregation, CustomerDimension join(
    SalesAggregation@CustomerID == CustomerDimension@CustomerKey,
    joinType: 'left',
    matchType: 'exact',
    ignoreSpaces: false,
    broadcast: 'auto'
) ~> JoinedData

Conditional Split

// Split data based on conditions
JoinedData split(
    Amount > 10000,
    Amount > 1000,
    disjoint: false
) ~> SplitByValue@(HighValue, MediumValue, LowValue)

Slowly Changing Dimension (SCD Type 2)

// SCD Type 2 transformation
CustomerSource alterRow(
    insertIf(isNull(lookup(CustomerKey))),
    updateIf(!isNull(lookup(CustomerKey)) &&
             hasChanged(CustomerName, lookup(CustomerName))),
    upsertIf(true())
) ~> DetermineAction

DetermineAction sink(
    allowSchemaDrift: false,
    validateSchema: true,
    deletable: false,
    insertable: true,
    updateable: true,
    upsertable: true,
    keys: ['CustomerKey'],
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true,
    saveOrder: 1,
    mapColumn(
        CustomerKey,
        CustomerName,
        Email,
        EffectiveDate = currentDate(),
        ExpirationDate = toDate('9999-12-31'),
        IsCurrent = true()
    )
) ~> CustomerDimSink

Performance Optimization

Partitioning Settings

{
  "transformation": {
    "name": "OptimizedPartitioning",
    "partition": {
      "type": "hash",
      "columns": ["CustomerID"],
      "numberOfPartitions": 200
    }
  }
}

Debug Settings

{
  "debug": {
    "sourceSettings": {
      "rowLimit": 1000,
      "sourceSampling": {
        "samplingMethod": "Top",
        "count": 1000
      }
    },
    "parameters": {
      "environment": "dev",
      "dateFilter": "2022-01-01"
    }
  }
}

Integration with Pipeline

{
  "name": "ExecuteDataFlow",
  "type": "ExecuteDataFlow",
  "dependsOn": [],
  "policy": {
    "timeout": "1.00:00:00",
    "retry": 3,
    "retryIntervalInSeconds": 30
  },
  "typeProperties": {
    "dataFlow": {
      "referenceName": "SalesTransformationFlow",
      "type": "DataFlowReference"
    },
    "compute": {
      "computeType": "MemoryOptimized",
      "coreCount": 16
    },
    "traceLevel": "Fine",
    "staging": {
      "linkedService": {
        "referenceName": "StagingStorage",
        "type": "LinkedServiceReference"
      },
      "folderPath": "staging"
    }
  }
}

Monitoring and Debugging

// Monitor data flow execution
ADFActivityRun
| where OperationName == "ExecuteDataFlow"
| extend
    DataFlowName = parse_json(Output).dataFlowName,
    Duration = datetime_diff('second', End, Start),
    RowsRead = parse_json(Output).runStatus.metrics.source1.rowsRead,
    RowsWritten = parse_json(Output).runStatus.metrics.sink1.rowsWritten
| project
    TimeGenerated,
    DataFlowName,
    Duration,
    RowsRead,
    RowsWritten,
    Status
| order by TimeGenerated desc

Data Flows Gen2 democratize data transformation, enabling both developers and data analysts to build scalable ETL solutions.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.