1 min read
Batch Inference at Scale with Azure ML Batch Endpoints
I wrote “2021-09-10-azure-ml-batch-endpoints” to share practical, production-minded guidance on this topic.
When to Use Batch Endpoints
- Processing millions of records
- Scheduled scoring jobs (daily, weekly)
- No real-time latency requirements
- Cost-sensitive workloads
- Large file processing (images, documents)
Creating a Batch Endpoint
from azure.ai.ml import MLClient
from azure.ai.ml.entities import BatchEndpoint, BatchDeployment
from azure.ai.ml.constants import BatchDeploymentOutputAction
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
ml_client = MLClient(
credential=credential,
subscription_id="your-subscription-id",
resource_group_name="myresourcegroup",
workspace_name="myworkspace"
)
# Create batch endpoint
batch_endpoint = BatchEndpoint(
name="customer-scoring-batch",
description="Batch scoring for customer data"
)
ml_client.batch_endpoints.begin_create_or_update(batch_endpoint).result()
print(f"Batch endpoint created: {batch_endpoint.name}")
Batch Scoring Script
# batch_score.py
import os
import pandas as pd
import joblib
from pathlib import Path
def init():
"""Initialize the model."""
global model
model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'model.pkl')
model = joblib.load(model_path)
print(f"Model loaded from {model_path}")
def run(mini_batch):
"""
Process a mini-batch of files.
Args:
mini_batch: List of file paths to process
Returns:
List of results (one per input file)
"""
results = []
for file_path in mini_batch:
print(f"Processing: {file_path}")
# Read input file
df = pd.read_csv(file_path)
# Prepare features
features = df.drop(['id'], axis=1, errors='ignore')
# Make predictions
predictions = model.predict(features)
probabilities = model.predict_proba(features)
# Combine results
df['prediction'] = predictions
df['probability'] = probabilities.max(axis=1)
# Return as string (will be written to output)
results.append(df.to_csv(index=False))
return results
Batch Deployment Configuration
from azure.ai.ml.entities import (
BatchDeployment,
BatchRetrySettings,
CodeConfiguration
)
batch_deployment = BatchDeployment(
name="scoring-v1",
endpoint_name="customer-scoring-batch",
model="azureml:customer-churn-model:2",
code_configuration=CodeConfiguration(
code="./src/batch",
scoring_script="batch_score.py"
),
environment="azureml:sklearn-inference-env:1",
compute="cpu-cluster",
instance_count=4,
mini_batch_size=10, # Files per mini-batch
output_action=BatchDeploymentOutputAction.APPEND_ROW,
output_file_name="predictions.csv",
retry_settings=BatchRetrySettings(
max_retries=3,
timeout=300
),
logging_level="info",
max_concurrency_per_instance=2,
error_threshold=10 # Allow up to 10 failed items
)
ml_client.batch_deployments.begin_create_or_update(batch_deployment).result()
# Set as default deployment
batch_endpoint = ml_client.batch_endpoints.get("customer-scoring-batch")
batch_endpoint.defaults.deployment_name = "scoring-v1"
ml_client.batch_endpoints.begin_create_or_update(batch_endpoint).result()
Invoking Batch Endpoints
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
# Invoke with data asset
job = ml_client.batch_endpoints.invoke(
endpoint_name="customer-scoring-batch",
input=Input(
type=AssetTypes.URI_FOLDER,
path="azureml://datastores/workspaceblobstore/paths/batch-input/"
)
)
print(f"Batch job submitted: {job.name}")
# Monitor job progress
import time
while True:
job = ml_client.jobs.get(job.name)
print(f"Status: {job.status}")
if job.status in ['Completed', 'Failed', 'Canceled']:
break
time.sleep(30)
# Get output location
if job.status == 'Completed':
print(f"Output path: {job.outputs.default.path}")
Processing Large Datasets
# batch_score_chunked.py
import os
import pandas as pd
import numpy as np
import joblib
from typing import List
model = None
chunk_size = 10000
def init():
global model
model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'model.pkl')
model = joblib.load(model_path)
def run(mini_batch: List[str]) -> List[str]:
"""Process files in chunks to handle memory efficiently."""
results = []
for file_path in mini_batch:
output_rows = []
# Process file in chunks
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# Extract features
ids = chunk['id'].values
features = chunk.drop(['id'], axis=1)
# Predict
predictions = model.predict(features)
# Collect results
for id_val, pred in zip(ids, predictions):
output_rows.append(f"{id_val},{pred}")
results.extend(output_rows)
return results
Parallel Processing with Partitioned Data
# batch-deployment.yml
$schema: https://azuremlschemas.azureedge.net/latest/batchDeployment.schema.json
name: parallel-scoring
endpoint_name: customer-scoring-batch
model: azureml:customer-churn-model:2
code_configuration:
code: ./src/batch
scoring_script: batch_score.py
environment: azureml:sklearn-inference-env:1
compute: cpu-cluster
resources:
instance_count: 10
mini_batch_size: 5
max_concurrency_per_instance: 4
retry_settings:
max_retries: 3
timeout: 600
output_action: append_row
output_file_name: predictions.csv
environment_variables:
BATCH_SIZE: "1000"
Scheduled Batch Jobs
from azure.ai.ml.entities import JobSchedule, RecurrenceTrigger, RecurrencePattern
# Create a schedule for nightly batch scoring
schedule = JobSchedule(
name="nightly-batch-scoring",
trigger=RecurrenceTrigger(
frequency="day",
interval=1,
schedule=RecurrencePattern(hours=2, minutes=0), # 2:00 AM
time_zone="America/Los_Angeles"
),
create_job=lambda: ml_client.batch_endpoints.invoke(
endpoint_name="customer-scoring-batch",
input=Input(
type=AssetTypes.URI_FOLDER,
path="azureml://datastores/workspaceblobstore/paths/daily-data/"
)
)
)
# Note: Actual scheduling would use Azure Data Factory or Logic Apps
# for production scenarios
Error Handling and Recovery
# batch_score_robust.py
import os
import logging
import pandas as pd
import joblib
from typing import List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
model = None
def init():
global model
model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'model.pkl')
model = joblib.load(model_path)
def run(mini_batch: List[str]) -> List[str]:
results = []
for file_path in mini_batch:
try:
logger.info(f"Processing: {file_path}")
df = pd.read_csv(file_path)
# Validate data
required_columns = ['feature1', 'feature2', 'feature3']
missing = set(required_columns) - set(df.columns)
if missing:
logger.error(f"Missing columns in {file_path}: {missing}")
results.append(f"ERROR,{file_path},missing_columns")
continue
# Handle missing values
df = df.fillna(0)
# Predict
features = df[required_columns]
predictions = model.predict(features)
# Format output
for idx, pred in enumerate(predictions):
results.append(f"{df.iloc[idx]['id']},{pred}")
except Exception as e:
logger.error(f"Error processing {file_path}: {str(e)}")
results.append(f"ERROR,{file_path},{str(e)}")
return results
Monitoring Batch Jobs
# Monitor running batch jobs
jobs = ml_client.jobs.list(
parent_job_name=None,
tags={"batch_endpoint": "customer-scoring-batch"}
)
for job in jobs:
print(f"""
Job: {job.name}
Status: {job.status}
Started: {job.creation_context.created_at}
""")
# Get detailed job metrics
job = ml_client.jobs.get("batch-job-name")
print(f"Progress: {job.properties.get('progress', 'N/A')}")
Best Practices
- Right-size mini-batches: Balance parallelism and overhead
- Handle failures gracefully: Use retry settings and error thresholds
- Monitor memory usage: Process large files in chunks
- Use appropriate compute: Match instance count to data volume
- Validate inputs: Check data quality before processing
- Log extensively: Enable debugging for failed items
Batch endpoints enable cost-effective, scalable inference for large datasets without the complexity of managing infrastructure.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n