Skip to content
Back to Blog
1 min read

Cost Optimization in Microsoft Fabric

I wrote “Cost Optimization in Microsoft Fabric” to share practical, production-minded guidance on this topic.

Cost optimisation isn’t about cutting features — it’s about aligning spend with business value. During holiday slowdowns I help teams identify idle capacities, ineffective autoscale rules, and costly query patterns. These tactics reduce spend immediately without harming throughput.

Understanding Fabric Costs

Understanding Fabric Costs

from dataclasses import dataclass
from typing import List, Dict

@dataclass
class CostCategory:
    category: str
    description: str
    cost_drivers: List[str]
    optimization_strategies: List[str]

fabric_cost_categories = {
    "compute": CostCategory(
        category="Compute (Capacity Units)",
        description="Primary cost driver - Spark and SQL compute",
        cost_drivers=[
            "Number of concurrent jobs",
            "Job duration",
            "Cluster size",
            "Always-on vs on-demand"
        ],
        optimization_strategies=[
            "Right-size capacity SKU",
            "Schedule jobs during off-peak",
            "Optimize query performance",
            "Use reserved capacity for predictable workloads"
        ]
    ),
    "storage": CostCategory(
        category="Storage (OneLake)",
        description="Data storage costs",
        cost_drivers=[
            "Data volume",
            "Data retention",
            "Redundancy requirements",
            "Snapshot history"
        ],
        optimization_strategies=[
            "Implement data lifecycle policies",
            "Compress data appropriately",
            "Clean up unused tables",
            "Manage Delta log retention"
        ]
    ),
    "egress": CostCategory(
        category="Data Egress",
        description="Data leaving Azure/Fabric",
        cost_drivers=[
            "Cross-region queries",
            "External tool access",
            "Data exports"
        ],
        optimization_strategies=[
            "Keep compute and data in same region",
            "Minimize external data access",
            "Use shortcuts instead of copying"
        ]
    )
}

Cost Optimization Strategies

class CostOptimizer:
    """Fabric cost optimization toolkit."""

    def analyze_capacity_efficiency(self, usage_data: List[Dict]) -> Dict:
        """Analyze capacity efficiency."""
        total_hours = len(usage_data)
        utilized_hours = sum(1 for u in usage_data if u.get("utilization", 0) > 10)

        efficiency = (utilized_hours / total_hours * 100) if total_hours > 0 else 0

        recommendations = []
        if efficiency < 30:
            recommendations.append("Very low utilization - consider smaller SKU or pay-as-you-go")
        elif efficiency < 50:
            recommendations.append("Moderate utilization - consider scheduling workloads together")

        return {
            "total_hours": total_hours,
            "utilized_hours": utilized_hours,
            "efficiency_percent": efficiency,
            "recommendations": recommendations
        }

    def calculate_reserved_savings(self, current_monthly: float, utilization: float) -> Dict:
        """Calculate potential savings from reserved capacity."""
        # Reserved capacity is ~50% cheaper but requires commitment
        reserved_monthly = current_monthly * 0.5

        # Break-even utilization for reserved
        break_even = 0.5  # 50% utilization

        if utilization > break_even:
            savings = current_monthly - reserved_monthly
            recommendation = "Switch to reserved capacity"
        else:
            savings = 0
            recommendation = "Stay with pay-as-you-go"

        return {
            "current_monthly": current_monthly,
            "reserved_monthly": reserved_monthly,
            "current_utilization": utilization,
            "break_even_utilization": break_even,
            "potential_monthly_savings": savings,
            "annual_savings": savings * 12,
            "recommendation": recommendation
        }

    def identify_optimization_opportunities(self, workspace_analysis: Dict) -> List[Dict]:
        """Identify cost optimization opportunities."""
        opportunities = []

        # Check for unused assets
        unused_tables = workspace_analysis.get("tables_not_queried_30d", [])
        if unused_tables:
            storage_gb = sum(t.get("size_gb", 0) for t in unused_tables)
            opportunities.append({
                "type": "Unused Tables",
                "description": f"{len(unused_tables)} tables not queried in 30 days",
                "potential_savings_gb": storage_gb,
                "action": "Review and archive or delete unused tables"
            })

        # Check for unoptimized tables
        unoptimized = workspace_analysis.get("tables_without_vorder", [])
        if unoptimized:
            opportunities.append({
                "type": "Unoptimized Tables",
                "description": f"{len(unoptimized)} tables without V-Order",
                "potential_savings": "Reduced query time = reduced CU consumption",
                "action": "Apply V-Order optimization"
            })

        # Check for inefficient queries
        slow_queries = workspace_analysis.get("queries_over_30s", [])
        if slow_queries:
            opportunities.append({
                "type": "Slow Queries",
                "description": f"{len(slow_queries)} queries running > 30 seconds",
                "potential_savings": "Significant CU reduction with optimization",
                "action": "Review and optimize slow queries"
            })

        return opportunities

