Back to Blog
5 min read

Fabric CI/CD: Building Deployment Pipelines

Fabric CI/CD: Building Deployment Pipelines

Implementing CI/CD for Microsoft Fabric ensures reliable, repeatable deployments of analytics artifacts. This guide covers building end-to-end deployment pipelines.

Fabric Deployment Pipeline Concepts

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class DeploymentStage(Enum):
    DEVELOPMENT = "development"
    TEST = "test"
    PRODUCTION = "production"

@dataclass
class FabricDeploymentPipeline:
    name: str
    stages: List[DeploymentStage]
    workspace_mappings: Dict[DeploymentStage, str]  # stage -> workspace_id

@dataclass
class DeploymentRule:
    item_type: str
    auto_deploy: bool
    require_approval: bool
    conditions: List[str]

Fabric Deployment API Client

import requests
from azure.identity import DefaultAzureCredential
from typing import List, Optional
import time

class FabricDeploymentClient:
    """Client for Fabric Deployment Pipelines API"""

    def __init__(self):
        self.credential = DefaultAzureCredential()
        self.base_url = "https://api.fabric.microsoft.com/v1"

    def _get_headers(self) -> dict:
        token = self.credential.get_token(
            "https://api.fabric.microsoft.com/.default"
        )
        return {
            "Authorization": f"Bearer {token.token}",
            "Content-Type": "application/json"
        }

    def list_pipelines(self) -> List[dict]:
        """List all deployment pipelines"""
        url = f"{self.base_url}/deploymentPipelines"
        response = requests.get(url, headers=self._get_headers())
        return response.json().get("value", [])

    def get_pipeline(self, pipeline_id: str) -> dict:
        """Get pipeline details"""
        url = f"{self.base_url}/deploymentPipelines/{pipeline_id}"
        response = requests.get(url, headers=self._get_headers())
        return response.json()

    def get_pipeline_stages(self, pipeline_id: str) -> List[dict]:
        """Get stages in a pipeline"""
        url = f"{self.base_url}/deploymentPipelines/{pipeline_id}/stages"
        response = requests.get(url, headers=self._get_headers())
        return response.json().get("value", [])

    def deploy(
        self,
        pipeline_id: str,
        source_stage: int,
        target_stage: int,
        items: Optional[List[dict]] = None,
        options: Optional[dict] = None
    ) -> dict:
        """Deploy items between stages"""

        url = f"{self.base_url}/deploymentPipelines/{pipeline_id}/deploy"

        payload = {
            "sourceStageOrder": source_stage,
            "targetStageOrder": target_stage,
            "isBackwardDeployment": target_stage < source_stage
        }

        if items:
            payload["items"] = items

        if options:
            payload["options"] = options

        response = requests.post(url, headers=self._get_headers(), json=payload)
        return response.json()

    def deploy_all(
        self,
        pipeline_id: str,
        source_stage: int,
        target_stage: int
    ) -> dict:
        """Deploy all items between stages"""

        url = f"{self.base_url}/deploymentPipelines/{pipeline_id}/deployAll"

        payload = {
            "sourceStageOrder": source_stage,
            "targetStageOrder": target_stage,
            "options": {
                "allowCreateNewArtifact": True,
                "allowOverwriteArtifact": True
            }
        }

        response = requests.post(url, headers=self._get_headers(), json=payload)
        return response.json()

    def get_deployment_operation(
        self,
        pipeline_id: str,
        operation_id: str
    ) -> dict:
        """Get status of deployment operation"""

        url = f"{self.base_url}/deploymentPipelines/{pipeline_id}/operations/{operation_id}"
        response = requests.get(url, headers=self._get_headers())
        return response.json()

    def wait_for_deployment(
        self,
        pipeline_id: str,
        operation_id: str,
        timeout_seconds: int = 300
    ) -> dict:
        """Wait for deployment to complete"""

        start_time = time.time()

        while time.time() - start_time < timeout_seconds:
            operation = self.get_deployment_operation(pipeline_id, operation_id)
            status = operation.get("status")

            if status in ["Succeeded", "Failed"]:
                return operation

            time.sleep(10)

        raise TimeoutError(f"Deployment did not complete within {timeout_seconds}s")

# Usage
client = FabricDeploymentClient()

# List pipelines
pipelines = client.list_pipelines()
for p in pipelines:
    print(f"Pipeline: {p['displayName']} (ID: {p['id']})")

# Deploy from dev (stage 0) to test (stage 1)
result = client.deploy_all(
    pipeline_id="your-pipeline-id",
    source_stage=0,
    target_stage=1
)
print(f"Deployment started: {result.get('id')}")

Azure DevOps Pipeline

# azure-pipelines-fabric.yml

trigger:
  branches:
    include:
      - main
      - develop

variables:
  - group: fabric-deployment-vars
  - name: pipelineId
    value: 'your-fabric-pipeline-id'

