Back to Blog
7 min read

Testing Data Pipelines with AI: From Unit Tests to Integration Testing

Testing data pipelines is challenging - the data is complex, edge cases are numerous, and validation requires domain knowledge. AI can help generate test cases, create synthetic data, and validate outputs. Let’s explore AI-assisted pipeline testing.

The Testing Pyramid for Data Pipelines

                    ┌─────────┐
                    │ E2E     │  ← Full pipeline tests
                   ┌┴─────────┴┐
                   │Integration│  ← Multi-component tests
                  ┌┴───────────┴┐
                  │  Unit Tests  │  ← Single function tests
                 ┌┴─────────────┴┐
                 │ Data Contracts │  ← Schema validation
                └─────────────────┘

AI-Generated Test Cases

from azure.ai.foundry import AIFoundryClient

class PipelineTestGenerator:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def generate_unit_tests(self, function_code: str, function_name: str) -> str:
        """Generate unit tests for a data transformation function."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "system",
                "content": """You are a data engineering test expert.
                Generate comprehensive pytest tests including:
                - Happy path tests
                - Edge cases (nulls, empty data, type mismatches)
                - Boundary conditions
                - Error handling tests"""
            }, {
                "role": "user",
                "content": f"""Generate pytest tests for this function:

                ```python
                {function_code}
                ```

                Include:
                1. Test fixtures with sample data
                2. Parametrized tests for multiple scenarios
                3. Tests for error conditions
                4. Assertions for both output data and side effects"""
            }]
        )

        return response.choices[0].message.content

    async def generate_integration_tests(self, pipeline_spec: dict) -> str:
        """Generate integration tests for a data pipeline."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Generate integration tests for this pipeline:

                Pipeline specification:
                {json.dumps(pipeline_spec, indent=2)}

                Generate pytest tests that:
                1. Test data flow between components
                2. Verify transformations are applied correctly
                3. Check data integrity across the pipeline
                4. Test with both valid and invalid inputs
                5. Include setup and teardown for test resources

                Use mocks where appropriate for external dependencies."""
            }]
        )

        return response.choices[0].message.content

Generated Test Example

# Generated by AI, refined by human

import pytest
import pandas as pd
from pyspark.sql import SparkSession
from my_pipeline.transformations import clean_customer_data

@pytest.fixture
def spark():
    """Create a local Spark session for testing."""
    return SparkSession.builder \
        .master("local[1]") \
        .appName("test") \
        .getOrCreate()

@pytest.fixture
def sample_customer_data(spark):
    """Sample customer data for testing."""
    data = [
        (1, "John Doe", "john@email.com", "2024-01-15", 1000.50),
        (2, "Jane Smith", "jane@email.com", "2024-01-16", 2500.00),
        (3, None, "invalid-email", "2024-01-17", -100),  # Edge case: null name, invalid email, negative amount
        (4, "Bob Wilson", None, "invalid-date", None),  # Edge case: null email, invalid date, null amount
    ]
    return spark.createDataFrame(data, ["id", "name", "email", "signup_date", "lifetime_value"])

class TestCleanCustomerData:
    """Tests for the clean_customer_data transformation."""

    def test_removes_null_names(self, spark, sample_customer_data):
        """Records with null names should be filtered out."""
        result = clean_customer_data(sample_customer_data)
        assert result.filter("name IS NULL").count() == 0

    def test_validates_email_format(self, spark, sample_customer_data):
        """Invalid email formats should be flagged or fixed."""
        result = clean_customer_data(sample_customer_data)
        invalid_emails = result.filter("email NOT LIKE '%@%.%'").count()
        assert invalid_emails == 0 or result.filter("email_valid = False").count() > 0

    def test_handles_negative_values(self, spark, sample_customer_data):
        """Negative lifetime values should be set to 0 or flagged."""
        result = clean_customer_data(sample_customer_data)
        negative_values = result.filter("lifetime_value < 0").count()
        assert negative_values == 0

    def test_date_parsing(self, spark, sample_customer_data):
        """Invalid dates should be handled gracefully."""
        result = clean_customer_data(sample_customer_data)
        # Should not throw an exception
        assert result.count() > 0

    @pytest.mark.parametrize("input_name,expected_output", [
        ("  John Doe  ", "John Doe"),  # Trim whitespace
        ("JOHN DOE", "John Doe"),      # Title case
        ("john doe", "John Doe"),      # Title case
    ])
    def test_name_normalization(self, spark, input_name, expected_output):
        """Names should be normalized to title case and trimmed."""
        df = spark.createDataFrame([(1, input_name, "test@test.com", "2024-01-01", 100.0)],
                                   ["id", "name", "email", "signup_date", "lifetime_value"])
        result = clean_customer_data(df)
        actual_name = result.collect()[0]["name"]
        assert actual_name == expected_output

    def test_empty_dataframe(self, spark):
        """Should handle empty input gracefully."""
        empty_df = spark.createDataFrame([], "id INT, name STRING, email STRING, signup_date STRING, lifetime_value DOUBLE")
        result = clean_customer_data(empty_df)
        assert result.count() == 0

    def test_output_schema(self, spark, sample_customer_data):
        """Output schema should match expected structure."""
        result = clean_customer_data(sample_customer_data)
        expected_columns = {"id", "name", "email", "signup_date", "lifetime_value", "processed_at"}
        assert set(result.columns) == expected_columns or expected_columns.issubset(set(result.columns))

Synthetic Test Data Generation

class TestDataGenerator:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def generate_synthetic_data(self, schema: dict, num_rows: int, context: str) -> pd.DataFrame:
        """Generate synthetic test data that's realistic."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Generate {num_rows} rows of realistic synthetic test data.

                Schema:
                {json.dumps(schema, indent=2)}

                Context: {context}

                Requirements:
                - Data should be realistic and follow business rules
                - Include some edge cases (nulls, boundary values)
                - Mix of typical and unusual values
                - Maintain referential integrity where applicable

                Return as JSON array of objects."""
            }]
        )

        data = json.loads(response.choices[0].message.content)
        return pd.DataFrame(data)

    async def generate_edge_cases(self, schema: dict, context: str) -> list[dict]:
        """Generate specific edge case test data."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Generate edge case test data for this schema:

                Schema:
                {json.dumps(schema, indent=2)}

                Context: {context}

                Generate test cases for:
                1. Null values in each nullable column
                2. Boundary values (min/max)
                3. Empty strings
                4. Special characters
                5. Unicode characters
                6. Extremely long values
                7. Type edge cases (0, -1, MAX_INT, etc.)
                8. Date edge cases (leap years, timezone boundaries)

                Return JSON array where each object is a test case with:
                {{"description": "what this tests", "data": {{...row data...}}}}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

