Back to Blog
5 min read

Custom Model Deployment on Azure: From Fine-Tuning to Production

Deploying custom models to production requires careful consideration of scaling, monitoring, and cost management. Let’s explore the complete workflow from fine-tuned model to production deployment.

Deployment Options Overview

┌─────────────────────────────────────────────────────────────┐
│                    Deployment Options                        │
├─────────────────┬─────────────────┬─────────────────────────┤
│   Serverless    │    Managed      │      Container          │
├─────────────────┼─────────────────┼─────────────────────────┤
│ Pay-per-token   │ Pay-per-hour    │ Full control            │
│ Auto-scaling    │ Manual scaling  │ Custom scaling          │
│ Zero management │ Some management │ Full management         │
│ Limited config  │ More options    │ Complete flexibility    │
└─────────────────┴─────────────────┴─────────────────────────┘

Serverless Deployment

Best for: Variable workloads, quick start, minimal ops overhead

from azure.ai.foundry import AIFoundryClient
from azure.ai.foundry.deployments import ServerlessDeployment

client = AIFoundryClient(...)

# Deploy fine-tuned model serverless
deployment = client.deployments.create_serverless(
    ServerlessDeployment(
        name="my-custom-model-serverless",
        model=fine_tuned_model_name,
        rate_limits={
            "requests_per_minute": 100,
            "tokens_per_minute": 50000
        }
    )
)

print(f"Endpoint: {deployment.endpoint}")
print(f"API Key: {deployment.api_key}")

Managed Compute Deployment

Best for: Predictable workloads, SLA requirements, cost optimization

from azure.ai.foundry.deployments import ManagedDeployment, ScaleSettings

# Deploy with managed compute
deployment = client.deployments.create_managed(
    ManagedDeployment(
        name="my-custom-model-managed",
        model=fine_tuned_model_name,
        compute={
            "sku": "Standard_NC24ads_A100_v4",
            "instance_count": 2
        },
        scale_settings=ScaleSettings(
            min_instances=1,
            max_instances=5,
            scale_type="manual"  # or "target_utilization"
        ),
        request_settings={
            "max_concurrent_requests": 10,
            "request_timeout_ms": 60000
        }
    )
)

# Wait for deployment
deployment.wait_for_completion()
print(f"Status: {deployment.status}")

Container Deployment with AKS

Best for: Multi-model serving, custom inference logic, hybrid scenarios

# First, export the model
export_job = client.models.export(
    model_name=fine_tuned_model_name,
    format="onnx",  # or "pytorch", "safetensors"
    output_path="azureml://datastores/models/paths/my-model"
)

export_job.wait_for_completion()
# Dockerfile for custom inference server
FROM mcr.microsoft.com/azureml/inference-base:latest

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY model/ /app/model/
COPY inference.py /app/

EXPOSE 8080

CMD ["python", "/app/inference.py"]
# inference.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

app = FastAPI()

# Load model once at startup
model = AutoModelForCausalLM.from_pretrained("/app/model")
tokenizer = AutoTokenizer.from_pretrained("/app/model")

class ChatRequest(BaseModel):
    messages: List[dict]
    max_tokens: int = 500
    temperature: float = 0.7

class ChatResponse(BaseModel):
    content: str
    usage: dict

@app.post("/v1/chat/completions")
async def chat_completion(request: ChatRequest) -> ChatResponse:
    # Format messages
    prompt = tokenizer.apply_chat_template(
        request.messages,
        tokenize=False
    )

    # Tokenize
    inputs = tokenizer(prompt, return_tensors="pt")

    # Generate
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=request.max_tokens,
            temperature=request.temperature,
            do_sample=True
        )

    # Decode
    response_text = tokenizer.decode(
        outputs[0][inputs.input_ids.shape[1]:],
        skip_special_tokens=True
    )

    return ChatResponse(
        content=response_text,
        usage={
            "prompt_tokens": inputs.input_ids.shape[1],
            "completion_tokens": outputs.shape[1] - inputs.input_ids.shape[1],
            "total_tokens": outputs.shape[1]
        }
    )

@app.get("/health")
async def health():
    return {"status": "healthy"}
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: custom-model-inference
spec:
  replicas: 3
  selector:
    matchLabels:
      app: custom-model
  template:
    metadata:
      labels:
        app: custom-model
    spec:
      containers:
      - name: inference
        image: myregistry.azurecr.io/custom-model:v1
        resources:
          limits:
            nvidia.com/gpu: 1
            memory: "32Gi"
          requests:
            nvidia.com/gpu: 1
            memory: "16Gi"
        ports:
        - containerPort: 8080
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: custom-model-service
spec:
  selector:
    app: custom-model
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer

Blue-Green Deployments

from azure.ai.foundry.deployments import TrafficSplit

