Skip to content
Back to Blog
1 min read

Implementing the Saga Pattern with Azure Services

The Saga pattern is how you handle distributed transactions—operations that span multiple microservices and need consistency guarantees—without two-phase commit. Traditional distributed transactions don’t scale, and the failures are ugly. Sagas break the operation into local transactions with compensating transactions for rollback. The two variants have different tradeoffs: orchestration (a central coordinator, naturally Durable Functions for Azure implementations) is easier to reason about and debug; choreography (services react to each other’s events via Service Bus) is more loosely coupled but harder to trace when things go wrong. I default to orchestration for most implementations because the observability of a Durable Functions orchestration—the built-in instance history, the Azure Portal monitoring view—is worth the coupling trade-off.

Saga Approaches

Orchestration vs Choreography

Orchestration (Centralized):
┌─────────────────────────────────────────────────────────┐
│                    Saga Orchestrator                     │
│              (Azure Durable Functions)                   │
└────┬────────────┬────────────┬────────────┬─────────────┘
     │            │            │            │
     ▼            ▼            ▼            ▼
┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐
│ Order  │  │Inventory│  │Payment │  │Shipping│
│Service │  │ Service │  │Service │  │Service │
└────────┘  └────────┘  └────────┘  └────────┘

Choreography (Decentralized):
┌────────┐    ┌────────┐    ┌────────┐    ┌────────┐
│ Order  │───►│Inventory│───►│Payment │───►│Shipping│
│Service │◄───│ Service │◄───│Service │◄───│Service │
└────────┘    └────────┘    └────────┘    └────────┘
                    │
              Azure Service Bus

Orchestration with Durable Functions

Saga Orchestrator

public class OrderSagaOrchestrator
{
    [Function("OrderSaga")]
    public static async Task<SagaResult> RunOrchestrator(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        var input = context.GetInput<CreateOrderSagaInput>();
        var logger = context.CreateReplaySafeLogger<OrderSagaOrchestrator>();

        var sagaState = new SagaState();

        try
        {
            // Step 1: Create Order
            logger.LogInformation("Creating order...");
            var orderId = await context.CallActivityAsync<Guid>(
                "CreateOrder",
                input.OrderDetails);
            sagaState.OrderId = orderId;
            sagaState.Steps.Add(SagaStep.OrderCreated);

            // Step 2: Reserve Inventory
            logger.LogInformation("Reserving inventory...");
            var inventoryReservation = await context.CallActivityAsync<InventoryReservation>(
                "ReserveInventory",
                new ReserveInventoryInput(orderId, input.OrderDetails.Items));
            sagaState.InventoryReservationId = inventoryReservation.Id;
            sagaState.Steps.Add(SagaStep.InventoryReserved);

            // Step 3: Process Payment
            logger.LogInformation("Processing payment...");
            var paymentResult = await context.CallActivityAsync<PaymentResult>(
                "ProcessPayment",
                new ProcessPaymentInput(orderId, input.OrderDetails.TotalAmount, input.PaymentInfo));
            sagaState.PaymentTransactionId = paymentResult.TransactionId;
            sagaState.Steps.Add(SagaStep.PaymentProcessed);

            // Step 4: Confirm Order
            logger.LogInformation("Confirming order...");
            await context.CallActivityAsync(
                "ConfirmOrder",
                orderId);
            sagaState.Steps.Add(SagaStep.OrderConfirmed);

            // Step 5: Schedule Shipping
            logger.LogInformation("Scheduling shipping...");
            var shipmentId = await context.CallActivityAsync<Guid>(
                "ScheduleShipping",
                new ScheduleShippingInput(orderId, input.ShippingAddress));
            sagaState.ShipmentId = shipmentId;
            sagaState.Steps.Add(SagaStep.ShippingScheduled);

            return SagaResult.Success(sagaState);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Saga failed, starting compensation...");

            // Run compensating transactions
            await CompensateAsync(context, sagaState, logger);

            return SagaResult.Failed(sagaState, ex.Message);
        }
    }

    private static async Task CompensateAsync(
        TaskOrchestrationContext context,
        SagaState state,
        ILogger logger)
    {
        // Compensate in reverse order
        var steps = state.Steps.ToList();
        steps.Reverse();

        foreach (var step in steps)
        {
            try
            {
                switch (step)
                {
                    case SagaStep.ShippingScheduled:
                        logger.LogInformation("Cancelling shipment...");
                        await context.CallActivityAsync(
                            "CancelShipment",
                            state.ShipmentId);
                        break;

                    case SagaStep.OrderConfirmed:
                        logger.LogInformation("Reverting order confirmation...");
                        await context.CallActivityAsync(
                            "RevertOrderConfirmation",
                            state.OrderId);
                        break;

                    case SagaStep.PaymentProcessed:
                        logger.LogInformation("Refunding payment...");
                        await context.CallActivityAsync(
                            "RefundPayment",
                            state.PaymentTransactionId);
                        break;

                    case SagaStep.InventoryReserved:
                        logger.LogInformation("Releasing inventory...");
                        await context.CallActivityAsync(
                            "ReleaseInventory",
                            state.InventoryReservationId);
                        break;

                    case SagaStep.OrderCreated:
                        logger.LogInformation("Cancelling order...");
                        await context.CallActivityAsync(
                            "CancelOrder",
                            state.OrderId);
                        break;
                }
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Compensation failed for step {Step}", step);
                // Log for manual intervention
            }
        }
    }
}

