CosmosDB Change Feed Patterns for Event-Driven Architectures
The Cosmos DB Change Feed is a powerful feature that enables event-driven architectures by providing a sorted list of documents in the order they were modified. Today, I will explore various patterns for building reactive systems using Change Feed.
Understanding Change Feed
Change Feed captures every insert and update operation in a Cosmos DB container. Key characteristics:
- Sorted by modification time within each logical partition
- Guaranteed at-least-once delivery
- Supports parallel processing across partitions
- Persistent cursor via continuation tokens
Basic Change Feed Processing
Using Azure Functions
The simplest way to process Change Feed is with Azure Functions:
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.Documents;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
public static class OrderProcessor
{
[FunctionName("ProcessOrderChanges")]
public static async Task Run(
[CosmosDBTrigger(
databaseName: "ecommerce",
containerName: "orders",
Connection = "CosmosDBConnection",
LeaseContainerName = "leases",
CreateLeaseContainerIfNotExists = true,
StartFromBeginning = false,
MaxItemsPerInvocation = 100)]
IReadOnlyList<Order> orders,
ILogger log)
{
if (orders == null || orders.Count == 0) return;
foreach (var order in orders)
{
log.LogInformation($"Processing order {order.Id}, Status: {order.Status}");
switch (order.Status)
{
case "Created":
await ProcessNewOrder(order);
break;
case "Paid":
await InitiateShipment(order);
break;
case "Shipped":
await NotifyCustomer(order);
break;
case "Cancelled":
await ProcessRefund(order);
break;
}
}
}
private static async Task ProcessNewOrder(Order order)
{
// Validate inventory, calculate totals
}
private static async Task InitiateShipment(Order order)
{
// Create shipping label, notify warehouse
}
private static async Task NotifyCustomer(Order order)
{
// Send email/SMS notification
}
private static async Task ProcessRefund(Order order)
{
// Initiate refund, restore inventory
}
}
public class Order
{
public string Id { get; set; }
public string CustomerId { get; set; }
public string Status { get; set; }
public decimal TotalAmount { get; set; }
public List<OrderItem> Items { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime ModifiedAt { get; set; }
}
Using Change Feed Processor
For more control, use the Change Feed Processor library:
using Microsoft.Azure.Cosmos;
public class ChangeFeedService : IHostedService
{
private readonly CosmosClient _cosmosClient;
private ChangeFeedProcessor _processor;
private readonly ILogger<ChangeFeedService> _logger;
private readonly IServiceProvider _serviceProvider;
public ChangeFeedService(
CosmosClient cosmosClient,
ILogger<ChangeFeedService> logger,
IServiceProvider serviceProvider)
{
_cosmosClient = cosmosClient;
_logger = logger;
_serviceProvider = serviceProvider;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
var database = _cosmosClient.GetDatabase("ecommerce");
var sourceContainer = database.GetContainer("orders");
var leaseContainer = database.GetContainer("leases");
_processor = sourceContainer
.GetChangeFeedProcessorBuilder<Order>(
processorName: "OrderProcessor",
onChangesDelegate: HandleChangesAsync)
.WithInstanceName(Environment.MachineName)
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.UtcNow.AddDays(-1)) // Start from yesterday
.WithMaxItems(50)
.WithPollInterval(TimeSpan.FromSeconds(5))
.WithLeaseConfiguration(
acquireInterval: TimeSpan.FromSeconds(10),
expirationInterval: TimeSpan.FromSeconds(60),
renewInterval: TimeSpan.FromSeconds(20))
.Build();
await _processor.StartAsync();
_logger.LogInformation("Change Feed Processor started");
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _processor.StopAsync();
_logger.LogInformation("Change Feed Processor stopped");
}
private async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<Order> changes,
CancellationToken cancellationToken)
{
_logger.LogInformation(
$"Processing {changes.Count} changes from partition {context.LeaseToken}");
using var scope = _serviceProvider.CreateScope();
var eventPublisher = scope.ServiceProvider.GetRequiredService<IEventPublisher>();
var tasks = changes.Select(async order =>
{
try
{
var @event = new OrderChangedEvent
{
OrderId = order.Id,
Status = order.Status,
CustomerId = order.CustomerId,
Timestamp = order.ModifiedAt
};
await eventPublisher.PublishAsync(@event);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to process order {order.Id}");
// Implement retry or dead-letter logic
}
});
await Task.WhenAll(tasks);
}
}
Event Sourcing Pattern
Use Change Feed to implement event sourcing:
public class EventStore
{
private readonly Container _eventsContainer;
private readonly Container _snapshotsContainer;
public async Task AppendEventAsync<TEvent>(
string aggregateId,
TEvent @event,
int expectedVersion) where TEvent : DomainEvent
{
var eventDocument = new EventDocument
{
Id = Guid.NewGuid().ToString(),
AggregateId = aggregateId,
EventType = typeof(TEvent).Name,
EventData = JsonSerializer.Serialize(@event),
Version = expectedVersion + 1,
Timestamp = DateTime.UtcNow,
PartitionKey = aggregateId
};
// Optimistic concurrency check
var query = new QueryDefinition(
"SELECT VALUE MAX(c.Version) FROM c WHERE c.AggregateId = @aggregateId")
.WithParameter("@aggregateId", aggregateId);
var iterator = _eventsContainer.GetItemQueryIterator<int?>(query);
var response = await iterator.ReadNextAsync();
var currentVersion = response.Resource.FirstOrDefault() ?? 0;
if (currentVersion != expectedVersion)
{
throw new ConcurrencyException(
$"Expected version {expectedVersion}, but current is {currentVersion}");
}
await _eventsContainer.CreateItemAsync(
eventDocument,
new PartitionKey(aggregateId));
}
public async Task<TAggregate> RehydrateAsync<TAggregate>(string aggregateId)
where TAggregate : AggregateRoot, new()
{
// Try to load from snapshot first
var snapshot = await LoadSnapshotAsync<TAggregate>(aggregateId);
var aggregate = snapshot?.Aggregate ?? new TAggregate();
var fromVersion = snapshot?.Version ?? 0;
// Load events since snapshot
var query = new QueryDefinition(
"SELECT * FROM c WHERE c.AggregateId = @aggregateId AND c.Version > @version ORDER BY c.Version")
.WithParameter("@aggregateId", aggregateId)
.WithParameter("@version", fromVersion);
var events = new List<DomainEvent>();
var iterator = _eventsContainer.GetItemQueryIterator<EventDocument>(query);
while (iterator.HasMoreResults)
{
var response = await iterator.ReadNextAsync();
foreach (var doc in response.Resource)
{
var eventType = Type.GetType($"MyApp.Events.{doc.EventType}");
var @event = (DomainEvent)JsonSerializer.Deserialize(
doc.EventData, eventType);
events.Add(@event);
}
}
// Apply events to aggregate
foreach (var @event in events)
{
aggregate.Apply(@event);
}
return aggregate;
}
}
// Change Feed processor for projections
public class ProjectionProcessor
{
private readonly Container _readModelContainer;
public async Task ProcessEventsAsync(
IReadOnlyCollection<EventDocument> events)
{
foreach (var eventDoc in events)
{
switch (eventDoc.EventType)
{
case "OrderCreated":
await ProjectOrderCreated(eventDoc);
break;
case "OrderItemAdded":
await ProjectOrderItemAdded(eventDoc);
break;
case "OrderShipped":
await ProjectOrderShipped(eventDoc);
break;
}
}
}
private async Task ProjectOrderCreated(EventDocument eventDoc)
{
var @event = JsonSerializer.Deserialize<OrderCreated>(eventDoc.EventData);
var readModel = new OrderReadModel
{
Id = @event.OrderId,
CustomerId = @event.CustomerId,
Status = "Created",
TotalAmount = 0,
Items = new List<OrderItemReadModel>(),
CreatedAt = @event.Timestamp
};
await _readModelContainer.UpsertItemAsync(
readModel,
new PartitionKey(readModel.CustomerId));
}
}
Materialized View Pattern
Create denormalized views for efficient reads:
public class MaterializedViewBuilder
{
private readonly Container _productsContainer;
private readonly Container _categoriesContainer;
private readonly Container _productCatalogView;
// Process changes from products container
public async Task ProcessProductChangesAsync(
IReadOnlyCollection<Product> products)
{
foreach (var product in products)
{
// Fetch related category
var category = await _categoriesContainer.ReadItemAsync<Category>(
product.CategoryId,
new PartitionKey(product.CategoryId));
// Build denormalized view
var catalogEntry = new ProductCatalogEntry
{
Id = product.Id,
ProductId = product.Id,
Name = product.Name,
Description = product.Description,
Price = product.Price,
ImageUrl = product.ImageUrl,
CategoryId = category.Resource.Id,
CategoryName = category.Resource.Name,
CategoryPath = category.Resource.Path,
Tags = product.Tags,
Attributes = product.Attributes,
InStock = product.InventoryCount > 0,
InventoryCount = product.InventoryCount,
LastUpdated = DateTime.UtcNow,
// Partition by category for efficient category browsing
PartitionKey = category.Resource.Id
};
await _productCatalogView.UpsertItemAsync(
catalogEntry,
new PartitionKey(catalogEntry.PartitionKey));
}
}
// Also process category changes to update all related products
public async Task ProcessCategoryChangesAsync(
IReadOnlyCollection<Category> categories)
{
foreach (var category in categories)
{
// Find all products in this category
var query = new QueryDefinition(
"SELECT * FROM c WHERE c.CategoryId = @categoryId")
.WithParameter("@categoryId", category.Id);
var iterator = _productCatalogView.GetItemQueryIterator<ProductCatalogEntry>(query);
while (iterator.HasMoreResults)
{
var batch = await iterator.ReadNextAsync();
var updates = batch.Resource.Select(entry =>
{
entry.CategoryName = category.Name;
entry.CategoryPath = category.Path;
entry.LastUpdated = DateTime.UtcNow;
return _productCatalogView.UpsertItemAsync(
entry,
new PartitionKey(entry.PartitionKey));
});
await Task.WhenAll(updates);
}
}
}
}
Cross-Region Replication
Replicate data across regions using Change Feed:
from azure.cosmos import CosmosClient
from azure.cosmos.aio import CosmosClient as AsyncCosmosClient
import asyncio
class CrossRegionReplicator:
def __init__(self, source_connection, target_connection):
self.source_client = CosmosClient.from_connection_string(source_connection)
self.target_client = AsyncCosmosClient.from_connection_string(target_connection)
async def replicate_changes(self, database_name, container_name):
source_container = self.source_client \
.get_database_client(database_name) \
.get_container_client(container_name)
target_container = self.target_client \
.get_database_client(database_name) \
.get_container_client(container_name)
# Process change feed
continuation_token = None
while True:
response = source_container.query_items_change_feed(
is_start_from_beginning=continuation_token is None,
continuation=continuation_token,
max_item_count=100
)
changes = list(response)
if changes:
# Batch upsert to target
tasks = [
target_container.upsert_item(item)
for item in changes
]
await asyncio.gather(*tasks, return_exceptions=True)
print(f"Replicated {len(changes)} items")
continuation_token = response.continuation_token
if not changes:
await asyncio.sleep(5) # Poll interval
async def handle_conflicts(self, database_name, container_name):
"""
Handle conflicts when using multi-master replication.
"""
conflicts_container = self.source_client \
.get_database_client(database_name) \
.get_container_client("conflicts")
for conflict in conflicts_container.query_items(
query="SELECT * FROM c",
enable_cross_partition_query=True
):
# Custom conflict resolution
if conflict["operationType"] == "create":
# Keep the item with the latest timestamp
await self.resolve_by_timestamp(conflict)
elif conflict["operationType"] == "replace":
# Merge changes
await self.merge_documents(conflict)
# Delete processed conflict
await conflicts_container.delete_item(
conflict["id"],
partition_key=conflict["partitionKey"]
)
async def resolve_by_timestamp(self, conflict):
current = conflict["content"]
conflicting = conflict["conflictingResource"]
winner = current if current["_ts"] > conflicting["_ts"] else conflicting
target_container = self.target_client \
.get_database_client(conflict["databaseName"]) \
.get_container_client(conflict["containerName"])
await target_container.upsert_item(winner)
Change Feed Estimator
Monitor Change Feed processing lag:
public class ChangeFeedMonitor : BackgroundService
{
private readonly CosmosClient _cosmosClient;
private readonly ILogger<ChangeFeedMonitor> _logger;
private ChangeFeedProcessor _processor;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var database = _cosmosClient.GetDatabase("ecommerce");
var sourceContainer = database.GetContainer("orders");
var leaseContainer = database.GetContainer("leases");
var estimator = sourceContainer
.GetChangeFeedEstimatorBuilder(
"OrderProcessor-Estimator",
HandleEstimationAsync)
.WithLeaseContainer(leaseContainer)
.Build();
await estimator.StartAsync();
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
await estimator.StopAsync();
}
private async Task HandleEstimationAsync(
long estimatedPendingChanges,
CancellationToken cancellationToken)
{
_logger.LogInformation(
$"Estimated pending changes: {estimatedPendingChanges}");
// Alert if lag is too high
if (estimatedPendingChanges > 10000)
{
await SendAlertAsync(
$"Change Feed lag critical: {estimatedPendingChanges} pending changes");
}
// Emit metric for monitoring
TelemetryClient.TrackMetric("ChangeFeed.PendingChanges", estimatedPendingChanges);
}
}
Best Practices
- Partition Key Design: Choose partition keys that distribute load evenly
- Idempotent Processing: Design handlers to handle duplicate deliveries
- Error Handling: Implement dead-letter patterns for failed processing
- Monitoring: Use the Change Feed Estimator to track lag
- Lease Management: Monitor lease container for partition distribution
- Batch Processing: Process changes in batches for efficiency
Cosmos DB Change Feed enables powerful event-driven architectures. Combined with Azure Functions or the Change Feed Processor, you can build reactive systems that respond to data changes in real-time.