Skip to content
Back to Blog
1 min read

Azure ML Pipelines v2: Modern ML Workflows

I wrote “Azure ML Pipelines v2: Modern ML Workflows” to share practical, production-minded guidance on this topic.

Pipeline Basics

from azure.ai.ml import MLClient, Input, Output, dsl, command
from azure.ai.ml.entities import Environment, AmlCompute
from azure.identity import DefaultAzureCredential

# Connect to workspace
ml_client = MLClient(
    credential=DefaultAzureCredential(),
    subscription_id="your-subscription",
    resource_group_name="your-rg",
    workspace_name="your-workspace"
)

# Define a command component
data_prep_component = command(
    name="data_preparation",
    display_name="Prepare Training Data",
    description="Clean and prepare data for training",
    inputs={
        "raw_data": Input(type="uri_file"),
        "test_split": Input(type="number", default=0.2)
    },
    outputs={
        "train_data": Output(type="uri_folder"),
        "test_data": Output(type="uri_folder")
    },
    code="./src/data_prep",
    command="python prep.py --raw-data ${{inputs.raw_data}} --test-split ${{inputs.test_split}} --train-output ${{outputs.train_data}} --test-output ${{outputs.test_data}}",
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest"
)

# Register the component
ml_client.components.create_or_update(data_prep_component)

Building a Pipeline

from azure.ai.ml import dsl, Input, Output

# Load components
data_prep = ml_client.components.get("data_preparation")
train_model = ml_client.components.get("train_model")
evaluate_model = ml_client.components.get("evaluate_model")

@dsl.pipeline(
    name="training_pipeline",
    description="End-to-end training pipeline",
    compute="cpu-cluster"
)
def training_pipeline(
    raw_data: Input,
    model_name: str,
    learning_rate: float = 0.01
):
    # Data preparation step
    prep_step = data_prep(
        raw_data=raw_data,
        test_split=0.2
    )

    # Training step
    train_step = train_model(
        train_data=prep_step.outputs.train_data,
        model_name=model_name,
        learning_rate=learning_rate
    )

    # Evaluation step
    eval_step = evaluate_model(
        model=train_step.outputs.model,
        test_data=prep_step.outputs.test_data
    )

    return {
        "trained_model": train_step.outputs.model,
        "metrics": eval_step.outputs.metrics
    }

# Create pipeline instance
pipeline = training_pipeline(
    raw_data=Input(path="azureml://datastores/workspaceblobstore/paths/data/raw.csv"),
    model_name="churn_classifier",
    learning_rate=0.001
)

# Submit pipeline
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="churn-experiment"
)

# Stream logs
ml_client.jobs.stream(pipeline_job.name)

Component Definition Files

# component.yaml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train Model
version: 1
type: command

inputs:
  train_data:
    type: uri_folder
    description: Training data folder
  model_name:
    type: string
    description: Name of the model
  learning_rate:
    type: number
    default: 0.01

outputs:
  model:
    type: mlflow_model
    description: Trained model

code: ./src
environment:
  image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
  conda_file: conda.yaml

command: >-
  python train.py
  --train-data ${{inputs.train_data}}
  --model-name ${{inputs.model_name}}
  --learning-rate ${{inputs.learning_rate}}
  --model-output ${{outputs.model}}

Conditional Execution

from azure.ai.ml.dsl import condition

@dsl.pipeline(name="conditional_pipeline")
def conditional_pipeline(data: Input, model_type: str):
    # Data prep runs always
    prep_step = data_prep(raw_data=data)

    # Conditional training based on model type
    with condition(model_type == "xgboost"):
        xgb_train = train_xgboost(
            train_data=prep_step.outputs.train_data
        )

    with condition(model_type == "sklearn"):
        sklearn_train = train_sklearn(
            train_data=prep_step.outputs.train_data
        )

    return {
        "model": xgb_train.outputs.model if model_type == "xgboost" else sklearn_train.outputs.model
    }

Parallel Processing

from azure.ai.ml.parallel import parallel_run_function, RunFunction

# Define parallel component
parallel_score = parallel_run_function(
    name="batch_scoring",
    display_name="Parallel Batch Scoring",
    inputs={
        "input_data": Input(type="uri_folder"),
        "model": Input(type="mlflow_model")
    },
    outputs={
        "scored_data": Output(type="uri_folder")
    },
    input_data="${{inputs.input_data}}",
    mini_batch_size="10",
    instance_count=4,
    max_concurrency_per_instance=2,
    error_threshold=5,
    mini_batch_error_threshold=5,
    logging_level="INFO",
    task=RunFunction(
        code="./src/scoring",
        entry_script="score.py",
        environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest"
    )
)

@dsl.pipeline(name="batch_scoring_pipeline")
def batch_scoring_pipeline(input_data: Input, model: Input):
    scoring_step = parallel_score(
        input_data=input_data,
        model=model
    )

    return {"predictions": scoring_step.outputs.scored_data}

Pipeline with Multiple Compute Targets

@dsl.pipeline(name="multi_compute_pipeline")
def multi_compute_pipeline(raw_data: Input):
    # CPU for data prep
    prep_step = data_prep(raw_data=raw_data)
    prep_step.compute = "cpu-cluster"

    # GPU for training
    train_step = train_deep_learning(
        train_data=prep_step.outputs.train_data
    )
    train_step.compute = "gpu-cluster"

    # CPU for evaluation
    eval_step = evaluate(
        model=train_step.outputs.model,
        test_data=prep_step.outputs.test_data
    )
    eval_step.compute = "cpu-cluster"

    return {"model": train_step.outputs.model}

Pipeline Scheduling

from azure.ai.ml.entities import RecurrenceTrigger, JobSchedule

# Create schedule
schedule = JobSchedule(
    name="daily_training",
    trigger=RecurrenceTrigger(
        frequency="day",
        interval=1,
        start_time="2022-08-17T08:00:00",
        time_zone="UTC"
    ),
    create_job=training_pipeline(
        raw_data=Input(path="azureml://datastores/workspaceblobstore/paths/data/daily/"),
        model_name="daily_model"
    )
)

ml_client.schedules.begin_create_or_update(schedule)

# List schedules
for s in ml_client.schedules.list():
    print(f"{s.name}: {s.trigger}")

Pipeline Debugging

# Download outputs for debugging
ml_client.jobs.download(
    name=pipeline_job.name,
    output_name="trained_model",
    download_path="./outputs"
)

# Get specific step output
step_job = ml_client.jobs.get(f"{pipeline_job.name}/train_step")
print(step_job.status)

# Stream logs from specific step
ml_client.jobs.stream(f"{pipeline_job.name}/train_step")

Azure ML Pipelines v2 provides a modern, Pythonic way to build production-ready ML workflows.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.