Skip to content
Back to Blog
2 min read

Azure Cosmos DB API for MongoDB: Patterns and Best Practices

I wrote “Azure Cosmos DB API for MongoDB: Patterns and Best Practices” to share practical, production-minded guidance on this topic.

Understanding Cosmos DB’s MongoDB API

The MongoDB API for Cosmos DB lets you use familiar MongoDB drivers and tools while benefiting from:

  • Global distribution with multi-region writes
  • Automatic indexing
  • Elastic scale with RU-based throughput
  • 99.999% availability SLA
  • Comprehensive compliance certifications

Connecting to Cosmos DB with MongoDB Protocol

// Node.js connection using the native MongoDB driver
const { MongoClient } = require('mongodb');

const connectionString = process.env.COSMOS_DB_CONNECTION_STRING;
// Format: mongodb://<account>:<key>@<account>.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<account>@

async function connectToCosmosDB() {
    const client = new MongoClient(connectionString, {
        useNewUrlParser: true,
        useUnifiedTopology: true,
        retryWrites: false
    });

    try {
        await client.connect();
        console.log('Connected to Azure Cosmos DB (MongoDB API)');

        const database = client.db('ecommerce');
        const collection = database.collection('products');

        // Test the connection
        const pingResult = await database.command({ ping: 1 });
        console.log('Ping result:', pingResult);

        return { client, database, collection };
    } catch (error) {
        console.error('Connection error:', error);
        throw error;
    }
}

CRUD Operations

// Create documents
async function createProduct(collection, product) {
    const result = await collection.insertOne({
        name: product.name,
        category: product.category,
        price: product.price,
        inventory: product.inventory,
        tags: product.tags,
        createdAt: new Date(),
        updatedAt: new Date()
    });

    console.log(`Product created with ID: ${result.insertedId}`);
    return result;
}

// Bulk insert - use ordered: false for better performance
async function bulkInsertProducts(collection, products) {
    const operations = products.map(product => ({
        insertOne: {
            document: {
                ...product,
                createdAt: new Date(),
                updatedAt: new Date()
            }
        }
    }));

    const result = await collection.bulkWrite(operations, { ordered: false });
    console.log(`Inserted ${result.insertedCount} products`);
    return result;
}

// Read with aggregation pipeline
async function getProductsByCategory(collection, category) {
    const pipeline = [
        { $match: { category: category } },
        { $sort: { price: 1 } },
        {
            $project: {
                name: 1,
                price: 1,
                inventory: 1,
                priceRange: {
                    $switch: {
                        branches: [
                            { case: { $lt: ['$price', 50] }, then: 'budget' },
                            { case: { $lt: ['$price', 200] }, then: 'mid-range' }
                        ],
                        default: 'premium'
                    }
                }
            }
        }
    ];

    return await collection.aggregate(pipeline).toArray();
}

// Update with operators
async function updateProductInventory(collection, productId, quantityChange) {
    const result = await collection.updateOne(
        { _id: productId },
        {
            $inc: { inventory: quantityChange },
            $set: { updatedAt: new Date() },
            $push: {
                inventoryHistory: {
                    change: quantityChange,
                    timestamp: new Date()
                }
            }
        }
    );

    return result;
}

// Delete with conditions
async function deleteDiscontinuedProducts(collection) {
    const result = await collection.deleteMany({
        $and: [
            { inventory: 0 },
            { discontinued: true }
        ]
    });

    console.log(`Deleted ${result.deletedCount} discontinued products`);
    return result;
}

Advanced Queries

// Text search (requires text index)
async function setupTextSearch(collection) {
    await collection.createIndex(
        { name: 'text', description: 'text', tags: 'text' },
        { weights: { name: 10, description: 5, tags: 1 } }
    );
}

async function searchProducts(collection, searchTerm) {
    return await collection.find(
        { $text: { $search: searchTerm } },
        { score: { $meta: 'textScore' } }
    )
    .sort({ score: { $meta: 'textScore' } })
    .limit(20)
    .toArray();
}

// Geospatial queries (requires 2dsphere index)
async function setupGeoIndex(collection) {
    await collection.createIndex({ location: '2dsphere' });
}

async function findNearbyStores(collection, longitude, latitude, maxDistanceMeters) {
    return await collection.find({
        location: {
            $near: {
                $geometry: {
                    type: 'Point',
                    coordinates: [longitude, latitude]
                },
                $maxDistance: maxDistanceMeters
            }
        }
    }).toArray();
}

