Working with Azure Activity Logs for Audit and Compliance
Introduction
Azure Activity Logs provide a record of all control plane operations performed on resources in your subscription. From resource creation and modification to role assignments and policy evaluations, Activity Logs are essential for auditing, compliance, and security investigations.
In this post, we will explore how to work with Activity Logs effectively.
Understanding Activity Log Categories
Activity Logs include several categories:
- Administrative: Resource management operations (create, update, delete)
- Security: Security Center alerts and recommendations
- Service Health: Azure service incidents and maintenance
- Alert: Metric and log alert activations
- Recommendation: Azure Advisor recommendations
- Policy: Azure Policy evaluations
- Autoscale: Autoscale operations
- Resource Health: Resource availability changes
Querying Activity Logs
Query Activity Logs using Azure CLI:
# Get all activity logs from last 24 hours
az monitor activity-log list \
--start-time $(date -u -d '24 hours ago' '+%Y-%m-%dT%H:%M:%SZ') \
--output table
# Filter by operation type
az monitor activity-log list \
--start-time $(date -u -d '7 days ago' '+%Y-%m-%dT%H:%M:%SZ') \
--filter "operationName eq 'Microsoft.Authorization/roleAssignments/write'" \
--output table
# Filter by resource group
az monitor activity-log list \
--resource-group rg-production \
--start-time $(date -u -d '24 hours ago' '+%Y-%m-%dT%H:%M:%SZ')
# Get failed operations
az monitor activity-log list \
--start-time $(date -u -d '24 hours ago' '+%Y-%m-%dT%H:%M:%SZ') \
--filter "status eq 'Failed'" \
--output table
Python SDK for Activity Logs
Query and analyze activity logs programmatically:
from azure.mgmt.monitor import MonitorManagementClient
from azure.identity import DefaultAzureCredential
from datetime import datetime, timedelta
import pandas as pd
credential = DefaultAzureCredential()
monitor_client = MonitorManagementClient(credential, subscription_id)
def get_activity_logs(days=7, filter_str=None):
"""Get activity logs with optional filtering."""
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
# Build filter
time_filter = f"eventTimestamp ge '{start_time.isoformat()}Z' and eventTimestamp le '{end_time.isoformat()}Z'"
if filter_str:
full_filter = f"{time_filter} and {filter_str}"
else:
full_filter = time_filter
logs = monitor_client.activity_logs.list(filter=full_filter)
results = []
for log in logs:
results.append({
"timestamp": log.event_timestamp,
"operation": log.operation_name.localized_value if log.operation_name else None,
"status": log.status.value if log.status else None,
"caller": log.caller,
"resource_type": log.resource_type.value if log.resource_type else None,
"resource_id": log.resource_id,
"category": log.category.value if log.category else None,
"level": log.level.value if log.level else None,
"description": log.description
})
return pd.DataFrame(results)
# Get all administrative operations
admin_logs = get_activity_logs(
days=7,
filter_str="category eq 'Administrative'"
)
print(f"Total administrative operations: {len(admin_logs)}")
print(admin_logs.head())
# Analyze by operation type
operation_counts = admin_logs.groupby("operation").size().sort_values(ascending=False)
print("\nTop operations:")
print(operation_counts.head(10))
Security Audit Queries
Audit security-relevant operations:
def audit_role_assignments(days=30):
"""Audit all role assignment changes."""
logs = get_activity_logs(
days=days,
filter_str="operationName eq 'Microsoft.Authorization/roleAssignments/write' or operationName eq 'Microsoft.Authorization/roleAssignments/delete'"
)
print(f"\nRole assignment changes in last {days} days: {len(logs)}")
for _, log in logs.iterrows():
action = "Created" if "write" in str(log["operation"]) else "Deleted"
print(f"{log['timestamp']}: {action} by {log['caller']}")
print(f" Resource: {log['resource_id']}")
print(f" Status: {log['status']}")
print()
return logs
def audit_resource_deletions(days=7):
"""Audit all resource deletions."""
logs = get_activity_logs(
days=days,
filter_str="operationName contains 'delete' and status eq 'Succeeded'"
)
deleted_resources = logs[logs["operation"].str.contains("Delete", case=False, na=False)]
print(f"\nSuccessful deletions in last {days} days: {len(deleted_resources)}")
# Group by resource type
by_type = deleted_resources.groupby("resource_type").size().sort_values(ascending=False)
print("\nDeletions by resource type:")
print(by_type)
return deleted_resources
def audit_failed_operations(days=1):
"""Audit failed operations for troubleshooting."""
logs = get_activity_logs(
days=days,
filter_str="status eq 'Failed'"
)
print(f"\nFailed operations in last {days} days: {len(logs)}")
# Group by operation
by_operation = logs.groupby("operation").size().sort_values(ascending=False)
print("\nFailed operations by type:")
print(by_operation.head(10))
return logs
# Run security audits
role_changes = audit_role_assignments(days=30)
deletions = audit_resource_deletions(days=7)
failures = audit_failed_operations(days=1)
Exporting Activity Logs
Configure long-term export to storage:
# Diagnostic setting for Activity Log
resource "azurerm_monitor_diagnostic_setting" "activity_log_export" {
name = "activity-log-export"
target_resource_id = "/subscriptions/${data.azurerm_subscription.current.subscription_id}"
log_analytics_workspace_id = azurerm_log_analytics_workspace.main.id
storage_account_id = azurerm_storage_account.audit.id
enabled_log {
category = "Administrative"
}
enabled_log {
category = "Security"
}
enabled_log {
category = "ServiceHealth"
}
enabled_log {
category = "Alert"
}
enabled_log {
category = "Recommendation"
}
enabled_log {
category = "Policy"
}
enabled_log {
category = "Autoscale"
}
enabled_log {
category = "ResourceHealth"
}
}
# Storage account for compliance retention
resource "azurerm_storage_account" "audit" {
name = "auditlogsstorage"
resource_group_name = azurerm_resource_group.compliance.name
location = azurerm_resource_group.compliance.location
account_tier = "Standard"
account_replication_type = "GRS"
min_tls_version = "TLS1_2"
blob_properties {
versioning_enabled = true
delete_retention_policy {
days = 365
}
}
# Immutable storage for compliance
immutability_policy {
allow_protected_append_writes = true
period_since_creation_in_days = 365
state = "Locked"
}
tags = {
Purpose = "Audit Logs"
Compliance = "Required"
}
}
Log Analytics Queries
Query Activity Logs in Log Analytics:
// All resource creations
AzureActivity
| where OperationNameValue contains "write"
| where ActivityStatusValue == "Success"
| where TimeGenerated > ago(7d)
| project TimeGenerated, Caller, OperationNameValue, ResourceGroup, _ResourceId
| order by TimeGenerated desc
// Administrative changes by user
AzureActivity
| where CategoryValue == "Administrative"
| where TimeGenerated > ago(30d)
| summarize OperationCount = count() by Caller
| order by OperationCount desc
| take 20
// Failed operations with error details
AzureActivity
| where ActivityStatusValue == "Failed"
| where TimeGenerated > ago(24h)
| project TimeGenerated, Caller, OperationNameValue, ResourceGroup,
Properties_d.statusMessage, Properties_d.statusCode
| order by TimeGenerated desc
// Policy evaluation failures
AzureActivity
| where CategoryValue == "Policy"
| where ActivityStatusValue == "Failed"
| where TimeGenerated > ago(7d)
| project TimeGenerated, OperationNameValue, ResourceGroup, Properties_d.message
| order by TimeGenerated desc
// RBAC changes
AzureActivity
| where OperationNameValue has "Microsoft.Authorization/roleAssignments"
| where TimeGenerated > ago(30d)
| extend RoleDefinitionId = tostring(parse_json(tostring(Properties_d.requestbody)).Properties.RoleDefinitionId)
| extend PrincipalId = tostring(parse_json(tostring(Properties_d.requestbody)).Properties.PrincipalId)
| project TimeGenerated, Caller, OperationNameValue, RoleDefinitionId, PrincipalId, ActivityStatusValue
| order by TimeGenerated desc
// Resource health changes
AzureActivity
| where CategoryValue == "ResourceHealth"
| where TimeGenerated > ago(7d)
| project TimeGenerated, _ResourceId, OperationNameValue, Properties_d
| order by TimeGenerated desc
Creating Alerts on Activity Logs
Set up alerts for critical operations:
def create_activity_log_alert(name, operation_name, severity=2):
"""Create an alert for specific activity log operations."""
alert = monitor_client.activity_log_alerts.create_or_update(
resource_group_name="rg-monitoring",
activity_log_alert_name=name,
activity_log_alert={
"location": "global",
"scopes": [f"/subscriptions/{subscription_id}"],
"enabled": True,
"condition": {
"allOf": [
{
"field": "category",
"equals": "Administrative"
},
{
"field": "operationName",
"equals": operation_name
},
{
"field": "status",
"equals": "Succeeded"
}
]
},
"actions": {
"actionGroups": [{
"actionGroupId": f"/subscriptions/{subscription_id}/resourceGroups/rg-monitoring/providers/Microsoft.Insights/actionGroups/security-team"
}]
},
"description": f"Alert when {operation_name} occurs"
}
)
return alert
# Create alerts for security-sensitive operations
alerts_to_create = [
("role-assignment-alert", "Microsoft.Authorization/roleAssignments/write"),
("policy-assignment-alert", "Microsoft.Authorization/policyAssignments/write"),
("keyvault-secret-delete", "Microsoft.KeyVault/vaults/secrets/delete"),
("nsg-rule-write", "Microsoft.Network/networkSecurityGroups/securityRules/write"),
("diagnostic-setting-delete", "Microsoft.Insights/diagnosticSettings/delete")
]
for name, operation in alerts_to_create:
create_activity_log_alert(name, operation)
print(f"Created alert: {name}")
Compliance Reporting
Generate compliance reports from Activity Logs:
def generate_compliance_report(days=30):
"""Generate a compliance report from Activity Logs."""
logs_df = get_activity_logs(days=days)
report = {
"period": f"Last {days} days",
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"total_operations": len(logs_df),
"successful": len(logs_df[logs_df["status"] == "Succeeded"]),
"failed": len(logs_df[logs_df["status"] == "Failed"]),
"unique_callers": logs_df["caller"].nunique()
},
"by_category": logs_df.groupby("category").size().to_dict(),
"security_events": {},
"high_risk_operations": []
}
# Security-relevant operations
security_operations = [
"Microsoft.Authorization/roleAssignments",
"Microsoft.Authorization/policyAssignments",
"Microsoft.KeyVault",
"Microsoft.Network/networkSecurityGroups"
]
for op in security_operations:
op_logs = logs_df[logs_df["operation"].str.contains(op, case=False, na=False)]
report["security_events"][op] = {
"count": len(op_logs),
"successful": len(op_logs[op_logs["status"] == "Succeeded"]),
"callers": op_logs["caller"].unique().tolist()
}
# High-risk operations (deletions, permission changes)
high_risk = logs_df[
(logs_df["operation"].str.contains("delete", case=False, na=False)) |
(logs_df["operation"].str.contains("roleAssignment", case=False, na=False))
]
for _, row in high_risk.iterrows():
report["high_risk_operations"].append({
"timestamp": row["timestamp"].isoformat() if pd.notna(row["timestamp"]) else None,
"operation": row["operation"],
"caller": row["caller"],
"resource": row["resource_id"],
"status": row["status"]
})
return report
# Generate and save report
report = generate_compliance_report(days=30)
import json
with open("compliance_report.json", "w") as f:
json.dump(report, f, indent=2, default=str)
print(f"Report generated: {report['summary']}")
Conclusion
Azure Activity Logs are fundamental for governance, compliance, and security in Azure. They provide a complete audit trail of control plane operations, enabling you to track who did what, when, and to which resources.
Best practices include exporting logs to long-term storage for compliance, setting up alerts on security-sensitive operations, and regularly reviewing logs for suspicious activity. Combined with Log Analytics queries, Activity Logs give you the visibility needed to maintain secure and compliant Azure environments.