Back to Blog
4 min read

Building Reusable Components in Azure ML

Components in Azure ML are self-contained, reusable pieces of ML logic that can be combined into pipelines. Building well-designed components promotes code reuse and maintainability.

Component Anatomy

from azure.ai.ml import command, Input, Output
from azure.ai.ml.entities import Environment

# Define a component using Python SDK
data_cleaning_component = command(
    name="data_cleaning",
    version="1.0.0",
    display_name="Data Cleaning Component",
    description="Cleans and validates input data",
    inputs={
        "input_data": Input(type="uri_file", description="Raw input data"),
        "null_threshold": Input(type="number", default=0.3, description="Max null ratio allowed"),
        "remove_duplicates": Input(type="boolean", default=True)
    },
    outputs={
        "cleaned_data": Output(type="uri_file", description="Cleaned output data"),
        "quality_report": Output(type="uri_file", description="Data quality report")
    },
    code="./components/data_cleaning",
    command="""python clean.py \
        --input ${{inputs.input_data}} \
        --null-threshold ${{inputs.null_threshold}} \
        --remove-duplicates ${{inputs.remove_duplicates}} \
        --output ${{outputs.cleaned_data}} \
        --report ${{outputs.quality_report}}""",
    environment=Environment(
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
        conda_file="./components/data_cleaning/conda.yaml"
    ),
    is_deterministic=True
)

# Register the component
ml_client.components.create_or_update(data_cleaning_component)

Component Script

# components/data_cleaning/clean.py
import argparse
import pandas as pd
import json
from pathlib import Path

def clean_data(
    input_path: str,
    output_path: str,
    report_path: str,
    null_threshold: float,
    remove_duplicates: bool
):
    """Clean the input data"""
    df = pd.read_csv(input_path)

    quality_report = {
        "original_rows": len(df),
        "original_columns": len(df.columns),
        "issues_found": []
    }

    # Remove columns with too many nulls
    null_ratios = df.isnull().mean()
    cols_to_drop = null_ratios[null_ratios > null_threshold].index.tolist()

    if cols_to_drop:
        df = df.drop(columns=cols_to_drop)
        quality_report["issues_found"].append({
            "type": "high_null_columns",
            "columns": cols_to_drop
        })

    # Remove duplicates
    if remove_duplicates:
        dup_count = df.duplicated().sum()
        if dup_count > 0:
            df = df.drop_duplicates()
            quality_report["issues_found"].append({
                "type": "duplicates",
                "count": int(dup_count)
            })

    # Fill remaining nulls
    for col in df.columns:
        if df[col].dtype in ['int64', 'float64']:
            df[col] = df[col].fillna(df[col].median())
        else:
            df[col] = df[col].fillna(df[col].mode().iloc[0])

    quality_report["final_rows"] = len(df)
    quality_report["final_columns"] = len(df.columns)

    # Save outputs
    df.to_csv(output_path, index=False)

    with open(report_path, 'w') as f:
        json.dump(quality_report, f, indent=2)

    print(f"Cleaned data: {quality_report['original_rows']} -> {quality_report['final_rows']} rows")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--null-threshold", type=float, default=0.3)
    parser.add_argument("--remove-duplicates", type=bool, default=True)
    parser.add_argument("--output", required=True)
    parser.add_argument("--report", required=True)

    args = parser.parse_args()

    clean_data(
        args.input,
        args.output,
        args.report,
        args.null_threshold,
        args.remove_duplicates
    )

Component YAML Definition

# components/data_cleaning/component.yaml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: data_cleaning
version: 1.0.0
display_name: Data Cleaning Component
type: command
description: Cleans and validates input data

inputs:
  input_data:
    type: uri_file
    description: Raw input data file
  null_threshold:
    type: number
    default: 0.3
    description: Maximum null ratio allowed per column
  remove_duplicates:
    type: boolean
    default: true
    description: Whether to remove duplicate rows

outputs:
  cleaned_data:
    type: uri_file
    description: Cleaned output data
  quality_report:
    type: uri_file
    description: Data quality report in JSON format

code: .
environment:
  conda_file: conda.yaml
  image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest

command: >-
  python clean.py
  --input ${{inputs.input_data}}
  --null-threshold ${{inputs.null_threshold}}
  --remove-duplicates ${{inputs.remove_duplicates}}
  --output ${{outputs.cleaned_data}}
  --report ${{outputs.quality_report}}

is_deterministic: true

Feature Engineering Component