class BlueGreenDeploymentManager:
    """Manage blue-green deployments for custom models."""

    def __init__(self, client: AIFoundryClient):
        self.client = client

    async def deploy_new_version(
        self,
        endpoint_name: str,
        new_model: str,
        initial_traffic_percent: int = 10
    ):
        """Deploy new model version with traffic splitting."""

        # Get current deployment (blue)
        endpoint = self.client.endpoints.get(endpoint_name)
        blue_deployment = endpoint.deployments[0]

        # Create new deployment (green)
        green_deployment = await self.client.deployments.create_managed(
            ManagedDeployment(
                name=f"{endpoint_name}-green",
                model=new_model,
                compute=blue_deployment.compute
            )
        )

        # Split traffic
        await self.client.endpoints.update_traffic(
            endpoint_name=endpoint_name,
            traffic=TrafficSplit({
                blue_deployment.name: 100 - initial_traffic_percent,
                green_deployment.name: initial_traffic_percent
            })
        )

        return green_deployment

    async def promote_green(self, endpoint_name: str):
        """Promote green to 100% traffic."""

        endpoint = self.client.endpoints.get(endpoint_name)
        green_deployment = next(
            d for d in endpoint.deployments
            if d.name.endswith("-green")
        )

        await self.client.endpoints.update_traffic(
            endpoint_name=endpoint_name,
            traffic=TrafficSplit({green_deployment.name: 100})
        )

    async def rollback(self, endpoint_name: str):
        """Rollback to blue deployment."""

        endpoint = self.client.endpoints.get(endpoint_name)
        blue_deployment = next(
            d for d in endpoint.deployments
            if not d.name.endswith("-green")
        )

        await self.client.endpoints.update_traffic(
            endpoint_name=endpoint_name,
            traffic=TrafficSplit({blue_deployment.name: 100})
        )

# Usage
manager = BlueGreenDeploymentManager(client)

# Deploy new version with 10% traffic
await manager.deploy_new_version(
    endpoint_name="production-endpoint",
    new_model="my-model-v2",
    initial_traffic_percent=10
)

# Monitor metrics, then promote
await manager.promote_green("production-endpoint")

Monitoring and Observability

from azure.ai.foundry.monitoring import DeploymentMonitor

monitor = DeploymentMonitor(client)

# Set up monitoring
monitor.configure(
    deployment_name="my-custom-model-managed",
    metrics=[
        "request_count",
        "latency_p50",
        "latency_p95",
        "latency_p99",
        "error_rate",
        "token_usage",
        "gpu_utilization"
    ],
    alerts=[
        {
            "name": "high-latency",
            "condition": "latency_p95 > 2000",
            "action": "email",
            "recipients": ["oncall@company.com"]
        },
        {
            "name": "high-error-rate",
            "condition": "error_rate > 0.01",
            "action": "pagerduty",
            "severity": "critical"
        }
    ]
)

# Get current metrics
metrics = monitor.get_metrics(
    deployment_name="my-custom-model-managed",
    time_range="last_1h"
)

print(f"Requests: {metrics['request_count']}")
print(f"P95 Latency: {metrics['latency_p95']}ms")
print(f"Error Rate: {metrics['error_rate']*100:.2f}%")

Cost Management

class DeploymentCostTracker:
    """Track and optimize deployment costs."""

    def __init__(self, client: AIFoundryClient):
        self.client = client

    def get_cost_breakdown(self, deployment_name: str, days: int = 30) -> dict:
        """Get detailed cost breakdown."""

        usage = self.client.deployments.get_usage(
            deployment_name=deployment_name,
            days=days
        )

        return {
            "compute_cost": usage.compute_hours * usage.compute_rate,
            "token_cost": usage.total_tokens * usage.token_rate,
            "storage_cost": usage.model_storage_gb * 0.02,
            "total": usage.total_cost,
            "cost_per_request": usage.total_cost / usage.request_count,
            "recommendations": self.get_recommendations(usage)
        }

    def get_recommendations(self, usage) -> list:
        recommendations = []

        if usage.avg_gpu_utilization < 0.3:
            recommendations.append(
                "Consider smaller compute SKU - GPU utilization is low"
            )

        if usage.requests_per_hour_variance > 0.8:
            recommendations.append(
                "Consider serverless deployment for variable workloads"
            )

        if usage.avg_batch_size < 2:
            recommendations.append(
                "Enable request batching to improve throughput"
            )

        return recommendations

tracker = DeploymentCostTracker(client)
costs = tracker.get_cost_breakdown("my-custom-model-managed")
print(f"Total cost (30d): ${costs['total']:.2f}")
print(f"Cost per request: ${costs['cost_per_request']:.4f}")

Custom model deployment is a critical skill for production AI systems. Choose the right deployment option based on your workload characteristics, and implement proper monitoring and cost management from day one.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.