4 min read
Building Reliable Messaging with Azure Service Bus
Reliable messaging is fundamental to distributed systems. Azure Service Bus provides enterprise-grade messaging capabilities that help decouple components and ensure message delivery. Here is how to implement common patterns.
Creating Service Bus Resources
# Create a Service Bus namespace
az servicebus namespace create \
--resource-group rg-messaging \
--name sb-myapp-2020 \
--location australiaeast \
--sku Standard
# Create a queue
az servicebus queue create \
--resource-group rg-messaging \
--namespace-name sb-myapp-2020 \
--name orders-queue \
--max-size 1024
# Create a topic with subscription
az servicebus topic create \
--resource-group rg-messaging \
--namespace-name sb-myapp-2020 \
--name events-topic
az servicebus topic subscription create \
--resource-group rg-messaging \
--namespace-name sb-myapp-2020 \
--topic-name events-topic \
--name email-subscription
.NET SDK Setup
dotnet add package Azure.Messaging.ServiceBus
Sending Messages
using Azure.Messaging.ServiceBus;
public class OrderPublisher
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
public OrderPublisher(string connectionString)
{
_client = new ServiceBusClient(connectionString);
_sender = _client.CreateSender("orders-queue");
}
public async Task SendOrderAsync(Order order)
{
var body = JsonSerializer.Serialize(order);
var message = new ServiceBusMessage(body)
{
ContentType = "application/json",
MessageId = order.Id.ToString(),
Subject = "NewOrder",
ApplicationProperties =
{
{ "OrderType", order.Type },
{ "Priority", order.Priority }
}
};
await _sender.SendMessageAsync(message);
}
public async Task SendBatchAsync(IEnumerable<Order> orders)
{
using var batch = await _sender.CreateMessageBatchAsync();
foreach (var order in orders)
{
var body = JsonSerializer.Serialize(order);
var message = new ServiceBusMessage(body);
if (!batch.TryAddMessage(message))
{
// Batch is full, send it and create a new one
await _sender.SendMessagesAsync(batch);
batch = await _sender.CreateMessageBatchAsync();
batch.TryAddMessage(message);
}
}
if (batch.Count > 0)
{
await _sender.SendMessagesAsync(batch);
}
}
public async ValueTask DisposeAsync()
{
await _sender.DisposeAsync();
await _client.DisposeAsync();
}
}
Receiving Messages
public class OrderProcessor
{
private readonly ServiceBusClient _client;
private readonly ServiceBusProcessor _processor;
public OrderProcessor(string connectionString)
{
_client = new ServiceBusClient(connectionString);
_processor = _client.CreateProcessor("orders-queue", new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 10,
AutoCompleteMessages = false,
PrefetchCount = 20
});
_processor.ProcessMessageAsync += ProcessMessageAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
}
public async Task StartAsync()
{
await _processor.StartProcessingAsync();
}
public async Task StopAsync()
{
await _processor.StopProcessingAsync();
}
private async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
var body = args.Message.Body.ToString();
var order = JsonSerializer.Deserialize<Order>(body);
try
{
await ProcessOrderAsync(order);
// Complete the message
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
// Log the error
Console.WriteLine($"Error processing order {order.Id}: {ex.Message}");
// Abandon to retry later or dead-letter if max retries exceeded
await args.AbandonMessageAsync(args.Message);
}
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
}
private async Task ProcessOrderAsync(Order order)
{
// Process the order
await Task.Delay(100); // Simulate work
}
}
Publish-Subscribe with Topics
public class EventPublisher
{
private readonly ServiceBusSender _sender;
public EventPublisher(ServiceBusClient client)
{
_sender = client.CreateSender("events-topic");
}
public async Task PublishEventAsync<T>(T eventData, string eventType)
{
var body = JsonSerializer.Serialize(eventData);
var message = new ServiceBusMessage(body)
{
Subject = eventType,
ContentType = "application/json"
};
await _sender.SendMessageAsync(message);
}
}
public class EventSubscriber
{
private readonly ServiceBusProcessor _processor;
public EventSubscriber(ServiceBusClient client, string subscriptionName)
{
_processor = client.CreateProcessor("events-topic", subscriptionName);
_processor.ProcessMessageAsync += HandleMessageAsync;
_processor.ProcessErrorAsync += HandleErrorAsync;
}
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
{
var eventType = args.Message.Subject;
var body = args.Message.Body.ToString();
switch (eventType)
{
case "OrderCreated":
await HandleOrderCreatedAsync(body);
break;
case "OrderShipped":
await HandleOrderShippedAsync(body);
break;
}
await args.CompleteMessageAsync(args.Message);
}
}
Message Filters
Create subscription filters:
# SQL filter
az servicebus topic subscription rule create \
--resource-group rg-messaging \
--namespace-name sb-myapp-2020 \
--topic-name events-topic \
--subscription-name priority-subscription \
--name priority-filter \
--filter-sql-expression "Priority = 'High'"
# Correlation filter
az servicebus topic subscription rule create \
--resource-group rg-messaging \
--namespace-name sb-myapp-2020 \
--topic-name events-topic \
--subscription-name region-subscription \
--name region-filter \
--correlation-filter subject=OrderCreated label=APAC
Scheduled Messages
public async Task ScheduleMessageAsync(Order order, DateTimeOffset scheduledTime)
{
var body = JsonSerializer.Serialize(order);
var message = new ServiceBusMessage(body)
{
ScheduledEnqueueTime = scheduledTime
};
var sequenceNumber = await _sender.ScheduleMessageAsync(message, scheduledTime);
// Can cancel if needed
// await _sender.CancelScheduledMessageAsync(sequenceNumber);
}
Dead Letter Queue Processing
public class DeadLetterProcessor
{
private readonly ServiceBusReceiver _receiver;
public DeadLetterProcessor(ServiceBusClient client)
{
_receiver = client.CreateReceiver("orders-queue",
new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
}
public async Task ProcessDeadLettersAsync()
{
while (true)
{
var message = await _receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(5));
if (message == null) break;
Console.WriteLine($"Dead letter: {message.MessageId}");
Console.WriteLine($"Reason: {message.DeadLetterReason}");
Console.WriteLine($"Description: {message.DeadLetterErrorDescription}");
// Process or log the failed message
await _receiver.CompleteMessageAsync(message);
}
}
}
Azure Service Bus provides the reliability and features needed for enterprise messaging scenarios.