Back to Blog
6 min read

DataOps Maturity: Building Reliable Data Operations

DataOps maturity determines how reliably and efficiently organizations can deliver data. Let’s examine how to build mature DataOps practices.

DataOps Maturity Model

Level 1: Manual              Level 2: Automated           Level 3: Orchestrated
───────────────             ─────────────────           ────────────────────
Manual deployments          CI/CD pipelines             Full GitOps
Ad-hoc testing              Automated testing           Continuous testing
Reactive monitoring         Basic monitoring            Observability
Manual quality checks       Automated quality           Quality gates
Individual practices        Team standards              Organization standards

Level 4: Intelligent        Level 5: Autonomous
──────────────────         ──────────────────
AI-assisted operations     Self-healing systems
Predictive quality         Autonomous optimization
Smart alerting             Auto-remediation
ML-driven insights         Continuous improvement

Core DataOps Practices

Practice 1: Version Control Everything

# Project structure for version-controlled data platform
"""
data-platform/
├── .github/
│   └── workflows/
│       ├── ci.yml                 # Continuous integration
│       ├── cd-dev.yml            # Deploy to dev
│       └── cd-prod.yml           # Deploy to production
├── src/
│   ├── notebooks/                 # Fabric notebooks
│   ├── pipelines/                 # Data Factory/Pipeline definitions
│   ├── dataflows/                 # Dataflow definitions
│   └── semantic-models/           # Power BI semantic models
├── tests/
│   ├── unit/                      # Unit tests
│   ├── integration/               # Integration tests
│   └── data-quality/              # Data quality tests
├── config/
│   ├── dev.yml                    # Dev environment config
│   ├── staging.yml                # Staging config
│   └── prod.yml                   # Production config
└── docs/
    ├── runbooks/                  # Operational runbooks
    └── architecture/              # Architecture docs
"""

# Git branching strategy
branching_strategy = {
    "main": "Production-ready code",
    "develop": "Integration branch",
    "feature/*": "New features",
    "fix/*": "Bug fixes",
    "release/*": "Release candidates"
}

Practice 2: CI/CD for Data

# .github/workflows/ci.yml
name: Data Platform CI

on:
  pull_request:
    branches: [main, develop]

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Lint Python
        run: |
          pip install ruff
          ruff check src/

      - name: Lint SQL
        run: |
          pip install sqlfluff
          sqlfluff lint src/**/*.sql

  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Run unit tests
        run: |
          pip install pytest
          pytest tests/unit/

      - name: Run data quality tests
        run: |
          pip install great_expectations
          great_expectations checkpoint run data_quality

  validate:
    runs-on: ubuntu-latest
    steps:
      - name: Validate pipeline definitions
        run: python scripts/validate_pipelines.py

      - name: Validate semantic models
        run: python scripts/validate_semantic_models.py
# .github/workflows/cd-prod.yml
name: Deploy to Production

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    environment: production

    steps:
      - uses: actions/checkout@v4

      - name: Azure Login
        uses: azure/login@v1
        with:
          creds: ${{ secrets.AZURE_CREDENTIALS }}

      - name: Deploy to Fabric
        run: |
          python scripts/deploy_fabric.py \
            --workspace ${{ vars.FABRIC_WORKSPACE }} \
            --environment production

      - name: Run smoke tests
        run: pytest tests/smoke/

      - name: Notify team
        if: success()
        run: |
          curl -X POST ${{ secrets.TEAMS_WEBHOOK }} \
            -H 'Content-Type: application/json' \
            -d '{"text": "Production deployment successful"}'

Practice 3: Automated Testing

import pytest
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import SparkDFDataset

class TestDataQuality:
    """Data quality tests using Great Expectations."""

    def test_customer_data_completeness(self, spark_session):
        """Test customer data has required fields."""
        df = spark_session.read.table("gold.customers")
        ge_df = SparkDFDataset(df)

        # Required fields should not be null
        result = ge_df.expect_column_values_to_not_be_null("customer_id")
        assert result.success

        result = ge_df.expect_column_values_to_not_be_null("email")
        assert result.success

    def test_customer_data_validity(self, spark_session):
        """Test customer data validity."""
        df = spark_session.read.table("gold.customers")
        ge_df = SparkDFDataset(df)

        # Email should match pattern
        result = ge_df.expect_column_values_to_match_regex(
            "email",
            r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
        )
        assert result.success

    def test_sales_data_freshness(self, spark_session):
        """Test sales data is fresh."""
        df = spark_session.read.table("gold.sales")

        max_date = df.agg({"sale_date": "max"}).collect()[0][0]
        days_old = (datetime.now() - max_date).days

        assert days_old <= 1, f"Sales data is {days_old} days old"

    def test_referential_integrity(self, spark_session):
        """Test foreign key relationships."""
        sales = spark_session.read.table("gold.sales")
        customers = spark_session.read.table("gold.customers")

        orphan_count = sales.join(
            customers,
            sales.customer_id == customers.customer_id,
            "left_anti"
        ).count()

        assert orphan_count == 0, f"{orphan_count} orphan sales records"

