Skip to content
Back to Blog
1 min read

Service Broker in Azure SQL Managed Instance

I wrote “Service Broker in Azure SQL Managed Instance” to share practical, production-minded guidance on this topic.

Understanding Service Broker

Service Broker provides queuing and messaging functionality within SQL Server, allowing for decoupled, asynchronous communication between services.

Setting Up Service Broker

-- Enable Service Broker on the database
ALTER DATABASE MyDatabase SET ENABLE_BROKER WITH ROLLBACK IMMEDIATE;

-- Verify Service Broker is enabled
SELECT
    name,
    is_broker_enabled,
    service_broker_guid
FROM sys.databases
WHERE name = 'MyDatabase';

-- Create message types
CREATE MESSAGE TYPE OrderRequest
VALIDATION = WELL_FORMED_XML;

CREATE MESSAGE TYPE OrderResponse
VALIDATION = WELL_FORMED_XML;

-- Create contract
CREATE CONTRACT OrderProcessingContract
(
    OrderRequest SENT BY INITIATOR,
    OrderResponse SENT BY TARGET
);

-- Create queues
CREATE QUEUE OrderRequestQueue;
CREATE QUEUE OrderResponseQueue;

-- Create services
CREATE SERVICE OrderService
ON QUEUE OrderRequestQueue (OrderProcessingContract);

CREATE SERVICE OrderResponseService
ON QUEUE OrderResponseQueue (OrderProcessingContract);

Sending Messages

-- Procedure to send order request
CREATE PROCEDURE dbo.SendOrderRequest
    @OrderID INT,
    @CustomerID INT,
    @TotalAmount DECIMAL(18,2)
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @ConversationHandle UNIQUEIDENTIFIER;
    DECLARE @MessageBody XML;

    -- Create message body
    SET @MessageBody = (
        SELECT
            @OrderID AS OrderID,
            @CustomerID AS CustomerID,
            @TotalAmount AS TotalAmount,
            GETUTCDATE() AS RequestTime
        FOR XML PATH('OrderRequest'), TYPE
    );

    -- Begin conversation
    BEGIN DIALOG CONVERSATION @ConversationHandle
        FROM SERVICE OrderResponseService
        TO SERVICE 'OrderService'
        ON CONTRACT OrderProcessingContract
        WITH ENCRYPTION = OFF;

    -- Send message
    SEND ON CONVERSATION @ConversationHandle
        MESSAGE TYPE OrderRequest (@MessageBody);

    -- Log the sent message
    INSERT INTO dbo.MessageLog (ConversationHandle, MessageType, MessageBody, SentAt)
    VALUES (@ConversationHandle, 'OrderRequest', CAST(@MessageBody AS NVARCHAR(MAX)), GETUTCDATE());

    RETURN @ConversationHandle;
END;
GO

Receiving and Processing Messages

