Back to Blog
5 min read

Edge AI Improvements: Deploying Intelligence at the Data Source

Edge AI brings machine learning to where data is generated. For data platforms, this means preprocessing, filtering, and enriching data before it reaches the cloud. Let’s explore the latest improvements in edge AI deployment.

Edge AI Architecture

Data Sources           Edge Layer              Cloud Layer
    │                      │                       │
    ▼                      ▼                       ▼
┌─────────┐          ┌──────────┐           ┌──────────┐
│ Sensors │─────────►│ Edge AI  │──────────►│ Cloud AI │
│ Cameras │          │ - Filter │           │ - Train  │
│ IoT     │          │ - Enrich │           │ - Store  │
│ Devices │          │ - Detect │           │ - Analyze│
└─────────┘          └──────────┘           └──────────┘

                     ┌─────┴─────┐
                     │ Local     │
                     │ Storage   │
                     │ & Action  │
                     └───────────┘

Azure IoT Edge with AI

# Azure IoT Edge module with AI inference

import asyncio
import json
from azure.iot.device.aio import IoTHubModuleClient
import onnxruntime as ort
import numpy as np

class EdgeAIModule:
    def __init__(self):
        self.client = None
        self.model = None

    async def initialize(self):
        """Initialize IoT Edge module and load AI model."""
        self.client = IoTHubModuleClient.create_from_edge_environment()
        await self.client.connect()

        # Load ONNX model
        self.model = ort.InferenceSession(
            "model.onnx",
            providers=['CPUExecutionProvider']  # Or GPU if available
        )

        # Set up message handler
        self.client.on_message_received = self.handle_message

    async def handle_message(self, message):
        """Process incoming sensor data with AI."""
        data = json.loads(message.data.decode())

        # Prepare input for model
        features = np.array([
            data['temperature'],
            data['pressure'],
            data['vibration']
        ]).astype(np.float32).reshape(1, -1)

        # Run inference
        result = self.model.run(None, {"input": features})
        prediction = result[0][0]

        # Decision: send to cloud or handle locally
        if prediction > 0.8:  # Anomaly detected
            # Send alert to cloud
            alert = {
                "device_id": data['device_id'],
                "timestamp": data['timestamp'],
                "anomaly_score": float(prediction),
                "raw_data": data
            }
            await self.client.send_message_to_output(
                json.dumps(alert),
                "anomaly_output"
            )
        else:
            # Store locally, batch send later
            await self.store_locally(data, prediction)

    async def store_locally(self, data, score):
        """Store normal readings locally for batch upload."""
        # Append to local storage
        with open("/data/readings.jsonl", "a") as f:
            f.write(json.dumps({**data, "score": float(score)}) + "\n")

async def main():
    module = EdgeAIModule()
    await module.initialize()
    await asyncio.Event().wait()  # Run forever

if __name__ == "__main__":
    asyncio.run(main())

Model Optimization for Edge

import onnx
from onnxruntime.quantization import quantize_dynamic, QuantType
from onnxruntime.transformers import optimizer

class EdgeModelOptimizer:
    def __init__(self, model_path: str):
        self.model_path = model_path

    def quantize(self, output_path: str):
        """Quantize model for smaller size and faster inference."""
        quantize_dynamic(
            model_input=self.model_path,
            model_output=output_path,
            weight_type=QuantType.QInt8
        )
        print(f"Quantized model saved to {output_path}")

        # Size comparison
        import os
        original_size = os.path.getsize(self.model_path)
        quantized_size = os.path.getsize(output_path)
        print(f"Size reduction: {original_size/1024:.1f}KB -> {quantized_size/1024:.1f}KB")
        print(f"Compression: {quantized_size/original_size*100:.1f}%")

    def optimize_for_edge(self, output_path: str, target: str = "cpu"):
        """Optimize model graph for edge deployment."""
        model = onnx.load(self.model_path)

        # Graph optimizations
        optimized = optimizer.optimize_model(
            self.model_path,
            model_type='bert' if 'bert' in self.model_path else 'gpt2',
            num_heads=0,
            hidden_size=0,
            optimization_options=None,
            opt_level=99  # Maximum optimization
        )

        optimized.save_model_to_file(output_path)

    def benchmark(self, test_input: np.ndarray, iterations: int = 100):
        """Benchmark model inference speed."""
        import time

        session = ort.InferenceSession(self.model_path)

        # Warm up
        for _ in range(10):
            session.run(None, {"input": test_input})

        # Benchmark
        start = time.time()
        for _ in range(iterations):
            session.run(None, {"input": test_input})
        elapsed = time.time() - start

        print(f"Average inference time: {elapsed/iterations*1000:.2f}ms")
        print(f"Throughput: {iterations/elapsed:.1f} inferences/second")

