Back to Blog
5 min read

SQL Database in Microsoft Fabric: Deep Dive

SQL Database in Fabric brings the power of Azure SQL to the unified data platform. Let’s take a deep dive into its capabilities and use cases.

SQL Database Architecture in Fabric

"""
SQL Database in Fabric Architecture:
+------------------+
|  Applications    |
+------------------+
         |
         v
+------------------+
| TDS Endpoint     |  <- T-SQL Queries
+------------------+
         |
         v
+------------------+
| SQL Engine       |  <- Full SQL Server Engine
| (Azure SQL)      |
+------------------+
         |
    Automatic
    Mirroring
         |
         v
+------------------+
| OneLake          |  <- Delta Format
| (Parquet)        |
+------------------+
"""

T-SQL Feature Support

-- Full T-SQL support with some Fabric-specific considerations

-- Standard DDL
CREATE SCHEMA sales;

CREATE TABLE sales.products (
    product_id INT IDENTITY(1,1) PRIMARY KEY,
    name NVARCHAR(200) NOT NULL,
    category NVARCHAR(100),
    price DECIMAL(10, 2) NOT NULL,
    stock_quantity INT DEFAULT 0,
    created_at DATETIME2 DEFAULT SYSUTCDATETIME(),
    updated_at DATETIME2 DEFAULT SYSUTCDATETIME(),

    -- Indexes
    INDEX IX_products_category NONCLUSTERED (category),

    -- Check constraints
    CONSTRAINT CHK_price_positive CHECK (price > 0),
    CONSTRAINT CHK_stock_nonnegative CHECK (stock_quantity >= 0)
);

-- Computed columns
ALTER TABLE sales.products ADD
    is_in_stock AS (CASE WHEN stock_quantity > 0 THEN 1 ELSE 0 END);

-- Triggers (supported)
CREATE TRIGGER trg_products_update
ON sales.products
AFTER UPDATE
AS
BEGIN
    UPDATE sales.products
    SET updated_at = SYSUTCDATETIME()
    FROM sales.products p
    INNER JOIN inserted i ON p.product_id = i.product_id;
END;

-- Stored procedures
CREATE PROCEDURE sales.usp_UpdateStock
    @product_id INT,
    @quantity_change INT
AS
BEGIN
    SET NOCOUNT ON;

    UPDATE sales.products
    SET stock_quantity = stock_quantity + @quantity_change
    WHERE product_id = @product_id;

    SELECT
        product_id,
        name,
        stock_quantity as new_stock
    FROM sales.products
    WHERE product_id = @product_id;
END;

-- User-defined functions
CREATE FUNCTION sales.fn_GetDiscountedPrice(
    @price DECIMAL(10, 2),
    @discount_percent DECIMAL(5, 2)
)
RETURNS DECIMAL(10, 2)
AS
BEGIN
    RETURN @price * (1 - @discount_percent / 100);
END;

-- Views
CREATE VIEW sales.vw_LowStockProducts AS
SELECT
    product_id,
    name,
    category,
    stock_quantity
FROM sales.products
WHERE stock_quantity < 10;

Transactional Operations

import pyodbc
from contextlib import contextmanager

class TransactionalOperations:
    """Handle transactions in Fabric SQL Database"""

    def __init__(self, connection_string: str):
        self.conn_str = connection_string

    @contextmanager
    def transaction(self):
        """Context manager for transactions"""
        conn = pyodbc.connect(self.conn_str)
        conn.autocommit = False

        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise
        finally:
            conn.close()

    def transfer_inventory(self, from_product_id: int,
                          to_product_id: int, quantity: int) -> bool:
        """Transfer inventory between products atomically"""

        with self.transaction() as conn:
            cursor = conn.cursor()

            # Check source has enough stock
            cursor.execute("""
                SELECT stock_quantity
                FROM sales.products WITH (UPDLOCK, ROWLOCK)
                WHERE product_id = ?
            """, (from_product_id,))

            row = cursor.fetchone()
            if not row or row[0] < quantity:
                raise ValueError("Insufficient stock")

            # Deduct from source
            cursor.execute("""
                UPDATE sales.products
                SET stock_quantity = stock_quantity - ?
                WHERE product_id = ?
            """, (quantity, from_product_id))

            # Add to destination
            cursor.execute("""
                UPDATE sales.products
                SET stock_quantity = stock_quantity + ?
                WHERE product_id = ?
            """, (quantity, to_product_id))

            return True

    def process_order(self, order_items: list) -> int:
        """Process an order with multiple items"""

        with self.transaction() as conn:
            cursor = conn.cursor()

            # Create order
            cursor.execute("""
                INSERT INTO sales.orders (customer_id, status)
                OUTPUT INSERTED.order_id
                VALUES (?, 'pending')
            """, (order_items[0]['customer_id'],))

            order_id = cursor.fetchone()[0]

            # Add order items and update stock
            for item in order_items:
                # Check and lock stock
                cursor.execute("""
                    SELECT stock_quantity
                    FROM sales.products WITH (UPDLOCK, ROWLOCK)
                    WHERE product_id = ?
                """, (item['product_id'],))

                stock = cursor.fetchone()[0]
                if stock < item['quantity']:
                    raise ValueError(f"Insufficient stock for product {item['product_id']}")

                # Add order item
                cursor.execute("""
                    INSERT INTO sales.order_items
                    (order_id, product_id, quantity, unit_price)
                    VALUES (?, ?, ?, ?)
                """, (order_id, item['product_id'],
                      item['quantity'], item['unit_price']))

                # Update stock
                cursor.execute("""
                    UPDATE sales.products
                    SET stock_quantity = stock_quantity - ?
                    WHERE product_id = ?
                """, (item['quantity'], item['product_id']))

            # Update order total
            cursor.execute("""
                UPDATE sales.orders
                SET total_amount = (
                    SELECT SUM(quantity * unit_price)
                    FROM sales.order_items
                    WHERE order_id = ?
                ),
                status = 'confirmed'
                WHERE order_id = ?
            """, (order_id, order_id))

            return order_id