-- Activation procedure for processing orders
CREATE PROCEDURE dbo.ProcessOrderRequest
AS
BEGIN
    SET NOCOUNT ON;

    DECLARE @ConversationHandle UNIQUEIDENTIFIER;
    DECLARE @MessageType NVARCHAR(256);
    DECLARE @MessageBody XML;

    WHILE 1 = 1
    BEGIN
        BEGIN TRANSACTION;

        WAITFOR (
            RECEIVE TOP(1)
                @ConversationHandle = conversation_handle,
                @MessageType = message_type_name,
                @MessageBody = CAST(message_body AS XML)
            FROM OrderRequestQueue
        ), TIMEOUT 5000;

        IF @@ROWCOUNT = 0
        BEGIN
            ROLLBACK TRANSACTION;
            BREAK;
        END

        IF @MessageType = N'OrderRequest'
        BEGIN
            -- Extract order details
            DECLARE @OrderID INT = @MessageBody.value('(/OrderRequest/OrderID)[1]', 'INT');
            DECLARE @CustomerID INT = @MessageBody.value('(/OrderRequest/CustomerID)[1]', 'INT');
            DECLARE @TotalAmount DECIMAL(18,2) = @MessageBody.value('(/OrderRequest/TotalAmount)[1]', 'DECIMAL(18,2)');

            -- Process the order
            BEGIN TRY
                -- Insert into orders table
                INSERT INTO dbo.ProcessedOrders (OrderID, CustomerID, TotalAmount, ProcessedAt)
                VALUES (@OrderID, @CustomerID, @TotalAmount, GETUTCDATE());

                -- Send success response
                DECLARE @ResponseBody XML = (
                    SELECT
                        @OrderID AS OrderID,
                        'Success' AS Status,
                        GETUTCDATE() AS ProcessedAt
                    FOR XML PATH('OrderResponse'), TYPE
                );

                SEND ON CONVERSATION @ConversationHandle
                    MESSAGE TYPE OrderResponse (@ResponseBody);

            END TRY
            BEGIN CATCH
                -- Send error response
                DECLARE @ErrorResponse XML = (
                    SELECT
                        @OrderID AS OrderID,
                        'Error' AS Status,
                        ERROR_MESSAGE() AS ErrorMessage
                    FOR XML PATH('OrderResponse'), TYPE
                );

                SEND ON CONVERSATION @ConversationHandle
                    MESSAGE TYPE OrderResponse (@ErrorResponse);
            END CATCH

            -- End the conversation
            END CONVERSATION @ConversationHandle;
        END
        ELSE IF @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
        BEGIN
            END CONVERSATION @ConversationHandle;
        END
        ELSE IF @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
        BEGIN
            END CONVERSATION @ConversationHandle;
        END

        COMMIT TRANSACTION;
    END
END;
GO

-- Enable activation on the queue
ALTER QUEUE OrderRequestQueue
WITH ACTIVATION (
    STATUS = ON,
    PROCEDURE_NAME = dbo.ProcessOrderRequest,
    MAX_QUEUE_READERS = 5,
    EXECUTE AS SELF
);

C# Integration

using Microsoft.Data.SqlClient;

public class ServiceBrokerClient
{
    private readonly string _connectionString;

    public ServiceBrokerClient(string connectionString)
    {
        _connectionString = connectionString;
    }

    public async Task<Guid> SendOrderAsync(int orderId, int customerId, decimal totalAmount)
    {
        using var connection = new SqlConnection(_connectionString);
        await connection.OpenAsync();

        using var command = new SqlCommand("dbo.SendOrderRequest", connection)
        {
            CommandType = CommandType.StoredProcedure
        };

        command.Parameters.AddWithValue("@OrderID", orderId);
        command.Parameters.AddWithValue("@CustomerID", customerId);
        command.Parameters.AddWithValue("@TotalAmount", totalAmount);

        var result = await command.ExecuteScalarAsync();
        return (Guid)result;
    }

    public async Task<OrderResponse> WaitForResponseAsync(
        Guid conversationHandle,
        TimeSpan timeout)
    {
        using var connection = new SqlConnection(_connectionString);
        await connection.OpenAsync();

        var query = @"
            WAITFOR (
                RECEIVE TOP(1)
                    conversation_handle,
                    message_type_name,
                    CAST(message_body AS XML) AS message_body
                FROM OrderResponseQueue
                WHERE conversation_handle = @ConversationHandle
            ), TIMEOUT @TimeoutMs";

        using var command = new SqlCommand(query, connection);
        command.Parameters.AddWithValue("@ConversationHandle", conversationHandle);
        command.Parameters.AddWithValue("@TimeoutMs", (int)timeout.TotalMilliseconds);

        using var reader = await command.ExecuteReaderAsync();

        if (await reader.ReadAsync())
        {
            var messageBody = reader.GetSqlXml(2).CreateReader();
            var doc = new XmlDocument();
            doc.Load(messageBody);

            return new OrderResponse
            {
                OrderID = int.Parse(doc.SelectSingleNode("//OrderID")?.InnerText ?? "0"),
                Status = doc.SelectSingleNode("//Status")?.InnerText,
                ErrorMessage = doc.SelectSingleNode("//ErrorMessage")?.InnerText
            };
        }

        return null;
    }

