Skip to content
Back to Blog
1 min read

Fabric Databases Preview: Relational Databases in Microsoft Fabric

I wrote “Fabric Databases Preview: Relational Databases in Microsoft Fabric” to share practical, production-minded guidance on this topic.

Understanding Fabric Databases

Fabric Databases bring operational database workloads into the unified Fabric platform, with automatic integration to OneLake.

# Architecture Overview
"""
Fabric Databases:
- SQL Database in Fabric (based on Azure SQL)
- PostgreSQL in Fabric (coming)
- Automatic mirroring to OneLake (Delta format)
- Unified governance with Purview
- Integrated with Fabric analytics
"""

Creating a Fabric SQL Database

from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default")

headers = {
    "Authorization": f"Bearer {token.token}",
    "Content-Type": "application/json"
}

# Create SQL Database in Fabric
workspace_id = "your-workspace-id"

database_config = {
    "displayName": "SalesDatabase",
    "description": "Sales operational database",
    "type": "SQLDatabase",
    "properties": {
        # Performance tier
        "sku": "S0",  # Standard tier
        # Auto-mirroring to OneLake
        "autoMirror": True,
        # Collation
        "collation": "SQL_Latin1_General_CP1_CI_AS"
    }
}

response = requests.post(
    f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items",
    headers=headers,
    json=database_config
)

database = response.json()
print(f"Created database: {database['id']}")

Connecting and Working with Data

import pyodbc
from azure.identity import DefaultAzureCredential

# Connection options
class FabricDatabaseConnection:
    """Connect to Fabric SQL Database"""

    def __init__(self, workspace_name: str, database_name: str):
        self.server = f"{workspace_name}.database.fabric.microsoft.com"
        self.database = database_name
        self.credential = DefaultAzureCredential()

    def get_connection(self) -> pyodbc.Connection:
        """Get database connection with AAD auth"""
        token = self.credential.get_token(
            "https://database.fabric.microsoft.com/.default"
        )

        conn_str = (
            f"Driver={{ODBC Driver 18 for SQL Server}};"
            f"Server={self.server};"
            f"Database={self.database};"
            f"Encrypt=yes;"
        )

        conn = pyodbc.connect(
            conn_str,
            attrs_before={1256: token.token.encode()}
        )

        return conn

    def execute_query(self, query: str, params: tuple = None):
        """Execute a query and return results"""
        conn = self.get_connection()
        cursor = conn.cursor()

        if params:
            cursor.execute(query, params)
        else:
            cursor.execute(query)

        if cursor.description:
            columns = [col[0] for col in cursor.description]
            results = [dict(zip(columns, row)) for row in cursor.fetchall()]
            return results

        conn.commit()
        return cursor.rowcount

# Usage
db = FabricDatabaseConnection("my-workspace", "SalesDatabase")

# Create tables
db.execute_query("""
    CREATE TABLE customers (
        customer_id INT PRIMARY KEY IDENTITY(1,1),
        name NVARCHAR(100) NOT NULL,
        email NVARCHAR(255),
        created_at DATETIME2 DEFAULT GETUTCDATE()
    )
""")

db.execute_query("""
    CREATE TABLE orders (
        order_id INT PRIMARY KEY IDENTITY(1,1),
        customer_id INT REFERENCES customers(customer_id),
        amount DECIMAL(10, 2) NOT NULL,
        status NVARCHAR(20) DEFAULT 'pending',
        order_date DATETIME2 DEFAULT GETUTCDATE()
    )
""")

# Insert data
db.execute_query(
    "INSERT INTO customers (name, email) VALUES (?, ?)",
    ("John Doe", "john@example.com")
)

# Query data
customers = db.execute_query("SELECT * FROM customers")

Auto-Mirroring to OneLake

# Data automatically mirrors to OneLake in Delta format
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Access mirrored data in Lakehouse
# Path: abfss://workspace@onelake.dfs.fabric.microsoft.com/database/Tables/

# Read mirrored customer data
customers_df = spark.read.format("delta").load(
    "abfss://my-workspace@onelake.dfs.fabric.microsoft.com/"
    "SalesDatabase.Database/Tables/dbo/customers"
)

customers_df.show()

# Join with analytical data
analytics_df = spark.read.format("delta").load(
    "abfss://my-workspace@onelake.dfs.fabric.microsoft.com/"
    "AnalyticsLakehouse.Lakehouse/Tables/customer_segments"
)

# Combine operational and analytical data
enriched = customers_df.join(
    analytics_df,
    "customer_id"
).select(
    "customer_id",
    "name",
    "email",
    "segment",
    "lifetime_value"
)

Real-Time Sync Configuration

