Back to Blog
5 min read

Fabric Data Pipelines: Orchestration Patterns

Data Pipelines in Fabric orchestrate complex data workflows. Today we’ll explore orchestration patterns and how to build robust data pipelines.

Pipeline Orchestration Basics

# Pipeline components
pipeline_components = {
    "activities": "Individual tasks (copy, transform, etc.)",
    "dependencies": "Activity execution order",
    "parameters": "Runtime inputs",
    "variables": "Mutable values within pipeline",
    "triggers": "Execution initiators"
}

# Activity dependency conditions
dependency_conditions = [
    "Succeeded",   # Run if previous succeeded
    "Failed",      # Run if previous failed
    "Completed",   # Run regardless of status
    "Skipped"      # Run if previous was skipped
]

Pattern 1: Sequential Pipeline

{
    "name": "SequentialETL",
    "activities": [
        {
            "name": "ExtractData",
            "type": "Copy",
            "dependsOn": []
        },
        {
            "name": "TransformData",
            "type": "TridentNotebook",
            "dependsOn": [
                {"activity": "ExtractData", "dependencyConditions": ["Succeeded"]}
            ]
        },
        {
            "name": "LoadToWarehouse",
            "type": "Copy",
            "dependsOn": [
                {"activity": "TransformData", "dependencyConditions": ["Succeeded"]}
            ]
        }
    ]
}

Pattern 2: Parallel Execution

{
    "name": "ParallelIngestion",
    "activities": [
        {
            "name": "CopySalesData",
            "type": "Copy",
            "dependsOn": []
        },
        {
            "name": "CopyInventoryData",
            "type": "Copy",
            "dependsOn": []
        },
        {
            "name": "CopyCustomerData",
            "type": "Copy",
            "dependsOn": []
        },
        {
            "name": "MergeAllData",
            "type": "TridentNotebook",
            "dependsOn": [
                {"activity": "CopySalesData", "dependencyConditions": ["Succeeded"]},
                {"activity": "CopyInventoryData", "dependencyConditions": ["Succeeded"]},
                {"activity": "CopyCustomerData", "dependencyConditions": ["Succeeded"]}
            ]
        }
    ]
}

Pattern 3: Conditional Execution

# If-Else pattern for different processing paths
conditional_pipeline = {
    "name": "ConditionalProcessing",
    "parameters": {
        "processType": {"type": "String", "defaultValue": "full"}
    },
    "activities": [
        {
            "name": "CheckProcessType",
            "type": "IfCondition",
            "expression": {
                "value": "@equals(pipeline().parameters.processType, 'incremental')",
                "type": "Expression"
            },
            "ifTrueActivities": [
                {
                    "name": "IncrementalLoad",
                    "type": "Copy",
                    "source": {
                        "query": "SELECT * FROM Orders WHERE ModifiedDate > '@{variables('lastRunDate')}'"
                    }
                }
            ],
            "ifFalseActivities": [
                {
                    "name": "FullLoad",
                    "type": "Copy",
                    "source": {
                        "query": "SELECT * FROM Orders"
                    }
                }
            ]
        }
    ]
}

Pattern 4: Error Handling with Retry

# Robust error handling pattern
error_handling_pipeline = {
    "name": "RobustPipeline",
    "variables": {
        "attempt": {"type": "Integer", "defaultValue": 0},
        "maxAttempts": {"type": "Integer", "defaultValue": 3},
        "success": {"type": "Boolean", "defaultValue": False}
    },
    "activities": [
        {
            "name": "AttemptProcessing",
            "type": "Until",
            "expression": "@or(variables('success'), greaterOrEquals(variables('attempt'), variables('maxAttempts')))",
            "timeout": "01:00:00",
            "activities": [
                {
                    "name": "IncrementAttempt",
                    "type": "SetVariable",
                    "variable": "attempt",
                    "value": "@add(variables('attempt'), 1)"
                },
                {
                    "name": "MainProcess",
                    "type": "Copy",
                    "dependsOn": [{"activity": "IncrementAttempt", "dependencyConditions": ["Succeeded"]}]
                },
                {
                    "name": "MarkSuccess",
                    "type": "SetVariable",
                    "variable": "success",
                    "value": True,
                    "dependsOn": [{"activity": "MainProcess", "dependencyConditions": ["Succeeded"]}]
                },
                {
                    "name": "WaitBeforeRetry",
                    "type": "Wait",
                    "waitTimeInSeconds": 60,
                    "dependsOn": [{"activity": "MainProcess", "dependencyConditions": ["Failed"]}]
                }
            ]
        },
        {
            "name": "HandleFailure",
            "type": "TridentNotebook",
            "notebook": "send_alert",
            "dependsOn": [{"activity": "AttemptProcessing", "dependencyConditions": ["Completed"]}],
            "condition": "@not(variables('success'))"
        }
    ]
}

