Back to Blog
5 min read

Cost Optimization in Microsoft Fabric

Cost Optimization in Microsoft Fabric

Happy Holidays! Today let’s unwrap the gift of cost optimization - strategies to get maximum value from your Fabric investment.

Understanding Fabric Costs

from dataclasses import dataclass
from typing import List, Dict

@dataclass
class CostCategory:
    category: str
    description: str
    cost_drivers: List[str]
    optimization_strategies: List[str]

fabric_cost_categories = {
    "compute": CostCategory(
        category="Compute (Capacity Units)",
        description="Primary cost driver - Spark and SQL compute",
        cost_drivers=[
            "Number of concurrent jobs",
            "Job duration",
            "Cluster size",
            "Always-on vs on-demand"
        ],
        optimization_strategies=[
            "Right-size capacity SKU",
            "Schedule jobs during off-peak",
            "Optimize query performance",
            "Use reserved capacity for predictable workloads"
        ]
    ),
    "storage": CostCategory(
        category="Storage (OneLake)",
        description="Data storage costs",
        cost_drivers=[
            "Data volume",
            "Data retention",
            "Redundancy requirements",
            "Snapshot history"
        ],
        optimization_strategies=[
            "Implement data lifecycle policies",
            "Compress data appropriately",
            "Clean up unused tables",
            "Manage Delta log retention"
        ]
    ),
    "egress": CostCategory(
        category="Data Egress",
        description="Data leaving Azure/Fabric",
        cost_drivers=[
            "Cross-region queries",
            "External tool access",
            "Data exports"
        ],
        optimization_strategies=[
            "Keep compute and data in same region",
            "Minimize external data access",
            "Use shortcuts instead of copying"
        ]
    )
}

Cost Optimization Strategies

class CostOptimizer:
    """Fabric cost optimization toolkit."""

    def analyze_capacity_efficiency(self, usage_data: List[Dict]) -> Dict:
        """Analyze capacity efficiency."""
        total_hours = len(usage_data)
        utilized_hours = sum(1 for u in usage_data if u.get("utilization", 0) > 10)

        efficiency = (utilized_hours / total_hours * 100) if total_hours > 0 else 0

        recommendations = []
        if efficiency < 30:
            recommendations.append("Very low utilization - consider smaller SKU or pay-as-you-go")
        elif efficiency < 50:
            recommendations.append("Moderate utilization - consider scheduling workloads together")

        return {
            "total_hours": total_hours,
            "utilized_hours": utilized_hours,
            "efficiency_percent": efficiency,
            "recommendations": recommendations
        }

    def calculate_reserved_savings(self, current_monthly: float, utilization: float) -> Dict:
        """Calculate potential savings from reserved capacity."""
        # Reserved capacity is ~50% cheaper but requires commitment
        reserved_monthly = current_monthly * 0.5

        # Break-even utilization for reserved
        break_even = 0.5  # 50% utilization

        if utilization > break_even:
            savings = current_monthly - reserved_monthly
            recommendation = "Switch to reserved capacity"
        else:
            savings = 0
            recommendation = "Stay with pay-as-you-go"

        return {
            "current_monthly": current_monthly,
            "reserved_monthly": reserved_monthly,
            "current_utilization": utilization,
            "break_even_utilization": break_even,
            "potential_monthly_savings": savings,
            "annual_savings": savings * 12,
            "recommendation": recommendation
        }

    def identify_optimization_opportunities(self, workspace_analysis: Dict) -> List[Dict]:
        """Identify cost optimization opportunities."""
        opportunities = []

        # Check for unused assets
        unused_tables = workspace_analysis.get("tables_not_queried_30d", [])
        if unused_tables:
            storage_gb = sum(t.get("size_gb", 0) for t in unused_tables)
            opportunities.append({
                "type": "Unused Tables",
                "description": f"{len(unused_tables)} tables not queried in 30 days",
                "potential_savings_gb": storage_gb,
                "action": "Review and archive or delete unused tables"
            })

        # Check for unoptimized tables
        unoptimized = workspace_analysis.get("tables_without_vorder", [])
        if unoptimized:
            opportunities.append({
                "type": "Unoptimized Tables",
                "description": f"{len(unoptimized)} tables without V-Order",
                "potential_savings": "Reduced query time = reduced CU consumption",
                "action": "Apply V-Order optimization"
            })

        # Check for inefficient queries
        slow_queries = workspace_analysis.get("queries_over_30s", [])
        if slow_queries:
            opportunities.append({
                "type": "Slow Queries",
                "description": f"{len(slow_queries)} queries running > 30 seconds",
                "potential_savings": "Significant CU reduction with optimization",
                "action": "Review and optimize slow queries"
            })

        return opportunities

