Back to Blog
6 min read

Real-Time Analytics with Cosmos DB Analytical Store

Cosmos DB Analytical Store enables Hybrid Transactional/Analytical Processing (HTAP) by automatically syncing your operational data to a column store optimized for analytical queries. This eliminates the need for complex ETL pipelines while providing real-time analytics.

Understanding HTAP Architecture

Traditional architecture requires ETL pipelines to move data from operational databases to analytical systems. Cosmos DB Analytical Store changes this by:

  1. Automatically syncing data to a column-oriented store
  2. No impact on transactional workload performance
  3. Near real-time sync (typically under 2 minutes)
  4. Direct querying from Azure Synapse Analytics

Enabling Analytical Store

# Create Cosmos DB account with analytical storage
az cosmosdb create \
    --name mycosmosdb-htap \
    --resource-group myResourceGroup \
    --locations regionName=eastus \
    --enable-analytical-storage true

# Create database
az cosmosdb sql database create \
    --account-name mycosmosdb-htap \
    --resource-group myResourceGroup \
    --name salesdb

# Create container with analytical store enabled
az cosmosdb sql container create \
    --account-name mycosmosdb-htap \
    --resource-group myResourceGroup \
    --database-name salesdb \
    --name orders \
    --partition-key-path "/customerId" \
    --analytical-storage-ttl -1

Querying with Azure Synapse

# Create Synapse workspace
az synapse workspace create \
    --name mysynapseworkspace \
    --resource-group myResourceGroup \
    --storage-account mystorageaccount \
    --file-system synapse-fs \
    --sql-admin-login-user sqladmin \
    --sql-admin-login-password 'ComplexPassword123!' \
    --location eastus

# Create Synapse Link for Cosmos DB (in Azure Portal or Synapse Studio)

Synapse Serverless SQL Queries

-- Query Cosmos DB analytical store directly
SELECT TOP 100
    JSON_VALUE(doc, '$.orderId') as OrderId,
    JSON_VALUE(doc, '$.customerId') as CustomerId,
    CAST(JSON_VALUE(doc, '$.orderDate') as datetime2) as OrderDate,
    CAST(JSON_VALUE(doc, '$.total') as decimal(10,2)) as Total,
    JSON_VALUE(doc, '$.status') as Status
FROM OPENROWSET(
    'CosmosDB',
    'Account=mycosmosdb-htap;Database=salesdb;Key=<your-key>',
    orders
) as doc
WHERE JSON_VALUE(doc, '$.status') = 'Completed'
ORDER BY CAST(JSON_VALUE(doc, '$.orderDate') as datetime2) DESC;

-- Aggregation query
SELECT
    JSON_VALUE(doc, '$.customerId') as CustomerId,
    COUNT(*) as OrderCount,
    SUM(CAST(JSON_VALUE(doc, '$.total') as decimal(10,2))) as TotalSpent,
    AVG(CAST(JSON_VALUE(doc, '$.total') as decimal(10,2))) as AvgOrderValue
FROM OPENROWSET(
    'CosmosDB',
    'Account=mycosmosdb-htap;Database=salesdb;Key=<your-key>',
    orders
) as doc
GROUP BY JSON_VALUE(doc, '$.customerId')
HAVING COUNT(*) > 5
ORDER BY TotalSpent DESC;

-- Time-series analysis
SELECT
    DATEPART(year, CAST(JSON_VALUE(doc, '$.orderDate') as datetime2)) as Year,
    DATEPART(month, CAST(JSON_VALUE(doc, '$.orderDate') as datetime2)) as Month,
    COUNT(*) as OrderCount,
    SUM(CAST(JSON_VALUE(doc, '$.total') as decimal(10,2))) as Revenue
FROM OPENROWSET(
    'CosmosDB',
    'Account=mycosmosdb-htap;Database=salesdb;Key=<your-key>',
    orders
) as doc
GROUP BY
    DATEPART(year, CAST(JSON_VALUE(doc, '$.orderDate') as datetime2)),
    DATEPART(month, CAST(JSON_VALUE(doc, '$.orderDate') as datetime2))
ORDER BY Year, Month;

Synapse Spark Queries

# Read from Cosmos DB analytical store using Spark
from pyspark.sql.functions import col, sum, avg, count, to_date

# Configure connection
spark.conf.set("spark.cosmos.accountEndpoint", "https://mycosmosdb-htap.documents.azure.com:443/")
spark.conf.set("spark.cosmos.accountKey", "<your-key>")
spark.conf.set("spark.cosmos.database", "salesdb")
spark.conf.set("spark.cosmos.container", "orders")

# Read from analytical store
df = spark.read \
    .format("cosmos.olap") \
    .option("spark.synapse.linkedService", "CosmosDbAnalyticalStore") \
    .option("spark.cosmos.container", "orders") \
    .load()

# Display schema
df.printSchema()

# Basic analytics
daily_sales = df \
    .withColumn("order_date", to_date(col("orderDate"))) \
    .groupBy("order_date") \
    .agg(
        count("*").alias("order_count"),
        sum("total").alias("daily_revenue"),
        avg("total").alias("avg_order_value")
    ) \
    .orderBy("order_date")

display(daily_sales)

# Customer segmentation
customer_segments = df \
    .groupBy("customerId") \
    .agg(
        count("*").alias("order_count"),
        sum("total").alias("total_spent"),
        avg("total").alias("avg_order_value")
    ) \
    .withColumn("segment",
        when(col("total_spent") > 10000, "Premium")
        .when(col("total_spent") > 1000, "Regular")
        .otherwise("New")
    )

display(customer_segments)

# Write results to Delta Lake
daily_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/analytics/daily_sales")

