Skip to content
Back to Blog
1 min read

Bias Detection and Fairness Metrics for AI Systems

I wrote “Bias Detection and Fairness Metrics for AI Systems” to share practical, production-minded guidance on this topic.

Understanding Bias in AI

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class BiasType(Enum):
    SELECTION = "selection"         # Biased data collection
    MEASUREMENT = "measurement"     # Biased features or labels
    AGGREGATION = "aggregation"     # Loss of subgroup patterns
    EVALUATION = "evaluation"       # Biased benchmarks
    DEPLOYMENT = "deployment"       # Biased usage context

@dataclass
class BiasSource:
    type: BiasType
    description: str
    detection_method: str
    mitigation_strategy: str

COMMON_BIAS_SOURCES = [
    BiasSource(
        type=BiasType.SELECTION,
        description="Training data doesn't represent all populations",
        detection_method="Compare data distribution with target population",
        mitigation_strategy="Collect more representative data or use resampling"
    ),
    BiasSource(
        type=BiasType.MEASUREMENT,
        description="Labels reflect historical bias",
        detection_method="Audit labeling process and historical outcomes",
        mitigation_strategy="Re-label or use fairness constraints during training"
    ),
    BiasSource(
        type=BiasType.AGGREGATION,
        description="Single model fails different subgroups",
        detection_method="Evaluate metrics by subgroup",
        mitigation_strategy="Use group-specific models or fairness constraints"
    )
]

Fairness Metrics Implementation

import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from sklearn.metrics import confusion_matrix

class FairnessMetrics:
    """Calculate fairness metrics for ML models."""

    def __init__(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        y_prob: np.ndarray,
        sensitive_feature: np.ndarray
    ):
        self.y_true = y_true
        self.y_pred = y_pred
        self.y_prob = y_prob
        self.sensitive = sensitive_feature
        self.groups = np.unique(sensitive_feature)

    def _get_group_mask(self, group) -> np.ndarray:
        return self.sensitive == group

    def demographic_parity(self) -> Dict:
        """
        Demographic Parity: P(Y_hat=1|A=a) = P(Y_hat=1|A=b)
        All groups should have equal positive prediction rates.
        """
        rates = {}
        for group in self.groups:
            mask = self._get_group_mask(group)
            rates[group] = self.y_pred[mask].mean()

        min_rate = min(rates.values())
        max_rate = max(rates.values())

        return {
            "group_rates": rates,
            "ratio": min_rate / max_rate if max_rate > 0 else 1,
            "difference": max_rate - min_rate,
            "is_fair": (min_rate / max_rate) >= 0.8 if max_rate > 0 else True
        }

    def equalized_odds(self) -> Dict:
        """
        Equalized Odds: TPR and FPR should be equal across groups.
        """
        metrics = {}

        for group in self.groups:
            mask = self._get_group_mask(group)
            y_t = self.y_true[mask]
            y_p = self.y_pred[mask]

            # True Positive Rate
            tpr = y_p[y_t == 1].mean() if (y_t == 1).sum() > 0 else 0

            # False Positive Rate
            fpr = y_p[y_t == 0].mean() if (y_t == 0).sum() > 0 else 0

            metrics[group] = {"tpr": tpr, "fpr": fpr}

        tpr_values = [m["tpr"] for m in metrics.values()]
        fpr_values = [m["fpr"] for m in metrics.values()]

        return {
            "group_metrics": metrics,
            "tpr_ratio": min(tpr_values) / max(tpr_values) if max(tpr_values) > 0 else 1,
            "fpr_ratio": min(fpr_values) / max(fpr_values) if max(fpr_values) > 0 else 1,
            "tpr_difference": max(tpr_values) - min(tpr_values),
            "fpr_difference": max(fpr_values) - min(fpr_values)
        }

    def equal_opportunity(self) -> Dict:
        """
        Equal Opportunity: TPR should be equal across groups.
        Focuses only on positive class.
        """
        tpr_by_group = {}

        for group in self.groups:
            mask = self._get_group_mask(group)
            y_t = self.y_true[mask]
            y_p = self.y_pred[mask]

            # True Positive Rate
            positive_mask = y_t == 1
            if positive_mask.sum() > 0:
                tpr = y_p[positive_mask].mean()
            else:
                tpr = 0

            tpr_by_group[group] = tpr

        min_tpr = min(tpr_by_group.values())
        max_tpr = max(tpr_by_group.values())

        return {
            "tpr_by_group": tpr_by_group,
            "ratio": min_tpr / max_tpr if max_tpr > 0 else 1,
            "difference": max_tpr - min_tpr,
            "is_fair": (min_tpr / max_tpr) >= 0.8 if max_tpr > 0 else True
        }

    def calibration_by_group(self) -> Dict:
        """
        Calibration: P(Y=1|Y_hat=p, A=a) should equal p for all groups.
        """
        calibration = {}

        for group in self.groups:
            mask = self._get_group_mask(group)
            probs = self.y_prob[mask]
            true = self.y_true[mask]

            # Bin probabilities
            bins = np.linspace(0, 1, 11)
            bin_indices = np.digitize(probs, bins)

            bin_stats = []
            for i in range(1, len(bins)):
                bin_mask = bin_indices == i
                if bin_mask.sum() > 0:
                    expected = bins[i-1:i+1].mean()
                    actual = true[bin_mask].mean()
                    bin_stats.append({
                        "expected": expected,
                        "actual": actual,
                        "count": bin_mask.sum()
                    })

            calibration[group] = bin_stats

        return calibration

    def get_all_metrics(self) -> Dict:
        """Get comprehensive fairness metrics."""
        return {
            "demographic_parity": self.demographic_parity(),
            "equalized_odds": self.equalized_odds(),
            "equal_opportunity": self.equal_opportunity()
        }

    def generate_report(self) -> str:
        """Generate human-readable fairness report."""
        metrics = self.get_all_metrics()

        report = "# Fairness Metrics Report\n\n"

        # Demographic Parity
        dp = metrics["demographic_parity"]
        report += "## Demographic Parity\n"
        report += f"- Ratio: {dp['ratio']:.3f} (target: >= 0.8)\n"
        report += f"- Fair: {'Yes' if dp['is_fair'] else 'No'}\n"
        for group, rate in dp["group_rates"].items():
            report += f"- {group}: {rate:.3f}\n"
        report += "\n"

        # Equal Opportunity
        eo = metrics["equal_opportunity"]
        report += "## Equal Opportunity\n"
        report += f"- TPR Ratio: {eo['ratio']:.3f} (target: >= 0.8)\n"
        report += f"- Fair: {'Yes' if eo['is_fair'] else 'No'}\n"
        for group, tpr in eo["tpr_by_group"].items():
            report += f"- {group} TPR: {tpr:.3f}\n"
        report += "\n"

        # Equalized Odds
        eod = metrics["equalized_odds"]
        report += "## Equalized Odds\n"
        report += f"- TPR Ratio: {eod['tpr_ratio']:.3f}\n"
        report += f"- FPR Ratio: {eod['fpr_ratio']:.3f}\n"

        return report

