5 min read
Service Broker in Azure SQL Managed Instance
Service Broker is a native messaging framework in SQL Server that enables reliable, asynchronous messaging between databases. Azure SQL Managed Instance fully supports Service Broker, making it possible to build message-driven applications directly in your database layer.
Understanding Service Broker
Service Broker provides queuing and messaging functionality within SQL Server, allowing for decoupled, asynchronous communication between services.
Setting Up Service Broker
-- Enable Service Broker on the database
ALTER DATABASE MyDatabase SET ENABLE_BROKER WITH ROLLBACK IMMEDIATE;
-- Verify Service Broker is enabled
SELECT
name,
is_broker_enabled,
service_broker_guid
FROM sys.databases
WHERE name = 'MyDatabase';
-- Create message types
CREATE MESSAGE TYPE OrderRequest
VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE OrderResponse
VALIDATION = WELL_FORMED_XML;
-- Create contract
CREATE CONTRACT OrderProcessingContract
(
OrderRequest SENT BY INITIATOR,
OrderResponse SENT BY TARGET
);
-- Create queues
CREATE QUEUE OrderRequestQueue;
CREATE QUEUE OrderResponseQueue;
-- Create services
CREATE SERVICE OrderService
ON QUEUE OrderRequestQueue (OrderProcessingContract);
CREATE SERVICE OrderResponseService
ON QUEUE OrderResponseQueue (OrderProcessingContract);
Sending Messages
-- Procedure to send order request
CREATE PROCEDURE dbo.SendOrderRequest
@OrderID INT,
@CustomerID INT,
@TotalAmount DECIMAL(18,2)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @ConversationHandle UNIQUEIDENTIFIER;
DECLARE @MessageBody XML;
-- Create message body
SET @MessageBody = (
SELECT
@OrderID AS OrderID,
@CustomerID AS CustomerID,
@TotalAmount AS TotalAmount,
GETUTCDATE() AS RequestTime
FOR XML PATH('OrderRequest'), TYPE
);
-- Begin conversation
BEGIN DIALOG CONVERSATION @ConversationHandle
FROM SERVICE OrderResponseService
TO SERVICE 'OrderService'
ON CONTRACT OrderProcessingContract
WITH ENCRYPTION = OFF;
-- Send message
SEND ON CONVERSATION @ConversationHandle
MESSAGE TYPE OrderRequest (@MessageBody);
-- Log the sent message
INSERT INTO dbo.MessageLog (ConversationHandle, MessageType, MessageBody, SentAt)
VALUES (@ConversationHandle, 'OrderRequest', CAST(@MessageBody AS NVARCHAR(MAX)), GETUTCDATE());
RETURN @ConversationHandle;
END;
GO
Receiving and Processing Messages
-- Activation procedure for processing orders
CREATE PROCEDURE dbo.ProcessOrderRequest
AS
BEGIN
SET NOCOUNT ON;
DECLARE @ConversationHandle UNIQUEIDENTIFIER;
DECLARE @MessageType NVARCHAR(256);
DECLARE @MessageBody XML;
WHILE 1 = 1
BEGIN
BEGIN TRANSACTION;
WAITFOR (
RECEIVE TOP(1)
@ConversationHandle = conversation_handle,
@MessageType = message_type_name,
@MessageBody = CAST(message_body AS XML)
FROM OrderRequestQueue
), TIMEOUT 5000;
IF @@ROWCOUNT = 0
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END
IF @MessageType = N'OrderRequest'
BEGIN
-- Extract order details
DECLARE @OrderID INT = @MessageBody.value('(/OrderRequest/OrderID)[1]', 'INT');
DECLARE @CustomerID INT = @MessageBody.value('(/OrderRequest/CustomerID)[1]', 'INT');
DECLARE @TotalAmount DECIMAL(18,2) = @MessageBody.value('(/OrderRequest/TotalAmount)[1]', 'DECIMAL(18,2)');
-- Process the order
BEGIN TRY
-- Insert into orders table
INSERT INTO dbo.ProcessedOrders (OrderID, CustomerID, TotalAmount, ProcessedAt)
VALUES (@OrderID, @CustomerID, @TotalAmount, GETUTCDATE());
-- Send success response
DECLARE @ResponseBody XML = (
SELECT
@OrderID AS OrderID,
'Success' AS Status,
GETUTCDATE() AS ProcessedAt
FOR XML PATH('OrderResponse'), TYPE
);
SEND ON CONVERSATION @ConversationHandle
MESSAGE TYPE OrderResponse (@ResponseBody);
END TRY
BEGIN CATCH
-- Send error response
DECLARE @ErrorResponse XML = (
SELECT
@OrderID AS OrderID,
'Error' AS Status,
ERROR_MESSAGE() AS ErrorMessage
FOR XML PATH('OrderResponse'), TYPE
);
SEND ON CONVERSATION @ConversationHandle
MESSAGE TYPE OrderResponse (@ErrorResponse);
END CATCH
-- End the conversation
END CONVERSATION @ConversationHandle;
END
ELSE IF @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @ConversationHandle;
END
ELSE IF @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
END CONVERSATION @ConversationHandle;
END
COMMIT TRANSACTION;
END
END;
GO
-- Enable activation on the queue
ALTER QUEUE OrderRequestQueue
WITH ACTIVATION (
STATUS = ON,
PROCEDURE_NAME = dbo.ProcessOrderRequest,
MAX_QUEUE_READERS = 5,
EXECUTE AS SELF
);
C# Integration
using Microsoft.Data.SqlClient;
public class ServiceBrokerClient
{
private readonly string _connectionString;
public ServiceBrokerClient(string connectionString)
{
_connectionString = connectionString;
}
public async Task<Guid> SendOrderAsync(int orderId, int customerId, decimal totalAmount)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
using var command = new SqlCommand("dbo.SendOrderRequest", connection)
{
CommandType = CommandType.StoredProcedure
};
command.Parameters.AddWithValue("@OrderID", orderId);
command.Parameters.AddWithValue("@CustomerID", customerId);
command.Parameters.AddWithValue("@TotalAmount", totalAmount);
var result = await command.ExecuteScalarAsync();
return (Guid)result;
}
public async Task<OrderResponse> WaitForResponseAsync(
Guid conversationHandle,
TimeSpan timeout)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
var query = @"
WAITFOR (
RECEIVE TOP(1)
conversation_handle,
message_type_name,
CAST(message_body AS XML) AS message_body
FROM OrderResponseQueue
WHERE conversation_handle = @ConversationHandle
), TIMEOUT @TimeoutMs";
using var command = new SqlCommand(query, connection);
command.Parameters.AddWithValue("@ConversationHandle", conversationHandle);
command.Parameters.AddWithValue("@TimeoutMs", (int)timeout.TotalMilliseconds);
using var reader = await command.ExecuteReaderAsync();
if (await reader.ReadAsync())
{
var messageBody = reader.GetSqlXml(2).CreateReader();
var doc = new XmlDocument();
doc.Load(messageBody);
return new OrderResponse
{
OrderID = int.Parse(doc.SelectSingleNode("//OrderID")?.InnerText ?? "0"),
Status = doc.SelectSingleNode("//Status")?.InnerText,
ErrorMessage = doc.SelectSingleNode("//ErrorMessage")?.InnerText
};
}
return null;
}
public async Task ProcessMessagesAsync(
Func<OrderRequest, Task<OrderResponse>> processor,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync(cancellationToken);
using var transaction = connection.BeginTransaction();
var query = @"
WAITFOR (
RECEIVE TOP(1)
conversation_handle,
message_type_name,
CAST(message_body AS XML) AS message_body
FROM OrderRequestQueue
), TIMEOUT 5000";
using var receiveCmd = new SqlCommand(query, connection, transaction);
using var reader = await receiveCmd.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
var conversationHandle = reader.GetGuid(0);
var messageType = reader.GetString(1);
if (messageType == "OrderRequest")
{
var messageBody = reader.GetSqlXml(2).CreateReader();
var doc = new XmlDocument();
doc.Load(messageBody);
var request = new OrderRequest
{
OrderID = int.Parse(doc.SelectSingleNode("//OrderID")?.InnerText ?? "0"),
CustomerID = int.Parse(doc.SelectSingleNode("//CustomerID")?.InnerText ?? "0"),
TotalAmount = decimal.Parse(doc.SelectSingleNode("//TotalAmount")?.InnerText ?? "0")
};
reader.Close();
var response = await processor(request);
// Send response
await SendResponseAsync(connection, transaction, conversationHandle, response);
}
}
reader.Close();
transaction.Commit();
}
}
private async Task SendResponseAsync(
SqlConnection connection,
SqlTransaction transaction,
Guid conversationHandle,
OrderResponse response)
{
var responseXml = $@"
<OrderResponse>
<OrderID>{response.OrderID}</OrderID>
<Status>{response.Status}</Status>
<ErrorMessage>{response.ErrorMessage}</ErrorMessage>
</OrderResponse>";
var sendQuery = @"
SEND ON CONVERSATION @ConversationHandle
MESSAGE TYPE OrderResponse (@MessageBody);
END CONVERSATION @ConversationHandle";
using var sendCmd = new SqlCommand(sendQuery, connection, transaction);
sendCmd.Parameters.AddWithValue("@ConversationHandle", conversationHandle);
sendCmd.Parameters.AddWithValue("@MessageBody", responseXml);
await sendCmd.ExecuteNonQueryAsync();
}
}
public class OrderRequest
{
public int OrderID { get; set; }
public int CustomerID { get; set; }
public decimal TotalAmount { get; set; }
}
public class OrderResponse
{
public int OrderID { get; set; }
public string Status { get; set; }
public string ErrorMessage { get; set; }
}
Monitoring Service Broker
-- Queue status
SELECT
name AS queue_name,
is_activation_enabled,
is_receive_enabled,
is_enqueue_enabled,
activation_procedure
FROM sys.service_queues;
-- Message counts
SELECT
q.name AS queue_name,
p.rows AS message_count
FROM sys.service_queues q
JOIN sys.internal_tables it ON q.object_id = it.parent_object_id
JOIN sys.partitions p ON it.object_id = p.object_id
WHERE it.internal_type = 201;
-- Conversation endpoints
SELECT
ce.conversation_handle,
ce.state_desc,
s.name AS service_name,
ce.lifetime
FROM sys.conversation_endpoints ce
JOIN sys.services s ON ce.service_id = s.service_id;
Service Broker provides reliable messaging capabilities directly within Azure SQL Managed Instance for event-driven architectures.