Back to Blog
5 min read

Fabric Data Factory: Data Integration Reimagined

Data Factory in Microsoft Fabric brings a modernized data integration experience. Today we’ll explore how Data Factory works in Fabric and how it differs from Azure Data Factory.

Data Factory in Fabric Overview

# Fabric Data Factory components
data_factory_components = {
    "data_pipelines": "Orchestration and workflow",
    "dataflow_gen2": "Power Query-based transformations",
    "copy_activity": "Data movement between sources"
}

# Key differences from Azure Data Factory:
differences = {
    "deployment": "SaaS (no ARM resources)",
    "storage": "Native OneLake integration",
    "compute": "Fabric capacity (CU-based)",
    "linked_services": "Simplified connection management",
    "monitoring": "Unified Fabric monitoring"
}

Creating a Data Pipeline

Basic Pipeline Structure

# In your workspace:
# 1. Click "+ New" > "Data pipeline"
# 2. Name your pipeline (e.g., "daily_sales_ingestion")

# Pipeline components:
pipeline_structure = {
    "activities": [
        "Copy data",
        "Dataflow",
        "Notebook",
        "Stored procedure",
        "Wait",
        "Set variable",
        "If condition",
        "ForEach",
        "Until"
    ],
    "parameters": "Runtime values passed to pipeline",
    "variables": "Pipeline-scoped mutable values",
    "triggers": "Schedule or event-based execution"
}

Simple Copy Pipeline Example

{
    "name": "CopySalesData",
    "activities": [
        {
            "name": "CopyFromSQL",
            "type": "Copy",
            "inputs": [
                {
                    "source": {
                        "type": "AzureSqlSource",
                        "query": "SELECT * FROM Sales.Orders WHERE ModifiedDate > @{pipeline().parameters.lastRunDate}"
                    }
                }
            ],
            "outputs": [
                {
                    "sink": {
                        "type": "LakehouseSink",
                        "tableOption": "Tables/sales_raw"
                    }
                }
            ]
        }
    ],
    "parameters": {
        "lastRunDate": {
            "type": "String",
            "defaultValue": "2023-01-01"
        }
    }
}

Data Sources and Connections

# Fabric supports numerous data sources
supported_sources = {
    "cloud_databases": [
        "Azure SQL Database",
        "Azure Synapse Analytics",
        "Azure Cosmos DB",
        "Amazon RDS",
        "Google BigQuery"
    ],
    "cloud_storage": [
        "Azure Blob Storage",
        "Azure Data Lake Gen2",
        "Amazon S3",
        "Google Cloud Storage"
    ],
    "on_premises": [
        "SQL Server",
        "Oracle",
        "MySQL",
        "PostgreSQL",
        "File system"
    ],
    "saas": [
        "Salesforce",
        "Dynamics 365",
        "SharePoint",
        "SAP"
    ],
    "fabric_native": [
        "Lakehouse",
        "Warehouse",
        "KQL Database"
    ]
}

# Creating a connection:
# 1. In pipeline, add Copy activity
# 2. Select source > New connection
# 3. Choose connector type
# 4. Enter credentials
# 5. Test connection

Pipeline Parameters and Variables

# Parameters: Input values at runtime
# Define in pipeline settings

pipeline_parameters = {
    "source_table": "Sales.Orders",
    "start_date": "2023-07-01",
    "end_date": "2023-07-31",
    "incremental": True
}

# Access in expressions:
# @{pipeline().parameters.source_table}
# @{pipeline().parameters.start_date}

# Variables: Mutable values within pipeline
pipeline_variables = {
    "record_count": 0,
    "status": "running",
    "error_message": ""
}

# Set variable activity:
# @{activity('CopyData').output.rowsCopied}

Control Flow Activities

If Condition

{
    "name": "CheckRecordCount",
    "type": "IfCondition",
    "expression": {
        "value": "@greater(activity('CountRecords').output.firstRow.count, 0)",
        "type": "Expression"
    },
    "ifTrueActivities": [
        {
            "name": "ProcessData",
            "type": "Copy"
        }
    ],
    "ifFalseActivities": [
        {
            "name": "LogNoData",
            "type": "SetVariable"
        }
    ]
}

