Back to Blog
6 min read

FinOps Practices: Managing Cloud Financial Operations

FinOps - the practice of bringing financial accountability to cloud spending - became essential in 2021. As cloud bills grew, organizations realized they needed discipline around cloud financial operations.

The FinOps Framework

FinOps operates in three phases:

  1. Inform: Visibility and allocation
  2. Optimize: Rates and usage
  3. Operate: Continuous improvement

Building a Cost Allocation Strategy

# Tag-based cost allocation
from azure.mgmt.resource import ResourceManagementClient
from azure.identity import DefaultAzureCredential
import json

class CostAllocationManager:
    REQUIRED_TAGS = ["CostCenter", "Environment", "Owner", "Application"]

    def __init__(self, subscription_id: str):
        self.credential = DefaultAzureCredential()
        self.client = ResourceManagementClient(self.credential, subscription_id)

    def audit_tag_compliance(self) -> dict:
        """Audit resources for required tags"""
        compliance_report = {
            "compliant": [],
            "non_compliant": [],
            "missing_tags": {}
        }

        for resource in self.client.resources.list():
            tags = resource.tags or {}
            missing = [tag for tag in self.REQUIRED_TAGS if tag not in tags]

            if missing:
                compliance_report["non_compliant"].append({
                    "resource_id": resource.id,
                    "resource_type": resource.type,
                    "missing_tags": missing
                })

                for tag in missing:
                    compliance_report["missing_tags"][tag] = \
                        compliance_report["missing_tags"].get(tag, 0) + 1
            else:
                compliance_report["compliant"].append(resource.id)

        compliance_report["compliance_rate"] = \
            len(compliance_report["compliant"]) / \
            (len(compliance_report["compliant"]) + len(compliance_report["non_compliant"]))

        return compliance_report

    def enforce_tags_policy(self):
        """Create Azure Policy for tag enforcement"""
        policy_definition = {
            "mode": "Indexed",
            "policyRule": {
                "if": {
                    "anyOf": [
                        {"field": f"tags['{tag}']", "exists": "false"}
                        for tag in self.REQUIRED_TAGS
                    ]
                },
                "then": {
                    "effect": "deny"
                }
            },
            "parameters": {}
        }

        return policy_definition

# Azure Policy for inherited tags
tag_inheritance_policy = """
{
    "mode": "Indexed",
    "policyRule": {
        "if": {
            "allOf": [
                {
                    "field": "tags['CostCenter']",
                    "exists": "false"
                },
                {
                    "value": "[resourceGroup().tags['CostCenter']]",
                    "notEquals": ""
                }
            ]
        },
        "then": {
            "effect": "modify",
            "details": {
                "roleDefinitionIds": [
                    "/providers/Microsoft.Authorization/roleDefinitions/b24988ac-6180-42a0-ab88-20f7382dd24c"
                ],
                "operations": [
                    {
                        "operation": "add",
                        "field": "tags['CostCenter']",
                        "value": "[resourceGroup().tags['CostCenter']]"
                    }
                ]
            }
        }
    }
}
"""

Showback and Chargeback Reports

from azure.mgmt.costmanagement import CostManagementClient
import pandas as pd
from datetime import datetime, timedelta