Bias Mitigation Techniques

from sklearn.base import BaseEstimator, TransformerMixin

class BiasmitigationPreprocessor(BaseEstimator, TransformerMixin):
    """Preprocessing techniques for bias mitigation."""

    def __init__(self, method: str = "reweighting"):
        self.method = method
        self.weights = None

    def fit(self, X, y, sensitive):
        """Fit the preprocessor."""
        if self.method == "reweighting":
            self._fit_reweighting(y, sensitive)
        return self

    def _fit_reweighting(self, y, sensitive):
        """Calculate reweighting factors."""
        # Calculate expected probability under fairness
        p_favorable = y.mean()

        self.weights = {}
        for group in np.unique(sensitive):
            group_mask = sensitive == group
            p_group = group_mask.mean()

            for label in [0, 1]:
                label_mask = y == label
                combined_mask = group_mask & label_mask

                p_joint_expected = p_group * (p_favorable if label == 1 else 1 - p_favorable)
                p_joint_observed = combined_mask.mean()

                if p_joint_observed > 0:
                    weight = p_joint_expected / p_joint_observed
                else:
                    weight = 1.0

                self.weights[(group, label)] = weight

    def transform(self, X, y=None, sensitive=None):
        """Transform returns sample weights."""
        if sensitive is None or y is None:
            return X

        sample_weights = np.ones(len(X))
        for i in range(len(X)):
            key = (sensitive[i], y[i])
            sample_weights[i] = self.weights.get(key, 1.0)

        return X, sample_weights

class InProcessingMitigation:
    """In-processing bias mitigation during training."""

    def __init__(self, lambda_fairness: float = 1.0):
        self.lambda_fairness = lambda_fairness

    def fairness_loss(
        self,
        y_pred: np.ndarray,
        sensitive: np.ndarray
    ) -> float:
        """Calculate fairness penalty term."""
        groups = np.unique(sensitive)
        group_rates = []

        for group in groups:
            mask = sensitive == group
            group_rates.append(y_pred[mask].mean())

        # Demographic parity penalty
        max_diff = max(group_rates) - min(group_rates)
        return self.lambda_fairness * max_diff

    def combined_loss(
        self,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        sensitive: np.ndarray,
        base_loss_fn
    ) -> float:
        """Combined loss with fairness penalty."""
        base_loss = base_loss_fn(y_true, y_pred)
        fairness_penalty = self.fairness_loss(y_pred, sensitive)
        return base_loss + fairness_penalty

