Skip to content
Back to Blog
1 min read

PostgreSQL in Microsoft Fabric: What to Expect

I wrote “PostgreSQL in Microsoft Fabric: What to Expect” to share practical, production-minded guidance on this topic.

PostgreSQL in Fabric Overview

"""
PostgreSQL in Fabric (Announced for future release):
- Managed PostgreSQL in Fabric workspace
- Automatic mirroring to OneLake
- Integration with Fabric analytics
- Unified governance
- Familiar PostgreSQL experience
"""

# Expected capabilities based on announcements
EXPECTED_FEATURES = {
    "engine": "PostgreSQL 16+",
    "extensions": [
        "PostGIS",
        "pgvector",
        "pg_stat_statements",
        "uuid-ossp",
        "hstore"
    ],
    "fabric_integration": [
        "OneLake mirroring",
        "Shortcut support",
        "Unified security",
        "Copilot integration"
    ]
}

Preparing for PostgreSQL in Fabric

# Current: Azure Database for PostgreSQL
# Future: PostgreSQL in Fabric

import psycopg2
from azure.identity import DefaultAzureCredential

class PostgreSQLConnection:
    """Connection helper for PostgreSQL"""

    def __init__(self, host: str, database: str, use_aad: bool = True):
        self.host = host
        self.database = database
        self.use_aad = use_aad
        self.credential = DefaultAzureCredential() if use_aad else None

    def get_connection(self, user: str = None, password: str = None):
        """Get PostgreSQL connection"""

        if self.use_aad:
            # AAD authentication (expected in Fabric)
            token = self.credential.get_token(
                "https://ossrdbms-aad.database.windows.net/.default"
            )
            return psycopg2.connect(
                host=self.host,
                database=self.database,
                user="aad_user@tenant",
                password=token.token,
                sslmode="require"
            )
        else:
            return psycopg2.connect(
                host=self.host,
                database=self.database,
                user=user,
                password=password
            )

    def execute(self, query: str, params: tuple = None):
        """Execute query and return results"""
        conn = self.get_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(query, params)

            if cursor.description:
                columns = [col[0] for col in cursor.description]
                results = [dict(zip(columns, row)) for row in cursor.fetchall()]
                return results

            conn.commit()
            return cursor.rowcount

        finally:
            cursor.close()
            conn.close()

PostgreSQL Patterns for Fabric

-- Schema design optimized for Fabric mirroring
CREATE SCHEMA analytics;

