Back to Blog
7 min read

Working with Azure Activity Logs for Audit and Compliance

Introduction

Azure Activity Logs provide a record of all control plane operations performed on resources in your subscription. From resource creation and modification to role assignments and policy evaluations, Activity Logs are essential for auditing, compliance, and security investigations.

In this post, we will explore how to work with Activity Logs effectively.

Understanding Activity Log Categories

Activity Logs include several categories:

  • Administrative: Resource management operations (create, update, delete)
  • Security: Security Center alerts and recommendations
  • Service Health: Azure service incidents and maintenance
  • Alert: Metric and log alert activations
  • Recommendation: Azure Advisor recommendations
  • Policy: Azure Policy evaluations
  • Autoscale: Autoscale operations
  • Resource Health: Resource availability changes

Querying Activity Logs

Query Activity Logs using Azure CLI:

# Get all activity logs from last 24 hours
az monitor activity-log list \
    --start-time $(date -u -d '24 hours ago' '+%Y-%m-%dT%H:%M:%SZ') \
    --output table

# Filter by operation type
az monitor activity-log list \
    --start-time $(date -u -d '7 days ago' '+%Y-%m-%dT%H:%M:%SZ') \
    --filter "operationName eq 'Microsoft.Authorization/roleAssignments/write'" \
    --output table

# Filter by resource group
az monitor activity-log list \
    --resource-group rg-production \
    --start-time $(date -u -d '24 hours ago' '+%Y-%m-%dT%H:%M:%SZ')

# Get failed operations
az monitor activity-log list \
    --start-time $(date -u -d '24 hours ago' '+%Y-%m-%dT%H:%M:%SZ') \
    --filter "status eq 'Failed'" \
    --output table

Python SDK for Activity Logs

Query and analyze activity logs programmatically:

from azure.mgmt.monitor import MonitorManagementClient
from azure.identity import DefaultAzureCredential
from datetime import datetime, timedelta
import pandas as pd

credential = DefaultAzureCredential()
monitor_client = MonitorManagementClient(credential, subscription_id)

def get_activity_logs(days=7, filter_str=None):
    """Get activity logs with optional filtering."""

    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days)

    # Build filter
    time_filter = f"eventTimestamp ge '{start_time.isoformat()}Z' and eventTimestamp le '{end_time.isoformat()}Z'"

    if filter_str:
        full_filter = f"{time_filter} and {filter_str}"
    else:
        full_filter = time_filter

    logs = monitor_client.activity_logs.list(filter=full_filter)

    results = []
    for log in logs:
        results.append({
            "timestamp": log.event_timestamp,
            "operation": log.operation_name.localized_value if log.operation_name else None,
            "status": log.status.value if log.status else None,
            "caller": log.caller,
            "resource_type": log.resource_type.value if log.resource_type else None,
            "resource_id": log.resource_id,
            "category": log.category.value if log.category else None,
            "level": log.level.value if log.level else None,
            "description": log.description
        })

    return pd.DataFrame(results)

# Get all administrative operations
admin_logs = get_activity_logs(
    days=7,
    filter_str="category eq 'Administrative'"
)

print(f"Total administrative operations: {len(admin_logs)}")
print(admin_logs.head())

# Analyze by operation type
operation_counts = admin_logs.groupby("operation").size().sort_values(ascending=False)
print("\nTop operations:")
print(operation_counts.head(10))

Security Audit Queries

Audit security-relevant operations:

def audit_role_assignments(days=30):
    """Audit all role assignment changes."""

    logs = get_activity_logs(
        days=days,
        filter_str="operationName eq 'Microsoft.Authorization/roleAssignments/write' or operationName eq 'Microsoft.Authorization/roleAssignments/delete'"
    )

    print(f"\nRole assignment changes in last {days} days: {len(logs)}")

    for _, log in logs.iterrows():
        action = "Created" if "write" in str(log["operation"]) else "Deleted"
        print(f"{log['timestamp']}: {action} by {log['caller']}")
        print(f"  Resource: {log['resource_id']}")
        print(f"  Status: {log['status']}")
        print()

    return logs