Querying Mirrored Data

-- Fabric SQL Database automatically mirrors to OneLake
-- Query operational data directly
SELECT TOP 100
    p.name,
    p.category,
    SUM(oi.quantity) as total_sold,
    SUM(oi.quantity * oi.unit_price) as revenue
FROM sales.products p
JOIN sales.order_items oi ON p.product_id = oi.product_id
JOIN sales.orders o ON oi.order_id = o.order_id
WHERE o.order_date >= DATEADD(month, -1, GETUTCDATE())
GROUP BY p.name, p.category
ORDER BY revenue DESC;

-- For heavy analytical queries, use the mirrored data in Lakehouse
-- This offloads analytics from the operational database

Change Data Capture

-- Enable CDC for tracking changes
EXEC sys.sp_cdc_enable_db;

-- Enable CDC on specific tables
EXEC sys.sp_cdc_enable_table
    @source_schema = N'sales',
    @source_name = N'products',
    @role_name = NULL,
    @supports_net_changes = 1;

-- Query changes
DECLARE @from_lsn binary(10), @to_lsn binary(10);
SET @from_lsn = sys.fn_cdc_get_min_lsn('sales_products');
SET @to_lsn = sys.fn_cdc_get_max_lsn();

SELECT *
FROM cdc.fn_cdc_get_all_changes_sales_products(
    @from_lsn, @to_lsn, 'all'
);

-- CDC changes are also reflected in OneLake mirroring

Integration with Fabric Analytics

# Combine SQL Database with Fabric Analytics
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read mirrored data from OneLake
products_df = spark.read.format("delta").load(
    "abfss://workspace@onelake.dfs.fabric.microsoft.com/"
    "SalesDB.Database/Tables/sales/products"
)

orders_df = spark.read.format("delta").load(
    "abfss://workspace@onelake.dfs.fabric.microsoft.com/"
    "SalesDB.Database/Tables/sales/orders"
)

# Perform analytical queries that would be expensive in OLTP
sales_analysis = orders_df.join(
    products_df,
    orders_df.product_id == products_df.product_id
).groupBy(
    "category",
    spark.sql("date_trunc('month', order_date)").alias("month")
).agg(
    spark.sql("sum(quantity)").alias("units_sold"),
    spark.sql("sum(quantity * unit_price)").alias("revenue"),
    spark.sql("count(distinct customer_id)").alias("unique_customers")
)

# Write insights back to Lakehouse (not to operational DB)
sales_analysis.write.format("delta").mode("overwrite").saveAsTable(
    "analytics.monthly_sales_summary"
)

Security and Access Control

-- Row-level security
CREATE FUNCTION sales.fn_SecurityPredicate(@region NVARCHAR(50))
RETURNS TABLE
WITH SCHEMABINDING
AS
RETURN SELECT 1 AS result
WHERE @region = USER_NAME()
   OR IS_MEMBER('sales_admin') = 1;

CREATE SECURITY POLICY sales.RegionPolicy
ADD FILTER PREDICATE sales.fn_SecurityPredicate(region)
    ON sales.orders;

-- Column-level encryption
-- Sensitive data can be encrypted using Always Encrypted
-- This works with Fabric's integrated security model

-- Roles and permissions
CREATE ROLE sales_reader;
GRANT SELECT ON SCHEMA::sales TO sales_reader;

CREATE ROLE sales_writer;
GRANT SELECT, INSERT, UPDATE ON SCHEMA::sales TO sales_writer;

-- Add users to roles
ALTER ROLE sales_reader ADD MEMBER [user@domain.com];

SQL Database in Fabric brings enterprise-grade transactional capabilities to the unified data platform, with seamless integration to analytical workloads through automatic mirroring.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.