Back to Blog
5 min read

Fabric Databases Preview: Relational Databases in Microsoft Fabric

Microsoft Fabric now includes relational database capabilities in preview. Let’s explore what this means and how to use it.

Understanding Fabric Databases

Fabric Databases bring operational database workloads into the unified Fabric platform, with automatic integration to OneLake.

# Architecture Overview
"""
Fabric Databases:
- SQL Database in Fabric (based on Azure SQL)
- PostgreSQL in Fabric (coming)
- Automatic mirroring to OneLake (Delta format)
- Unified governance with Purview
- Integrated with Fabric analytics
"""

Creating a Fabric SQL Database

from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default")

headers = {
    "Authorization": f"Bearer {token.token}",
    "Content-Type": "application/json"
}

# Create SQL Database in Fabric
workspace_id = "your-workspace-id"

database_config = {
    "displayName": "SalesDatabase",
    "description": "Sales operational database",
    "type": "SQLDatabase",
    "properties": {
        # Performance tier
        "sku": "S0",  # Standard tier
        # Auto-mirroring to OneLake
        "autoMirror": True,
        # Collation
        "collation": "SQL_Latin1_General_CP1_CI_AS"
    }
}

response = requests.post(
    f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items",
    headers=headers,
    json=database_config
)

database = response.json()
print(f"Created database: {database['id']}")

Connecting and Working with Data

import pyodbc
from azure.identity import DefaultAzureCredential

# Connection options
class FabricDatabaseConnection:
    """Connect to Fabric SQL Database"""

    def __init__(self, workspace_name: str, database_name: str):
        self.server = f"{workspace_name}.database.fabric.microsoft.com"
        self.database = database_name
        self.credential = DefaultAzureCredential()

    def get_connection(self) -> pyodbc.Connection:
        """Get database connection with AAD auth"""
        token = self.credential.get_token(
            "https://database.fabric.microsoft.com/.default"
        )

        conn_str = (
            f"Driver={{ODBC Driver 18 for SQL Server}};"
            f"Server={self.server};"
            f"Database={self.database};"
            f"Encrypt=yes;"
        )

        conn = pyodbc.connect(
            conn_str,
            attrs_before={1256: token.token.encode()}
        )

        return conn

    def execute_query(self, query: str, params: tuple = None):
        """Execute a query and return results"""
        conn = self.get_connection()
        cursor = conn.cursor()

        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)

        if cursor.description:
            columns = [col[0] for col in cursor.description]
            results = [dict(zip(columns, row)) for row in cursor.fetchall()]
            return results

        conn.commit()
        return cursor.rowcount

# Usage
db = FabricDatabaseConnection("my-workspace", "SalesDatabase")

# Create tables
db.execute_query("""
    CREATE TABLE customers (
        customer_id INT PRIMARY KEY IDENTITY(1,1),
        name NVARCHAR(100) NOT NULL,
        email NVARCHAR(255),
        created_at DATETIME2 DEFAULT GETUTCDATE()
    )
""")

db.execute_query("""
    CREATE TABLE orders (
        order_id INT PRIMARY KEY IDENTITY(1,1),
        customer_id INT REFERENCES customers(customer_id),
        amount DECIMAL(10, 2) NOT NULL,
        status NVARCHAR(20) DEFAULT 'pending',
        order_date DATETIME2 DEFAULT GETUTCDATE()
    )
""")

# Insert data
db.execute_query(
    "INSERT INTO customers (name, email) VALUES (?, ?)",
    ("John Doe", "john@example.com")
)

# Query data
customers = db.execute_query("SELECT * FROM customers")

Auto-Mirroring to OneLake

# Data automatically mirrors to OneLake in Delta format
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Access mirrored data in Lakehouse
# Path: abfss://workspace@onelake.dfs.fabric.microsoft.com/database/Tables/

# Read mirrored customer data
customers_df = spark.read.format("delta").load(
    "abfss://my-workspace@onelake.dfs.fabric.microsoft.com/"
    "SalesDatabase.Database/Tables/dbo/customers"
)

customers_df.show()

# Join with analytical data
analytics_df = spark.read.format("delta").load(
    "abfss://my-workspace@onelake.dfs.fabric.microsoft.com/"
    "AnalyticsLakehouse.Lakehouse/Tables/customer_segments"
)

# Combine operational and analytical data
enriched = customers_df.join(
    analytics_df,
    "customer_id"
).select(
    "customer_id",
    "name",
    "email",
    "segment",
    "lifetime_value"
)

Real-Time Sync Configuration

# Configure real-time sync between database and OneLake
sync_config = {
    "displayName": "CustomerSync",
    "sourceDatabase": "SalesDatabase",
    "tables": [
        {
            "schema": "dbo",
            "table": "customers",
            "syncMode": "incremental",  # or "full"
            "changeTracking": True,
            "watermarkColumn": "updated_at"
        },
        {
            "schema": "dbo",
            "table": "orders",
            "syncMode": "incremental",
            "changeTracking": True,
            "watermarkColumn": "order_date"
        }
    ],
    "schedule": {
        "type": "continuous",  # or "scheduled"
        "latencySeconds": 30
    }
}

