Back to Blog
6 min read

Building Reliable Queue-Triggered Azure Functions

Introduction

Queue-triggered Azure Functions provide a reliable pattern for processing messages asynchronously. Whether using Azure Storage Queues or Service Bus, queue triggers enable you to build scalable, resilient systems that handle variable loads gracefully.

In this post, we will explore best practices for building queue-triggered functions.

Storage Queue Trigger

Basic queue-triggered function:

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

public class QueueFunctions
{
    private readonly ILogger<QueueFunctions> _logger;
    private readonly IOrderService _orderService;

    public QueueFunctions(
        ILogger<QueueFunctions> logger,
        IOrderService orderService)
    {
        _logger = logger;
        _orderService = orderService;
    }

    [Function("ProcessOrder")]
    public async Task ProcessOrder(
        [QueueTrigger("orders", Connection = "StorageConnection")] OrderMessage message)
    {
        _logger.LogInformation("Processing order {OrderId}", message.OrderId);

        try
        {
            var order = await _orderService.GetOrderAsync(message.OrderId);

            if (order == null)
            {
                _logger.LogWarning("Order {OrderId} not found", message.OrderId);
                return;
            }

            await _orderService.ProcessAsync(order);

            _logger.LogInformation("Order {OrderId} processed successfully", message.OrderId);
        }
        catch (TransientException ex)
        {
            _logger.LogWarning(ex, "Transient error processing order {OrderId}", message.OrderId);
            throw; // Retry
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process order {OrderId}", message.OrderId);
            throw; // Will go to poison queue after max retries
        }
    }
}

public class OrderMessage
{
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public DateTime CreatedAt { get; set; }
    public string Priority { get; set; }
}

Service Bus Queue Trigger

Using Service Bus for advanced scenarios:

using Azure.Messaging.ServiceBus;

public class ServiceBusFunctions
{
    private readonly ILogger<ServiceBusFunctions> _logger;
    private readonly IPaymentProcessor _paymentProcessor;

    public ServiceBusFunctions(
        ILogger<ServiceBusFunctions> logger,
        IPaymentProcessor paymentProcessor)
    {
        _logger = logger;
        _paymentProcessor = paymentProcessor;
    }