Real-World Example: E-Commerce Analytics

Operational Data Model

// Order.cs - Operational model
public class Order
{
    [JsonPropertyName("id")]
    public string Id { get; set; } = Guid.NewGuid().ToString();

    [JsonPropertyName("customerId")]
    public string CustomerId { get; set; }

    [JsonPropertyName("orderDate")]
    public DateTime OrderDate { get; set; }

    [JsonPropertyName("status")]
    public string Status { get; set; }

    [JsonPropertyName("items")]
    public List<OrderItem> Items { get; set; } = new();

    [JsonPropertyName("total")]
    public decimal Total { get; set; }

    [JsonPropertyName("shippingAddress")]
    public Address ShippingAddress { get; set; }

    [JsonPropertyName("paymentMethod")]
    public string PaymentMethod { get; set; }
}

public class OrderItem
{
    [JsonPropertyName("productId")]
    public string ProductId { get; set; }

    [JsonPropertyName("productName")]
    public string ProductName { get; set; }

    [JsonPropertyName("quantity")]
    public int Quantity { get; set; }

    [JsonPropertyName("unitPrice")]
    public decimal UnitPrice { get; set; }

    [JsonPropertyName("category")]
    public string Category { get; set; }
}

Analytical Queries

-- Product performance analysis
SELECT
    JSON_VALUE(item.value, '$.category') as Category,
    JSON_VALUE(item.value, '$.productName') as ProductName,
    SUM(CAST(JSON_VALUE(item.value, '$.quantity') as int)) as TotalQuantity,
    SUM(CAST(JSON_VALUE(item.value, '$.quantity') as int) *
        CAST(JSON_VALUE(item.value, '$.unitPrice') as decimal(10,2))) as Revenue
FROM OPENROWSET(
    'CosmosDB',
    'Account=mycosmosdb-htap;Database=salesdb;Key=<key>',
    orders
) as doc
CROSS APPLY OPENJSON(doc, '$.items') as item
WHERE CAST(JSON_VALUE(doc, '$.orderDate') as datetime2) > DATEADD(month, -3, GETUTCDATE())
GROUP BY
    JSON_VALUE(item.value, '$.category'),
    JSON_VALUE(item.value, '$.productName')
ORDER BY Revenue DESC;

-- Customer lifetime value
WITH CustomerOrders AS (
    SELECT
        JSON_VALUE(doc, '$.customerId') as CustomerId,
        CAST(JSON_VALUE(doc, '$.total') as decimal(10,2)) as OrderTotal,
        CAST(JSON_VALUE(doc, '$.orderDate') as datetime2) as OrderDate
    FROM OPENROWSET(
        'CosmosDB',
        'Account=mycosmosdb-htap;Database=salesdb;Key=<key>',
        orders
    ) as doc
)
SELECT
    CustomerId,
    COUNT(*) as OrderCount,
    SUM(OrderTotal) as LifetimeValue,
    MIN(OrderDate) as FirstOrder,
    MAX(OrderDate) as LastOrder,
    DATEDIFF(day, MIN(OrderDate), MAX(OrderDate)) as CustomerTenureDays
FROM CustomerOrders
GROUP BY CustomerId
ORDER BY LifetimeValue DESC;

-- Funnel analysis by payment method
SELECT
    JSON_VALUE(doc, '$.paymentMethod') as PaymentMethod,
    JSON_VALUE(doc, '$.status') as Status,
    COUNT(*) as OrderCount,
    SUM(CAST(JSON_VALUE(doc, '$.total') as decimal(10,2))) as TotalValue
FROM OPENROWSET(
    'CosmosDB',
    'Account=mycosmosdb-htap;Database=salesdb;Key=<key>',
    orders
) as doc
GROUP BY
    JSON_VALUE(doc, '$.paymentMethod'),
    JSON_VALUE(doc, '$.status')
ORDER BY PaymentMethod, Status;

Best Practices

1. Schema Design for Analytics

// Flatten nested objects for easier querying
public class AnalyticsOptimizedOrder
{
    public string Id { get; set; }
    public string CustomerId { get; set; }
    public DateTime OrderDate { get; set; }
    public decimal Total { get; set; }

    // Denormalized for analytics
    public string CustomerEmail { get; set; }
    public string CustomerSegment { get; set; }
    public string ShippingCity { get; set; }
    public string ShippingCountry { get; set; }

    // Pre-computed metrics
    public int ItemCount { get; set; }
    public decimal AvgItemPrice { get; set; }
}

2. Analytical TTL Configuration

# Set analytical TTL independent of transactional TTL
az cosmosdb sql container update \
    --account-name mycosmosdb-htap \
    --resource-group myResourceGroup \
    --database-name salesdb \
    --name orders \
    --analytical-storage-ttl 7776000  # 90 days in seconds

3. Monitoring Sync Latency

# Monitor analytical store sync in Spark
sync_status = spark.read \
    .format("cosmos.olap") \
    .option("spark.cosmos.container", "orders") \
    .option("spark.cosmos.diagnostics", "true") \
    .load()

# Check the _ts field for sync verification
from pyspark.sql.functions import max as spark_max, current_timestamp

latest_sync = sync_status.select(
    spark_max("_ts").alias("latest_document_ts"),
    current_timestamp().alias("current_time")
)

display(latest_sync)

Conclusion

Cosmos DB Analytical Store with Synapse Link provides:

  • Zero-ETL architecture for real-time analytics
  • No impact on transactional performance
  • Cost-effective analytical storage
  • Seamless integration with Synapse SQL and Spark

This HTAP capability is transformative for organizations needing real-time insights without the complexity of traditional data warehousing pipelines.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.