# Configure real-time sync between database and OneLake
sync_config = {
    "displayName": "CustomerSync",
    "sourceDatabase": "SalesDatabase",
    "tables": [
        {
            "schema": "dbo",
            "table": "customers",
            "syncMode": "incremental",  # or "full"
            "changeTracking": True,
            "watermarkColumn": "updated_at"
        },
        {
            "schema": "dbo",
            "table": "orders",
            "syncMode": "incremental",
            "changeTracking": True,
            "watermarkColumn": "order_date"
        }
    ],
    "schedule": {
        "type": "continuous",  # or "scheduled"
        "latencySeconds": 30
    }
}

# Mirroring happens automatically with ~30 second latency
# Changes are captured via SQL Server Change Tracking

Querying Across Operational and Analytical Data

# T-SQL cross-database queries
cross_db_query = """
    -- Query operational data
    SELECT
        o.order_id,
        c.name as customer_name,
        o.amount,
        o.order_date
    FROM SalesDatabase.dbo.orders o
    JOIN SalesDatabase.dbo.customers c ON o.customer_id = c.customer_id
    WHERE o.order_date >= DATEADD(day, -30, GETUTCDATE())

    -- Join with analytical insights from Lakehouse
    -- Using external tables or views
"""

# Create external table pointing to Lakehouse
db.execute_query("""
    CREATE EXTERNAL TABLE customer_insights (
        customer_id INT,
        segment NVARCHAR(50),
        lifetime_value DECIMAL(10, 2),
        churn_risk FLOAT
    )
    WITH (
        LOCATION = 'Tables/customer_insights',
        DATA_SOURCE = OneLakehouse,
        FILE_FORMAT = DeltaFormat
    )
""")

# Now query across operational and analytical data
results = db.execute_query("""
    SELECT
        c.name,
        c.email,
        i.segment,
        i.churn_risk,
        SUM(o.amount) as total_orders
    FROM customers c
    JOIN customer_insights i ON c.customer_id = i.customer_id
    JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.name, c.email, i.segment, i.churn_risk
    HAVING i.churn_risk > 0.7
""")

Performance Optimization

# Index recommendations
db.execute_query("""
    -- Fabric provides automatic index recommendations
    SELECT * FROM sys.dm_db_index_usage_stats

    -- Create recommended indexes
    CREATE NONCLUSTERED INDEX IX_orders_customer_date
    ON orders (customer_id, order_date)
    INCLUDE (amount, status)
""")

# Query Store for performance insights
db.execute_query("""
    -- Enable Query Store (on by default)
    ALTER DATABASE CURRENT SET QUERY_STORE = ON;

    -- Review top resource-consuming queries
    SELECT TOP 10
        qt.query_sql_text,
        rs.avg_duration,
        rs.avg_cpu_time,
        rs.count_executions
    FROM sys.query_store_query_text qt
    JOIN sys.query_store_query q ON qt.query_text_id = q.query_text_id
    JOIN sys.query_store_plan p ON q.query_id = p.query_id
    JOIN sys.query_store_runtime_stats rs ON p.plan_id = rs.plan_id
    ORDER BY rs.avg_cpu_time DESC
""")

# Automatic tuning
db.execute_query("""
    -- Fabric enables automatic tuning
    ALTER DATABASE CURRENT SET AUTOMATIC_TUNING = ON;
""")

Migration from Azure SQL

class FabricMigration:
    """Helper for migrating from Azure SQL to Fabric Database"""

    def __init__(self, source_conn: str, fabric_conn: FabricDatabaseConnection):
        self.source = pyodbc.connect(source_conn)
        self.fabric = fabric_conn

    def migrate_schema(self, tables: list):
        """Migrate table schemas"""
        for table in tables:
            # Get schema from source
            schema = self._get_table_schema(table)

            # Create in Fabric
            self.fabric.execute_query(schema)

    def migrate_data(self, table: str, batch_size: int = 10000):
        """Migrate data in batches"""
        cursor = self.source.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {table}")
        total_rows = cursor.fetchone()[0]

        offset = 0
        while offset < total_rows:
            cursor.execute(f"""
                SELECT * FROM {table}
                ORDER BY (SELECT NULL)
                OFFSET {offset} ROWS
                FETCH NEXT {batch_size} ROWS ONLY
            """)

            rows = cursor.fetchall()
            columns = [col[0] for col in cursor.description]

            # Insert into Fabric
            self._bulk_insert(table, columns, rows)

            offset += batch_size
            print(f"Migrated {min(offset, total_rows)}/{total_rows} rows")

    def _get_table_schema(self, table: str) -> str:
        """Get CREATE TABLE statement"""
        # Implementation to generate DDL
        pass

    def _bulk_insert(self, table: str, columns: list, rows: list):
        """Bulk insert data"""
        placeholders = ", ".join(["?" for _ in columns])
        cols = ", ".join(columns)
        query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders})"

        conn = self.fabric.get_connection()
        cursor = conn.cursor()
        cursor.executemany(query, rows)
        conn.commit()

Fabric Databases bring operational workloads into the unified Fabric platform. The automatic mirroring to OneLake enables seamless integration between transactional and analytical workloads.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.