class FinOpsReporter:
    def __init__(self, subscription_id: str):
        self.credential = DefaultAzureCredential()
        self.client = CostManagementClient(self.credential)
        self.scope = f"/subscriptions/{subscription_id}"

    def generate_showback_report(self, month: str) -> pd.DataFrame:
        """Generate showback report by cost center"""
        start_date = f"{month}-01"
        end_date = (datetime.strptime(start_date, "%Y-%m-%d") +
                    timedelta(days=32)).replace(day=1) - timedelta(days=1)

        query = {
            "type": "ActualCost",
            "timeframe": "Custom",
            "timePeriod": {
                "from": start_date,
                "to": end_date.strftime("%Y-%m-%d")
            },
            "dataset": {
                "granularity": "None",
                "aggregation": {
                    "totalCost": {"name": "Cost", "function": "Sum"}
                },
                "grouping": [
                    {"type": "Tag", "name": "CostCenter"},
                    {"type": "Tag", "name": "Application"},
                    {"type": "Tag", "name": "Environment"}
                ]
            }
        }

        result = self.client.query.usage(scope=self.scope, parameters=query)

        columns = [col.name for col in result.columns]
        data = [row for row in result.rows]
        df = pd.DataFrame(data, columns=columns)

        # Apply shared costs allocation
        df = self._allocate_shared_costs(df)

        return df

    def _allocate_shared_costs(self, df: pd.DataFrame) -> pd.DataFrame:
        """Allocate shared infrastructure costs"""
        # Identify shared costs (no CostCenter tag or tagged as "Shared")
        shared_mask = (df["CostCenter"].isna()) | (df["CostCenter"] == "Shared")
        shared_costs = df[shared_mask]["Cost"].sum()

        # Get cost centers and their proportional spend
        cost_center_totals = df[~shared_mask].groupby("CostCenter")["Cost"].sum()
        total_tagged = cost_center_totals.sum()

        if total_tagged > 0:
            # Proportional allocation
            allocations = (cost_center_totals / total_tagged) * shared_costs
            df.loc[~shared_mask, "AllocatedSharedCosts"] = \
                df[~shared_mask]["CostCenter"].map(allocations / cost_center_totals * df["Cost"])

        df["TotalCost"] = df["Cost"] + df.get("AllocatedSharedCosts", 0)

        return df

    def generate_anomaly_report(self, threshold_pct: float = 20) -> list:
        """Detect cost anomalies"""
        # Get last 60 days of daily costs
        query = {
            "type": "ActualCost",
            "timeframe": "Custom",
            "timePeriod": {
                "from": (datetime.utcnow() - timedelta(days=60)).strftime("%Y-%m-%d"),
                "to": datetime.utcnow().strftime("%Y-%m-%d")
            },
            "dataset": {
                "granularity": "Daily",
                "aggregation": {
                    "totalCost": {"name": "Cost", "function": "Sum"}
                },
                "grouping": [
                    {"type": "Dimension", "name": "ServiceName"}
                ]
            }
        }

        result = self.client.query.usage(scope=self.scope, parameters=query)
        df = pd.DataFrame([row for row in result.rows],
                         columns=[col.name for col in result.columns])

        anomalies = []
        for service in df["ServiceName"].unique():
            service_data = df[df["ServiceName"] == service].sort_values("UsageDate")

            if len(service_data) < 14:
                continue

            # Calculate rolling average and std
            service_data["rolling_avg"] = service_data["Cost"].rolling(14).mean()
            service_data["rolling_std"] = service_data["Cost"].rolling(14).std()

            latest = service_data.iloc[-1]
            if latest["rolling_avg"] > 0:
                deviation_pct = (latest["Cost"] - latest["rolling_avg"]) / latest["rolling_avg"] * 100

                if abs(deviation_pct) > threshold_pct:
                    anomalies.append({
                        "service": service,
                        "date": latest["UsageDate"],
                        "cost": latest["Cost"],
                        "expected": latest["rolling_avg"],
                        "deviation_pct": deviation_pct
                    })

        return sorted(anomalies, key=lambda x: abs(x["deviation_pct"]), reverse=True)

Unit Economics Tracking

class UnitEconomicsTracker:
    """Track cost per business unit"""

    def __init__(self, cost_client, metrics_client):
        self.cost_client = cost_client
        self.metrics_client = metrics_client

    def calculate_unit_costs(self, period: str) -> dict:
        """Calculate cost per unit metrics"""

        # Get total infrastructure costs
        infra_costs = self._get_infrastructure_costs(period)

        # Get business metrics from Application Insights or custom source
        business_metrics = self._get_business_metrics(period)

        unit_costs = {
            "period": period,
            "total_cost": infra_costs["total"],
            "metrics": {}
        }

        # Cost per transaction
        if business_metrics.get("total_transactions"):
            unit_costs["metrics"]["cost_per_transaction"] = \
                infra_costs["total"] / business_metrics["total_transactions"]

        # Cost per API call
        if business_metrics.get("total_api_calls"):
            unit_costs["metrics"]["cost_per_api_call"] = \
                infra_costs["api_services"] / business_metrics["total_api_calls"]

        # Cost per active user
        if business_metrics.get("active_users"):
            unit_costs["metrics"]["cost_per_active_user"] = \
                infra_costs["total"] / business_metrics["active_users"]

        # Cost per GB processed
        if business_metrics.get("data_processed_gb"):
            unit_costs["metrics"]["cost_per_gb_processed"] = \
                infra_costs["data_services"] / business_metrics["data_processed_gb"]

        return unit_costs

    def trend_analysis(self, periods: list) -> pd.DataFrame:
        """Analyze unit cost trends"""
        trends = []
        for period in periods:
            unit_costs = self.calculate_unit_costs(period)
            trends.append({
                "period": period,
                **unit_costs["metrics"]
            })

        df = pd.DataFrame(trends)

        # Calculate month-over-month changes
        for col in df.columns:
            if col != "period":
                df[f"{col}_mom_change"] = df[col].pct_change() * 100

        return df

