Back to Blog
2 min read

Microsoft Fabric Capacity Planning: Right-Sizing Your Data Platform

Capacity planning in Microsoft Fabric requires understanding the relationship between Capacity Units (CUs), workload types, and usage patterns. Getting this right ensures optimal performance without overspending.

Understanding Fabric Capacity Units

Fabric capacities are measured in CUs, which are consumed differently by various workloads. Data Engineering, Data Science, and Real-Time Analytics each have distinct consumption patterns:

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum

class WorkloadType(Enum):
    DATA_ENGINEERING = "data_engineering"
    DATA_SCIENCE = "data_science"
    REAL_TIME_ANALYTICS = "real_time_analytics"
    DATA_WAREHOUSE = "data_warehouse"
    POWER_BI = "power_bi"

@dataclass
class CapacityRequirement:
    workload: WorkloadType
    peak_cu_per_hour: float
    average_cu_per_hour: float
    burst_duration_hours: float
    concurrent_users: int

def calculate_required_capacity(requirements: List[CapacityRequirement]) -> dict:
    """Calculate total capacity needed across workloads."""

    # Sum peak requirements with 20% headroom
    total_peak = sum(r.peak_cu_per_hour for r in requirements) * 1.2

    # Calculate weighted average for baseline
    total_average = sum(r.average_cu_per_hour for r in requirements)

    # Determine recommended SKU
    fabric_skus = {
        "F2": 2, "F4": 4, "F8": 8, "F16": 16,
        "F32": 32, "F64": 64, "F128": 128,
        "F256": 256, "F512": 512, "F1024": 1024
    }

    recommended_sku = None
    for sku, capacity in fabric_skus.items():
        if capacity >= total_peak:
            recommended_sku = sku
            break

    return {
        "peak_requirement": total_peak,
        "average_requirement": total_average,
        "recommended_sku": recommended_sku,
        "utilization_at_peak": (total_peak / fabric_skus.get(recommended_sku, 1)) * 100
    }

Monitoring Capacity Utilization

Use the Fabric Capacity Metrics app to track actual consumption:

import requests
from datetime import datetime, timedelta

class FabricCapacityMonitor:
    def __init__(self, workspace_id: str, access_token: str):
        self.workspace_id = workspace_id
        self.headers = {"Authorization": f"Bearer {access_token}"}
        self.base_url = "https://api.fabric.microsoft.com/v1"

    def get_capacity_metrics(self, days_back: int = 7) -> dict:
        """Retrieve capacity utilization metrics."""

        end_date = datetime.utcnow()
        start_date = end_date - timedelta(days=days_back)

        response = requests.get(
            f"{self.base_url}/capacities/{self.workspace_id}/metrics",
            headers=self.headers,
            params={
                "startDateTime": start_date.isoformat(),
                "endDateTime": end_date.isoformat(),
                "granularity": "hourly"
            }
        )

        metrics = response.json()

        return {
            "peak_utilization": max(m["utilizationPercent"] for m in metrics["values"]),
            "average_utilization": sum(m["utilizationPercent"] for m in metrics["values"]) / len(metrics["values"]),
            "throttling_events": sum(1 for m in metrics["values"] if m["throttled"])
        }

Implementing Auto-Pause for Cost Savings

For development and test environments, implement auto-pause policies to reduce costs during inactive periods. Monitor utilization patterns and adjust capacity reservations quarterly based on actual usage data.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.