Back to Blog
8 min read

Autoscale in Microsoft Fabric

Autoscale in Microsoft Fabric automatically adjusts capacity based on demand, ensuring workloads have the resources they need while optimizing costs during low-usage periods.

How Autoscale Works

Autoscale Flow:
┌─────────────────────────────────────────────────────────────┐
│                    Load Monitoring                           │
│                          │                                   │
│                    ┌─────▼─────┐                            │
│                    │  Demand   │                            │
│                    │  Metrics  │                            │
│                    └─────┬─────┘                            │
│                          │                                   │
│         ┌────────────────┼────────────────┐                 │
│         │                │                │                 │
│    ┌────▼────┐     ┌─────▼─────┐    ┌────▼────┐            │
│    │  Low    │     │  Normal   │    │  High   │            │
│    │ Demand  │     │  Demand   │    │ Demand  │            │
│    └────┬────┘     └─────┬─────┘    └────┬────┘            │
│         │                │                │                 │
│    ┌────▼────┐     ┌─────▼─────┐    ┌────▼────┐            │
│    │Scale In │     │No Change  │    │Scale Out│            │
│    │ (Pause) │     │           │    │(Burst)  │            │
│    └─────────┘     └───────────┘    └─────────┘            │
│                                                             │
│                    Base Capacity: F32                       │
│                    Max Scale: F64 (100% burst)              │
└─────────────────────────────────────────────────────────────┘

Configuring Autoscale

Basic Autoscale Settings

class AutoscaleConfig:
    """Configure autoscale settings for Fabric capacity."""

    def __init__(self, capacity_id: str):
        self.capacity_id = capacity_id
        self.config = {}

    def configure_autoscale(
        self,
        enabled: bool = True,
        max_scale_percent: int = 100,
        scale_up_threshold: int = 80,
        scale_down_threshold: int = 30,
        cooldown_minutes: int = 10
    ):
        """Configure autoscale parameters."""

        self.config = {
            "enabled": enabled,
            "max_scale_percent": max_scale_percent,  # Up to 100% additional
            "thresholds": {
                "scale_up_percent": scale_up_threshold,
                "scale_down_percent": scale_down_threshold
            },
            "cooldown_minutes": cooldown_minutes,
            "schedule": None  # Can add schedule-based rules
        }

        return self.config

    def add_schedule_rule(
        self,
        name: str,
        start_time: str,
        end_time: str,
        days: list,
        min_capacity_percent: int
    ):
        """Add schedule-based autoscale rule."""

        if self.config.get("schedule") is None:
            self.config["schedule"] = []

        self.config["schedule"].append({
            "name": name,
            "start_time": start_time,
            "end_time": end_time,
            "days": days,
            "min_capacity_percent": min_capacity_percent
        })

    def get_effective_config(self) -> dict:
        """Get the effective configuration."""
        return self.config

# Usage
autoscale = AutoscaleConfig("capacity-123")

# Basic configuration
autoscale.configure_autoscale(
    enabled=True,
    max_scale_percent=100,
    scale_up_threshold=80,
    scale_down_threshold=30,
    cooldown_minutes=10
)

# Add schedule for business hours
autoscale.add_schedule_rule(
    name="Business Hours",
    start_time="08:00",
    end_time="18:00",
    days=["Mon", "Tue", "Wed", "Thu", "Fri"],
    min_capacity_percent=50  # Maintain at least 50% during business hours
)

# Add schedule for overnight batch
autoscale.add_schedule_rule(
    name="Nightly Batch",
    start_time="02:00",
    end_time="05:00",
    days=["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"],
    min_capacity_percent=100  # Full capacity for batch jobs
)

print(autoscale.get_effective_config())

Autoscale via Azure Portal/API

import requests
from azure.identity import DefaultAzureCredential