Activity Functions

public class SagaActivities
{
    private readonly IOrderService _orderService;
    private readonly IInventoryService _inventoryService;
    private readonly IPaymentService _paymentService;
    private readonly IShippingService _shippingService;

    [Function("CreateOrder")]
    public async Task<Guid> CreateOrder(
        [ActivityTrigger] OrderDetails orderDetails)
    {
        var order = await _orderService.CreateAsync(orderDetails);
        return order.Id;
    }

    [Function("ReserveInventory")]
    public async Task<InventoryReservation> ReserveInventory(
        [ActivityTrigger] ReserveInventoryInput input)
    {
        return await _inventoryService.ReserveAsync(input.OrderId, input.Items);
    }

    [Function("ReleaseInventory")]
    public async Task ReleaseInventory(
        [ActivityTrigger] Guid reservationId)
    {
        await _inventoryService.ReleaseReservationAsync(reservationId);
    }

    [Function("ProcessPayment")]
    public async Task<PaymentResult> ProcessPayment(
        [ActivityTrigger] ProcessPaymentInput input)
    {
        return await _paymentService.ProcessAsync(
            input.OrderId,
            input.Amount,
            input.PaymentInfo);
    }

    [Function("RefundPayment")]
    public async Task RefundPayment(
        [ActivityTrigger] string transactionId)
    {
        await _paymentService.RefundAsync(transactionId);
    }

    [Function("ConfirmOrder")]
    public async Task ConfirmOrder(
        [ActivityTrigger] Guid orderId)
    {
        await _orderService.ConfirmAsync(orderId);
    }

    [Function("CancelOrder")]
    public async Task CancelOrder(
        [ActivityTrigger] Guid orderId)
    {
        await _orderService.CancelAsync(orderId);
    }

    [Function("ScheduleShipping")]
    public async Task<Guid> ScheduleShipping(
        [ActivityTrigger] ScheduleShippingInput input)
    {
        return await _shippingService.ScheduleAsync(input.OrderId, input.Address);
    }

    [Function("CancelShipment")]
    public async Task CancelShipment(
        [ActivityTrigger] Guid shipmentId)
    {
        await _shippingService.CancelAsync(shipmentId);
    }
}

Choreography with Service Bus

Event-Driven Saga

// Order Service - Starts the saga
public class OrderService
{
    private readonly IEventPublisher _eventPublisher;
    private readonly IOrderRepository _orderRepository;

    public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
    {
        var order = Order.Create(request.CustomerId, request.Items);
        order.SetStatus(OrderStatus.Pending);

        await _orderRepository.SaveAsync(order);

        // Publish event to start saga
        await _eventPublisher.PublishAsync(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Items = order.Items.Select(i => new OrderItemDto
            {
                ProductId = i.ProductId,
                Quantity = i.Quantity,
                UnitPrice = i.UnitPrice
            }).ToList(),
            TotalAmount = order.TotalAmount,
            CorrelationId = Guid.NewGuid().ToString()
        });

        return order;
    }
}

// Inventory Service - Handles order created event
public class InventoryEventHandler
{
    [Function("HandleOrderCreated")]
    public async Task HandleOrderCreated(
        [ServiceBusTrigger("order-events", "inventory-handler")]
        OrderCreatedEvent @event)
    {
        try
        {
            var reservation = await _inventoryService.ReserveAsync(
                @event.OrderId,
                @event.Items);

            await _eventPublisher.PublishAsync(new InventoryReservedEvent
            {
                OrderId = @event.OrderId,
                ReservationId = reservation.Id,
                CorrelationId = @event.CorrelationId
            });
        }
        catch (InsufficientInventoryException ex)
        {
            await _eventPublisher.PublishAsync(new InventoryReservationFailedEvent
            {
                OrderId = @event.OrderId,
                Reason = ex.Message,
                CorrelationId = @event.CorrelationId
            });
        }
    }

    [Function("HandlePaymentFailed")]
    public async Task HandlePaymentFailed(
        [ServiceBusTrigger("payment-events", "inventory-compensation")]
        PaymentFailedEvent @event)
    {
        // Compensating transaction
        await _inventoryService.ReleaseReservationAsync(@event.OrderId);

        await _eventPublisher.PublishAsync(new InventoryReleasedEvent
        {
            OrderId = @event.OrderId,
            CorrelationId = @event.CorrelationId
        });
    }
}