def audit_resource_deletions(days=7):
    """Audit all resource deletions."""

    logs = get_activity_logs(
        days=days,
        filter_str="operationName contains 'delete' and status eq 'Succeeded'"
    )

    deleted_resources = logs[logs["operation"].str.contains("Delete", case=False, na=False)]

    print(f"\nSuccessful deletions in last {days} days: {len(deleted_resources)}")

    # Group by resource type
    by_type = deleted_resources.groupby("resource_type").size().sort_values(ascending=False)
    print("\nDeletions by resource type:")
    print(by_type)

    return deleted_resources

def audit_failed_operations(days=1):
    """Audit failed operations for troubleshooting."""

    logs = get_activity_logs(
        days=days,
        filter_str="status eq 'Failed'"
    )

    print(f"\nFailed operations in last {days} days: {len(logs)}")

    # Group by operation
    by_operation = logs.groupby("operation").size().sort_values(ascending=False)
    print("\nFailed operations by type:")
    print(by_operation.head(10))

    return logs

# Run security audits
role_changes = audit_role_assignments(days=30)
deletions = audit_resource_deletions(days=7)
failures = audit_failed_operations(days=1)

Exporting Activity Logs

Configure long-term export to storage:

# Diagnostic setting for Activity Log
resource "azurerm_monitor_diagnostic_setting" "activity_log_export" {
  name                       = "activity-log-export"
  target_resource_id         = "/subscriptions/${data.azurerm_subscription.current.subscription_id}"
  log_analytics_workspace_id = azurerm_log_analytics_workspace.main.id
  storage_account_id         = azurerm_storage_account.audit.id

  enabled_log {
    category = "Administrative"
  }

  enabled_log {
    category = "Security"
  }

  enabled_log {
    category = "ServiceHealth"
  }

  enabled_log {
    category = "Alert"
  }

  enabled_log {
    category = "Recommendation"
  }

  enabled_log {
    category = "Policy"
  }

  enabled_log {
    category = "Autoscale"
  }

  enabled_log {
    category = "ResourceHealth"
  }
}

# Storage account for compliance retention
resource "azurerm_storage_account" "audit" {
  name                     = "auditlogsstorage"
  resource_group_name      = azurerm_resource_group.compliance.name
  location                 = azurerm_resource_group.compliance.location
  account_tier             = "Standard"
  account_replication_type = "GRS"
  min_tls_version          = "TLS1_2"

  blob_properties {
    versioning_enabled = true

    delete_retention_policy {
      days = 365
    }
  }

  # Immutable storage for compliance
  immutability_policy {
    allow_protected_append_writes = true
    period_since_creation_in_days = 365
    state                         = "Locked"
  }

  tags = {
    Purpose    = "Audit Logs"
    Compliance = "Required"
  }
}

Log Analytics Queries

Query Activity Logs in Log Analytics:

// All resource creations
AzureActivity
| where OperationNameValue contains "write"
| where ActivityStatusValue == "Success"
| where TimeGenerated > ago(7d)
| project TimeGenerated, Caller, OperationNameValue, ResourceGroup, _ResourceId
| order by TimeGenerated desc

// Administrative changes by user
AzureActivity
| where CategoryValue == "Administrative"
| where TimeGenerated > ago(30d)
| summarize OperationCount = count() by Caller
| order by OperationCount desc
| take 20

// Failed operations with error details
AzureActivity
| where ActivityStatusValue == "Failed"
| where TimeGenerated > ago(24h)
| project TimeGenerated, Caller, OperationNameValue, ResourceGroup,
          Properties_d.statusMessage, Properties_d.statusCode
| order by TimeGenerated desc

// Policy evaluation failures
AzureActivity
| where CategoryValue == "Policy"
| where ActivityStatusValue == "Failed"
| where TimeGenerated > ago(7d)
| project TimeGenerated, OperationNameValue, ResourceGroup, Properties_d.message
| order by TimeGenerated desc

// RBAC changes
AzureActivity
| where OperationNameValue has "Microsoft.Authorization/roleAssignments"
| where TimeGenerated > ago(30d)
| extend RoleDefinitionId = tostring(parse_json(tostring(Properties_d.requestbody)).Properties.RoleDefinitionId)
| extend PrincipalId = tostring(parse_json(tostring(Properties_d.requestbody)).Properties.PrincipalId)
| project TimeGenerated, Caller, OperationNameValue, RoleDefinitionId, PrincipalId, ActivityStatusValue
| order by TimeGenerated desc