class FabricCapacityAPI:
    """Interact with Fabric capacity management APIs."""

    def __init__(self, subscription_id: str, resource_group: str):
        self.subscription_id = subscription_id
        self.resource_group = resource_group
        self.credential = DefaultAzureCredential()
        self.base_url = "https://management.azure.com"

    def _get_headers(self):
        token = self.credential.get_token("https://management.azure.com/.default")
        return {
            "Authorization": f"Bearer {token.token}",
            "Content-Type": "application/json"
        }

    def enable_autoscale(
        self,
        capacity_name: str,
        max_capacity_units: int
    ) -> dict:
        """Enable autoscale on capacity."""

        url = (
            f"{self.base_url}/subscriptions/{self.subscription_id}"
            f"/resourceGroups/{self.resource_group}"
            f"/providers/Microsoft.Fabric/capacities/{capacity_name}"
            f"?api-version=2023-11-01"
        )

        # Get current capacity
        response = requests.get(url, headers=self._get_headers())
        current = response.json()

        # Update with autoscale settings
        current["properties"]["administration"] = {
            "autoscale": {
                "enabled": True,
                "maxCapacityUnits": max_capacity_units
            }
        }

        response = requests.put(
            url,
            headers=self._get_headers(),
            json=current
        )

        return response.json()

    def get_autoscale_status(self, capacity_name: str) -> dict:
        """Get current autoscale status."""

        url = (
            f"{self.base_url}/subscriptions/{self.subscription_id}"
            f"/resourceGroups/{self.resource_group}"
            f"/providers/Microsoft.Fabric/capacities/{capacity_name}"
            f"?api-version=2023-11-01"
        )

        response = requests.get(url, headers=self._get_headers())
        data = response.json()

        return {
            "capacity_name": capacity_name,
            "sku": data.get("sku", {}).get("name"),
            "autoscale_enabled": data.get("properties", {}).get("administration", {}).get("autoscale", {}).get("enabled", False),
            "current_state": data.get("properties", {}).get("state")
        }

# Usage
api = FabricCapacityAPI(
    subscription_id="sub-id",
    resource_group="fabric-rg"
)

# Enable autoscale
api.enable_autoscale(
    capacity_name="my-fabric-capacity",
    max_capacity_units=128  # F128 max
)

# Check status
status = api.get_autoscale_status("my-fabric-capacity")
print(f"Autoscale enabled: {status['autoscale_enabled']}")

Autoscale Behaviors

Scale Up (Burst)

class AutoscaleAnalyzer:
    """Analyze autoscale behavior and patterns."""

    def __init__(self):
        self.scale_events = []

    def record_scale_event(
        self,
        event_type: str,  # "scale_up" or "scale_down"
        trigger_metric: str,
        metric_value: float,
        from_cus: int,
        to_cus: int
    ):
        """Record a scaling event."""

        from datetime import datetime

        self.scale_events.append({
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "trigger_metric": trigger_metric,
            "metric_value": metric_value,
            "from_cus": from_cus,
            "to_cus": to_cus
        })

    def analyze_scale_patterns(self) -> dict:
        """Analyze scaling patterns."""

        if not self.scale_events:
            return {"message": "No scale events recorded"}

        scale_ups = [e for e in self.scale_events if e["event_type"] == "scale_up"]
        scale_downs = [e for e in self.scale_events if e["event_type"] == "scale_down"]

        return {
            "total_events": len(self.scale_events),
            "scale_ups": len(scale_ups),
            "scale_downs": len(scale_downs),
            "most_common_trigger": self._most_common_trigger(),
            "recommendations": self._generate_recommendations()
        }

    def _most_common_trigger(self) -> str:
        triggers = {}
        for event in self.scale_events:
            trigger = event["trigger_metric"]
            triggers[trigger] = triggers.get(trigger, 0) + 1

        return max(triggers, key=triggers.get) if triggers else "none"

    def _generate_recommendations(self) -> list:
        recommendations = []

        # Frequent scale-ups might indicate base capacity too low
        scale_ups = [e for e in self.scale_events if e["event_type"] == "scale_up"]
        if len(scale_ups) > 10:  # More than 10 in the analysis period
            recommendations.append(
                "Frequent scale-ups detected. Consider increasing base capacity."
            )

        # Rapid oscillation might indicate thresholds too close
        if len(self.scale_events) > 20:
            recommendations.append(
                "High scaling frequency. Consider adjusting thresholds or cooldown period."
            )

        return recommendations