feature_engineering_component = command(
    name="feature_engineering",
    version="1.0.0",
    display_name="Feature Engineering",
    inputs={
        "input_data": Input(type="uri_file"),
        "feature_config": Input(type="uri_file", optional=True),
        "target_column": Input(type="string")
    },
    outputs={
        "feature_data": Output(type="uri_file"),
        "feature_metadata": Output(type="uri_file")
    },
    code="./components/feature_engineering",
    command="""python engineer_features.py \
        --input ${{inputs.input_data}} \
        --config ${{inputs.feature_config}} \
        --target ${{inputs.target_column}} \
        --output ${{outputs.feature_data}} \
        --metadata ${{outputs.feature_metadata}}""",
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest"
)

# components/feature_engineering/engineer_features.py
import pandas as pd
import numpy as np
import json
import argparse

def engineer_features(input_path, config_path, target_column, output_path, metadata_path):
    df = pd.read_csv(input_path)

    feature_metadata = {
        "original_features": df.columns.tolist(),
        "engineered_features": [],
        "target_column": target_column
    }

    # Load config if provided
    config = {}
    if config_path and Path(config_path).exists():
        with open(config_path) as f:
            config = json.load(f)

    # Numerical transformations
    numerical_cols = df.select_dtypes(include=[np.number]).columns
    for col in numerical_cols:
        if col != target_column:
            # Log transform for skewed features
            if df[col].skew() > 1:
                df[f"{col}_log"] = np.log1p(df[col])
                feature_metadata["engineered_features"].append(f"{col}_log")

            # Squared terms
            if config.get("include_squared", True):
                df[f"{col}_squared"] = df[col] ** 2
                feature_metadata["engineered_features"].append(f"{col}_squared")

    # Categorical encoding
    categorical_cols = df.select_dtypes(include=['object']).columns
    for col in categorical_cols:
        dummies = pd.get_dummies(df[col], prefix=col)
        df = pd.concat([df, dummies], axis=1)
        df = df.drop(columns=[col])
        feature_metadata["engineered_features"].extend(dummies.columns.tolist())

    # Save outputs
    df.to_csv(output_path, index=False)
    with open(metadata_path, 'w') as f:
        json.dump(feature_metadata, f, indent=2)

Model Training Component

training_component = command(
    name="model_training",
    version="1.0.0",
    display_name="Model Training",
    inputs={
        "train_data": Input(type="uri_file"),
        "validation_data": Input(type="uri_file", optional=True),
        "target_column": Input(type="string"),
        "model_type": Input(type="string", default="random_forest"),
        "hyperparameters": Input(type="uri_file", optional=True)
    },
    outputs={
        "model": Output(type="mlflow_model"),
        "metrics": Output(type="uri_file")
    },
    code="./components/training",
    command="""python train.py \
        --train-data ${{inputs.train_data}} \
        --validation-data ${{inputs.validation_data}} \
        --target ${{inputs.target_column}} \
        --model-type ${{inputs.model_type}} \
        --hyperparameters ${{inputs.hyperparameters}} \
        --model-output ${{outputs.model}} \
        --metrics-output ${{outputs.metrics}}""",
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest"
)

Component Versioning

# Create new version of a component
data_cleaning_v2 = command(
    name="data_cleaning",
    version="2.0.0",  # New version
    display_name="Data Cleaning Component v2",
    # ... updated implementation
)

ml_client.components.create_or_update(data_cleaning_v2)

# Load specific version
v1_component = ml_client.components.get("data_cleaning", version="1.0.0")
latest_component = ml_client.components.get("data_cleaning")  # Gets latest

# List all versions
for component in ml_client.components.list(name="data_cleaning"):
    print(f"Version: {component.version}, Created: {component.creation_context.created_at}")

Using Components in Pipelines

from azure.ai.ml import dsl

# Load registered components
data_cleaning = ml_client.components.get("data_cleaning", version="1.0.0")
feature_engineering = ml_client.components.get("feature_engineering")
model_training = ml_client.components.get("model_training")

@dsl.pipeline(
    name="full_ml_pipeline",
    compute="cpu-cluster"
)
def ml_pipeline(raw_data: Input, target_column: str):
    # Use components
    clean_step = data_cleaning(
        input_data=raw_data,
        null_threshold=0.3,
        remove_duplicates=True
    )

    feature_step = feature_engineering(
        input_data=clean_step.outputs.cleaned_data,
        target_column=target_column
    )

    train_step = model_training(
        train_data=feature_step.outputs.feature_data,
        target_column=target_column,
        model_type="random_forest"
    )

    return {"model": train_step.outputs.model}

# Submit pipeline
pipeline_job = ml_client.jobs.create_or_update(
    ml_pipeline(
        raw_data=Input(path="azureml://datastores/workspaceblobstore/paths/data.csv"),
        target_column="target"
    ),
    experiment_name="component-pipeline"
)

Well-designed components enable team collaboration and accelerate ML development through reuse.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.