    public async Task ProcessMessagesAsync(
        Func<OrderRequest, Task<OrderResponse>> processor,
        CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            using var connection = new SqlConnection(_connectionString);
            await connection.OpenAsync(cancellationToken);

            using var transaction = connection.BeginTransaction();

            var query = @"
                WAITFOR (
                    RECEIVE TOP(1)
                        conversation_handle,
                        message_type_name,
                        CAST(message_body AS XML) AS message_body
                    FROM OrderRequestQueue
                ), TIMEOUT 5000";

            using var receiveCmd = new SqlCommand(query, connection, transaction);
            using var reader = await receiveCmd.ExecuteReaderAsync(cancellationToken);

            if (await reader.ReadAsync(cancellationToken))
            {
                var conversationHandle = reader.GetGuid(0);
                var messageType = reader.GetString(1);

                if (messageType == "OrderRequest")
                {
                    var messageBody = reader.GetSqlXml(2).CreateReader();
                    var doc = new XmlDocument();
                    doc.Load(messageBody);

                    var request = new OrderRequest
                    {
                        OrderID = int.Parse(doc.SelectSingleNode("//OrderID")?.InnerText ?? "0"),
                        CustomerID = int.Parse(doc.SelectSingleNode("//CustomerID")?.InnerText ?? "0"),
                        TotalAmount = decimal.Parse(doc.SelectSingleNode("//TotalAmount")?.InnerText ?? "0")
                    };

                    reader.Close();

                    var response = await processor(request);

                    // Send response
                    await SendResponseAsync(connection, transaction, conversationHandle, response);
                }
            }

            reader.Close();
            transaction.Commit();
        }
    }

    private async Task SendResponseAsync(
        SqlConnection connection,
        SqlTransaction transaction,
        Guid conversationHandle,
        OrderResponse response)
    {
        var responseXml = $@"
            <OrderResponse>
                <OrderID>{response.OrderID}</OrderID>
                <Status>{response.Status}</Status>
                <ErrorMessage>{response.ErrorMessage}</ErrorMessage>
            </OrderResponse>";

        var sendQuery = @"
            SEND ON CONVERSATION @ConversationHandle
            MESSAGE TYPE OrderResponse (@MessageBody);
            END CONVERSATION @ConversationHandle";

        using var sendCmd = new SqlCommand(sendQuery, connection, transaction);
        sendCmd.Parameters.AddWithValue("@ConversationHandle", conversationHandle);
        sendCmd.Parameters.AddWithValue("@MessageBody", responseXml);

        await sendCmd.ExecuteNonQueryAsync();
    }
}

public class OrderRequest
{
    public int OrderID { get; set; }
    public int CustomerID { get; set; }
    public decimal TotalAmount { get; set; }
}

public class OrderResponse
{
    public int OrderID { get; set; }
    public string Status { get; set; }
    public string ErrorMessage { get; set; }
}

Monitoring Service Broker

-- Queue status
SELECT
    name AS queue_name,
    is_activation_enabled,
    is_receive_enabled,
    is_enqueue_enabled,
    activation_procedure
FROM sys.service_queues;

-- Message counts
SELECT
    q.name AS queue_name,
    p.rows AS message_count
FROM sys.service_queues q
JOIN sys.internal_tables it ON q.object_id = it.parent_object_id
JOIN sys.partitions p ON it.object_id = p.object_id
WHERE it.internal_type = 201;

-- Conversation endpoints
SELECT
    ce.conversation_handle,
    ce.state_desc,
    s.name AS service_name,
    ce.lifetime
FROM sys.conversation_endpoints ce
JOIN sys.services s ON ce.service_id = s.service_id;

Service Broker provides reliable messaging capabilities directly within Azure SQL Managed Instance for event-driven architectures.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.