class PostProcessingMitigation:
    """Post-processing techniques for bias mitigation."""

    def __init__(self):
        self.thresholds = {}

    def calibrate_thresholds(
        self,
        y_prob: np.ndarray,
        y_true: np.ndarray,
        sensitive: np.ndarray,
        target_rate: float = None
    ):
        """Find group-specific thresholds for equal positive rates."""
        if target_rate is None:
            # Use overall positive rate as target
            target_rate = y_true.mean()

        for group in np.unique(sensitive):
            mask = sensitive == group
            probs = y_prob[mask]

            # Find threshold that achieves target rate
            sorted_probs = np.sort(probs)[::-1]
            target_count = int(target_rate * len(probs))
            threshold = sorted_probs[min(target_count, len(sorted_probs) - 1)]

            self.thresholds[group] = threshold

    def apply_thresholds(
        self,
        y_prob: np.ndarray,
        sensitive: np.ndarray
    ) -> np.ndarray:
        """Apply group-specific thresholds."""
        y_pred = np.zeros(len(y_prob))

        for group, threshold in self.thresholds.items():
            mask = sensitive == group
            y_pred[mask] = (y_prob[mask] >= threshold).astype(int)

        return y_pred

Fairness Dashboard

class FairnessDashboard:
    """Dashboard for fairness monitoring."""

    def __init__(self):
        self.history = []

    def log_evaluation(
        self,
        model_id: str,
        y_true: np.ndarray,
        y_pred: np.ndarray,
        y_prob: np.ndarray,
        sensitive: np.ndarray
    ):
        """Log a fairness evaluation."""
        metrics = FairnessMetrics(y_true, y_pred, y_prob, sensitive)
        all_metrics = metrics.get_all_metrics()

        self.history.append({
            "timestamp": datetime.now().isoformat(),
            "model_id": model_id,
            "metrics": all_metrics
        })

    def check_fairness_alerts(self, thresholds: Dict = None) -> List[Dict]:
        """Check for fairness violations."""
        thresholds = thresholds or {
            "demographic_parity_ratio": 0.8,
            "equal_opportunity_ratio": 0.8
        }

        alerts = []
        if not self.history:
            return alerts

        latest = self.history[-1]["metrics"]

        if latest["demographic_parity"]["ratio"] < thresholds["demographic_parity_ratio"]:
            alerts.append({
                "type": "demographic_parity",
                "severity": "high",
                "value": latest["demographic_parity"]["ratio"],
                "threshold": thresholds["demographic_parity_ratio"]
            })

        if latest["equal_opportunity"]["ratio"] < thresholds["equal_opportunity_ratio"]:
            alerts.append({
                "type": "equal_opportunity",
                "severity": "high",
                "value": latest["equal_opportunity"]["ratio"],
                "threshold": thresholds["equal_opportunity_ratio"]
            })

        return alerts

    def get_trend(self, metric: str, lookback: int = 10) -> List[float]:
        """Get metric trend over time."""
        values = []
        for entry in self.history[-lookback:]:
            if metric == "demographic_parity":
                values.append(entry["metrics"]["demographic_parity"]["ratio"])
            elif metric == "equal_opportunity":
                values.append(entry["metrics"]["equal_opportunity"]["ratio"])
        return values

Integration Example

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

def train_fair_model(X, y, sensitive, mitigation="reweighting"):
    """Train model with fairness mitigation."""

    # Split data
    X_train, X_test, y_train, y_test, s_train, s_test = train_test_split(
        X, y, sensitive, test_size=0.2, random_state=42
    )

    if mitigation == "reweighting":
        # Pre-processing mitigation
        preprocessor = BiasmitigationPreprocessor(method="reweighting")
        preprocessor.fit(X_train, y_train, s_train)
        X_train, sample_weights = preprocessor.transform(X_train, y_train, s_train)

        model = RandomForestClassifier(random_state=42)
        model.fit(X_train, y_train, sample_weight=sample_weights)

    elif mitigation == "threshold_adjustment":
        # Post-processing mitigation
        model = RandomForestClassifier(random_state=42)
        model.fit(X_train, y_train)

        y_prob_train = model.predict_proba(X_train)[:, 1]

        post_processor = PostProcessingMitigation()
        post_processor.calibrate_thresholds(y_prob_train, y_train, s_train)

        y_prob_test = model.predict_proba(X_test)[:, 1]
        y_pred = post_processor.apply_thresholds(y_prob_test, s_test)

    else:
        model = RandomForestClassifier(random_state=42)
        model.fit(X_train, y_train)

    # Evaluate fairness
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]

    fairness_metrics = FairnessMetrics(y_test, y_pred, y_prob, s_test)
    report = fairness_metrics.generate_report()

    return model, report

Best Practices

  1. Measure multiple metrics: No single metric captures all fairness
  2. Choose appropriate metrics: Based on your use case
  3. Consider trade-offs: Fairness vs accuracy
  4. Monitor continuously: Bias can emerge over time
  5. Document decisions: Explain fairness choices
  6. Involve stakeholders: In defining fairness criteria

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.