9 min read
Usage Reporting in Microsoft Fabric
Comprehensive usage reporting provides visibility into how your organization consumes Microsoft Fabric resources. Let’s explore how to build effective usage reports and dashboards.
Usage Data Sources
Usage Data Flow:
┌─────────────────────────────────────────────────────────────┐
│ Microsoft Fabric │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Capacity │ │ Activity │ │ Audit │ │ Admin │ │
│ │ Metrics │ │ Logs │ │ Logs │ │ API │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ └─────────────┴─────────────┴─────────────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ OneLake │ │
│ │ Lakehouse │ │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ Reports │ │
│ │ Dashboards│ │
│ └───────────┘ │
└─────────────────────────────────────────────────────────────┘
Collecting Usage Data
Capacity Metrics Collection
from datetime import datetime, timedelta
import requests
class UsageDataCollector:
"""Collect usage data from Fabric APIs."""
def __init__(self, tenant_id: str, client_id: str, client_secret: str):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.base_url = "https://api.fabric.microsoft.com/v1"
def _get_token(self) -> str:
"""Get access token."""
# OAuth token acquisition
token_url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token"
response = requests.post(token_url, data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": "https://api.fabric.microsoft.com/.default"
})
return response.json().get("access_token")
def _get_headers(self) -> dict:
return {
"Authorization": f"Bearer {self._get_token()}",
"Content-Type": "application/json"
}
def get_capacity_usage(
self,
capacity_id: str,
start_date: str,
end_date: str
) -> dict:
"""Get capacity usage metrics."""
# This would call the actual Fabric admin API
# Simulated response
return {
"capacity_id": capacity_id,
"period": {"start": start_date, "end": end_date},
"metrics": {
"total_cu_hours": 1500,
"peak_cu_usage": 95,
"average_cu_usage": 45,
"operations_count": 25000
},
"by_workload": {
"spark": {"cu_hours": 800, "operations": 500},
"warehouse": {"cu_hours": 400, "operations": 15000},
"dataflow": {"cu_hours": 200, "operations": 200},
"power_bi": {"cu_hours": 100, "operations": 9300}
}
}
def get_workspace_activity(self, workspace_id: str, days: int = 30) -> list:
"""Get workspace activity logs."""
# Simulated activity data
return [
{
"timestamp": "2024-08-30T10:00:00Z",
"operation": "RefreshDataset",
"user": "user@company.com",
"duration_seconds": 120,
"status": "Success"
},
{
"timestamp": "2024-08-30T09:30:00Z",
"operation": "ExecuteQuery",
"user": "analyst@company.com",
"duration_seconds": 5,
"status": "Success"
}
]
def get_storage_usage(self, workspace_id: str) -> dict:
"""Get OneLake storage usage."""
return {
"workspace_id": workspace_id,
"total_size_gb": 150.5,
"by_item_type": {
"lakehouse": 80.2,
"warehouse": 45.3,
"semantic_model": 15.0,
"other": 10.0
}
}
# Usage
collector = UsageDataCollector(
tenant_id="tenant-id",
client_id="client-id",
client_secret="secret"
)
# Collect capacity usage
capacity_usage = collector.get_capacity_usage(
capacity_id="capacity-123",
start_date="2024-08-01",
end_date="2024-08-31"
)
print(f"Total CU-hours: {capacity_usage['metrics']['total_cu_hours']}")
Activity Log Processing
class ActivityLogProcessor:
"""Process and aggregate activity logs."""
def __init__(self):
self.activity_log = []
def ingest_activities(self, activities: list):
"""Ingest raw activity data."""
self.activity_log.extend(activities)
def aggregate_by_user(self) -> dict:
"""Aggregate activities by user."""
user_stats = {}
for activity in self.activity_log:
user = activity.get("user", "unknown")
if user not in user_stats:
user_stats[user] = {
"operation_count": 0,
"total_duration_seconds": 0,
"operations": {}
}
user_stats[user]["operation_count"] += 1
user_stats[user]["total_duration_seconds"] += activity.get("duration_seconds", 0)
op = activity.get("operation", "unknown")
if op not in user_stats[user]["operations"]:
user_stats[user]["operations"][op] = 0
user_stats[user]["operations"][op] += 1
return user_stats
def aggregate_by_operation(self) -> dict:
"""Aggregate activities by operation type."""
op_stats = {}
for activity in self.activity_log:
op = activity.get("operation", "unknown")
if op not in op_stats:
op_stats[op] = {
"count": 0,
"total_duration_seconds": 0,
"success_count": 0,
"failure_count": 0
}
op_stats[op]["count"] += 1
op_stats[op]["total_duration_seconds"] += activity.get("duration_seconds", 0)
if activity.get("status") == "Success":
op_stats[op]["success_count"] += 1
else:
op_stats[op]["failure_count"] += 1
# Calculate averages and success rates
for op, stats in op_stats.items():
stats["avg_duration_seconds"] = stats["total_duration_seconds"] / stats["count"]
stats["success_rate"] = stats["success_count"] / stats["count"] * 100
return op_stats
def get_hourly_distribution(self) -> dict:
"""Get activity distribution by hour."""
hourly = {str(h).zfill(2): 0 for h in range(24)}
for activity in self.activity_log:
timestamp = activity.get("timestamp", "")
if timestamp:
hour = timestamp[11:13] # Extract hour from ISO timestamp
if hour in hourly:
hourly[hour] += 1
return hourly
# Usage
processor = ActivityLogProcessor()
# Ingest sample activities
processor.ingest_activities([
{"timestamp": "2024-08-30T10:00:00Z", "operation": "RefreshDataset", "user": "user1@company.com", "duration_seconds": 120, "status": "Success"},
{"timestamp": "2024-08-30T10:30:00Z", "operation": "ExecuteQuery", "user": "user2@company.com", "duration_seconds": 5, "status": "Success"},
{"timestamp": "2024-08-30T11:00:00Z", "operation": "RefreshDataset", "user": "user1@company.com", "duration_seconds": 150, "status": "Success"},
{"timestamp": "2024-08-30T14:00:00Z", "operation": "ExecuteQuery", "user": "user3@company.com", "duration_seconds": 3, "status": "Failure"}
])
# Get aggregations
user_stats = processor.aggregate_by_user()
print("Activity by User:")
for user, stats in user_stats.items():
print(f" {user}: {stats['operation_count']} operations")
op_stats = processor.aggregate_by_operation()
print("\nActivity by Operation:")
for op, stats in op_stats.items():
print(f" {op}: {stats['count']} calls, {stats['success_rate']:.1f}% success")
Building Usage Reports
Executive Dashboard
class ExecutiveUsageReport:
"""Generate executive-level usage reports."""
def __init__(self):
self.data = {}
def load_data(
self,
capacity_usage: dict,
cost_data: dict,
user_activity: dict
):
"""Load all data sources."""
self.data["capacity"] = capacity_usage
self.data["cost"] = cost_data
self.data["activity"] = user_activity
def generate_kpis(self) -> dict:
"""Generate key performance indicators."""
capacity = self.data.get("capacity", {})
cost = self.data.get("cost", {})
activity = self.data.get("activity", {})
return {
"utilization": {
"average_percent": capacity.get("metrics", {}).get("average_cu_usage", 0),
"peak_percent": capacity.get("metrics", {}).get("peak_cu_usage", 0),
"trend": "up" # Would calculate from historical data
},
"cost": {
"mtd": cost.get("mtd_cost", 0),
"budget": cost.get("budget", 0),
"variance_percent": ((cost.get("mtd_cost", 0) - cost.get("budget", 0)) / cost.get("budget", 1)) * 100 if cost.get("budget") else 0
},
"adoption": {
"active_users": len(activity),
"total_operations": sum(u.get("operation_count", 0) for u in activity.values())
}
}
def generate_trends(self, historical_data: list) -> dict:
"""Generate trend analysis."""
if not historical_data:
return {}
# Calculate month-over-month changes
current = historical_data[-1] if historical_data else {}
previous = historical_data[-2] if len(historical_data) > 1 else {}
def calc_change(curr, prev):
if prev and prev > 0:
return ((curr - prev) / prev) * 100
return 0
return {
"cu_hours_change": calc_change(
current.get("cu_hours", 0),
previous.get("cu_hours", 0)
),
"cost_change": calc_change(
current.get("cost", 0),
previous.get("cost", 0)
),
"users_change": calc_change(
current.get("active_users", 0),
previous.get("active_users", 0)
)
}
def to_markdown(self) -> str:
"""Generate markdown report."""
kpis = self.generate_kpis()
report = """
# Fabric Usage Report
## Key Metrics
| Metric | Value | Status |
|--------|-------|--------|
| Avg Utilization | {util_avg}% | {util_status} |
| Peak Utilization | {util_peak}% | - |
| MTD Cost | ${cost_mtd:,.2f} | {cost_status} |
| Active Users | {users} | - |
## Utilization Trend
Current average utilization is {util_avg}%, with peak reaching {util_peak}%.
## Cost Status
Month-to-date spending is ${cost_mtd:,.2f} against a budget of ${budget:,.2f}.
Variance: {variance:.1f}%
""".format(
util_avg=kpis["utilization"]["average_percent"],
util_peak=kpis["utilization"]["peak_percent"],
util_status="Good" if kpis["utilization"]["average_percent"] > 50 else "Review",
cost_mtd=kpis["cost"]["mtd"],
budget=kpis["cost"]["budget"],
variance=kpis["cost"]["variance_percent"],
cost_status="On Track" if kpis["cost"]["variance_percent"] <= 10 else "Over Budget",
users=kpis["adoption"]["active_users"]
)
return report
# Usage
report = ExecutiveUsageReport()
report.load_data(
capacity_usage={
"metrics": {"average_cu_usage": 65, "peak_cu_usage": 92}
},
cost_data={
"mtd_cost": 8500,
"budget": 10000
},
user_activity={
"user1": {"operation_count": 100},
"user2": {"operation_count": 50},
"user3": {"operation_count": 25}
}
)
print(report.to_markdown())
Detailed Usage Report
class DetailedUsageReport:
"""Generate detailed usage reports for teams."""
def __init__(self):
self.usage_data = {}
def add_workspace_usage(
self,
workspace_name: str,
team: str,
cu_hours: float,
storage_gb: float,
query_count: int,
refresh_count: int
):
"""Add usage data for a workspace."""
if team not in self.usage_data:
self.usage_data[team] = {"workspaces": []}
self.usage_data[team]["workspaces"].append({
"name": workspace_name,
"cu_hours": cu_hours,
"storage_gb": storage_gb,
"query_count": query_count,
"refresh_count": refresh_count
})
def generate_team_report(self, team: str) -> dict:
"""Generate detailed report for a team."""
if team not in self.usage_data:
return {"error": "Team not found"}
workspaces = self.usage_data[team]["workspaces"]
total_cu = sum(w["cu_hours"] for w in workspaces)
total_storage = sum(w["storage_gb"] for w in workspaces)
total_queries = sum(w["query_count"] for w in workspaces)
total_refreshes = sum(w["refresh_count"] for w in workspaces)
# Sort workspaces by CU usage
sorted_workspaces = sorted(
workspaces,
key=lambda x: x["cu_hours"],
reverse=True
)
return {
"team": team,
"summary": {
"total_cu_hours": total_cu,
"total_storage_gb": total_storage,
"total_queries": total_queries,
"total_refreshes": total_refreshes,
"workspace_count": len(workspaces)
},
"workspaces": sorted_workspaces,
"recommendations": self._generate_recommendations(sorted_workspaces)
}
def _generate_recommendations(self, workspaces: list) -> list:
"""Generate optimization recommendations."""
recommendations = []
for ws in workspaces:
# Low query count but high CU
if ws["cu_hours"] > 100 and ws["query_count"] < 10:
recommendations.append({
"workspace": ws["name"],
"issue": "High compute with low query activity",
"recommendation": "Review if refreshes are necessary"
})
# High storage
if ws["storage_gb"] > 50:
recommendations.append({
"workspace": ws["name"],
"issue": f"Large storage footprint ({ws['storage_gb']:.1f} GB)",
"recommendation": "Consider data lifecycle policies"
})
return recommendations
def compare_teams(self) -> dict:
"""Compare usage across teams."""
team_summaries = {}
for team, data in self.usage_data.items():
workspaces = data["workspaces"]
team_summaries[team] = {
"cu_hours": sum(w["cu_hours"] for w in workspaces),
"storage_gb": sum(w["storage_gb"] for w in workspaces),
"workspace_count": len(workspaces)
}
# Rank teams
total_cu = sum(t["cu_hours"] for t in team_summaries.values())
for team in team_summaries:
team_summaries[team]["cu_percent"] = (
team_summaries[team]["cu_hours"] / total_cu * 100
if total_cu > 0 else 0
)
return team_summaries
# Usage
detailed = DetailedUsageReport()
# Add workspace data
detailed.add_workspace_usage("Sales Dashboard", "Sales", 300, 25, 5000, 60)
detailed.add_workspace_usage("Sales ETL", "Sales", 450, 80, 100, 30)
detailed.add_workspace_usage("Marketing Analytics", "Marketing", 200, 15, 2000, 30)
detailed.add_workspace_usage("Finance Reports", "Finance", 100, 10, 1000, 15)
# Generate team report
sales_report = detailed.generate_team_report("Sales")
print(f"Sales Team Summary:")
print(f" Total CU-hours: {sales_report['summary']['total_cu_hours']}")
print(f" Total Storage: {sales_report['summary']['total_storage_gb']:.1f} GB")
# Compare teams
comparison = detailed.compare_teams()
print("\nTeam Comparison:")
for team, stats in comparison.items():
print(f" {team}: {stats['cu_hours']} CU-hours ({stats['cu_percent']:.1f}%)")
Automating Reports
class ReportScheduler:
"""Schedule and distribute usage reports."""
def __init__(self):
self.schedules = []
def add_schedule(
self,
report_name: str,
frequency: str, # "daily", "weekly", "monthly"
recipients: list,
report_type: str
):
"""Add a report schedule."""
self.schedules.append({
"name": report_name,
"frequency": frequency,
"recipients": recipients,
"report_type": report_type,
"last_run": None
})
def should_run(self, schedule: dict, current_date: datetime) -> bool:
"""Check if report should run."""
if schedule["last_run"] is None:
return True
last_run = schedule["last_run"]
frequency = schedule["frequency"]
if frequency == "daily":
return (current_date - last_run).days >= 1
elif frequency == "weekly":
return (current_date - last_run).days >= 7
elif frequency == "monthly":
return (current_date - last_run).days >= 30
return False
def run_due_reports(self, current_date: datetime) -> list:
"""Run all due reports."""
results = []
for schedule in self.schedules:
if self.should_run(schedule, current_date):
result = {
"report_name": schedule["name"],
"generated_at": current_date.isoformat(),
"recipients": schedule["recipients"],
"status": "generated"
}
schedule["last_run"] = current_date
results.append(result)
return results
# Usage
scheduler = ReportScheduler()
scheduler.add_schedule(
"Executive Summary",
"weekly",
["cto@company.com", "cfo@company.com"],
"executive"
)
scheduler.add_schedule(
"Team Usage Detail",
"daily",
["data-team@company.com"],
"detailed"
)
# Run due reports
results = scheduler.run_due_reports(datetime.now())
print(f"Reports generated: {len(results)}")
Best Practices
- Collect comprehensively: Gather data from all sources
- Aggregate appropriately: Different audiences need different views
- Automate distribution: Regular reports without manual effort
- Include trends: Show direction, not just current state
- Add recommendations: Make reports actionable
Conclusion
Effective usage reporting provides the visibility needed for informed decisions about Microsoft Fabric investments. Build reports that serve different audiences - executives need KPIs, teams need details, finance needs cost allocation.
Automate report generation and distribution to ensure stakeholders always have current information without manual overhead.