# Mirroring happens automatically with ~30 second latency
# Changes are captured via SQL Server Change Tracking

Querying Across Operational and Analytical Data

# T-SQL cross-database queries
cross_db_query = """
    -- Query operational data
    SELECT
        o.order_id,
        c.name as customer_name,
        o.amount,
        o.order_date
    FROM SalesDatabase.dbo.orders o
    JOIN SalesDatabase.dbo.customers c ON o.customer_id = c.customer_id
    WHERE o.order_date >= DATEADD(day, -30, GETUTCDATE())

    -- Join with analytical insights from Lakehouse
    -- Using external tables or views
"""

# Create external table pointing to Lakehouse
db.execute_query("""
    CREATE EXTERNAL TABLE customer_insights (
        customer_id INT,
        segment NVARCHAR(50),
        lifetime_value DECIMAL(10, 2),
        churn_risk FLOAT
    )
    WITH (
        LOCATION = 'Tables/customer_insights',
        DATA_SOURCE = OneLakehouse,
        FILE_FORMAT = DeltaFormat
    )
""")

# Now query across operational and analytical data
results = db.execute_query("""
    SELECT
        c.name,
        c.email,
        i.segment,
        i.churn_risk,
        SUM(o.amount) as total_orders
    FROM customers c
    JOIN customer_insights i ON c.customer_id = i.customer_id
    JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.name, c.email, i.segment, i.churn_risk
    HAVING i.churn_risk > 0.7
""")

Performance Optimization

# Index recommendations
db.execute_query("""
    -- Fabric provides automatic index recommendations
    SELECT * FROM sys.dm_db_index_usage_stats

    -- Create recommended indexes
    CREATE NONCLUSTERED INDEX IX_orders_customer_date
    ON orders (customer_id, order_date)
    INCLUDE (amount, status)
""")

# Query Store for performance insights
db.execute_query("""
    -- Enable Query Store (on by default)
    ALTER DATABASE CURRENT SET QUERY_STORE = ON;

    -- Review top resource-consuming queries
    SELECT TOP 10
        qt.query_sql_text,
        rs.avg_duration,
        rs.avg_cpu_time,
        rs.count_executions
    FROM sys.query_store_query_text qt
    JOIN sys.query_store_query q ON qt.query_text_id = q.query_text_id
    JOIN sys.query_store_plan p ON q.query_id = p.query_id
    JOIN sys.query_store_runtime_stats rs ON p.plan_id = rs.plan_id
    ORDER BY rs.avg_cpu_time DESC
""")

# Automatic tuning
db.execute_query("""
    -- Fabric enables automatic tuning
    ALTER DATABASE CURRENT SET AUTOMATIC_TUNING = ON;
""")

Migration from Azure SQL

class FabricMigration:
    """Helper for migrating from Azure SQL to Fabric Database"""

    def __init__(self, source_conn: str, fabric_conn: FabricDatabaseConnection):
        self.source = pyodbc.connect(source_conn)
        self.fabric = fabric_conn

    def migrate_schema(self, tables: list):
        """Migrate table schemas"""
        for table in tables:
            # Get schema from source
            schema = self._get_table_schema(table)

            # Create in Fabric
            self.fabric.execute_query(schema)

    def migrate_data(self, table: str, batch_size: int = 10000):
        """Migrate data in batches"""
        cursor = self.source.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {table}")
        total_rows = cursor.fetchone()[0]

        offset = 0
        while offset < total_rows:
            cursor.execute(f"""
                SELECT * FROM {table}
                ORDER BY (SELECT NULL)
                OFFSET {offset} ROWS
                FETCH NEXT {batch_size} ROWS ONLY
            """)

            rows = cursor.fetchall()
            columns = [col[0] for col in cursor.description]

            # Insert into Fabric
            self._bulk_insert(table, columns, rows)

            offset += batch_size
            print(f"Migrated {min(offset, total_rows)}/{total_rows} rows")

    def _get_table_schema(self, table: str) -> str:
        """Get CREATE TABLE statement"""
        # Implementation to generate DDL
        pass

    def _bulk_insert(self, table: str, columns: list, rows: list):
        """Bulk insert data"""
        placeholders = ", ".join(["?" for _ in columns])
        cols = ", ".join(columns)
        query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders})"

        conn = self.fabric.get_connection()
        cursor = conn.cursor()
        cursor.executemany(query, rows)
        conn.commit()

Fabric Databases bring operational workloads into the unified Fabric platform. The automatic mirroring to OneLake enables seamless integration between transactional and analytical workloads.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.