Back to Blog
6 min read

Azure Cosmos DB for Apache Cassandra

Azure Cosmos DB for Apache Cassandra provides a fully managed Cassandra-compatible database service. This allows you to use existing Cassandra drivers and tools while benefiting from Azure’s global distribution and enterprise features.

Getting Started with Cassandra API

Connection Setup

using Cassandra;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;

public class CassandraConnection
{
    private readonly ICluster _cluster;
    private readonly ISession _session;

    public CassandraConnection(string contactPoint, string username, string password)
    {
        var options = new SSLOptions(SslProtocols.Tls12, true, ValidateServerCertificate);
        options.SetHostNameResolver((ipAddress) => contactPoint);

        _cluster = Cluster.Builder()
            .WithCredentials(username, password)
            .WithPort(10350)
            .AddContactPoint(contactPoint)
            .WithSSL(options)
            .WithLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy()))
            .WithReconnectionPolicy(new ExponentialReconnectionPolicy(1000, 60000))
            .Build();

        _session = _cluster.Connect();
    }

    private static bool ValidateServerCertificate(
        object sender,
        X509Certificate certificate,
        X509Chain chain,
        SslPolicyErrors sslPolicyErrors)
    {
        return sslPolicyErrors == SslPolicyErrors.None;
    }

    public ISession GetSession() => _session;

    public void Dispose()
    {
        _session?.Dispose();
        _cluster?.Dispose();
    }
}

Creating Keyspace and Tables

public class SchemaManager
{
    private readonly ISession _session;

    public SchemaManager(ISession session)
    {
        _session = session;
    }

    public void CreateKeyspace(string keyspaceName)
    {
        var cql = $@"
            CREATE KEYSPACE IF NOT EXISTS {keyspaceName}
            WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}";

        _session.Execute(cql);
        Console.WriteLine($"Keyspace '{keyspaceName}' created successfully");
    }

    public void CreateUserTable(string keyspace)
    {
        var cql = $@"
            CREATE TABLE IF NOT EXISTS {keyspace}.users (
                user_id UUID PRIMARY KEY,
                email TEXT,
                first_name TEXT,
                last_name TEXT,
                created_at TIMESTAMP,
                profile MAP<TEXT, TEXT>
            )";

        _session.Execute(cql);
        Console.WriteLine("Users table created successfully");
    }

    public void CreateTimeSeriesTable(string keyspace)
    {
        var cql = $@"
            CREATE TABLE IF NOT EXISTS {keyspace}.sensor_readings (
                device_id UUID,
                reading_time TIMESTAMP,
                temperature DOUBLE,
                humidity DOUBLE,
                pressure DOUBLE,
                PRIMARY KEY ((device_id), reading_time)
            ) WITH CLUSTERING ORDER BY (reading_time DESC)";

        _session.Execute(cql);
        Console.WriteLine("Sensor readings table created successfully");
    }

    public void CreateOrdersTable(string keyspace)
    {
        var cql = $@"
            CREATE TABLE IF NOT EXISTS {keyspace}.orders_by_customer (
                customer_id UUID,
                order_date DATE,
                order_id UUID,
                total_amount DECIMAL,
                status TEXT,
                items LIST<FROZEN<order_item>>,
                PRIMARY KEY ((customer_id), order_date, order_id)
            ) WITH CLUSTERING ORDER BY (order_date DESC, order_id ASC)";

        // First create the UDT
        var udtCql = $@"
            CREATE TYPE IF NOT EXISTS {keyspace}.order_item (
                product_id UUID,
                product_name TEXT,
                quantity INT,
                unit_price DECIMAL
            )";

        _session.Execute(udtCql);
        _session.Execute(cql);
        Console.WriteLine("Orders table created successfully");
    }
}

CRUD Operations

public class UserRepository
{
    private readonly ISession _session;
    private readonly PreparedStatement _insertUser;
    private readonly PreparedStatement _getUserById;
    private readonly PreparedStatement _updateUser;
    private readonly PreparedStatement _deleteUser;

