Back to Blog
4 min read

Building Reliable Messaging with Azure Service Bus

Reliable messaging is fundamental to distributed systems. Azure Service Bus provides enterprise-grade messaging capabilities that help decouple components and ensure message delivery. Here is how to implement common patterns.

Creating Service Bus Resources

# Create a Service Bus namespace
az servicebus namespace create \
    --resource-group rg-messaging \
    --name sb-myapp-2020 \
    --location australiaeast \
    --sku Standard

# Create a queue
az servicebus queue create \
    --resource-group rg-messaging \
    --namespace-name sb-myapp-2020 \
    --name orders-queue \
    --max-size 1024

# Create a topic with subscription
az servicebus topic create \
    --resource-group rg-messaging \
    --namespace-name sb-myapp-2020 \
    --name events-topic

az servicebus topic subscription create \
    --resource-group rg-messaging \
    --namespace-name sb-myapp-2020 \
    --topic-name events-topic \
    --name email-subscription

.NET SDK Setup

dotnet add package Azure.Messaging.ServiceBus

Sending Messages

using Azure.Messaging.ServiceBus;

public class OrderPublisher
{
    private readonly ServiceBusClient _client;
    private readonly ServiceBusSender _sender;

    public OrderPublisher(string connectionString)
    {
        _client = new ServiceBusClient(connectionString);
        _sender = _client.CreateSender("orders-queue");
    }

    public async Task SendOrderAsync(Order order)
    {
        var body = JsonSerializer.Serialize(order);
        var message = new ServiceBusMessage(body)
        {
            ContentType = "application/json",
            MessageId = order.Id.ToString(),
            Subject = "NewOrder",
            ApplicationProperties =
            {
                { "OrderType", order.Type },
                { "Priority", order.Priority }
            }
        };

        await _sender.SendMessageAsync(message);
    }

    public async Task SendBatchAsync(IEnumerable<Order> orders)
    {
        using var batch = await _sender.CreateMessageBatchAsync();

        foreach (var order in orders)
        {
            var body = JsonSerializer.Serialize(order);
            var message = new ServiceBusMessage(body);

            if (!batch.TryAddMessage(message))
            {
                // Batch is full, send it and create a new one
                await _sender.SendMessagesAsync(batch);
                batch = await _sender.CreateMessageBatchAsync();
                batch.TryAddMessage(message);
            }
        }

        if (batch.Count > 0)
        {
            await _sender.SendMessagesAsync(batch);
        }
    }

    public async ValueTask DisposeAsync()
    {
        await _sender.DisposeAsync();
        await _client.DisposeAsync();
    }
}

Receiving Messages

public class OrderProcessor
{
    private readonly ServiceBusClient _client;
    private readonly ServiceBusProcessor _processor;

    public OrderProcessor(string connectionString)
    {
        _client = new ServiceBusClient(connectionString);

        _processor = _client.CreateProcessor("orders-queue", new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls = 10,
            AutoCompleteMessages = false,
            PrefetchCount = 20
        });

        _processor.ProcessMessageAsync += ProcessMessageAsync;
        _processor.ProcessErrorAsync += ProcessErrorAsync;
    }

    public async Task StartAsync()
    {
        await _processor.StartProcessingAsync();
    }

    public async Task StopAsync()
    {
        await _processor.StopProcessingAsync();
    }

    private async Task ProcessMessageAsync(ProcessMessageEventArgs args)
    {
        var body = args.Message.Body.ToString();
        var order = JsonSerializer.Deserialize<Order>(body);

        try
        {
            await ProcessOrderAsync(order);

            // Complete the message
            await args.CompleteMessageAsync(args.Message);
        }
        catch (Exception ex)
        {
            // Log the error
            Console.WriteLine($"Error processing order {order.Id}: {ex.Message}");

            // Abandon to retry later or dead-letter if max retries exceeded
            await args.AbandonMessageAsync(args.Message);
        }
    }

    private Task ProcessErrorAsync(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"Error: {args.Exception.Message}");
        return Task.CompletedTask;
    }

    private async Task ProcessOrderAsync(Order order)
    {
        // Process the order
        await Task.Delay(100); // Simulate work
    }
}

Publish-Subscribe with Topics

public class EventPublisher
{
    private readonly ServiceBusSender _sender;

    public EventPublisher(ServiceBusClient client)
    {
        _sender = client.CreateSender("events-topic");
    }

    public async Task PublishEventAsync<T>(T eventData, string eventType)
    {
        var body = JsonSerializer.Serialize(eventData);
        var message = new ServiceBusMessage(body)
        {
            Subject = eventType,
            ContentType = "application/json"
        };

        await _sender.SendMessageAsync(message);
    }
}

public class EventSubscriber
{
    private readonly ServiceBusProcessor _processor;

    public EventSubscriber(ServiceBusClient client, string subscriptionName)
    {
        _processor = client.CreateProcessor("events-topic", subscriptionName);
        _processor.ProcessMessageAsync += HandleMessageAsync;
        _processor.ProcessErrorAsync += HandleErrorAsync;
    }

    private async Task HandleMessageAsync(ProcessMessageEventArgs args)
    {
        var eventType = args.Message.Subject;
        var body = args.Message.Body.ToString();

        switch (eventType)
        {
            case "OrderCreated":
                await HandleOrderCreatedAsync(body);
                break;
            case "OrderShipped":
                await HandleOrderShippedAsync(body);
                break;
        }

        await args.CompleteMessageAsync(args.Message);
    }
}

Message Filters

Create subscription filters:

# SQL filter
az servicebus topic subscription rule create \
    --resource-group rg-messaging \
    --namespace-name sb-myapp-2020 \
    --topic-name events-topic \
    --subscription-name priority-subscription \
    --name priority-filter \
    --filter-sql-expression "Priority = 'High'"

# Correlation filter
az servicebus topic subscription rule create \
    --resource-group rg-messaging \
    --namespace-name sb-myapp-2020 \
    --topic-name events-topic \
    --subscription-name region-subscription \
    --name region-filter \
    --correlation-filter subject=OrderCreated label=APAC

Scheduled Messages

public async Task ScheduleMessageAsync(Order order, DateTimeOffset scheduledTime)
{
    var body = JsonSerializer.Serialize(order);
    var message = new ServiceBusMessage(body)
    {
        ScheduledEnqueueTime = scheduledTime
    };

    var sequenceNumber = await _sender.ScheduleMessageAsync(message, scheduledTime);

    // Can cancel if needed
    // await _sender.CancelScheduledMessageAsync(sequenceNumber);
}

Dead Letter Queue Processing

public class DeadLetterProcessor
{
    private readonly ServiceBusReceiver _receiver;

    public DeadLetterProcessor(ServiceBusClient client)
    {
        _receiver = client.CreateReceiver("orders-queue",
            new ServiceBusReceiverOptions
            {
                SubQueue = SubQueue.DeadLetter
            });
    }

    public async Task ProcessDeadLettersAsync()
    {
        while (true)
        {
            var message = await _receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(5));
            if (message == null) break;

            Console.WriteLine($"Dead letter: {message.MessageId}");
            Console.WriteLine($"Reason: {message.DeadLetterReason}");
            Console.WriteLine($"Description: {message.DeadLetterErrorDescription}");

            // Process or log the failed message
            await _receiver.CompleteMessageAsync(message);
        }
    }
}

Azure Service Bus provides the reliability and features needed for enterprise messaging scenarios.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.