// Resource health changes
AzureActivity
| where CategoryValue == "ResourceHealth"
| where TimeGenerated > ago(7d)
| project TimeGenerated, _ResourceId, OperationNameValue, Properties_d
| order by TimeGenerated desc

Creating Alerts on Activity Logs

Set up alerts for critical operations:

def create_activity_log_alert(name, operation_name, severity=2):
    """Create an alert for specific activity log operations."""

    alert = monitor_client.activity_log_alerts.create_or_update(
        resource_group_name="rg-monitoring",
        activity_log_alert_name=name,
        activity_log_alert={
            "location": "global",
            "scopes": [f"/subscriptions/{subscription_id}"],
            "enabled": True,
            "condition": {
                "allOf": [
                    {
                        "field": "category",
                        "equals": "Administrative"
                    },
                    {
                        "field": "operationName",
                        "equals": operation_name
                    },
                    {
                        "field": "status",
                        "equals": "Succeeded"
                    }
                ]
            },
            "actions": {
                "actionGroups": [{
                    "actionGroupId": f"/subscriptions/{subscription_id}/resourceGroups/rg-monitoring/providers/Microsoft.Insights/actionGroups/security-team"
                }]
            },
            "description": f"Alert when {operation_name} occurs"
        }
    )

    return alert

# Create alerts for security-sensitive operations
alerts_to_create = [
    ("role-assignment-alert", "Microsoft.Authorization/roleAssignments/write"),
    ("policy-assignment-alert", "Microsoft.Authorization/policyAssignments/write"),
    ("keyvault-secret-delete", "Microsoft.KeyVault/vaults/secrets/delete"),
    ("nsg-rule-write", "Microsoft.Network/networkSecurityGroups/securityRules/write"),
    ("diagnostic-setting-delete", "Microsoft.Insights/diagnosticSettings/delete")
]

for name, operation in alerts_to_create:
    create_activity_log_alert(name, operation)
    print(f"Created alert: {name}")

Compliance Reporting

Generate compliance reports from Activity Logs:

def generate_compliance_report(days=30):
    """Generate a compliance report from Activity Logs."""

    logs_df = get_activity_logs(days=days)

    report = {
        "period": f"Last {days} days",
        "generated_at": datetime.utcnow().isoformat(),
        "summary": {
            "total_operations": len(logs_df),
            "successful": len(logs_df[logs_df["status"] == "Succeeded"]),
            "failed": len(logs_df[logs_df["status"] == "Failed"]),
            "unique_callers": logs_df["caller"].nunique()
        },
        "by_category": logs_df.groupby("category").size().to_dict(),
        "security_events": {},
        "high_risk_operations": []
    }

    # Security-relevant operations
    security_operations = [
        "Microsoft.Authorization/roleAssignments",
        "Microsoft.Authorization/policyAssignments",
        "Microsoft.KeyVault",
        "Microsoft.Network/networkSecurityGroups"
    ]

    for op in security_operations:
        op_logs = logs_df[logs_df["operation"].str.contains(op, case=False, na=False)]
        report["security_events"][op] = {
            "count": len(op_logs),
            "successful": len(op_logs[op_logs["status"] == "Succeeded"]),
            "callers": op_logs["caller"].unique().tolist()
        }

    # High-risk operations (deletions, permission changes)
    high_risk = logs_df[
        (logs_df["operation"].str.contains("delete", case=False, na=False)) |
        (logs_df["operation"].str.contains("roleAssignment", case=False, na=False))
    ]

    for _, row in high_risk.iterrows():
        report["high_risk_operations"].append({
            "timestamp": row["timestamp"].isoformat() if pd.notna(row["timestamp"]) else None,
            "operation": row["operation"],
            "caller": row["caller"],
            "resource": row["resource_id"],
            "status": row["status"]
        })

    return report

# Generate and save report
report = generate_compliance_report(days=30)

import json
with open("compliance_report.json", "w") as f:
    json.dump(report, f, indent=2, default=str)

print(f"Report generated: {report['summary']}")

Conclusion

Azure Activity Logs are fundamental for governance, compliance, and security in Azure. They provide a complete audit trail of control plane operations, enabling you to track who did what, when, and to which resources.

Best practices include exporting logs to long-term storage for compliance, setting up alerts on security-sensitive operations, and regularly reviewing logs for suspicious activity. Combined with Log Analytics queries, Activity Logs give you the visibility needed to maintain secure and compliant Azure environments.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.