cost_optimization_strategies = {
    "scheduling": {
        "description": "Schedule workloads efficiently",
        "implementation": """
# 1. Identify off-peak hours (typically nights/weekends)
# 2. Schedule batch jobs during off-peak
# 3. Stagger job start times to avoid peaks

# Example: Pipeline scheduling best practices
scheduling_guidelines = {
    "etl_jobs": "2:00 AM - 6:00 AM (off-peak)",
    "report_refresh": "6:00 AM - 8:00 AM (before business hours)",
    "ad_hoc_analysis": "Business hours (user-driven)",
    "ml_training": "Weekends or nights"
}
""",
        "estimated_savings": "10-30%"
    },
    "right_sizing": {
        "description": "Match capacity to actual needs",
        "implementation": """
# 1. Monitor utilization for 2-4 weeks
# 2. Identify peak and average usage
# 3. Select SKU that handles 80% of peaks

def recommend_sku(peak_cu: float, avg_cu: float) -> str:
    # Account for burst capability
    target_cu = max(peak_cu * 0.8, avg_cu * 1.5)

    skus = [("F2", 2), ("F4", 4), ("F8", 8), ("F16", 16), ("F32", 32)]
    for name, cu in skus:
        if cu >= target_cu:
            return name
    return "F64+"
""",
        "estimated_savings": "20-40%"
    },
    "query_optimization": {
        "description": "Reduce compute through better queries",
        "implementation": """
# Top optimizations:
# 1. Add appropriate filters
# 2. Select only needed columns
# 3. Use partitioning and Z-order
# 4. Implement incremental processing

# Before (expensive)
df = spark.table("large_table")
result = df.groupBy("category").sum("amount")

# After (optimized)
df = spark.table("large_table").filter("date >= '2023-12-01'")
result = df.select("category", "amount").groupBy("category").sum("amount")
""",
        "estimated_savings": "30-50%"
    },
    "storage_lifecycle": {
        "description": "Manage data lifecycle",
        "implementation": """
# Implement retention policies
retention_policy = {
    "bronze_layer": "90 days (raw data)",
    "silver_layer": "1 year (cleaned data)",
    "gold_layer": "3 years (aggregated)",
    "archive": "7 years (compliance)"
}

# Clean up Delta log files
VACUUM table_name RETAIN 168 HOURS;  # 7 days

# Delete old partitions
DELETE FROM table_name WHERE date < '2022-01-01';
""",
        "estimated_savings": "10-20%"
    }
}

Cost Monitoring Dashboard

class CostDashboard:
    """Generate cost monitoring reports."""

    def generate_cost_report(self, cost_data: Dict) -> str:
        """Generate comprehensive cost report."""
        return f"""
# Fabric Cost Report

## Monthly Summary
- **Total Cost:** ${cost_data.get('total_cost', 0):,.2f}
- **Compute Cost:** ${cost_data.get('compute_cost', 0):,.2f} ({cost_data.get('compute_pct', 0):.1f}%)
- **Storage Cost:** ${cost_data.get('storage_cost', 0):,.2f} ({cost_data.get('storage_pct', 0):.1f}%)

## Trend Analysis
- **Month-over-Month Change:** {cost_data.get('mom_change', 0):+.1f}%
- **Year-over-Year Change:** {cost_data.get('yoy_change', 0):+.1f}%

## Cost by Workspace

| Workspace | Cost | % of Total |
|-----------|------|-----------|
{self._format_workspace_costs(cost_data.get('by_workspace', {}))}

## Optimization Opportunities

{self._format_opportunities(cost_data.get('opportunities', []))}

## Recommendations

1. {cost_data.get('top_recommendation', 'Review utilization patterns')}
2. Consider reserved capacity for predictable workloads
3. Implement query optimization for top consumers
"""

    def _format_workspace_costs(self, workspaces: Dict) -> str:
        """Format workspace costs table."""
        lines = []
        for ws, data in workspaces.items():
            lines.append(f"| {ws} | ${data.get('cost', 0):,.2f} | {data.get('pct', 0):.1f}% |")
        return "\n".join(lines)

    def _format_opportunities(self, opportunities: List[Dict]) -> str:
        """Format optimization opportunities."""
        if not opportunities:
            return "No immediate opportunities identified."

        lines = []
        for i, opp in enumerate(opportunities[:5], 1):
            lines.append(f"{i}. **{opp.get('type', 'Unknown')}**: {opp.get('description', '')}")
        return "\n".join(lines)

Tomorrow, we’ll look at 2024 predictions for AI and data!

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.