6 min read
InfluxDB on Azure: Deploying Time-Series Infrastructure
InfluxDB is a purpose-built time-series database optimized for high write throughput and fast queries. While Azure doesn’t offer a managed InfluxDB service, you can deploy it on Azure infrastructure. Let’s explore deployment options and usage patterns.
Deployment Options
Option 1: Azure Container Instance (Simple)
For development or small workloads:
# Create storage account for persistence
az storage account create \
--name influxstorage \
--resource-group my-rg \
--location eastus \
--sku Standard_LRS
# Create file share
az storage share create \
--name influxdata \
--account-name influxstorage
# Get storage key
STORAGE_KEY=$(az storage account keys list \
--account-name influxstorage \
--resource-group my-rg \
--query '[0].value' -o tsv)
# Deploy InfluxDB container
az container create \
--name influxdb \
--resource-group my-rg \
--image influxdb:2.0 \
--cpu 2 \
--memory 4 \
--ports 8086 \
--dns-name-label my-influxdb \
--azure-file-volume-account-name influxstorage \
--azure-file-volume-account-key $STORAGE_KEY \
--azure-file-volume-share-name influxdata \
--azure-file-volume-mount-path /var/lib/influxdb2 \
--environment-variables \
DOCKER_INFLUXDB_INIT_MODE=setup \
DOCKER_INFLUXDB_INIT_USERNAME=admin \
DOCKER_INFLUXDB_INIT_PASSWORD='<strong-password>' \
DOCKER_INFLUXDB_INIT_ORG=myorg \
DOCKER_INFLUXDB_INIT_BUCKET=default
Option 2: Azure Kubernetes Service (Production)
For production workloads, use AKS with persistent volumes:
# influxdb-deployment.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: influxdb
spec:
serviceName: influxdb
replicas: 1
selector:
matchLabels:
app: influxdb
template:
metadata:
labels:
app: influxdb
spec:
containers:
- name: influxdb
image: influxdb:2.0
ports:
- containerPort: 8086
env:
- name: DOCKER_INFLUXDB_INIT_MODE
value: setup
- name: DOCKER_INFLUXDB_INIT_USERNAME
valueFrom:
secretKeyRef:
name: influxdb-secrets
key: username
- name: DOCKER_INFLUXDB_INIT_PASSWORD
valueFrom:
secretKeyRef:
name: influxdb-secrets
key: password
- name: DOCKER_INFLUXDB_INIT_ORG
value: myorg
- name: DOCKER_INFLUXDB_INIT_BUCKET
value: default
volumeMounts:
- name: influxdb-data
mountPath: /var/lib/influxdb2
resources:
requests:
cpu: "1"
memory: "4Gi"
limits:
cpu: "4"
memory: "16Gi"
volumeClaimTemplates:
- metadata:
name: influxdb-data
spec:
accessModes: ["ReadWriteOnce"]
storageClassName: managed-premium
resources:
requests:
storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
name: influxdb
spec:
type: LoadBalancer
ports:
- port: 8086
targetPort: 8086
selector:
app: influxdb
InfluxDB 2.0 Concepts
InfluxDB 2.0 introduces:
- Organizations: Workspace containers
- Buckets: Data containers with retention policies
- Tokens: API authentication
- Flux: Query and scripting language
Setting Up with Python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
# Connection settings
url = "http://my-influxdb.eastus.azurecontainer.io:8086"
token = "your-api-token"
org = "myorg"
bucket = "default"
# Create client
client = InfluxDBClient(url=url, token=token, org=org)
# Write API
write_api = client.write_api(write_options=SYNCHRONOUS)
# Query API
query_api = client.query_api()
Writing Data
Using Point Objects
from influxdb_client import Point
from datetime import datetime
def write_sensor_reading(sensor_id: str, temperature: float, humidity: float):
point = Point("sensor_data") \
.tag("sensor_id", sensor_id) \
.tag("location", "datacenter-1") \
.field("temperature", temperature) \
.field("humidity", humidity) \
.time(datetime.utcnow())
write_api.write(bucket=bucket, record=point)
# Write single point
write_sensor_reading("sensor-001", 23.5, 45.2)
Batch Writing for Performance
from influxdb_client.client.write_api import WriteOptions
import random
# Configure batch writing
write_options = WriteOptions(
batch_size=5000,
flush_interval=1000,
jitter_interval=500,
retry_interval=5000,
max_retries=5
)
batch_write_api = client.write_api(write_options=write_options)
def generate_batch_data(num_points: int):
"""Generate batch of sensor readings"""
points = []
base_time = datetime.utcnow()
for i in range(num_points):
point = Point("sensor_data") \
.tag("sensor_id", f"sensor-{i % 100:03d}") \
.tag("location", f"zone-{i % 10}") \
.field("temperature", 20 + random.uniform(-5, 15)) \
.field("humidity", 40 + random.uniform(-10, 30)) \
.field("pressure", 1013 + random.uniform(-20, 20)) \
.time(base_time)
points.append(point)
return points
# Write 100K points in batches
points = generate_batch_data(100000)
batch_write_api.write(bucket=bucket, record=points)
batch_write_api.flush()
print("Batch write complete")
Line Protocol (Fastest)
def write_line_protocol(lines: list):
"""Write using line protocol for maximum performance"""
write_api.write(bucket=bucket, record=lines)
# Generate line protocol
lines = [
f"sensor_data,sensor_id=sensor-001,location=zone-1 temperature=23.5,humidity=45.2 {int(datetime.utcnow().timestamp() * 1e9)}",
f"sensor_data,sensor_id=sensor-002,location=zone-1 temperature=24.1,humidity=44.8 {int(datetime.utcnow().timestamp() * 1e9)}"
]
write_line_protocol(lines)
Querying with Flux
Flux is InfluxDB’s query language:
def query_last_hour():
"""Get data from last hour"""
query = '''
from(bucket: "default")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
'''
result = query_api.query(query=query)
for table in result:
for record in table.records:
print(f"{record.get_time()}: {record.get_value()}")
def query_aggregated():
"""Get aggregated statistics"""
query = '''
from(bucket: "default")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> yield(name: "hourly_mean")
'''
result = query_api.query(query=query)
return result
def query_to_dataframe():
"""Query and return pandas DataFrame"""
query = '''
from(bucket: "default")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.sensor_id == "sensor-001")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
df = query_api.query_data_frame(query=query)
return df
Advanced Flux Queries
# Anomaly detection
anomaly_query = '''
from(bucket: "default")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r._field == "temperature")
|> movingAverage(n: 10)
|> map(fn: (r) => ({r with
deviation: math.abs(x: r._value - r._value_moving_average) / r._value_moving_average
}))
|> filter(fn: (r) => r.deviation > 0.2)
'''
# Downsampling
downsample_query = '''
from(bucket: "default")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> aggregateWindow(every: 1d, fn: mean)
|> to(bucket: "sensor_data_daily", org: "myorg")
'''
# Join multiple measurements
join_query = '''
temperature = from(bucket: "default")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "temperature")
humidity = from(bucket: "default")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "humidity")
join(tables: {temp: temperature, humid: humidity}, on: ["_time", "sensor_id"])
'''
Tasks for Automation
Create scheduled tasks in InfluxDB:
from influxdb_client.client.tasks_api import TasksApi
tasks_api = client.tasks_api()
# Create a downsampling task
task_flux = '''
option task = {name: "Hourly Downsample", every: 1h}
from(bucket: "default")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "sensor_data_5m", org: "myorg")
'''
task = tasks_api.create_task_every(
name="Hourly Downsample",
flux=task_flux,
every="1h",
org_id=org
)
Retention Policies
Manage data lifecycle with bucket retention:
from influxdb_client import BucketsApi
buckets_api = client.buckets_api()
# Create bucket with 30-day retention
bucket = buckets_api.create_bucket(
bucket_name="sensor_data_30d",
retention_rules=[{
"type": "expire",
"everySeconds": 30 * 24 * 60 * 60 # 30 days
}],
org=org
)
# Create bucket with infinite retention
bucket_archive = buckets_api.create_bucket(
bucket_name="sensor_data_archive",
retention_rules=[], # No retention = infinite
org=org
)
Monitoring InfluxDB
Query InfluxDB’s internal metrics:
# Check write performance
write_metrics_query = '''
from(bucket: "_monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "write")
|> filter(fn: (r) => r._field == "req_bytes")
|> aggregateWindow(every: 5m, fn: sum)
'''
# Check query performance
query_metrics_query = '''
from(bucket: "_monitoring")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "query")
|> filter(fn: (r) => r._field == "duration_ms")
|> aggregateWindow(every: 5m, fn: mean)
'''
Integration with Telegraf
Use Telegraf agent for data collection:
# telegraf.conf
[global_tags]
environment = "production"
region = "eastus"
[agent]
interval = "10s"
flush_interval = "10s"
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "$INFLUX_TOKEN"
organization = "myorg"
bucket = "default"
[[inputs.cpu]]
percpu = true
totalcpu = true
[[inputs.mem]]
[[inputs.disk]]
ignore_fs = ["tmpfs", "devtmpfs"]