Pattern 5: Parent-Child Pipelines

# Master pipeline calling child pipelines
master_pipeline = {
    "name": "MasterOrchestrator",
    "activities": [
        {
            "name": "RunIngestion",
            "type": "ExecutePipeline",
            "pipeline": {
                "referenceName": "IngestionPipeline"
            },
            "parameters": {
                "source": "production",
                "date": "@pipeline().parameters.runDate"
            },
            "waitOnCompletion": True
        },
        {
            "name": "RunTransformation",
            "type": "ExecutePipeline",
            "pipeline": {
                "referenceName": "TransformationPipeline"
            },
            "dependsOn": [{"activity": "RunIngestion", "dependencyConditions": ["Succeeded"]}],
            "waitOnCompletion": True
        },
        {
            "name": "RunReporting",
            "type": "ExecutePipeline",
            "pipeline": {
                "referenceName": "ReportingPipeline"
            },
            "dependsOn": [{"activity": "RunTransformation", "dependencyConditions": ["Succeeded"]}],
            "waitOnCompletion": True
        }
    ]
}

Pattern 6: Dynamic Pipeline with Metadata

# Metadata-driven pipeline
metadata_driven = {
    "name": "MetadataDrivenPipeline",
    "activities": [
        {
            "name": "GetTableConfig",
            "type": "Lookup",
            "source": {
                "type": "LakehouseSource",
                "table": "config.table_metadata"
            },
            "firstRowOnly": False
        },
        {
            "name": "ProcessEachTable",
            "type": "ForEach",
            "items": "@activity('GetTableConfig').output.value",
            "isSequential": False,
            "batchCount": 10,
            "activities": [
                {
                    "name": "CopyTable",
                    "type": "Copy",
                    "source": {
                        "type": "@{item().source_type}",
                        "query": "@{item().source_query}"
                    },
                    "sink": {
                        "type": "LakehouseSink",
                        "tableOption": "@{item().target_table}"
                    }
                }
            ],
            "dependsOn": [{"activity": "GetTableConfig", "dependencyConditions": ["Succeeded"]}]
        }
    ]
}

# Metadata table structure:
"""
CREATE TABLE config.table_metadata (
    table_name STRING,
    source_type STRING,
    source_connection STRING,
    source_query STRING,
    target_table STRING,
    load_type STRING,
    is_active BOOLEAN
)
"""

Scheduling and Triggers

# Schedule trigger
schedule_trigger = {
    "name": "DailySchedule",
    "type": "ScheduleTrigger",
    "properties": {
        "recurrence": {
            "frequency": "Day",
            "interval": 1,
            "startTime": "2023-07-01T06:00:00Z",
            "timeZone": "UTC"
        },
        "pipelines": [
            {
                "pipelineReference": "DailyETLPipeline",
                "parameters": {
                    "runDate": "@trigger().scheduledTime"
                }
            }
        ]
    }
}

# Manual trigger via UI
# Click "Run" in pipeline editor
# Pass parameters at runtime

Monitoring and Debugging

# Key monitoring metrics
monitoring = {
    "pipeline_runs": {
        "status": ["Succeeded", "Failed", "Cancelled", "InProgress"],
        "duration": "Total execution time",
        "start_time": "When pipeline started",
        "end_time": "When pipeline completed"
    },
    "activity_runs": {
        "status": "Individual activity status",
        "duration": "Activity execution time",
        "input": "Activity input data",
        "output": "Activity output/result",
        "error": "Error details if failed"
    }
}

# Debugging tips:
# 1. Use Debug mode for testing
# 2. Check activity outputs
# 3. Review error messages
# 4. Add logging activities
# 5. Test with sample data first

Pipeline Best Practices

best_practices = {
    "naming": "Use consistent, descriptive names",
    "modularity": "Break large pipelines into smaller ones",
    "parameters": "Use parameters for environment flexibility",
    "error_handling": "Always implement failure paths",
    "logging": "Add activities to log key events",
    "testing": "Use debug mode before scheduling",
    "documentation": "Add descriptions to activities",
    "monitoring": "Set up alerts for failures"
}

# Example: Well-structured pipeline naming
naming_examples = {
    "pipelines": [
        "pl_sales_daily_ingestion",
        "pl_inventory_incremental_load",
        "pl_master_orchestrator"
    ],
    "activities": [
        "act_copy_orders_to_bronze",
        "act_transform_orders_silver",
        "act_check_data_quality"
    ]
}

Tomorrow we’ll explore the Fabric Warehouse and its T-SQL capabilities.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.