Back to Blog
6 min read

InfluxDB on Azure: Deploying Time-Series Infrastructure

InfluxDB is a purpose-built time-series database optimized for high write throughput and fast queries. While Azure doesn’t offer a managed InfluxDB service, you can deploy it on Azure infrastructure. Let’s explore deployment options and usage patterns.

Deployment Options

Option 1: Azure Container Instance (Simple)

For development or small workloads:

# Create storage account for persistence
az storage account create \
    --name influxstorage \
    --resource-group my-rg \
    --location eastus \
    --sku Standard_LRS

# Create file share
az storage share create \
    --name influxdata \
    --account-name influxstorage

# Get storage key
STORAGE_KEY=$(az storage account keys list \
    --account-name influxstorage \
    --resource-group my-rg \
    --query '[0].value' -o tsv)

# Deploy InfluxDB container
az container create \
    --name influxdb \
    --resource-group my-rg \
    --image influxdb:2.0 \
    --cpu 2 \
    --memory 4 \
    --ports 8086 \
    --dns-name-label my-influxdb \
    --azure-file-volume-account-name influxstorage \
    --azure-file-volume-account-key $STORAGE_KEY \
    --azure-file-volume-share-name influxdata \
    --azure-file-volume-mount-path /var/lib/influxdb2 \
    --environment-variables \
        DOCKER_INFLUXDB_INIT_MODE=setup \
        DOCKER_INFLUXDB_INIT_USERNAME=admin \
        DOCKER_INFLUXDB_INIT_PASSWORD='<strong-password>' \
        DOCKER_INFLUXDB_INIT_ORG=myorg \
        DOCKER_INFLUXDB_INIT_BUCKET=default

Option 2: Azure Kubernetes Service (Production)

For production workloads, use AKS with persistent volumes:

# influxdb-deployment.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: influxdb
spec:
  serviceName: influxdb
  replicas: 1
  selector:
    matchLabels:
      app: influxdb
  template:
    metadata:
      labels:
        app: influxdb
    spec:
      containers:
      - name: influxdb
        image: influxdb:2.0
        ports:
        - containerPort: 8086
        env:
        - name: DOCKER_INFLUXDB_INIT_MODE
          value: setup
        - name: DOCKER_INFLUXDB_INIT_USERNAME
          valueFrom:
            secretKeyRef:
              name: influxdb-secrets
              key: username
        - name: DOCKER_INFLUXDB_INIT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: influxdb-secrets
              key: password
        - name: DOCKER_INFLUXDB_INIT_ORG
          value: myorg
        - name: DOCKER_INFLUXDB_INIT_BUCKET
          value: default
        volumeMounts:
        - name: influxdb-data
          mountPath: /var/lib/influxdb2
        resources:
          requests:
            cpu: "1"
            memory: "4Gi"
          limits:
            cpu: "4"
            memory: "16Gi"
  volumeClaimTemplates:
  - metadata:
      name: influxdb-data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: managed-premium
      resources:
        requests:
          storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
  name: influxdb
spec:
  type: LoadBalancer
  ports:
  - port: 8086
    targetPort: 8086
  selector:
    app: influxdb

InfluxDB 2.0 Concepts

InfluxDB 2.0 introduces:

  • Organizations: Workspace containers
  • Buckets: Data containers with retention policies
  • Tokens: API authentication
  • Flux: Query and scripting language

Setting Up with Python

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime

# Connection settings
url = "http://my-influxdb.eastus.azurecontainer.io:8086"
token = "your-api-token"
org = "myorg"
bucket = "default"

# Create client
client = InfluxDBClient(url=url, token=token, org=org)

# Write API
write_api = client.write_api(write_options=SYNCHRONOUS)

# Query API
query_api = client.query_api()

Writing Data

Using Point Objects

from influxdb_client import Point
from datetime import datetime

def write_sensor_reading(sensor_id: str, temperature: float, humidity: float):
    point = Point("sensor_data") \
        .tag("sensor_id", sensor_id) \
        .tag("location", "datacenter-1") \
        .field("temperature", temperature) \
        .field("humidity", humidity) \
        .time(datetime.utcnow())

    write_api.write(bucket=bucket, record=point)

# Write single point
write_sensor_reading("sensor-001", 23.5, 45.2)

Batch Writing for Performance

from influxdb_client.client.write_api import WriteOptions
import random

# Configure batch writing
write_options = WriteOptions(
    batch_size=5000,
    flush_interval=1000,
    jitter_interval=500,
    retry_interval=5000,
    max_retries=5
)

batch_write_api = client.write_api(write_options=write_options)

def generate_batch_data(num_points: int):
    """Generate batch of sensor readings"""
    points = []
    base_time = datetime.utcnow()

    for i in range(num_points):
        point = Point("sensor_data") \
            .tag("sensor_id", f"sensor-{i % 100:03d}") \
            .tag("location", f"zone-{i % 10}") \
            .field("temperature", 20 + random.uniform(-5, 15)) \
            .field("humidity", 40 + random.uniform(-10, 30)) \
            .field("pressure", 1013 + random.uniform(-20, 20)) \
            .time(base_time)
        points.append(point)

    return points

