5 min read
Fabric Databases Preview: Relational Databases in Microsoft Fabric
Microsoft Fabric now includes relational database capabilities in preview. Let’s explore what this means and how to use it.
Understanding Fabric Databases
Fabric Databases bring operational database workloads into the unified Fabric platform, with automatic integration to OneLake.
# Architecture Overview
"""
Fabric Databases:
- SQL Database in Fabric (based on Azure SQL)
- PostgreSQL in Fabric (coming)
- Automatic mirroring to OneLake (Delta format)
- Unified governance with Purview
- Integrated with Fabric analytics
"""
Creating a Fabric SQL Database
from azure.identity import DefaultAzureCredential
import requests
credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default")
headers = {
"Authorization": f"Bearer {token.token}",
"Content-Type": "application/json"
}
# Create SQL Database in Fabric
workspace_id = "your-workspace-id"
database_config = {
"displayName": "SalesDatabase",
"description": "Sales operational database",
"type": "SQLDatabase",
"properties": {
# Performance tier
"sku": "S0", # Standard tier
# Auto-mirroring to OneLake
"autoMirror": True,
# Collation
"collation": "SQL_Latin1_General_CP1_CI_AS"
}
}
response = requests.post(
f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items",
headers=headers,
json=database_config
)
database = response.json()
print(f"Created database: {database['id']}")
Connecting and Working with Data
import pyodbc
from azure.identity import DefaultAzureCredential
# Connection options
class FabricDatabaseConnection:
"""Connect to Fabric SQL Database"""
def __init__(self, workspace_name: str, database_name: str):
self.server = f"{workspace_name}.database.fabric.microsoft.com"
self.database = database_name
self.credential = DefaultAzureCredential()
def get_connection(self) -> pyodbc.Connection:
"""Get database connection with AAD auth"""
token = self.credential.get_token(
"https://database.fabric.microsoft.com/.default"
)
conn_str = (
f"Driver={{ODBC Driver 18 for SQL Server}};"
f"Server={self.server};"
f"Database={self.database};"
f"Encrypt=yes;"
)
conn = pyodbc.connect(
conn_str,
attrs_before={1256: token.token.encode()}
)
return conn
def execute_query(self, query: str, params: tuple = None):
"""Execute a query and return results"""
conn = self.get_connection()
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
if cursor.description:
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
conn.commit()
return cursor.rowcount
# Usage
db = FabricDatabaseConnection("my-workspace", "SalesDatabase")
# Create tables
db.execute_query("""
CREATE TABLE customers (
customer_id INT PRIMARY KEY IDENTITY(1,1),
name NVARCHAR(100) NOT NULL,
email NVARCHAR(255),
created_at DATETIME2 DEFAULT GETUTCDATE()
)
""")
db.execute_query("""
CREATE TABLE orders (
order_id INT PRIMARY KEY IDENTITY(1,1),
customer_id INT REFERENCES customers(customer_id),
amount DECIMAL(10, 2) NOT NULL,
status NVARCHAR(20) DEFAULT 'pending',
order_date DATETIME2 DEFAULT GETUTCDATE()
)
""")
# Insert data
db.execute_query(
"INSERT INTO customers (name, email) VALUES (?, ?)",
("John Doe", "john@example.com")
)
# Query data
customers = db.execute_query("SELECT * FROM customers")
Auto-Mirroring to OneLake
# Data automatically mirrors to OneLake in Delta format
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Access mirrored data in Lakehouse
# Path: abfss://workspace@onelake.dfs.fabric.microsoft.com/database/Tables/
# Read mirrored customer data
customers_df = spark.read.format("delta").load(
"abfss://my-workspace@onelake.dfs.fabric.microsoft.com/"
"SalesDatabase.Database/Tables/dbo/customers"
)
customers_df.show()
# Join with analytical data
analytics_df = spark.read.format("delta").load(
"abfss://my-workspace@onelake.dfs.fabric.microsoft.com/"
"AnalyticsLakehouse.Lakehouse/Tables/customer_segments"
)
# Combine operational and analytical data
enriched = customers_df.join(
analytics_df,
"customer_id"
).select(
"customer_id",
"name",
"email",
"segment",
"lifetime_value"
)
Real-Time Sync Configuration
# Configure real-time sync between database and OneLake
sync_config = {
"displayName": "CustomerSync",
"sourceDatabase": "SalesDatabase",
"tables": [
{
"schema": "dbo",
"table": "customers",
"syncMode": "incremental", # or "full"
"changeTracking": True,
"watermarkColumn": "updated_at"
},
{
"schema": "dbo",
"table": "orders",
"syncMode": "incremental",
"changeTracking": True,
"watermarkColumn": "order_date"
}
],
"schedule": {
"type": "continuous", # or "scheduled"
"latencySeconds": 30
}
}
# Mirroring happens automatically with ~30 second latency
# Changes are captured via SQL Server Change Tracking
Querying Across Operational and Analytical Data
# T-SQL cross-database queries
cross_db_query = """
-- Query operational data
SELECT
o.order_id,
c.name as customer_name,
o.amount,
o.order_date
FROM SalesDatabase.dbo.orders o
JOIN SalesDatabase.dbo.customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= DATEADD(day, -30, GETUTCDATE())
-- Join with analytical insights from Lakehouse
-- Using external tables or views
"""
# Create external table pointing to Lakehouse
db.execute_query("""
CREATE EXTERNAL TABLE customer_insights (
customer_id INT,
segment NVARCHAR(50),
lifetime_value DECIMAL(10, 2),
churn_risk FLOAT
)
WITH (
LOCATION = 'Tables/customer_insights',
DATA_SOURCE = OneLakehouse,
FILE_FORMAT = DeltaFormat
)
""")
# Now query across operational and analytical data
results = db.execute_query("""
SELECT
c.name,
c.email,
i.segment,
i.churn_risk,
SUM(o.amount) as total_orders
FROM customers c
JOIN customer_insights i ON c.customer_id = i.customer_id
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.name, c.email, i.segment, i.churn_risk
HAVING i.churn_risk > 0.7
""")
Performance Optimization
# Index recommendations
db.execute_query("""
-- Fabric provides automatic index recommendations
SELECT * FROM sys.dm_db_index_usage_stats
-- Create recommended indexes
CREATE NONCLUSTERED INDEX IX_orders_customer_date
ON orders (customer_id, order_date)
INCLUDE (amount, status)
""")
# Query Store for performance insights
db.execute_query("""
-- Enable Query Store (on by default)
ALTER DATABASE CURRENT SET QUERY_STORE = ON;
-- Review top resource-consuming queries
SELECT TOP 10
qt.query_sql_text,
rs.avg_duration,
rs.avg_cpu_time,
rs.count_executions
FROM sys.query_store_query_text qt
JOIN sys.query_store_query q ON qt.query_text_id = q.query_text_id
JOIN sys.query_store_plan p ON q.query_id = p.query_id
JOIN sys.query_store_runtime_stats rs ON p.plan_id = rs.plan_id
ORDER BY rs.avg_cpu_time DESC
""")
# Automatic tuning
db.execute_query("""
-- Fabric enables automatic tuning
ALTER DATABASE CURRENT SET AUTOMATIC_TUNING = ON;
""")
Migration from Azure SQL
class FabricMigration:
"""Helper for migrating from Azure SQL to Fabric Database"""
def __init__(self, source_conn: str, fabric_conn: FabricDatabaseConnection):
self.source = pyodbc.connect(source_conn)
self.fabric = fabric_conn
def migrate_schema(self, tables: list):
"""Migrate table schemas"""
for table in tables:
# Get schema from source
schema = self._get_table_schema(table)
# Create in Fabric
self.fabric.execute_query(schema)
def migrate_data(self, table: str, batch_size: int = 10000):
"""Migrate data in batches"""
cursor = self.source.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table}")
total_rows = cursor.fetchone()[0]
offset = 0
while offset < total_rows:
cursor.execute(f"""
SELECT * FROM {table}
ORDER BY (SELECT NULL)
OFFSET {offset} ROWS
FETCH NEXT {batch_size} ROWS ONLY
""")
rows = cursor.fetchall()
columns = [col[0] for col in cursor.description]
# Insert into Fabric
self._bulk_insert(table, columns, rows)
offset += batch_size
print(f"Migrated {min(offset, total_rows)}/{total_rows} rows")
def _get_table_schema(self, table: str) -> str:
"""Get CREATE TABLE statement"""
# Implementation to generate DDL
pass
def _bulk_insert(self, table: str, columns: list, rows: list):
"""Bulk insert data"""
placeholders = ", ".join(["?" for _ in columns])
cols = ", ".join(columns)
query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders})"
conn = self.fabric.get_connection()
cursor = conn.cursor()
cursor.executemany(query, rows)
conn.commit()
Fabric Databases bring operational workloads into the unified Fabric platform. The automatic mirroring to OneLake enables seamless integration between transactional and analytical workloads.