Back to Blog
6 min read

PostgreSQL Hyperscale (Citus) on Azure: Distributed Database Patterns

Azure Database for PostgreSQL Hyperscale (Citus) brings horizontal scaling to PostgreSQL. It distributes data across multiple nodes, enabling you to handle workloads that would be impossible on a single server. Let’s explore how to design and operate Hyperscale effectively.

Understanding Hyperscale Architecture

Hyperscale uses the Citus extension to create a distributed database:

                  Coordinator Node
                  (Metadata + Routing)
                         |
         +---------------+---------------+
         |               |               |
    Worker Node 1   Worker Node 2   Worker Node 3
    (Data Shard 1)  (Data Shard 2)  (Data Shard 3)
  • Coordinator: Routes queries, stores metadata
  • Workers: Store data shards, execute queries

Creating a Hyperscale Server Group

# Create Hyperscale (Citus) server group
az postgres server-group create \
    --name my-hyperscale \
    --resource-group my-rg \
    --location eastus \
    --admin-user citus \
    --admin-password '<strong-password>' \
    --coordinator-vcores 4 \
    --coordinator-storage-size 512 \
    --node-count 3 \
    --node-vcores 4 \
    --node-storage-size 512 \
    --postgresql-version 13

Table Distribution Strategies

The key to Hyperscale performance is choosing the right distribution strategy:

Distributed Tables

For large tables that need to scale horizontally:

-- Create a distributed table
CREATE TABLE events (
    tenant_id INT,
    event_id BIGSERIAL,
    event_type VARCHAR(50),
    event_data JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    PRIMARY KEY (tenant_id, event_id)
);

-- Distribute by tenant_id (co-location key)
SELECT create_distributed_table('events', 'tenant_id');

-- Data is automatically sharded across workers
INSERT INTO events (tenant_id, event_type, event_data)
VALUES (1, 'click', '{"page": "/home"}');

Reference Tables

For small lookup tables needed on every node:

-- Create reference table (replicated to all nodes)
CREATE TABLE countries (
    code VARCHAR(2) PRIMARY KEY,
    name VARCHAR(100)
);

SELECT create_reference_table('countries');

-- Now JOINs with distributed tables are efficient
SELECT e.*, c.name as country_name
FROM events e
JOIN countries c ON e.event_data->>'country_code' = c.code
WHERE e.tenant_id = 1;

Local Tables

For coordinator-only data:

-- Regular PostgreSQL table on coordinator only
CREATE TABLE system_settings (
    key VARCHAR(100) PRIMARY KEY,
    value TEXT
);
-- No distribution function called - stays local

Co-location for Efficient Joins

Tables distributed by the same column are co-located:

-- All tenant data co-located for efficient JOINs
CREATE TABLE tenants (
    tenant_id INT PRIMARY KEY,
    name VARCHAR(100),
    plan VARCHAR(20)
);
SELECT create_distributed_table('tenants', 'tenant_id');

CREATE TABLE users (
    user_id SERIAL,
    tenant_id INT,
    email VARCHAR(255),
    PRIMARY KEY (tenant_id, user_id)
);
SELECT create_distributed_table('users', 'tenant_id');

CREATE TABLE orders (
    order_id SERIAL,
    tenant_id INT,
    user_id INT,
    total DECIMAL(10,2),
    PRIMARY KEY (tenant_id, order_id)
);
SELECT create_distributed_table('orders', 'tenant_id');

-- This JOIN executes entirely on each worker (no network shuffle)
SELECT t.name, COUNT(o.order_id) as order_count, SUM(o.total) as revenue
FROM tenants t
JOIN orders o ON t.tenant_id = o.tenant_id
WHERE t.tenant_id = 1
GROUP BY t.name;

Query Patterns

Single-Tenant Queries (Fast)

Queries filtering by distribution column hit one shard:

-- Routes to single worker
SELECT * FROM events
WHERE tenant_id = 1
AND created_at > NOW() - INTERVAL '1 day'
ORDER BY created_at DESC
LIMIT 100;

Cross-Tenant Aggregations

Queries across all shards run in parallel:

-- Runs on all workers in parallel, aggregates on coordinator
SELECT
    DATE_TRUNC('day', created_at) as day,
    COUNT(*) as event_count
FROM events
WHERE created_at > NOW() - INTERVAL '30 days'
GROUP BY DATE_TRUNC('day', created_at)
ORDER BY day;

