Back to Blog
4 min read

Parallel Jobs in Azure ML for Large-Scale Processing

Parallel jobs in Azure ML enable processing large datasets by distributing work across multiple compute nodes. This is essential for batch inference, data processing, and distributed training.

Parallel Run Configuration

from azure.ai.ml import MLClient, Input, Output
from azure.ai.ml.parallel import parallel_run_function, RunFunction
from azure.identity import DefaultAzureCredential

ml_client = MLClient(
    credential=DefaultAzureCredential(),
    subscription_id="your-subscription",
    resource_group_name="your-rg",
    workspace_name="your-workspace"
)

# Define parallel run
batch_scoring = parallel_run_function(
    name="batch_scoring",
    display_name="Parallel Batch Scoring",
    description="Score large datasets in parallel",

    # Input/Output configuration
    inputs={
        "input_data": Input(type="uri_folder"),
        "model": Input(type="mlflow_model")
    },
    outputs={
        "predictions": Output(type="uri_folder")
    },
    input_data="${{inputs.input_data}}",

    # Parallelization settings
    instance_count=4,
    max_concurrency_per_instance=2,
    mini_batch_size="10",  # 10 files per mini-batch

    # Error handling
    error_threshold=10,
    mini_batch_error_threshold=5,
    retry_settings={
        "max_retries": 3,
        "timeout": 60
    },

    # Logging
    logging_level="INFO",
    append_row_to="${{outputs.predictions}}/predictions.csv",

    # Task configuration
    task=RunFunction(
        code="./src/scoring",
        entry_script="score.py",
        environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest"
    )
)

# Register the component
ml_client.components.create_or_update(batch_scoring)

Scoring Script for Parallel Run

# src/scoring/score.py
import os
import pandas as pd
import mlflow
from azureml.core import Run

def init():
    """Initialize the model"""
    global model

    # Get model from input
    run = Run.get_context()
    model_path = run.input_datasets.get("model") or os.environ.get("AZUREML_MODEL_DIR")

    model = mlflow.sklearn.load_model(model_path)
    print(f"Model loaded from {model_path}")

def run(mini_batch):
    """Process a mini-batch of files"""
    results = []

    for file_path in mini_batch:
        try:
            # Read file
            df = pd.read_csv(file_path)

            # Make predictions
            predictions = model.predict(df)
            probabilities = model.predict_proba(df)

            # Add results
            df["prediction"] = predictions
            df["probability"] = probabilities.max(axis=1)
            df["source_file"] = os.path.basename(file_path)

            results.append(df)

        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            continue

    if results:
        combined = pd.concat(results)
        return combined
    else:
        return pd.DataFrame()

Using Parallel Job in Pipeline

from azure.ai.ml import dsl, Input

# Load the parallel component
batch_scoring_component = ml_client.components.get("batch_scoring")

@dsl.pipeline(
    name="batch_inference_pipeline",
    description="Run batch inference on large dataset",
    compute="cpu-cluster"
)
def batch_inference_pipeline(
    input_data: Input,
    model: Input
):
    scoring_step = batch_scoring_component(
        input_data=input_data,
        model=model
    )

    return {
        "predictions": scoring_step.outputs.predictions
    }

# Submit pipeline
pipeline_job = ml_client.jobs.create_or_update(
    batch_inference_pipeline(
        input_data=Input(
            path="azureml://datastores/workspaceblobstore/paths/batch-data/",
            type="uri_folder"
        ),
        model=Input(
            path="azureml://models/my-model/versions/1",
            type="mlflow_model"
        )
    ),
    experiment_name="batch-inference"
)

Mini-Batch Processing Strategies

# Strategy 1: File-based mini-batches
parallel_job_file = parallel_run_function(
    name="file_parallel",
    mini_batch_size="10",  # 10 files per batch
    input_data="${{inputs.input_data}}",
    # ...
)