Contract Testing

class DataContractTester:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def generate_contract(self, sample_data: pd.DataFrame, context: str) -> dict:
        """Generate a data contract from sample data."""

        stats = sample_data.describe(include='all').to_dict()

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Generate a data contract from this sample:

                Statistics:
                {json.dumps(stats, indent=2, default=str)}

                Sample (first 5 rows):
                {sample_data.head().to_dict(orient='records')}

                Context: {context}

                Return a data contract JSON:
                {{
                    "schema": [
                        {{"column": "name", "type": "string|int|float|datetime|bool", "nullable": true|false, "description": "..."}}
                    ],
                    "constraints": [
                        {{"type": "not_null|unique|range|pattern|enum", "column": "col", "parameters": {{}}}}
                    ],
                    "freshness": {{"max_age_hours": 24}},
                    "volume": {{"min_rows": 1000, "max_rows": 1000000}}
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

    def validate_contract(self, df: pd.DataFrame, contract: dict) -> dict:
        """Validate data against a contract."""

        violations = []

        # Check schema
        for col_spec in contract["schema"]:
            col = col_spec["column"]

            if col not in df.columns:
                violations.append({"type": "missing_column", "column": col})
                continue

            if not col_spec["nullable"] and df[col].isnull().any():
                violations.append({
                    "type": "null_violation",
                    "column": col,
                    "null_count": int(df[col].isnull().sum())
                })

        # Check constraints
        for constraint in contract["constraints"]:
            violation = self._check_constraint(df, constraint)
            if violation:
                violations.append(violation)

        # Check volume
        if "volume" in contract:
            if len(df) < contract["volume"].get("min_rows", 0):
                violations.append({
                    "type": "volume_violation",
                    "expected_min": contract["volume"]["min_rows"],
                    "actual": len(df)
                })

        return {
            "valid": len(violations) == 0,
            "violations": violations
        }

Output Validation with AI

class OutputValidator:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def validate_transformation_output(
        self,
        input_data: pd.DataFrame,
        output_data: pd.DataFrame,
        transformation_description: str
    ) -> dict:
        """Validate that transformation output is correct."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Validate this data transformation output:

                Transformation: {transformation_description}

                Input (sample):
                {input_data.head(10).to_markdown()}

                Output (sample):
                {output_data.head(10).to_markdown()}

                Check:
                1. Is the transformation applied correctly?
                2. Are there any data integrity issues?
                3. Are all expected columns present?
                4. Do the values look reasonable?

                Return JSON:
                {{
                    "valid": true|false,
                    "issues": [
                        {{"description": "issue", "severity": "error|warning", "example": "..."}}
                    ],
                    "suggestions": ["improvements"]
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

    async def compare_expected_vs_actual(
        self,
        expected: pd.DataFrame,
        actual: pd.DataFrame,
        context: str
    ) -> dict:
        """Compare expected and actual outputs with intelligent diff."""

        # Find differences
        expected_set = set(expected.to_records(index=False).tolist())
        actual_set = set(actual.to_records(index=False).tolist())

        missing = expected_set - actual_set
        unexpected = actual_set - expected_set

        if not missing and not unexpected:
            return {"match": True}

        # Use AI to explain differences
        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Analyze the differences between expected and actual data:

                Context: {context}

                Expected columns: {list(expected.columns)}
                Actual columns: {list(actual.columns)}

                Missing from actual (sample): {list(missing)[:5]}
                Unexpected in actual (sample): {list(unexpected)[:5]}

                Expected row count: {len(expected)}
                Actual row count: {len(actual)}

                Explain:
                1. What's different?
                2. Why might this have happened?
                3. Is this a bug or expected behavior?"""
            }]
        )

        return {
            "match": False,
            "missing_count": len(missing),
            "unexpected_count": len(unexpected),
            "analysis": response.choices[0].message.content
        }

Best Practices

  1. Generate tests early: Use AI to create tests before implementation
  2. Include edge cases: AI is good at thinking of unusual scenarios
  3. Validate with domain knowledge: AI can miss business-specific rules
  4. Version test data: Keep synthetic data reproducible
  5. Continuous testing: Run tests on every pipeline change

AI-assisted testing catches more bugs earlier. Combine generated tests with manual review to ensure comprehensive coverage.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.