class TestPipelineLogic:
    """Unit tests for pipeline transformation logic."""

    def test_revenue_calculation(self):
        """Test revenue calculation logic."""
        from src.transforms import calculate_revenue

        input_data = [
            {"quantity": 10, "unit_price": 5.0, "discount": 0.1},
            {"quantity": 5, "unit_price": 10.0, "discount": 0.0}
        ]

        result = calculate_revenue(input_data)

        assert result[0]["revenue"] == 45.0  # 10 * 5 * 0.9
        assert result[1]["revenue"] == 50.0  # 5 * 10 * 1.0

Practice 4: Observability

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import logging

@dataclass
class PipelineMetrics:
    pipeline_name: str
    run_id: str
    start_time: datetime
    end_time: Optional[datetime]
    status: str
    rows_read: int
    rows_written: int
    errors: int

class DataObservability:
    """Comprehensive data observability."""

    def __init__(self):
        self.logger = logging.getLogger("dataops")
        self.metrics_client = MetricsClient()
        self.alerter = AlertService()

    def track_pipeline_run(self, metrics: PipelineMetrics):
        """Track pipeline execution metrics."""

        # Log execution
        self.logger.info(
            f"Pipeline {metrics.pipeline_name} completed",
            extra={
                "run_id": metrics.run_id,
                "duration_seconds": (metrics.end_time - metrics.start_time).seconds,
                "rows_processed": metrics.rows_written,
                "status": metrics.status
            }
        )

        # Send metrics
        self.metrics_client.gauge(
            f"pipeline.{metrics.pipeline_name}.duration",
            (metrics.end_time - metrics.start_time).seconds
        )
        self.metrics_client.gauge(
            f"pipeline.{metrics.pipeline_name}.rows",
            metrics.rows_written
        )

        # Alert on failure
        if metrics.status == "failed":
            self.alerter.send(
                severity="high",
                title=f"Pipeline {metrics.pipeline_name} failed",
                details={"run_id": metrics.run_id}
            )

    def track_data_quality(self, table: str, metrics: dict):
        """Track data quality metrics."""

        # Send metrics
        for metric_name, value in metrics.items():
            self.metrics_client.gauge(
                f"data_quality.{table}.{metric_name}",
                value
            )

        # Alert on quality issues
        if metrics.get("null_percentage", 0) > 5:
            self.alerter.send(
                severity="medium",
                title=f"High null percentage in {table}",
                details=metrics
            )

    def create_dashboard(self, workspace: str):
        """Create observability dashboard."""
        dashboard_config = {
            "panels": [
                {
                    "title": "Pipeline Success Rate",
                    "query": "success_count / total_count",
                    "visualization": "gauge"
                },
                {
                    "title": "Data Freshness",
                    "query": "max(last_update_time) by table",
                    "visualization": "table"
                },
                {
                    "title": "Pipeline Duration Trend",
                    "query": "avg(duration) by pipeline over time",
                    "visualization": "line_chart"
                },
                {
                    "title": "Data Quality Score",
                    "query": "avg(quality_score) by table",
                    "visualization": "bar_chart"
                }
            ]
        }
        return dashboard_config

Practice 5: Incident Management

class IncidentManagement:
    """Structured incident management for data issues."""

    severity_levels = {
        "critical": {
            "response_time": "15 minutes",
            "examples": ["Production pipeline down", "Data corruption"],
            "notification": ["PagerDuty", "Phone"]
        },
        "high": {
            "response_time": "1 hour",
            "examples": ["Significant data delay", "Quality below threshold"],
            "notification": ["Slack", "Email"]
        },
        "medium": {
            "response_time": "4 hours",
            "examples": ["Non-critical pipeline failure", "Minor quality issues"],
            "notification": ["Slack"]
        },
        "low": {
            "response_time": "1 business day",
            "examples": ["Performance degradation", "Warning thresholds"],
            "notification": ["Email"]
        }
    }

    async def handle_incident(self, incident: dict):
        """Handle data incident."""

        # 1. Classify severity
        severity = self.classify_severity(incident)

        # 2. Create incident ticket
        ticket = await self.create_ticket(incident, severity)

        # 3. Notify appropriate team
        await self.notify(severity, ticket)

        # 4. Begin diagnosis
        diagnosis = await self.auto_diagnose(incident)

        # 5. Execute runbook if available
        if runbook := self.get_runbook(incident["type"]):
            await self.execute_runbook(runbook, incident)

        return ticket

Maturity Assessment

def assess_dataops_maturity(organization: dict) -> dict:
    """Assess DataOps maturity level."""

    dimensions = {
        "version_control": assess_version_control(organization),
        "ci_cd": assess_ci_cd(organization),
        "testing": assess_testing(organization),
        "monitoring": assess_monitoring(organization),
        "incident_management": assess_incidents(organization),
        "automation": assess_automation(organization)
    }

    overall_score = sum(dimensions.values()) / len(dimensions)

    if overall_score >= 4.5:
        level = 5
    elif overall_score >= 3.5:
        level = 4
    elif overall_score >= 2.5:
        level = 3
    elif overall_score >= 1.5:
        level = 2
    else:
        level = 1

    return {
        "level": level,
        "score": overall_score,
        "dimensions": dimensions,
        "recommendations": generate_recommendations(dimensions)
    }

DataOps maturity directly impacts data reliability and team productivity. Invest in these practices to build a robust data operation.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.