    [Function("ProcessPayment")]
    public async Task ProcessPayment(
        [ServiceBusTrigger("payments", Connection = "ServiceBusConnection")]
        ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions)
    {
        var payment = message.Body.ToObjectFromJson<PaymentMessage>();

        _logger.LogInformation(
            "Processing payment {PaymentId}, attempt {DeliveryCount}",
            payment.PaymentId,
            message.DeliveryCount);

        try
        {
            var result = await _paymentProcessor.ProcessAsync(payment);

            if (result.Success)
            {
                await messageActions.CompleteMessageAsync(message);
                _logger.LogInformation("Payment {PaymentId} completed", payment.PaymentId);
            }
            else if (result.ShouldRetry)
            {
                // Schedule for retry with delay
                await messageActions.AbandonMessageAsync(message, new Dictionary<string, object>
                {
                    ["RetryReason"] = result.ErrorMessage
                });
            }
            else
            {
                // Dead letter for manual investigation
                await messageActions.DeadLetterMessageAsync(message,
                    deadLetterReason: "ProcessingFailed",
                    deadLetterErrorDescription: result.ErrorMessage);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing payment {PaymentId}", payment.PaymentId);

            if (message.DeliveryCount >= 5)
            {
                await messageActions.DeadLetterMessageAsync(message,
                    deadLetterReason: "MaxRetriesExceeded",
                    deadLetterErrorDescription: ex.Message);
            }
            else
            {
                throw; // Let Service Bus handle retry
            }
        }
    }

    // Process dead letter queue
    [Function("ProcessDeadLetterPayments")]
    public async Task ProcessDeadLetterPayments(
        [ServiceBusTrigger("payments/$DeadLetterQueue", Connection = "ServiceBusConnection")]
        ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions)
    {
        var payment = message.Body.ToObjectFromJson<PaymentMessage>();

        _logger.LogWarning(
            "Processing dead letter payment {PaymentId}, reason: {Reason}",
            payment.PaymentId,
            message.DeadLetterReason);

        // Log for investigation
        await LogDeadLetterAsync(payment, message);

        // Notify support team
        await NotifySupportAsync(payment, message);

        // Complete to remove from dead letter queue
        await messageActions.CompleteMessageAsync(message);
    }
}

Batch Processing

Process multiple messages in batches:

public class BatchProcessingFunctions
{
    [Function("ProcessOrderBatch")]
    public async Task ProcessOrderBatch(
        [QueueTrigger("orders-batch", Connection = "StorageConnection")]
        QueueMessage[] messages,
        FunctionContext context)
    {
        var logger = context.GetLogger<BatchProcessingFunctions>();

        logger.LogInformation("Processing batch of {Count} orders", messages.Length);

        var tasks = messages.Select(async message =>
        {
            try
            {
                var order = JsonSerializer.Deserialize<OrderMessage>(message.MessageText);
                await ProcessSingleOrderAsync(order);
                return (Order: order, Success: true, Error: null as Exception);
            }
            catch (Exception ex)
            {
                return (Order: null, Success: false, Error: ex);
            }
        });

        var results = await Task.WhenAll(tasks);

        var succeeded = results.Count(r => r.Success);
        var failed = results.Count(r => !r.Success);

        logger.LogInformation(
            "Batch complete: {Succeeded} succeeded, {Failed} failed",
            succeeded, failed);

        // Handle failures
        foreach (var failure in results.Where(r => !r.Success))
        {
            logger.LogError(failure.Error, "Failed to process order in batch");
        }
    }
}

// host.json configuration for batching
{
    "version": "2.0",
    "extensions": {
        "queues": {
            "batchSize": 16,
            "newBatchThreshold": 8,
            "maxPollingInterval": "00:00:02",
            "visibilityTimeout": "00:05:00",
            "maxDequeueCount": 5
        }
    }
}

Poison Message Handling

Handle messages that repeatedly fail:

public class PoisonMessageHandler
{
    private readonly ILogger<PoisonMessageHandler> _logger;
    private readonly IAlertService _alertService;
    private readonly TableClient _poisonLogTable;

    [Function("ProcessPoisonOrders")]
    public async Task ProcessPoisonOrders(
        [QueueTrigger("orders-poison", Connection = "StorageConnection")]
        QueueMessage message,
        FunctionContext context)
    {
        _logger.LogWarning("Processing poison message: {MessageId}", message.MessageId);

        // Parse the original message
        OrderMessage order;
        try
        {
            order = JsonSerializer.Deserialize<OrderMessage>(message.MessageText);
        }
        catch (JsonException)
        {
            _logger.LogError("Could not deserialize poison message");
            order = null;
        }

        // Log to table storage for investigation
        await _poisonLogTable.AddEntityAsync(new PoisonMessageLog
        {
            PartitionKey = DateTime.UtcNow.ToString("yyyy-MM-dd"),
            RowKey = message.MessageId,
            QueueName = "orders",
            MessageContent = message.MessageText,
            InsertedTime = message.InsertedOn,
            DequeueCount = message.DequeueCount,
            OrderId = order?.OrderId
        });

        // Send alert
        await _alertService.SendAlertAsync(new Alert
        {
            Severity = AlertSeverity.High,
            Title = "Poison Message in Orders Queue",
            Description = $"Order {order?.OrderId ?? "unknown"} failed after max retries",
            Metadata = new Dictionary<string, string>
            {
                ["MessageId"] = message.MessageId,
                ["DequeueCount"] = message.DequeueCount.ToString()
            }
        });

        // Optionally attempt manual recovery
        if (order != null)
        {
            try
            {
                await AttemptManualRecoveryAsync(order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Manual recovery failed for order {OrderId}", order.OrderId);
            }
        }
    }
}

Scaling Configuration

Configure function scaling for queues:

// host.json
{
    "version": "2.0",
    "extensions": {
        "queues": {
            "maxPollingInterval": "00:00:02",
            "visibilityTimeout": "00:05:00",
            "batchSize": 16,
            "maxDequeueCount": 5,
            "newBatchThreshold": 8,
            "messageEncoding": "base64"
        },
        "serviceBus": {
            "prefetchCount": 100,
            "messageHandlerOptions": {
                "autoComplete": false,
                "maxConcurrentCalls": 16,
                "maxAutoRenewDuration": "00:25:00"
            },
            "sessionHandlerOptions": {
                "autoComplete": false,
                "messageWaitTimeout": "00:00:30",
                "maxConcurrentSessions": 16,
                "maxConcurrentCallsPerSession": 1
            },
            "batchOptions": {
                "maxMessageCount": 100,
                "operationTimeout": "00:01:00",
                "autoComplete": false
            }
        }
    },
    "functionTimeout": "00:10:00"
}

Output Bindings

Chain queue processing with output bindings:

public class ChainedProcessing
{
    [Function("ValidateOrder")]
    [QueueOutput("validated-orders", Connection = "StorageConnection")]
    public async Task<ValidatedOrder> ValidateOrder(
        [QueueTrigger("new-orders", Connection = "StorageConnection")]
        OrderMessage order,
        FunctionContext context)
    {
        var logger = context.GetLogger<ChainedProcessing>();

        logger.LogInformation("Validating order {OrderId}", order.OrderId);

        // Validate the order
        var validationResult = await ValidateAsync(order);

        if (!validationResult.IsValid)
        {
            logger.LogWarning("Order {OrderId} failed validation: {Errors}",
                order.OrderId, string.Join(", ", validationResult.Errors));

            // Output to error queue instead
            return null;
        }

        // Return validated order to output queue
        return new ValidatedOrder
        {
            OrderId = order.OrderId,
            CustomerId = order.CustomerId,
            ValidatedAt = DateTime.UtcNow,
            ValidationScore = validationResult.Score
        };
    }