Real-Time Analytics

Hyperscale excels at real-time rollups:

-- Create rollup table
CREATE TABLE events_daily (
    tenant_id INT,
    day DATE,
    event_type VARCHAR(50),
    event_count BIGINT,
    PRIMARY KEY (tenant_id, day, event_type)
);
SELECT create_distributed_table('events_daily', 'tenant_id');

-- Incremental rollup (can run every minute)
INSERT INTO events_daily (tenant_id, day, event_type, event_count)
SELECT
    tenant_id,
    DATE_TRUNC('day', created_at)::DATE as day,
    event_type,
    COUNT(*) as event_count
FROM events
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY tenant_id, DATE_TRUNC('day', created_at), event_type
ON CONFLICT (tenant_id, day, event_type)
DO UPDATE SET event_count = events_daily.event_count + EXCLUDED.event_count;

Python Application Example

import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from contextlib import contextmanager

# Connection pool to coordinator
pool = ThreadedConnectionPool(
    minconn=5,
    maxconn=20,
    host="my-hyperscale-c.postgres.database.azure.com",
    port=5432,
    database="citus",
    user="citus",
    password="<password>",
    sslmode="require"
)

@contextmanager
def get_connection():
    conn = pool.getconn()
    try:
        yield conn
    finally:
        pool.putconn(conn)

def get_tenant_events(tenant_id: int, limit: int = 100):
    """Fetch events for a single tenant - hits one shard"""
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT event_id, event_type, event_data, created_at
                FROM events
                WHERE tenant_id = %s
                ORDER BY created_at DESC
                LIMIT %s
            """, (tenant_id, limit))
            return cur.fetchall()

def get_global_stats():
    """Aggregate across all tenants - parallel execution"""
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT
                    event_type,
                    COUNT(*) as count,
                    COUNT(DISTINCT tenant_id) as tenants
                FROM events
                WHERE created_at > NOW() - INTERVAL '1 day'
                GROUP BY event_type
                ORDER BY count DESC
            """)
            return cur.fetchall()

def insert_event(tenant_id: int, event_type: str, event_data: dict):
    """Insert event - routes to appropriate shard"""
    import json
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute("""
                INSERT INTO events (tenant_id, event_type, event_data)
                VALUES (%s, %s, %s)
                RETURNING event_id
            """, (tenant_id, event_type, json.dumps(event_data)))
            conn.commit()
            return cur.fetchone()[0]

Scaling Operations

Adding Workers

Scale out by adding more workers:

# Add workers to existing server group
az postgres server-group update \
    --name my-hyperscale \
    --resource-group my-rg \
    --node-count 5

# Rebalance shards across new workers
SELECT rebalance_table_shards();

Monitoring Shard Distribution

-- Check shard distribution
SELECT
    nodename,
    count(*) as shard_count,
    pg_size_pretty(sum(shard_size)) as total_size
FROM citus_shards
GROUP BY nodename;

-- Check for skewed shards
SELECT
    table_name,
    shardid,
    shard_size,
    nodename
FROM citus_shards
WHERE table_name = 'events'
ORDER BY shard_size DESC
LIMIT 10;

High Availability

Hyperscale supports HA for coordinator and workers:

# Enable HA (Preview)
az postgres server-group update \
    --name my-hyperscale \
    --resource-group my-rg \
    --coordinator-enable-public-ip-access false \
    --enable-ha true

Performance Tuning

Key parameters to tune:

-- On coordinator
ALTER SYSTEM SET citus.max_adaptive_executor_pool_size = 16;
ALTER SYSTEM SET citus.executor_slow_start_interval = 10;

-- Parallel query settings
ALTER SYSTEM SET max_parallel_workers_per_gather = 4;
ALTER SYSTEM SET parallel_tuple_cost = 0.001;

-- Memory settings (per node)
ALTER SYSTEM SET shared_buffers = '4GB';
ALTER SYSTEM SET work_mem = '256MB';

SELECT pg_reload_conf();

Use Cases

Multi-Tenant SaaS

  • Each tenant’s data on same shard
  • Tenant isolation with distribution key
  • Easy per-tenant queries

Real-Time Analytics

  • High-throughput event ingestion
  • Parallel aggregation queries
  • Time-series data with rollups

High-Volume OLTP

  • Distribute load across workers
  • Scale reads and writes independently
  • Handle millions of operations/second

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.