// Payment Service - Handles inventory reserved event
public class PaymentEventHandler
{
    [Function("HandleInventoryReserved")]
    public async Task HandleInventoryReserved(
        [ServiceBusTrigger("inventory-events", "payment-handler")]
        InventoryReservedEvent @event)
    {
        try
        {
            var order = await _orderService.GetByIdAsync(@event.OrderId);
            var result = await _paymentService.ProcessAsync(order);

            await _eventPublisher.PublishAsync(new PaymentProcessedEvent
            {
                OrderId = @event.OrderId,
                TransactionId = result.TransactionId,
                CorrelationId = @event.CorrelationId
            });
        }
        catch (PaymentFailedException ex)
        {
            await _eventPublisher.PublishAsync(new PaymentFailedEvent
            {
                OrderId = @event.OrderId,
                Reason = ex.Message,
                CorrelationId = @event.CorrelationId
            });
        }
    }
}

// Order Service - Final handler
public class OrderCompletionHandler
{
    [Function("HandlePaymentProcessed")]
    public async Task HandlePaymentProcessed(
        [ServiceBusTrigger("payment-events", "order-completion")]
        PaymentProcessedEvent @event)
    {
        await _orderService.ConfirmAsync(@event.OrderId);
    }

    [Function("HandleSagaFailed")]
    public async Task HandleSagaFailed(
        [ServiceBusTrigger("saga-events", "order-cancellation")]
        SagaFailedEvent @event)
    {
        await _orderService.CancelAsync(@event.OrderId, @event.Reason);
    }
}

Saga State Management

Saga State Store

public class SagaStateStore
{
    private readonly Container _container;

    public async Task<SagaState> GetOrCreateAsync(string sagaId)
    {
        try
        {
            var response = await _container.ReadItemAsync<SagaState>(
                sagaId,
                new PartitionKey(sagaId));
            return response.Resource;
        }
        catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
        {
            var state = new SagaState
            {
                Id = sagaId,
                Status = SagaStatus.Started,
                StartedAt = DateTime.UtcNow
            };
            await _container.CreateItemAsync(state, new PartitionKey(sagaId));
            return state;
        }
    }

    public async Task UpdateAsync(SagaState state)
    {
        state.UpdatedAt = DateTime.UtcNow;
        await _container.UpsertItemAsync(state, new PartitionKey(state.Id));
    }

    public async Task<IEnumerable<SagaState>> GetStuckSagasAsync(TimeSpan timeout)
    {
        var cutoff = DateTime.UtcNow.Subtract(timeout);

        var query = new QueryDefinition(
            "SELECT * FROM c WHERE c.status = @status AND c.updatedAt < @cutoff")
            .WithParameter("@status", SagaStatus.InProgress.ToString())
            .WithParameter("@cutoff", cutoff);

        var results = new List<SagaState>();
        using var iterator = _container.GetItemQueryIterator<SagaState>(query);

        while (iterator.HasMoreResults)
        {
            var response = await iterator.ReadNextAsync();
            results.AddRange(response);
        }

        return results;
    }
}

Saga Recovery Service

public class SagaRecoveryService : IHostedService, IDisposable
{
    private Timer? _timer;
    private readonly ISagaStateStore _stateStore;
    private readonly ISagaOrchestrator _orchestrator;
    private readonly ILogger<SagaRecoveryService> _logger;

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _timer = new Timer(
            RecoverStuckSagas,
            null,
            TimeSpan.FromMinutes(5),
            TimeSpan.FromMinutes(5));

        return Task.CompletedTask;
    }

    private async void RecoverStuckSagas(object? state)
    {
        try
        {
            var stuckSagas = await _stateStore.GetStuckSagasAsync(TimeSpan.FromMinutes(15));

            foreach (var saga in stuckSagas)
            {
                _logger.LogWarning("Recovering stuck saga {SagaId}", saga.Id);

                try
                {
                    await _orchestrator.CompensateAsync(saga);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed to recover saga {SagaId}", saga.Id);
                }
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in saga recovery service");
        }
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _timer?.Change(Timeout.Infinite, 0);
        return Task.CompletedTask;
    }

    public void Dispose() => _timer?.Dispose();
}

Monitoring and Observability

public class SagaTelemetry
{
    private readonly TelemetryClient _telemetryClient;

    public void TrackSagaStarted(string sagaId, string sagaType)
    {
        _telemetryClient.TrackEvent("SagaStarted", new Dictionary<string, string>
        {
            ["SagaId"] = sagaId,
            ["SagaType"] = sagaType
        });
    }

    public void TrackSagaCompleted(string sagaId, TimeSpan duration, bool success)
    {
        _telemetryClient.TrackEvent("SagaCompleted", new Dictionary<string, string>
        {
            ["SagaId"] = sagaId,
            ["Success"] = success.ToString()
        }, new Dictionary<string, double>
        {
            ["DurationMs"] = duration.TotalMilliseconds
        });
    }

    public void TrackCompensation(string sagaId, string step, bool success)
    {
        _telemetryClient.TrackEvent("SagaCompensation", new Dictionary<string, string>
        {
            ["SagaId"] = sagaId,
            ["Step"] = step,
            ["Success"] = success.ToString()
        });
    }
}

Conclusion

The Saga pattern is essential for maintaining data consistency in distributed systems. Azure Durable Functions provide excellent orchestration capabilities with built-in retry and state management. For event-driven architectures, Service Bus enables reliable choreography. Choose orchestration for complex flows requiring centralized control, and choreography for loosely coupled, event-driven systems.

References

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.