    [Function("ProcessValidatedOrder")]
    [QueueOutput("completed-orders", Connection = "StorageConnection")]
    public async Task<CompletedOrder> ProcessValidatedOrder(
        [QueueTrigger("validated-orders", Connection = "StorageConnection")]
        ValidatedOrder order,
        FunctionContext context)
    {
        var logger = context.GetLogger<ChainedProcessing>();

        logger.LogInformation("Processing validated order {OrderId}", order.OrderId);

        // Process the order
        var result = await ProcessAsync(order);

        return new CompletedOrder
        {
            OrderId = order.OrderId,
            ProcessedAt = DateTime.UtcNow,
            ConfirmationNumber = result.ConfirmationNumber
        };
    }
}

Monitoring Queue Functions

Monitor queue depth and processing:

from azure.storage.queue import QueueServiceClient
from azure.mgmt.monitor import MonitorManagementClient

def monitor_queue_health(storage_connection, queue_name):
    """Monitor queue depth and alert if too high."""

    queue_service = QueueServiceClient.from_connection_string(storage_connection)
    queue_client = queue_service.get_queue_client(queue_name)

    properties = queue_client.get_queue_properties()
    message_count = properties.approximate_message_count

    print(f"Queue {queue_name}: {message_count} messages")

    # Alert if queue is backing up
    if message_count > 1000:
        send_alert(f"Queue {queue_name} has {message_count} messages")

    return message_count

def get_function_metrics(function_app_name, function_name):
    """Get execution metrics for a queue function."""

    monitor_client = MonitorManagementClient(credential, subscription_id)

    resource_id = f"/subscriptions/{subscription_id}/resourceGroups/rg-functions/providers/Microsoft.Web/sites/{function_app_name}"

    metrics = monitor_client.metrics.list(
        resource_uri=resource_id,
        metricnames="FunctionExecutionCount,FunctionExecutionUnits",
        timespan="PT1H",
        interval="PT5M",
        aggregation="Total,Average"
    )

    for metric in metrics.value:
        print(f"{metric.name.value}:")
        for ts in metric.timeseries:
            for data in ts.data[-5:]:
                print(f"  {data.time_stamp}: {data.total or data.average}")

Conclusion

Queue-triggered Azure Functions provide a reliable foundation for asynchronous processing. By properly handling retries, poison messages, and scaling, you can build systems that gracefully handle failures and variable loads.

Key practices include implementing idempotent processing, using appropriate visibility timeouts, monitoring queue depth, and having a strategy for poison messages. Whether using Storage Queues for simple scenarios or Service Bus for advanced features like sessions and dead lettering, queue triggers enable scalable, resilient architectures.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.