Back to Blog
5 min read

Dataflows Gen2: Low-Code Data Transformation in Fabric

Dataflows Gen2 in Microsoft Fabric provide a low-code transformation experience using Power Query. Today, I will explore how to build effective dataflows for data preparation.

Dataflows Gen2 Overview

┌─────────────────────────────────────────────────────┐
│                Dataflows Gen2                        │
├─────────────────────────────────────────────────────┤
│                                                      │
│  ┌─────────────────────────────────────────────────┐│
│  │              Power Query Editor                  ││
│  │  - Visual transformation                         ││
│  │  - M language support                            ││
│  │  - Query folding optimization                    ││
│  └─────────────────────────────────────────────────┘│
│                        │                            │
│  ┌─────────────────────┴───────────────────────────┐│
│  │              Data Sources                        ││
│  │  - Databases (SQL, Oracle, etc.)                ││
│  │  - Files (CSV, Excel, JSON)                     ││
│  │  - Cloud services (Salesforce, Dynamics)        ││
│  │  - Web APIs                                      ││
│  └─────────────────────────────────────────────────┘│
│                        │                            │
│  ┌─────────────────────┴───────────────────────────┐│
│  │              Destinations                        ││
│  │  - Lakehouse Tables                             ││
│  │  - Warehouse Tables                              ││
│  │  - Azure SQL Database                            ││
│  └─────────────────────────────────────────────────┘│
│                                                      │
└─────────────────────────────────────────────────────┘

Creating Dataflows

M Language Basics

// Basic query structure
let
    // Step 1: Connect to source
    Source = Sql.Database("server.database.windows.net", "SalesDB"),

    // Step 2: Navigate to table
    SalesTable = Source{[Schema="dbo", Item="Sales"]}[Data],

    // Step 3: Transform
    FilteredRows = Table.SelectRows(SalesTable, each [Amount] > 0),

    // Step 4: Add columns
    WithYear = Table.AddColumn(FilteredRows, "Year", each Date.Year([OrderDate])),

    // Step 5: Rename columns
    RenamedColumns = Table.RenameColumns(WithYear, {
        {"CustomerID", "customer_id"},
        {"OrderDate", "order_date"},
        {"Amount", "amount"}
    }),

    // Step 6: Select final columns
    FinalColumns = Table.SelectColumns(RenamedColumns, {
        "customer_id", "order_date", "Year", "amount"
    })
in
    FinalColumns

Common Transformations

// Filtering
FilteredRows = Table.SelectRows(Source, each
    [Status] = "Active" and
    [Amount] > 100 and
    [OrderDate] >= #date(2023, 1, 1)
)

// Removing duplicates
Deduplicated = Table.Distinct(Source, {"CustomerID", "OrderDate"})

// Grouping and aggregation
Grouped = Table.Group(Source, {"Category", "Region"}, {
    {"TotalSales", each List.Sum([Amount]), type number},
    {"OrderCount", each Table.RowCount(_), Int64.Type},
    {"AvgOrderValue", each List.Average([Amount]), type number},
    {"MinDate", each List.Min([OrderDate]), type date},
    {"MaxDate", each List.Max([OrderDate]), type date}
})

// Pivoting
Pivoted = Table.Pivot(Source, List.Distinct(Source[Category]), "Category", "Amount", List.Sum)

// Unpivoting
Unpivoted = Table.UnpivotOtherColumns(Source, {"Product", "Region"}, "Month", "Sales")

// Merging tables (JOIN)
Merged = Table.NestedJoin(
    Sales, {"CustomerID"},
    Customers, {"CustomerID"},
    "CustomerDetails",
    JoinKind.LeftOuter
)
Expanded = Table.ExpandTableColumn(Merged, "CustomerDetails", {"CustomerName", "Segment"})

// Appending tables (UNION)
Combined = Table.Combine({Sales2022, Sales2023})

// Conditional column
WithCategory = Table.AddColumn(Source, "AmountCategory", each
    if [Amount] > 1000 then "High"
    else if [Amount] > 100 then "Medium"
    else "Low",
    type text
)

// Custom column with error handling
WithCalculated = Table.AddColumn(Source, "UnitPrice", each
    try [Amount] / [Quantity] otherwise 0,
    type number
)

Data Type Conversions

// Change column types
TypedTable = Table.TransformColumnTypes(Source, {
    {"OrderDate", type date},
    {"Amount", type number},
    {"Quantity", Int64.Type},
    {"CustomerID", type text},
    {"IsActive", type logical}
})

// Parse dates
WithParsedDate = Table.TransformColumns(Source, {
    {"DateString", each Date.FromText(_, "en-US"), type date}
})

// Parse JSON
WithParsedJSON = Table.TransformColumns(Source, {
    {"JsonColumn", each Json.Document(_)}
})
Expanded = Table.ExpandRecordColumn(WithParsedJSON, "JsonColumn", {"field1", "field2"})