# Simulated analysis
analyzer = AutoscaleAnalyzer()

# Record some events
analyzer.record_scale_event("scale_up", "cpu_percent", 85, 32, 48)
analyzer.record_scale_event("scale_up", "queue_length", 15, 48, 64)
analyzer.record_scale_event("scale_down", "cpu_percent", 25, 64, 48)

patterns = analyzer.analyze_scale_patterns()
print(f"Scale events: {patterns['total_events']}")
print(f"Most common trigger: {patterns['most_common_trigger']}")

Smoothing and Bursting

class SmoothingConfig:
    """Configure smoothing behavior for capacity consumption."""

    def __init__(self, base_capacity_cus: int):
        self.base_cus = base_capacity_cus
        self.smoothing_window_minutes = 5

    def explain_smoothing(self) -> dict:
        """Explain how smoothing works."""

        return {
            "concept": "Smoothing averages consumption over a time window",
            "benefit": "Prevents throttling from brief spikes",
            "how_it_works": [
                "1. Capacity consumption is measured every few seconds",
                "2. Consumption is averaged over a smoothing window (e.g., 5 min)",
                "3. Throttling only occurs if average exceeds capacity",
                "4. Brief spikes above capacity are absorbed"
            ],
            "example": {
                "scenario": "5-minute window, 64 CU capacity",
                "minute_1": "100 CU consumed",
                "minute_2": "30 CU consumed",
                "minute_3": "40 CU consumed",
                "minute_4": "50 CU consumed",
                "minute_5": "80 CU consumed",
                "average": "60 CU (no throttling)",
                "without_smoothing": "Minute 1 would throttle"
            }
        }

    def calculate_burst_capacity(self, max_scale_percent: int) -> dict:
        """Calculate burst capacity available."""

        burst_cus = self.base_cus * (max_scale_percent / 100)
        total_available = self.base_cus + burst_cus

        return {
            "base_capacity_cus": self.base_cus,
            "burst_capacity_cus": burst_cus,
            "total_available_cus": total_available,
            "burst_cost_multiplier": 1.0,  # Burst uses same rate
            "billing_note": "Burst consumption billed at same rate as base"
        }

# Usage
smoothing = SmoothingConfig(base_capacity_cus=64)

explanation = smoothing.explain_smoothing()
print("How Smoothing Works:")
for step in explanation["how_it_works"]:
    print(f"  {step}")

burst = smoothing.calculate_burst_capacity(max_scale_percent=100)
print(f"\nBurst Capacity: {burst['base_capacity_cus']} base + {burst['burst_capacity_cus']} burst = {burst['total_available_cus']} total CUs")

Monitoring Autoscale

Tracking Autoscale Metrics