// Complex aggregation for analytics
async function getSalesAnalytics(ordersCollection) {
    const pipeline = [
        {
            $match: {
                orderDate: {
                    $gte: new Date('2022-01-01'),
                    $lt: new Date('2023-01-01')
                }
            }
        },
        {
            $unwind: '$items'
        },
        {
            $group: {
                _id: {
                    month: { $month: '$orderDate' },
                    category: '$items.category'
                },
                totalSales: { $sum: { $multiply: ['$items.price', '$items.quantity'] } },
                orderCount: { $sum: 1 },
                avgOrderValue: { $avg: { $multiply: ['$items.price', '$items.quantity'] } }
            }
        },
        {
            $sort: { '_id.month': 1, totalSales: -1 }
        },
        {
            $group: {
                _id: '$_id.month',
                categories: {
                    $push: {
                        category: '$_id.category',
                        totalSales: '$totalSales',
                        orderCount: '$orderCount',
                        avgOrderValue: '$avgOrderValue'
                    }
                },
                monthlyTotal: { $sum: '$totalSales' }
            }
        }
    ];

    return await ordersCollection.aggregate(pipeline).toArray();
}

Python Integration

from pymongo import MongoClient
from datetime import datetime
import os

connection_string = os.environ.get('COSMOS_DB_CONNECTION_STRING')

class ProductRepository:
    def __init__(self):
        self.client = MongoClient(connection_string)
        self.db = self.client['ecommerce']
        self.products = self.db['products']

    def create_product(self, product_data):
        product_data['created_at'] = datetime.utcnow()
        product_data['updated_at'] = datetime.utcnow()
        result = self.products.insert_one(product_data)
        return str(result.inserted_id)

    def find_products(self, filters=None, sort_by=None, limit=100):
        cursor = self.products.find(filters or {})

        if sort_by:
            cursor = cursor.sort(sort_by)

        return list(cursor.limit(limit))

    def aggregate_by_category(self):
        pipeline = [
            {
                '$group': {
                    '_id': '$category',
                    'count': {'$sum': 1},
                    'avg_price': {'$avg': '$price'},
                    'total_inventory': {'$sum': '$inventory'}
                }
            },
            {'$sort': {'count': -1}}
        ]
        return list(self.products.aggregate(pipeline))

    def close(self):
        self.client.close()


# Usage
if __name__ == '__main__':
    repo = ProductRepository()

    # Create a product
    product_id = repo.create_product({
        'name': 'Wireless Mouse',
        'category': 'Electronics',
        'price': 29.99,
        'inventory': 150
    })
    print(f'Created product: {product_id}')

    # Get category stats
    stats = repo.aggregate_by_category()
    for stat in stats:
        print(f"Category: {stat['_id']}, Count: {stat['count']}, Avg Price: ${stat['avg_price']:.2f}")

    repo.close()

Request Units (RU) Considerations

Unlike native MongoDB, Cosmos DB uses Request Units (RU) for throughput:

// Check RU consumption for operations
async function measureOperationCost(collection, operation) {
    const result = await operation();

    // Access the command result to see RU charge
    const lastOp = await collection.s.db.command({ getLastRequestStatistics: 1 });
    console.log(`Operation consumed ${lastOp.RequestCharge} RUs`);

    return result;
}

// Optimize queries to reduce RU consumption
async function efficientQuery(collection, category) {
    // Use projection to return only needed fields
    return await collection.find(
        { category: category },
        {
            projection: { name: 1, price: 1, _id: 0 }
        }
    )
    .limit(50)  // Always limit results
    .toArray();
}

Indexing Best Practices

// Cosmos DB automatically indexes all fields, but custom indexes improve performance
async function setupOptimalIndexes(collection) {
    // Compound index for common query patterns
    await collection.createIndex(
        { category: 1, price: -1 },
        { name: 'category_price_idx' }
    );

    // Unique index for business keys
    await collection.createIndex(
        { sku: 1 },
        { unique: true, name: 'sku_unique_idx' }
    );

    // TTL index for automatic document expiration
    await collection.createIndex(
        { expiresAt: 1 },
        { expireAfterSeconds: 0, name: 'ttl_idx' }
    );
}

Benefits of Cosmos DB MongoDB API

  1. Familiar MongoDB experience - Use existing drivers, tools, and code
  2. Global distribution - Multi-region reads and writes with automatic failover
  3. Elastic scale - Adjust throughput without downtime
  4. Comprehensive SLAs - 99.999% availability guarantee
  5. Azure integration - Private endpoints, AAD authentication, Azure Monitor

When to Choose MongoDB API

The MongoDB API is ideal when:

  • Migrating existing MongoDB applications to Azure
  • Team has strong MongoDB expertise
  • Using MongoDB-specific features (aggregation pipeline, geospatial queries)
  • Need global distribution with MongoDB compatibility

Azure Cosmos DB’s MongoDB API provides a powerful option for teams wanting MongoDB compatibility with Azure’s enterprise-grade platform.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.