FinOps Automation

# Azure DevOps pipeline for FinOps automation
trigger:
  schedules:
    - cron: "0 6 * * 1"  # Every Monday at 6 AM
      displayName: Weekly FinOps Report
      branches:
        include:
          - main

stages:
  - stage: GenerateReports
    jobs:
      - job: CostReports
        steps:
          - task: AzureCLI@2
            displayName: Generate Showback Report
            inputs:
              azureSubscription: 'finops-service-connection'
              scriptType: 'bash'
              scriptLocation: 'inlineScript'
              inlineScript: |
                python scripts/generate_showback.py \
                  --subscription $(SUBSCRIPTION_ID) \
                  --output reports/showback.csv

          - task: AzureCLI@2
            displayName: Detect Anomalies
            inputs:
              azureSubscription: 'finops-service-connection'
              scriptType: 'bash'
              scriptLocation: 'inlineScript'
              inlineScript: |
                python scripts/detect_anomalies.py \
                  --subscription $(SUBSCRIPTION_ID) \
                  --threshold 20 \
                  --output reports/anomalies.json

          - task: AzureCLI@2
            displayName: Check Tag Compliance
            inputs:
              azureSubscription: 'finops-service-connection'
              scriptType: 'bash'
              scriptLocation: 'inlineScript'
              inlineScript: |
                python scripts/audit_tags.py \
                  --subscription $(SUBSCRIPTION_ID) \
                  --output reports/tag_compliance.json

  - stage: Notify
    dependsOn: GenerateReports
    jobs:
      - job: SendReports
        steps:
          - task: PowerShell@2
            displayName: Send Email Reports
            inputs:
              targetType: 'inline'
              script: |
                $report = Get-Content reports/showback.csv | ConvertFrom-Csv

                # Build email content
                $body = @"
                Weekly FinOps Report
                ====================

                Total Spend: $($report | Measure-Object Cost -Sum | Select -Expand Sum)

                Top Cost Centers:
                $(($report | Group-Object CostCenter |
                    Sort-Object { ($_.Group | Measure-Object Cost -Sum).Sum } -Descending |
                    Select-Object -First 5 |
                    ForEach-Object { "- $($_.Name): $$(($_.Group | Measure-Object Cost -Sum).Sum)" }) -join "`n")
                "@

                # Send via Logic App or SendGrid

FinOps Maturity Assessment

CapabilityCrawlWalkRun
Cost VisibilityBasic reportsShowbackReal-time dashboards
AllocationManual taggingTag policiesAutomated allocation
OptimizationAd-hocMonthly reviewsContinuous automation
ForecastingNoneTrend-basedML-powered
GovernanceReactiveBudgets & alertsPreventive policies
CultureCentralizedSharedEmbedded

Key FinOps Principles

  1. Teams Need to Collaborate: Finance, Engineering, and Business together
  2. Everyone Takes Ownership: Engineers understand cost impact
  3. Reports Should Be Accessible: Real-time, not monthly
  4. Decisions Are Data-Driven: Unit economics guide architecture
  5. Take Advantage of the Cloud: Variable cost is a feature, not a bug

FinOps in 2021 grew from a niche practice to an organizational capability. The tools exist; success requires culture change and executive support.

Resources

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.