# Write 100K points in batches
points = generate_batch_data(100000)
batch_write_api.write(bucket=bucket, record=points)
batch_write_api.flush()
print("Batch write complete")

Line Protocol (Fastest)

def write_line_protocol(lines: list):
    """Write using line protocol for maximum performance"""
    write_api.write(bucket=bucket, record=lines)

# Generate line protocol
lines = [
    f"sensor_data,sensor_id=sensor-001,location=zone-1 temperature=23.5,humidity=45.2 {int(datetime.utcnow().timestamp() * 1e9)}",
    f"sensor_data,sensor_id=sensor-002,location=zone-1 temperature=24.1,humidity=44.8 {int(datetime.utcnow().timestamp() * 1e9)}"
]

write_line_protocol(lines)

Querying with Flux

Flux is InfluxDB’s query language:

def query_last_hour():
    """Get data from last hour"""
    query = '''
    from(bucket: "default")
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "sensor_data")
        |> filter(fn: (r) => r._field == "temperature")
    '''

    result = query_api.query(query=query)

    for table in result:
        for record in table.records:
            print(f"{record.get_time()}: {record.get_value()}")

def query_aggregated():
    """Get aggregated statistics"""
    query = '''
    from(bucket: "default")
        |> range(start: -24h)
        |> filter(fn: (r) => r._measurement == "sensor_data")
        |> filter(fn: (r) => r._field == "temperature")
        |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
        |> yield(name: "hourly_mean")
    '''

    result = query_api.query(query=query)
    return result

def query_to_dataframe():
    """Query and return pandas DataFrame"""
    query = '''
    from(bucket: "default")
        |> range(start: -7d)
        |> filter(fn: (r) => r._measurement == "sensor_data")
        |> filter(fn: (r) => r.sensor_id == "sensor-001")
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    '''

    df = query_api.query_data_frame(query=query)
    return df

Advanced Flux Queries

# Anomaly detection
anomaly_query = '''
from(bucket: "default")
    |> range(start: -24h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> filter(fn: (r) => r._field == "temperature")
    |> movingAverage(n: 10)
    |> map(fn: (r) => ({r with
        deviation: math.abs(x: r._value - r._value_moving_average) / r._value_moving_average
    }))
    |> filter(fn: (r) => r.deviation > 0.2)
'''

# Downsampling
downsample_query = '''
from(bucket: "default")
    |> range(start: -30d)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> aggregateWindow(every: 1d, fn: mean)
    |> to(bucket: "sensor_data_daily", org: "myorg")
'''

# Join multiple measurements
join_query = '''
temperature = from(bucket: "default")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "temperature")

humidity = from(bucket: "default")
    |> range(start: -1h)
    |> filter(fn: (r) => r._field == "humidity")

join(tables: {temp: temperature, humid: humidity}, on: ["_time", "sensor_id"])
'''

Tasks for Automation

Create scheduled tasks in InfluxDB:

from influxdb_client.client.tasks_api import TasksApi

tasks_api = client.tasks_api()

# Create a downsampling task
task_flux = '''
option task = {name: "Hourly Downsample", every: 1h}

from(bucket: "default")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "sensor_data")
    |> aggregateWindow(every: 5m, fn: mean)
    |> to(bucket: "sensor_data_5m", org: "myorg")
'''

task = tasks_api.create_task_every(
    name="Hourly Downsample",
    flux=task_flux,
    every="1h",
    org_id=org
)

Retention Policies

Manage data lifecycle with bucket retention:

from influxdb_client import BucketsApi

buckets_api = client.buckets_api()

# Create bucket with 30-day retention
bucket = buckets_api.create_bucket(
    bucket_name="sensor_data_30d",
    retention_rules=[{
        "type": "expire",
        "everySeconds": 30 * 24 * 60 * 60  # 30 days
    }],
    org=org
)

# Create bucket with infinite retention
bucket_archive = buckets_api.create_bucket(
    bucket_name="sensor_data_archive",
    retention_rules=[],  # No retention = infinite
    org=org
)

Monitoring InfluxDB

Query InfluxDB’s internal metrics:

# Check write performance
write_metrics_query = '''
from(bucket: "_monitoring")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "write")
    |> filter(fn: (r) => r._field == "req_bytes")
    |> aggregateWindow(every: 5m, fn: sum)
'''

# Check query performance
query_metrics_query = '''
from(bucket: "_monitoring")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "query")
    |> filter(fn: (r) => r._field == "duration_ms")
    |> aggregateWindow(every: 5m, fn: mean)
'''

Integration with Telegraf

Use Telegraf agent for data collection:

# telegraf.conf
[global_tags]
  environment = "production"
  region = "eastus"

[agent]
  interval = "10s"
  flush_interval = "10s"

[[outputs.influxdb_v2]]
  urls = ["http://influxdb:8086"]
  token = "$INFLUX_TOKEN"
  organization = "myorg"
  bucket = "default"

[[inputs.cpu]]
  percpu = true
  totalcpu = true

[[inputs.mem]]

[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs"]

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.