cost_optimization_strategies = {
    "scheduling": {
        "description": "Schedule workloads efficiently",
        "implementation": """
# 1. Identify off-peak hours (typically nights/weekends)
# 2. Schedule batch jobs during off-peak
# 3. Stagger job start times to avoid peaks

# Example: Pipeline scheduling best practices
scheduling_guidelines = {
    "etl_jobs": "2:00 AM - 6:00 AM (off-peak)",
    "report_refresh": "6:00 AM - 8:00 AM (before business hours)",
    "ad_hoc_analysis": "Business hours (user-driven)",
    "ml_training": "Weekends or nights"
}
""",
        "estimated_savings": "10-30%"
    },
    "right_sizing": {
        "description": "Match capacity to actual needs",
        "implementation": """
# 1. Monitor utilization for 2-4 weeks
# 2. Identify peak and average usage
# 3. Select SKU that handles 80% of peaks

def recommend_sku(peak_cu: float, avg_cu: float) -> str:
    # Account for burst capability
    target_cu = max(peak_cu * 0.8, avg_cu * 1.5)

    skus = [("F2", 2), ("F4", 4), ("F8", 8), ("F16", 16), ("F32", 32)]
    for name, cu in skus:
        if cu >= target_cu:
            return name
    return "F64+"
""",
        "estimated_savings": "20-40%"
    },
    "query_optimization": {
        "description": "Reduce compute through better queries",
        "implementation": """
# Top optimizations:
# 1. Add appropriate filters
# 2. Select only needed columns
# 3. Use partitioning and Z-order
# 4. Implement incremental processing

# Before (expensive)
df = spark.table("large_table")
result = df.groupBy("category").sum("amount")

# After (optimized)
df = spark.table("large_table").filter("date >= '2023-12-01'")
result = df.select("category", "amount").groupBy("category").sum("amount")
""",
        "estimated_savings": "30-50%"
    },
    "storage_lifecycle": {
        "description": "Manage data lifecycle",
        "implementation": """
# Implement retention policies
retention_policy = {
    "bronze_layer": "90 days (raw data)",
    "silver_layer": "1 year (cleaned data)",
    "gold_layer": "3 years (aggregated)",
    "archive": "7 years (compliance)"
}

# Clean up Delta log files
VACUUM table_name RETAIN 168 HOURS;  # 7 days

# Delete old partitions
DELETE FROM table_name WHERE date < '2022-01-01';
""",
        "estimated_savings": "10-20%"
    }
}

Cost Monitoring Dashboard

class CostDashboard:
    """Generate cost monitoring reports."""

    def generate_cost_report(self, cost_data: Dict) -> str:
        """Generate comprehensive cost report."""
        return f"""
# Fabric Cost Report

## Monthly Summary
- **Total Cost:** ${cost_data.get('total_cost', 0):,.2f}
- **Compute Cost:** ${cost_data.get('compute_cost', 0):,.2f} ({cost_data.get('compute_pct', 0):.1f}%)
- **Storage Cost:** ${cost_data.get('storage_cost', 0):,.2f} ({cost_data.get('storage_pct', 0):.1f}%)

## Trend Analysis
- **Month-over-Month Change:** {cost_data.get('mom_change', 0):+.1f}%
- **Year-over-Year Change:** {cost_data.get('yoy_change', 0):+.1f}%

## Cost by Workspace

| Workspace | Cost | % of Total |
|-----------|------|-----------|
{self._format_workspace_costs(cost_data.get('by_workspace', {}))}

## Optimization Opportunities

{self._format_opportunities(cost_data.get('opportunities', []))}

## Recommendations

1. {cost_data.get('top_recommendation', 'Review utilization patterns')}
2. Consider reserved capacity for predictable workloads
3. Implement query optimization for top consumers
"""

    def _format_workspace_costs(self, workspaces: Dict) -> str:
        """Format workspace costs table."""
        lines = []
        for ws, data in workspaces.items():
            lines.append(f"| {ws} | ${data.get('cost', 0):,.2f} | {data.get('pct', 0):.1f}% |")
        return "\n".join(lines)

    def _format_opportunities(self, opportunities: List[Dict]) -> str:
        """Format optimization opportunities."""
        if not opportunities:
            return "No immediate opportunities identified."

        lines = []
        for i, opp in enumerate(opportunities[:5], 1):
            lines.append(f"{i}. **{opp.get('type', 'Unknown')}**: {opp.get('description', '')}")
        return "\n".join(lines)

Tomorrow, we’ll look at 2024 predictions for AI and data!\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.