-- Use native PostgreSQL types that map well to Delta
CREATE TABLE analytics.events (
    event_id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
    event_time TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    event_type VARCHAR(50) NOT NULL,
    user_id UUID,
    properties JSONB,
    -- Mirroring-friendly timestamp
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

-- Indexing for OLTP performance
CREATE INDEX idx_events_user ON analytics.events(user_id);
CREATE INDEX idx_events_time ON analytics.events(event_time);
CREATE INDEX idx_events_type ON analytics.events(event_type);

-- JSONB index for property queries
CREATE INDEX idx_events_properties ON analytics.events USING GIN (properties);

-- Trigger for updated_at (important for change tracking)
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER events_update_timestamp
    BEFORE UPDATE ON analytics.events
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at();

Vector Search with pgvector

-- pgvector for AI/ML workloads
-- Expected to be supported in Fabric PostgreSQL

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE embeddings.documents (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    embedding vector(1536),  -- OpenAI embedding dimension
    metadata JSONB,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

-- Create vector index
CREATE INDEX ON embeddings.documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- Similarity search function
CREATE OR REPLACE FUNCTION search_similar_documents(
    query_embedding vector(1536),
    match_count INT DEFAULT 10
)
RETURNS TABLE (
    id INT,
    content TEXT,
    similarity FLOAT
)
LANGUAGE SQL STABLE AS $$
    SELECT
        id,
        content,
        1 - (embedding <=> query_embedding) as similarity
    FROM embeddings.documents
    ORDER BY embedding <=> query_embedding
    LIMIT match_count;
$$;

Python Integration with pgvector

import psycopg2
from pgvector.psycopg2 import register_vector
import numpy as np
from openai import OpenAI

class VectorStore:
    """Vector store using PostgreSQL pgvector"""

    def __init__(self, db_conn):
        self.conn = db_conn
        register_vector(self.conn)
        self.openai = OpenAI()

    def embed_text(self, text: str) -> list:
        """Get embedding for text"""
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding

    def add_document(self, content: str, metadata: dict = None):
        """Add document to vector store"""
        embedding = self.embed_text(content)

        cursor = self.conn.cursor()
        cursor.execute("""
            INSERT INTO embeddings.documents (content, embedding, metadata)
            VALUES (%s, %s, %s)
            RETURNING id
        """, (content, embedding, metadata))

        self.conn.commit()
        return cursor.fetchone()[0]

    def search(self, query: str, limit: int = 10) -> list:
        """Search for similar documents"""
        query_embedding = self.embed_text(query)

        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT id, content, 1 - (embedding <=> %s) as similarity
            FROM embeddings.documents
            ORDER BY embedding <=> %s
            LIMIT %s
        """, (query_embedding, query_embedding, limit))

        columns = ['id', 'content', 'similarity']
        return [dict(zip(columns, row)) for row in cursor.fetchall()]

Migration Planning

class FabricPostgreSQLMigration:
    """Plan migration to Fabric PostgreSQL"""

    def __init__(self):
        self.compatibility_checks = []
        self.recommendations = []

    def assess_database(self, conn) -> dict:
        """Assess database for Fabric compatibility"""

        cursor = conn.cursor()

        # Check extensions
        cursor.execute("SELECT extname FROM pg_extension")
        extensions = [row[0] for row in cursor.fetchall()]

        # Check table sizes
        cursor.execute("""
            SELECT schemaname, tablename,
                   pg_total_relation_size(schemaname || '.' || tablename) as size
            FROM pg_tables
            WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
            ORDER BY size DESC
            LIMIT 20
        """)
        large_tables = cursor.fetchall()

        # Check for unsupported features (speculation)
        # These may change when Fabric PostgreSQL is released

        assessment = {
            "extensions": extensions,
            "large_tables": large_tables,
            "compatibility": "likely_compatible",
            "recommendations": []
        }

        # Add recommendations
        if "timescaledb" in extensions:
            assessment["recommendations"].append(
                "TimescaleDB may require migration to native partitioning"
            )

        if any(size > 100 * 1024 * 1024 * 1024 for _, _, size in large_tables):
            assessment["recommendations"].append(
                "Large tables may benefit from partitioning before migration"
            )

        return assessment

    def generate_migration_plan(self, assessment: dict) -> list:
        """Generate migration steps"""
        steps = [
            {
                "step": 1,
                "action": "Audit and document current database",
                "details": "Export schema, extensions, and configurations"
            },
            {
                "step": 2,
                "action": "Review Fabric PostgreSQL feature support",
                "details": "Verify all required extensions and features"
            },
            {
                "step": 3,
                "action": "Create Fabric workspace and database",
                "details": "Set up Fabric environment"
            },
            {
                "step": 4,
                "action": "Migrate schema",
                "details": "Create tables, indexes, functions"
            },
            {
                "step": 5,
                "action": "Migrate data",
                "details": "Use pg_dump/pg_restore or streaming replication"
            },
            {
                "step": 6,
                "action": "Verify mirroring",
                "details": "Confirm data appears in OneLake"
            },
            {
                "step": 7,
                "action": "Update application connections",
                "details": "Point applications to Fabric endpoint"
            },
            {
                "step": 8,
                "action": "Validate and cutover",
                "details": "Run validation queries and switch traffic"
            }
        ]

        return steps

Expected OneLake Integration

# Expected: PostgreSQL data mirrored to OneLake
# Similar to SQL Database in Fabric

# In PySpark (expected pattern)
"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read mirrored PostgreSQL data
events_df = spark.read.format("delta").load(
    "abfss://workspace@onelake.dfs.fabric.microsoft.com/"
    "PostgresDB.Database/Tables/analytics/events"
)

# JSONB columns expected to be stored as JSON strings
# May need parsing in Spark
from pyspark.sql.functions import from_json, schema_of_json

# Parse JSONB properties
events_with_props = events_df.withColumn(
    "parsed_properties",
    from_json("properties", schema_of_json(sample_json))
)
"""

PostgreSQL in Fabric will bring familiar PostgreSQL capabilities to the unified data platform. Start preparing now by designing schemas that work well with automatic mirroring.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.