    public UserRepository(ISession session, string keyspace)
    {
        _session = session;

        _insertUser = _session.Prepare($@"
            INSERT INTO {keyspace}.users
            (user_id, email, first_name, last_name, created_at, profile)
            VALUES (?, ?, ?, ?, ?, ?)");

        _getUserById = _session.Prepare($@"
            SELECT * FROM {keyspace}.users WHERE user_id = ?");

        _updateUser = _session.Prepare($@"
            UPDATE {keyspace}.users
            SET first_name = ?, last_name = ?, profile = ?
            WHERE user_id = ?");

        _deleteUser = _session.Prepare($@"
            DELETE FROM {keyspace}.users WHERE user_id = ?");
    }

    public async Task<Guid> CreateUserAsync(User user)
    {
        var userId = Guid.NewGuid();

        var statement = _insertUser.Bind(
            userId,
            user.Email,
            user.FirstName,
            user.LastName,
            DateTime.UtcNow,
            user.Profile);

        await _session.ExecuteAsync(statement);
        return userId;
    }

    public async Task<User> GetUserByIdAsync(Guid userId)
    {
        var statement = _getUserById.Bind(userId);
        var result = await _session.ExecuteAsync(statement);
        var row = result.FirstOrDefault();

        if (row == null) return null;

        return new User
        {
            UserId = row.GetValue<Guid>("user_id"),
            Email = row.GetValue<string>("email"),
            FirstName = row.GetValue<string>("first_name"),
            LastName = row.GetValue<string>("last_name"),
            CreatedAt = row.GetValue<DateTime>("created_at"),
            Profile = row.GetValue<IDictionary<string, string>>("profile")
        };
    }

    public async Task UpdateUserAsync(User user)
    {
        var statement = _updateUser.Bind(
            user.FirstName,
            user.LastName,
            user.Profile,
            user.UserId);

        await _session.ExecuteAsync(statement);
    }

    public async Task DeleteUserAsync(Guid userId)
    {
        var statement = _deleteUser.Bind(userId);
        await _session.ExecuteAsync(statement);
    }
}

public class User
{
    public Guid UserId { get; set; }
    public string Email { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public DateTime CreatedAt { get; set; }
    public IDictionary<string, string> Profile { get; set; }
}

Time Series Data Operations

public class SensorDataRepository
{
    private readonly ISession _session;
    private readonly string _keyspace;

    public SensorDataRepository(ISession session, string keyspace)
    {
        _session = session;
        _keyspace = keyspace;
    }

    public async Task InsertReadingAsync(SensorReading reading)
    {
        var cql = $@"
            INSERT INTO {_keyspace}.sensor_readings
            (device_id, reading_time, temperature, humidity, pressure)
            VALUES (?, ?, ?, ?, ?)";

        var statement = new SimpleStatement(cql,
            reading.DeviceId,
            reading.ReadingTime,
            reading.Temperature,
            reading.Humidity,
            reading.Pressure);

        await _session.ExecuteAsync(statement);
    }

    public async Task BatchInsertReadingsAsync(IEnumerable<SensorReading> readings)
    {
        var batch = new BatchStatement();

        foreach (var reading in readings)
        {
            var cql = $@"
                INSERT INTO {_keyspace}.sensor_readings
                (device_id, reading_time, temperature, humidity, pressure)
                VALUES (?, ?, ?, ?, ?)";

            batch.Add(new SimpleStatement(cql,
                reading.DeviceId,
                reading.ReadingTime,
                reading.Temperature,
                reading.Humidity,
                reading.Pressure));
        }

        await _session.ExecuteAsync(batch);
    }

    public async Task<List<SensorReading>> GetReadingsAsync(
        Guid deviceId,
        DateTime startTime,
        DateTime endTime,
        int limit = 1000)
    {
        var cql = $@"
            SELECT * FROM {_keyspace}.sensor_readings
            WHERE device_id = ? AND reading_time >= ? AND reading_time <= ?
            LIMIT ?";

        var statement = new SimpleStatement(cql, deviceId, startTime, endTime, limit);
        var result = await _session.ExecuteAsync(statement);

        return result.Select(row => new SensorReading
        {
            DeviceId = row.GetValue<Guid>("device_id"),
            ReadingTime = row.GetValue<DateTime>("reading_time"),
            Temperature = row.GetValue<double>("temperature"),
            Humidity = row.GetValue<double>("humidity"),
            Pressure = row.GetValue<double>("pressure")
        }).ToList();
    }
}

public class SensorReading
{
    public Guid DeviceId { get; set; }
    public DateTime ReadingTime { get; set; }
    public double Temperature { get; set; }
    public double Humidity { get; set; }
    public double Pressure { get; set; }
}

Python Client Example

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from ssl import SSLContext, PROTOCOL_TLS_CLIENT, CERT_REQUIRED
import uuid
from datetime import datetime

class CosmosDBCassandra:
    def __init__(self, contact_point, username, password):
        ssl_context = SSLContext(PROTOCOL_TLS_CLIENT)
        ssl_context.verify_mode = CERT_REQUIRED
        ssl_context.load_default_certs()

        auth_provider = PlainTextAuthProvider(username, password)

        self.cluster = Cluster(
            [contact_point],
            port=10350,
            auth_provider=auth_provider,
            ssl_context=ssl_context
        )
        self.session = self.cluster.connect()

    def create_keyspace(self, keyspace_name):
        self.session.execute(f"""
            CREATE KEYSPACE IF NOT EXISTS {keyspace_name}
            WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
        """)
        self.session.set_keyspace(keyspace_name)

    def insert_user(self, email, first_name, last_name):
        user_id = uuid.uuid4()
        self.session.execute("""
            INSERT INTO users (user_id, email, first_name, last_name, created_at)
            VALUES (%s, %s, %s, %s, %s)
        """, (user_id, email, first_name, last_name, datetime.utcnow()))
        return user_id

    def get_user(self, user_id):
        result = self.session.execute(
            "SELECT * FROM users WHERE user_id = %s", (user_id,))
        return result.one()

    def close(self):
        self.cluster.shutdown()


# Usage
if __name__ == '__main__':
    db = CosmosDBCassandra(
        'myaccount.cassandra.cosmos.azure.com',
        'myaccount',
        'mypassword'
    )

    db.create_keyspace('myapp')
    user_id = db.insert_user('john@example.com', 'John', 'Doe')
    user = db.get_user(user_id)
    print(f"User: {user.first_name} {user.last_name}")

    db.close()

Key Features

  1. Wire protocol compatibility - Use existing Cassandra drivers
  2. Global distribution - Multi-region writes and reads
  3. Automatic indexing - Secondary indexes for flexible queries
  4. Tunable consistency - Five consistency levels available
  5. Serverless option - Pay only for what you use

Azure Cosmos DB for Apache Cassandra bridges the gap between familiar Cassandra development and cloud-native scalability.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.