stages:
  - stage: Build
    displayName: 'Build and Validate'
    jobs:
      - job: Validate
        pool:
          vmImage: 'ubuntu-latest'
        steps:
          - task: UsePythonVersion@0
            inputs:
              versionSpec: '3.10'

          - script: |
              pip install azure-identity requests
            displayName: 'Install dependencies'

          - script: |
              python scripts/validate_fabric_items.py
            displayName: 'Validate Fabric Items'

  - stage: DeployDev
    displayName: 'Deploy to Development'
    dependsOn: Build
    condition: eq(variables['Build.SourceBranch'], 'refs/heads/develop')
    jobs:
      - deployment: DeployDev
        environment: 'fabric-dev'
        pool:
          vmImage: 'ubuntu-latest'
        strategy:
          runOnce:
            deploy:
              steps:
                - script: |
                    python scripts/deploy_fabric.py \
                      --pipeline-id $(pipelineId) \
                      --source-stage 0 \
                      --target-stage 0 \
                      --sync-git
                  displayName: 'Sync Development'

  - stage: DeployTest
    displayName: 'Deploy to Test'
    dependsOn: DeployDev
    jobs:
      - deployment: DeployTest
        environment: 'fabric-test'
        pool:
          vmImage: 'ubuntu-latest'
        strategy:
          runOnce:
            deploy:
              steps:
                - script: |
                    python scripts/deploy_fabric.py \
                      --pipeline-id $(pipelineId) \
                      --source-stage 0 \
                      --target-stage 1
                  displayName: 'Deploy to Test Stage'

  - stage: DeployProd
    displayName: 'Deploy to Production'
    dependsOn: DeployTest
    condition: eq(variables['Build.SourceBranch'], 'refs/heads/main')
    jobs:
      - deployment: DeployProd
        environment: 'fabric-prod'
        pool:
          vmImage: 'ubuntu-latest'
        strategy:
          runOnce:
            deploy:
              steps:
                - script: |
                    python scripts/deploy_fabric.py \
                      --pipeline-id $(pipelineId) \
                      --source-stage 1 \
                      --target-stage 2
                  displayName: 'Deploy to Production Stage'

Deployment Script

#!/usr/bin/env python3
# scripts/deploy_fabric.py

import argparse
import sys
import requests
from azure.identity import DefaultAzureCredential

# Use the FabricDeploymentClient class defined above
# or create a simplified version here:

class FabricDeployClient:
    """Simplified deployment client for scripts"""

    def __init__(self):
        self.credential = DefaultAzureCredential()
        self.base_url = "https://api.fabric.microsoft.com/v1"

    def _get_headers(self):
        token = self.credential.get_token("https://api.fabric.microsoft.com/.default")
        return {"Authorization": f"Bearer {token.token}", "Content-Type": "application/json"}

    def deploy_all(self, pipeline_id, source_stage, target_stage):
        url = f"{self.base_url}/deploymentPipelines/{pipeline_id}/deployAll"
        payload = {"sourceStageOrder": source_stage, "targetStageOrder": target_stage}
        return requests.post(url, headers=self._get_headers(), json=payload).json()

def main():
    parser = argparse.ArgumentParser(description='Deploy Fabric items')
    parser.add_argument('--pipeline-id', required=True)
    parser.add_argument('--source-stage', type=int, required=True)
    parser.add_argument('--target-stage', type=int, required=True)
    parser.add_argument('--sync-git', action='store_true')
    parser.add_argument('--items', nargs='*', help='Specific items to deploy')

    args = parser.parse_args()

    client = FabricDeployClient()

    try:
        print(f"Starting deployment from stage {args.source_stage} to {args.target_stage}")

        if args.items:
            result = client.deploy(
                pipeline_id=args.pipeline_id,
                source_stage=args.source_stage,
                target_stage=args.target_stage,
                items=[{"name": item} for item in args.items]
            )
        else:
            result = client.deploy_all(
                pipeline_id=args.pipeline_id,
                source_stage=args.source_stage,
                target_stage=args.target_stage
            )

        operation_id = result.get("id")
        print(f"Deployment operation: {operation_id}")

        # Wait for completion
        final_status = client.wait_for_deployment(
            args.pipeline_id,
            operation_id,
            timeout_seconds=600
        )

        if final_status.get("status") == "Succeeded":
            print("Deployment succeeded!")
            sys.exit(0)
        else:
            print(f"Deployment failed: {final_status.get('error')}")
            sys.exit(1)

    except Exception as e:
        print(f"Error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Testing Deployments

class FabricDeploymentTests:
    """Tests to run after deployment"""

    def __init__(self, workspace_id: str):
        self.workspace_id = workspace_id
        self.client = FabricDeploymentClient()

    def run_all_tests(self) -> dict:
        """Run all post-deployment tests"""

        results = {
            "passed": [],
            "failed": []
        }

        tests = [
            self.test_semantic_models,
            self.test_reports,
            self.test_dataflows,
            self.test_notebooks
        ]

        for test in tests:
            try:
                test()
                results["passed"].append(test.__name__)
            except AssertionError as e:
                results["failed"].append({
                    "test": test.__name__,
                    "error": str(e)
                })

        return results

    def test_semantic_models(self):
        """Verify semantic models are refreshable"""
        # Check that semantic models can connect to data sources
        pass

    def test_reports(self):
        """Verify reports load correctly"""
        # Check report definitions are valid
        pass

    def test_dataflows(self):
        """Verify dataflows are configured"""
        # Check dataflow connections
        pass

    def test_notebooks(self):
        """Verify notebooks are valid"""
        # Check notebook syntax
        pass

Conclusion

CI/CD for Fabric enables reliable, automated deployments across environments. Combine deployment pipelines with Git integration and automated testing for a complete DevOps workflow.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.