Back to Blog
2 min read

Batch Processing AI Workloads with Azure Batch and OpenAI

Large-scale AI workloads often require processing millions of items efficiently. Azure Batch provides the infrastructure to run massive parallel AI processing jobs while managing costs and throughput.

Designing a Batch AI Pipeline

Create a scalable batch processing architecture:

from azure.batch import BatchServiceClient
from azure.batch.models import (
    PoolAddParameter, VirtualMachineConfiguration, ImageReference,
    TaskAddParameter, JobAddParameter, ResourceFile, OutputFile
)
from azure.identity import DefaultAzureCredential
import json

class BatchAIProcessor:
    def __init__(self, batch_account_url: str, storage_connection: str):
        self.batch_client = BatchServiceClient(
            credential=DefaultAzureCredential(),
            batch_url=batch_account_url
        )
        self.storage_connection = storage_connection

    def create_processing_pool(self, pool_id: str, node_count: int = 10):
        """Create a pool of VMs for AI processing."""

        pool = PoolAddParameter(
            id=pool_id,
            vm_size="Standard_D4s_v3",
            virtual_machine_configuration=VirtualMachineConfiguration(
                image_reference=ImageReference(
                    publisher="microsoft-azure-batch",
                    offer="ubuntu-server-container",
                    sku="20-04-lts",
                    version="latest"
                ),
                node_agent_sku_id="batch.node.ubuntu 20.04"
            ),
            target_dedicated_nodes=node_count,
            start_task={
                "command_line": "/bin/bash -c 'pip install openai azure-storage-blob'",
                "wait_for_success": True,
                "user_identity": {"auto_user": {"elevation_level": "admin"}}
            }
        )

        self.batch_client.pool.add(pool)

    def submit_processing_job(
        self,
        job_id: str,
        pool_id: str,
        input_files: list[str],
        processing_script: str
    ):
        """Submit a batch job with AI processing tasks."""

        # Create job
        job = JobAddParameter(
            id=job_id,
            pool_info={"pool_id": pool_id}
        )
        self.batch_client.job.add(job)

        # Create tasks for each input file
        tasks = []
        for i, input_file in enumerate(input_files):
            task = TaskAddParameter(
                id=f"task_{i}",
                command_line=f"python process.py --input {input_file}",
                resource_files=[
                    ResourceFile(
                        auto_storage_container_name="scripts",
                        blob_prefix="process.py"
                    ),
                    ResourceFile(
                        auto_storage_container_name="inputs",
                        blob_prefix=input_file
                    )
                ],
                output_files=[
                    OutputFile(
                        file_pattern="output_*.json",
                        destination={"container": {"container_url": self.storage_connection}},
                        upload_options={"upload_condition": "taskSuccess"}
                    )
                ],
                environment_settings=[
                    {"name": "AZURE_OPENAI_ENDPOINT", "value": "$(OPENAI_ENDPOINT)"},
                    {"name": "AZURE_OPENAI_KEY", "value": "$(OPENAI_KEY)"}
                ]
            )
            tasks.append(task)

        self.batch_client.task.add_collection(job_id, tasks)

Rate-Limited AI Processing

Handle API rate limits gracefully:

import asyncio
from openai import AzureOpenAI
from typing import List
import json

class RateLimitedProcessor:
    def __init__(self, client: AzureOpenAI, requests_per_minute: int = 60):
        self.client = client
        self.rpm = requests_per_minute
        self.semaphore = asyncio.Semaphore(requests_per_minute)

    async def process_batch(self, items: List[dict], prompt_template: str) -> List[dict]:
        """Process items with rate limiting."""

        async def process_item(item: dict) -> dict:
            async with self.semaphore:
                prompt = prompt_template.format(**item)

                response = self.client.chat.completions.create(
                    model="gpt-4",
                    messages=[{"role": "user", "content": prompt}]
                )

                await asyncio.sleep(60 / self.rpm)  # Rate limit spacing

                return {
                    "input_id": item.get("id"),
                    "result": response.choices[0].message.content
                }

        tasks = [process_item(item) for item in items]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        return [r for r in results if not isinstance(r, Exception)]

    def process_file(self, input_path: str, output_path: str, prompt_template: str):
        """Process a file of items."""

        with open(input_path) as f:
            items = json.load(f)

        results = asyncio.run(self.process_batch(items, prompt_template))

        with open(output_path, 'w') as f:
            json.dump(results, f)

Monitoring Batch Progress

Track job progress and handle failures with proper retry logic for robust batch AI processing at scale.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.