class AutoscaleMonitor:
    """Monitor autoscale performance and efficiency."""

    def __init__(self, capacity_id: str):
        self.capacity_id = capacity_id

    def get_autoscale_metrics(self, hours: int = 24) -> dict:
        """Get autoscale-related metrics."""

        # This would connect to monitoring APIs
        # Simulated response
        return {
            "period_hours": hours,
            "base_capacity_cus": 64,
            "metrics": {
                "avg_consumed_cus": 52,
                "peak_consumed_cus": 95,
                "burst_usage_percent": 15,  # % of time in burst
                "scale_events": 8,
                "throttled_requests": 0
            },
            "cost_analysis": {
                "base_cost_estimate": 150.00,
                "burst_cost_estimate": 22.50,
                "total_cost": 172.50,
                "cost_without_autoscale": 300.00,  # If provisioned for peak
                "savings_percent": 42.5
            }
        }

    def generate_autoscale_report(self, metrics: dict) -> str:
        """Generate autoscale performance report."""

        report = f"""
# Autoscale Performance Report

## Capacity Utilization
- Base Capacity: {metrics['base_capacity_cus']} CUs
- Average Consumption: {metrics['metrics']['avg_consumed_cus']} CUs ({metrics['metrics']['avg_consumed_cus']/metrics['base_capacity_cus']*100:.1f}%)
- Peak Consumption: {metrics['metrics']['peak_consumed_cus']} CUs
- Time in Burst: {metrics['metrics']['burst_usage_percent']}%

## Scaling Activity
- Scale Events: {metrics['metrics']['scale_events']}
- Throttled Requests: {metrics['metrics']['throttled_requests']}

## Cost Analysis
- Base Cost: ${metrics['cost_analysis']['base_cost_estimate']:.2f}
- Burst Cost: ${metrics['cost_analysis']['burst_cost_estimate']:.2f}
- Total Cost: ${metrics['cost_analysis']['total_cost']:.2f}
- Cost if Fixed at Peak: ${metrics['cost_analysis']['cost_without_autoscale']:.2f}
- Savings: {metrics['cost_analysis']['savings_percent']:.1f}%

## Recommendations
"""
        # Add recommendations based on metrics
        if metrics['metrics']['burst_usage_percent'] > 50:
            report += "- Consider increasing base capacity (frequent bursting)\n"

        if metrics['metrics']['avg_consumed_cus'] < metrics['base_capacity_cus'] * 0.3:
            report += "- Consider reducing base capacity (low average utilization)\n"

        if metrics['metrics']['throttled_requests'] > 0:
            report += "- Review workload patterns (throttling occurred)\n"

        return report

# Usage
monitor = AutoscaleMonitor("capacity-123")

metrics = monitor.get_autoscale_metrics(hours=24)
report = monitor.generate_autoscale_report(metrics)
print(report)

Best Practices

Autoscale Optimization

autoscale_best_practices = {
    "right_size_base": {
        "description": "Set base capacity to handle typical load",
        "guidance": "Base should handle 70-80% of daily workload",
        "benefit": "Minimizes burst costs while ensuring performance"
    },
    "appropriate_burst_limit": {
        "description": "Set burst limit based on peak requirements",
        "guidance": "Allow burst to 100% for most workloads",
        "benefit": "Handles peaks without over-provisioning"
    },
    "tune_thresholds": {
        "description": "Adjust scale triggers based on workload",
        "guidance": "Scale up at 80%, scale down at 30%",
        "benefit": "Prevents thrashing while remaining responsive"
    },
    "use_schedules": {
        "description": "Pre-warm capacity for known peaks",
        "guidance": "Schedule higher minimum before batch jobs",
        "benefit": "Faster response to predictable demand"
    },
    "monitor_regularly": {
        "description": "Review autoscale patterns weekly",
        "guidance": "Adjust based on actual usage patterns",
        "benefit": "Continuous optimization"
    }
}

def get_autoscale_checklist() -> list:
    """Get autoscale configuration checklist."""

    return [
        "Base capacity sized for typical workload",
        "Burst limit allows for peak handling",
        "Scale thresholds prevent oscillation",
        "Cooldown period prevents rapid scaling",
        "Schedules configured for predictable peaks",
        "Monitoring alerts set for throttling",
        "Cost alerts set for unexpected burst usage"
    ]

# Print checklist
print("Autoscale Configuration Checklist:")
for item in get_autoscale_checklist():
    print(f"  [ ] {item}")

Conclusion

Autoscale in Microsoft Fabric provides the flexibility to handle variable workloads while optimizing costs. Configure base capacity for typical load, allow burst for peaks, and use schedules for predictable patterns.

Regular monitoring ensures your autoscale configuration remains optimal as workload patterns evolve.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.