Edge AI with Azure Arc

# Kubernetes manifest for Edge AI deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: edge-ai-inference
  namespace: edge-ai
spec:
  replicas: 2
  selector:
    matchLabels:
      app: edge-ai
  template:
    metadata:
      labels:
        app: edge-ai
    spec:
      containers:
      - name: inference
        image: myacr.azurecr.io/edge-ai:latest
        resources:
          limits:
            memory: "512Mi"
            cpu: "500m"
            # Request GPU if available
            nvidia.com/gpu: 1
        volumeMounts:
        - name: model-storage
          mountPath: /models
        env:
        - name: MODEL_PATH
          value: /models/optimized_model.onnx
        - name: BATCH_SIZE
          value: "32"
      volumes:
      - name: model-storage
        persistentVolumeClaim:
          claimName: model-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: edge-ai-service
spec:
  selector:
    app: edge-ai
  ports:
  - port: 8080
    targetPort: 8080
  type: ClusterIP

Federated Learning at the Edge

import flwr as fl
import torch

class EdgeFederatedClient(fl.client.NumPyClient):
    """Federated learning client for edge devices."""

    def __init__(self, model, train_data, device_id):
        self.model = model
        self.train_data = train_data
        self.device_id = device_id

    def get_parameters(self, config):
        """Return model parameters."""
        return [val.cpu().numpy() for val in self.model.parameters()]

    def set_parameters(self, parameters):
        """Set model parameters from server."""
        params_dict = zip(self.model.parameters(), parameters)
        for param, new_param in params_dict:
            param.data = torch.tensor(new_param)

    def fit(self, parameters, config):
        """Train model on local data."""
        self.set_parameters(parameters)

        # Local training
        self.model.train()
        optimizer = torch.optim.SGD(self.model.parameters(), lr=0.01)

        for epoch in range(config.get("local_epochs", 1)):
            for batch in self.train_data:
                optimizer.zero_grad()
                loss = self.model.training_step(batch)
                loss.backward()
                optimizer.step()

        return self.get_parameters(config), len(self.train_data), {}

    def evaluate(self, parameters, config):
        """Evaluate model on local test data."""
        self.set_parameters(parameters)
        loss, accuracy = self.model.evaluate(self.test_data)
        return float(loss), len(self.test_data), {"accuracy": float(accuracy)}

# Start federated client
def start_edge_client(server_address: str):
    model = create_model()
    train_data = load_local_data()

    client = EdgeFederatedClient(model, train_data, device_id="edge-001")

    fl.client.start_numpy_client(
        server_address=server_address,
        client=client
    )

Real-Time Edge Inference

import asyncio
from collections import deque
import numpy as np

class RealTimeEdgeInference:
    def __init__(self, model_path: str, batch_size: int = 16):
        self.model = ort.InferenceSession(model_path)
        self.batch_size = batch_size
        self.queue = deque(maxlen=1000)
        self.results = {}

    async def ingest(self, data_id: str, features: np.ndarray):
        """Ingest data for batch processing."""
        self.queue.append((data_id, features))

        # Process when batch is ready
        if len(self.queue) >= self.batch_size:
            await self._process_batch()

    async def _process_batch(self):
        """Process a batch of inputs."""
        batch_ids = []
        batch_features = []

        for _ in range(min(self.batch_size, len(self.queue))):
            data_id, features = self.queue.popleft()
            batch_ids.append(data_id)
            batch_features.append(features)

        # Batch inference
        batch_input = np.vstack(batch_features).astype(np.float32)
        predictions = self.model.run(None, {"input": batch_input})[0]

        # Store results
        for data_id, pred in zip(batch_ids, predictions):
            self.results[data_id] = pred

    async def get_result(self, data_id: str, timeout: float = 1.0) -> np.ndarray:
        """Get inference result for a specific input."""
        start = asyncio.get_event_loop().time()

        while data_id not in self.results:
            if asyncio.get_event_loop().time() - start > timeout:
                raise TimeoutError(f"Result not ready for {data_id}")
            await asyncio.sleep(0.01)

        return self.results.pop(data_id)

Best Practices

  1. Optimize models: Quantize and prune for edge deployment
  2. Handle offline: Design for intermittent connectivity
  3. Local-first: Process locally, sync to cloud periodically
  4. Monitor health: Track edge device performance
  5. Update safely: Implement safe model update mechanisms
  6. Security: Encrypt models and secure inference endpoints

Edge AI reduces latency, bandwidth, and cloud costs while improving privacy. Design your data architecture to leverage edge intelligence where it makes sense.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.