Working with Dates

// Date transformations
let
    Source = SalesTable,

    // Extract date parts
    WithDateParts = Table.AddColumn(Source, "Year", each Date.Year([OrderDate])),
    WithMonth = Table.AddColumn(WithDateParts, "Month", each Date.Month([OrderDate])),
    WithMonthName = Table.AddColumn(WithMonth, "MonthName", each Date.MonthName([OrderDate])),
    WithQuarter = Table.AddColumn(WithMonthName, "Quarter", each Date.QuarterOfYear([OrderDate])),
    WithWeekday = Table.AddColumn(WithQuarter, "Weekday", each Date.DayOfWeekName([OrderDate])),

    // Date calculations
    WithDaysAgo = Table.AddColumn(WithWeekday, "DaysAgo", each
        Duration.Days(DateTime.LocalNow() - [OrderDate])
    ),

    // Fiscal year (July start)
    WithFiscalYear = Table.AddColumn(WithDaysAgo, "FiscalYear", each
        if Date.Month([OrderDate]) >= 7
        then Date.Year([OrderDate]) + 1
        else Date.Year([OrderDate])
    )
in
    WithFiscalYear

// Create date dimension
let
    StartDate = #date(2020, 1, 1),
    EndDate = #date(2025, 12, 31),
    DayCount = Duration.Days(EndDate - StartDate) + 1,
    DateList = List.Dates(StartDate, DayCount, #duration(1, 0, 0, 0)),
    DateTable = Table.FromList(DateList, Splitter.SplitByNothing(), {"Date"}, null, ExtraValues.Error),
    TypedDate = Table.TransformColumnTypes(DateTable, {{"Date", type date}}),

    WithYear = Table.AddColumn(TypedDate, "Year", each Date.Year([Date]), Int64.Type),
    WithMonth = Table.AddColumn(WithYear, "Month", each Date.Month([Date]), Int64.Type),
    WithMonthName = Table.AddColumn(WithMonth, "MonthName", each Date.MonthName([Date]), type text),
    WithQuarter = Table.AddColumn(WithMonthName, "Quarter", each "Q" & Text.From(Date.QuarterOfYear([Date])), type text),
    WithDayOfWeek = Table.AddColumn(WithQuarter, "DayOfWeek", each Date.DayOfWeek([Date], Day.Monday) + 1, Int64.Type),
    WithDayName = Table.AddColumn(WithDayOfWeek, "DayName", each Date.DayOfWeekName([Date]), type text),
    WithIsWeekend = Table.AddColumn(WithDayName, "IsWeekend", each Date.DayOfWeek([Date], Day.Monday) >= 5, type logical),
    WithDateKey = Table.AddColumn(WithIsWeekend, "DateKey", each Date.Year([Date]) * 10000 + Date.Month([Date]) * 100 + Date.Day([Date]), Int64.Type)
in
    WithDateKey

Staging with Lakehouse

// Enable staging for improved performance
// In Dataflow settings, select a Lakehouse for staging

// This enables:
// 1. Query folding to Lakehouse
// 2. Faster incremental refresh
// 3. Parallel query execution

// Example: Large transformation with staging
let
    // This will be staged to Lakehouse
    Source = Sql.Database("server.database.windows.net", "LargeDB"),
    LargeTable = Source{[Schema="dbo", Item="Transactions"]}[Data],

    // Complex transformations run on staged data
    Filtered = Table.SelectRows(LargeTable, each [Year] = 2023),
    Grouped = Table.Group(Filtered, {"Category"}, {{"Total", each List.Sum([Amount])}}),
    Sorted = Table.Sort(Grouped, {{"Total", Order.Descending}})
in
    Sorted

Output Destinations

// Output to Lakehouse table
// Configure in Dataflow > Data destination

// Destination settings:
destination_config = {
    "type": "Lakehouse",
    "lakehouse": "SalesLakehouse",
    "table": "processed_sales",
    "update_method": "Replace",  // or "Append"
    "schema_handling": "Dynamic"  // Automatically update schema
}

// Output to Warehouse
warehouse_config = {
    "type": "Warehouse",
    "warehouse": "SalesWarehouse",
    "table": "fact_sales",
    "update_method": "Replace"
}

Best Practices

dataflow_best_practices = {
    "performance": [
        "Enable staging for large datasets",
        "Use query folding when possible",
        "Reduce columns early in the query",
        "Filter data as early as possible",
        "Avoid calculated columns on large datasets"
    ],
    "maintainability": [
        "Use descriptive step names",
        "Document complex transformations",
        "Organize queries in folders",
        "Use parameters for dynamic values"
    ],
    "error_handling": [
        "Use try/otherwise for error handling",
        "Set appropriate null handling",
        "Validate data types",
        "Handle date parsing errors"
    ]
}

Dataflows Gen2 provide an accessible way to build data transformations. Tomorrow, I will cover Mirroring in Fabric.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.