# Strategy 2: Row-based mini-batches (for tabular data)
parallel_job_rows = parallel_run_function(
    name="row_parallel",
    mini_batch_size="1000",  # 1000 rows per batch
    input_data="${{inputs.input_data}}",
    mini_batch_row_size=1000,
    # ...
)

# Strategy 3: Memory-based mini-batches
parallel_job_memory = parallel_run_function(
    name="memory_parallel",
    mini_batch_size="100MB",  # 100MB per batch
    input_data="${{inputs.input_data}}",
    # ...
)

Advanced Error Handling

# src/scoring/score_with_error_handling.py
import os
import logging
import traceback
from typing import List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BatchProcessor:
    def __init__(self):
        self.model = None
        self.failed_files = []
        self.processed_count = 0

    def init(self):
        """Initialize resources"""
        import mlflow
        self.model = mlflow.sklearn.load_model(os.environ.get("AZUREML_MODEL_DIR"))

    def run(self, mini_batch: List[str]):
        """Process mini-batch with comprehensive error handling"""
        results = []

        for file_path in mini_batch:
            try:
                result = self._process_file(file_path)
                results.append(result)
                self.processed_count += 1

            except FileNotFoundError:
                logger.error(f"File not found: {file_path}")
                self.failed_files.append((file_path, "not_found"))

            except pd.errors.EmptyDataError:
                logger.error(f"Empty file: {file_path}")
                self.failed_files.append((file_path, "empty"))

            except Exception as e:
                logger.error(f"Error processing {file_path}: {e}")
                logger.error(traceback.format_exc())
                self.failed_files.append((file_path, str(e)))

        logger.info(f"Processed {len(results)}/{len(mini_batch)} files in this batch")
        return results

    def _process_file(self, file_path: str):
        """Process a single file"""
        df = pd.read_csv(file_path)

        if df.empty:
            raise pd.errors.EmptyDataError("DataFrame is empty")

        # Validate schema
        required_columns = ["feature1", "feature2", "feature3"]
        missing = set(required_columns) - set(df.columns)
        if missing:
            raise ValueError(f"Missing columns: {missing}")

        # Make predictions
        predictions = self.model.predict(df[required_columns])

        return {
            "file": os.path.basename(file_path),
            "predictions": predictions.tolist(),
            "count": len(predictions)
        }

# Module-level functions for parallel run
processor = BatchProcessor()

def init():
    processor.init()

def run(mini_batch):
    return processor.run(mini_batch)

Monitoring Parallel Jobs

# Monitor job progress
job = ml_client.jobs.get(pipeline_job.name)

while job.status not in ["Completed", "Failed", "Canceled"]:
    print(f"Status: {job.status}")

    # Get child job details
    child_jobs = list(ml_client.jobs.list(parent_job_name=job.name))
    for child in child_jobs:
        print(f"  {child.name}: {child.status}")

    time.sleep(30)
    job = ml_client.jobs.get(pipeline_job.name)

print(f"Final status: {job.status}")

# Get logs
logs = ml_client.jobs.download(
    name=pipeline_job.name,
    download_path="./logs",
    output_name="logs"
)

Scaling Considerations

# Configure for maximum throughput
high_throughput_job = parallel_run_function(
    name="high_throughput_scoring",

    # Scale out with more instances
    instance_count=10,

    # Maximize parallelism per instance
    max_concurrency_per_instance=8,

    # Larger mini-batches for efficiency
    mini_batch_size="50",

    # Relaxed error thresholds for large jobs
    error_threshold=100,
    mini_batch_error_threshold=10,

    # Longer timeout for complex processing
    retry_settings={
        "max_retries": 3,
        "timeout": 300
    },

    # Use larger compute
    compute="Standard_DS4_v2",

    task=RunFunction(
        code="./src/scoring",
        entry_script="score.py",
        environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest"
    )
)

Parallel jobs enable processing datasets of any size by distributing work across your compute cluster.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.