ForEach Loop

{
    "name": "ProcessEachTable",
    "type": "ForEach",
    "items": {
        "value": "@pipeline().parameters.tableList",
        "type": "Expression"
    },
    "activities": [
        {
            "name": "CopyTable",
            "type": "Copy",
            "inputs": {
                "tableName": "@{item().name}"
            }
        }
    ],
    "isSequential": false,
    "batchCount": 5
}

Until Loop

{
    "name": "WaitForCompletion",
    "type": "Until",
    "expression": {
        "value": "@equals(variables('status'), 'completed')",
        "type": "Expression"
    },
    "timeout": "01:00:00",
    "activities": [
        {
            "name": "CheckStatus",
            "type": "Lookup"
        },
        {
            "name": "UpdateStatus",
            "type": "SetVariable"
        },
        {
            "name": "WaitInterval",
            "type": "Wait",
            "waitTimeInSeconds": 60
        }
    ]
}

Integrating Notebooks

# Call Fabric notebooks from pipelines
notebook_activity = {
    "name": "RunTransformNotebook",
    "type": "TridentNotebook",  # Fabric notebook type
    "notebook": {
        "workspaceId": "@{pipeline().parameters.workspaceId}",
        "notebookId": "transform_notebook"
    },
    "parameters": {
        "input_table": "@{pipeline().parameters.source}",
        "output_table": "@{pipeline().parameters.target}",
        "process_date": "@{utcNow()}"
    }
}

# The notebook receives parameters via dbutils.widgets
# dbutils.widgets.get("input_table")

Error Handling

# Implement error handling patterns

# 1. Activity-level retry
activity_retry = {
    "name": "CopyWithRetry",
    "type": "Copy",
    "policy": {
        "retry": 3,
        "retryIntervalInSeconds": 30,
        "timeout": "00:30:00"
    }
}

# 2. Try-catch pattern using If Condition
# Check activity status in subsequent activities
# @{equals(activity('PreviousActivity').Status, 'Succeeded')}

# 3. Error handling activity
error_handler = {
    "name": "LogError",
    "type": "SetVariable",
    "dependsOn": [
        {
            "activity": "MainProcess",
            "dependencyConditions": ["Failed"]
        }
    ],
    "variables": {
        "error_message": "@{activity('MainProcess').Error.message}"
    }
}

Scheduling Pipelines

# Schedule-based triggers
schedule_trigger = {
    "name": "DailyTrigger",
    "type": "ScheduleTrigger",
    "recurrence": {
        "frequency": "Day",
        "interval": 1,
        "startTime": "2023-07-01T06:00:00Z",
        "timeZone": "UTC"
    },
    "pipelines": [
        {
            "pipelineReference": "DailySalesIngestion",
            "parameters": {
                "runDate": "@trigger().scheduledTime"
            }
        }
    ]
}

# Event-based triggers (preview in Fabric)
# Trigger on file arrival, etc.

Monitoring and Debugging

# Access run history:
# 1. Open pipeline
# 2. Click "View run history"
# 3. Select a run to see details

# Key monitoring views:
monitoring_views = {
    "gantt_view": "Activity timeline and duration",
    "list_view": "Activity status and details",
    "pipeline_json": "Full pipeline definition",
    "activity_output": "Output from each activity"
}

# Debug mode:
# 1. Click "Debug" to run pipeline
# 2. View real-time execution
# 3. Check activity inputs/outputs
# 4. Fix issues and re-run

# Common troubleshooting:
# - Check connection credentials
# - Verify source data exists
# - Review expression syntax
# - Check capacity availability

Best Practices

# 1. Use parameters for flexibility
# 2. Implement error handling
# 3. Log key metrics
# 4. Use meaningful names
# 5. Keep pipelines focused

best_practices = {
    "naming": "Use snake_case, be descriptive",
    "modularity": "Break large pipelines into smaller ones",
    "parameters": "Externalize environment-specific values",
    "logging": "Add activities to log start/end/errors",
    "documentation": "Add descriptions to activities"
}

Tomorrow we’ll dive deeper into Copy Activity patterns.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.