Skip to content
Back to Blog
1 min read

dbt with AI: Accelerating Data Transformation Development

I wrote “dbt with AI: Accelerating Data Transformation Development” to share practical, production-minded guidance on this topic.

AI-Powered dbt Model Generation

From Requirements to Models

from azure.ai.foundry import AIFoundryClient

class DBTModelGenerator:
    def __init__(self, llm_client: AIFoundryClient, project_path: str):
        self.llm = llm_client
        self.project_path = project_path

    async def generate_model(self, requirements: str, source_tables: list[dict]) -> dict:
        """Generate a dbt model from requirements."""

        # Format source information
        sources_context = self._format_sources(source_tables)

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "system",
                "content": """You are a dbt expert. Generate dbt models following best practices:
                - Use CTEs for readability
                - Include appropriate tests
                - Write comprehensive documentation
                - Follow naming conventions (stg_, int_, fct_, dim_)
                - Use Jinja for reusability"""
            }, {
                "role": "user",
                "content": f"""Generate a dbt model based on these requirements:

                Requirements:
                {requirements}

                Available source tables:
                {sources_context}

                Return JSON with:
                {{
                    "model_name": "name following conventions",
                    "description": "what this model does",
                    "sql": "the dbt SQL model",
                    "schema_yml": "the schema.yml content for this model",
                    "tests": ["list of recommended tests"],
                    "dependencies": ["upstream models needed"]
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

    def _format_sources(self, sources: list[dict]) -> str:
        lines = []
        for source in sources:
            lines.append(f"Table: {source['name']}")
            lines.append("Columns:")
            for col in source['columns']:
                lines.append(f"  - {col['name']}: {col['type']} - {col.get('description', '')}")
            lines.append("")
        return "\n".join(lines)

    def save_model(self, model: dict, layer: str = "marts"):
        """Save generated model to appropriate location."""
        import os

        # Determine path based on naming convention
        if model["model_name"].startswith("stg_"):
            folder = "staging"
        elif model["model_name"].startswith("int_"):
            folder = "intermediate"
        else:
            folder = layer

        model_path = os.path.join(self.project_path, "models", folder)
        os.makedirs(model_path, exist_ok=True)

        # Save SQL file
        sql_path = os.path.join(model_path, f"{model['model_name']}.sql")
        with open(sql_path, 'w') as f:
            f.write(model["sql"])

        # Append to schema.yml
        schema_path = os.path.join(model_path, "schema.yml")
        # ... append schema content

        return sql_path

Example Generated Model

-- models/marts/fct_daily_sales.sql
-- Generated by AI, reviewed by human

{{
    config(
        materialized='incremental',
        unique_key='sale_date_product_key',
        on_schema_change='sync_all_columns'
    )
}}

with source_sales as (
    select * from {{ ref('stg_sales_transactions') }}
    {% if is_incremental() %}
    where transaction_date > (select max(sale_date) from {{ this }})
    {% endif %}
),

source_products as (
    select * from {{ ref('dim_products') }}
),

source_customers as (
    select * from {{ ref('dim_customers') }}
),

aggregated as (
    select
        date_trunc('day', s.transaction_date) as sale_date,
        s.product_id,
        p.product_key,
        p.category,
        p.subcategory,
        s.customer_id,
        c.customer_key,
        c.segment,
        c.region,
        count(*) as transaction_count,
        sum(s.quantity) as total_quantity,
        sum(s.amount) as total_revenue,
        avg(s.amount) as avg_transaction_value
    from source_sales s
    left join source_products p on s.product_id = p.product_id
    left join source_customers c on s.customer_id = c.customer_id
    group by 1, 2, 3, 4, 5, 6, 7, 8, 9
),

final as (
    select
        {{ dbt_utils.generate_surrogate_key(['sale_date', 'product_key']) }} as sale_date_product_key,
        sale_date,
        product_key,
        category,
        subcategory,
        customer_key,
        segment,
        region,
        transaction_count,
        total_quantity,
        total_revenue,
        avg_transaction_value,
        current_timestamp as loaded_at
    from aggregated
)

select * from final

AI-Generated Documentation

class DBTDocGenerator:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def generate_model_docs(self, sql_content: str, model_name: str) -> str:
        """Generate documentation for existing dbt model."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Analyze this dbt model and generate schema.yml documentation:

                Model name: {model_name}
                SQL:
                ```sql
                {sql_content}
                ```

                Generate YAML in this format:
                ```yaml
                version: 2

                models:
                  - name: {model_name}
                    description: |
                      [Detailed description of what this model does]
                    columns:
                      - name: column_name
                        description: [What this column represents]
                        tests:
                          - [appropriate tests]
                ```

                For each column:
                - Write a clear business-friendly description
                - Suggest appropriate tests (not_null, unique, accepted_values, relationships)
                - Note any important business logic"""
            }]
        )

        return response.choices[0].message.content

    async def generate_column_descriptions(self, columns: list[str], context: str) -> dict:
        """Generate descriptions for a list of columns."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Generate clear, business-friendly descriptions for these columns.

                Context: {context}
                Columns: {columns}

                Return JSON: {{"column_name": "description", ...}}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

AI-Assisted Testing

class DBTTestGenerator:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def suggest_tests(self, model_sql: str, model_name: str) -> list[dict]:
        """Suggest tests for a dbt model."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Analyze this dbt model and suggest comprehensive tests:

                Model: {model_name}
                SQL:
                ```sql
                {model_sql}
                ```

                Suggest tests in these categories:
                1. Schema tests (not_null, unique, accepted_values, relationships)
                2. Data tests (custom SQL tests)
                3. Freshness tests (if applicable)

                Return JSON:
                {{
                    "schema_tests": [
                        {{"column": "col_name", "tests": ["not_null", "unique"]}}
                    ],
                    "data_tests": [
                        {{"name": "test_name", "description": "what it tests", "sql": "SELECT..."}}
                    ],
                    "freshness_tests": [
                        {{"table": "table_name", "warn_after": "12 hours", "error_after": "24 hours"}}
                    ]
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

    async def generate_custom_test(self, requirement: str, model_name: str) -> str:
        """Generate a custom data test."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Create a dbt data test for this requirement:

                Model: {model_name}
                Requirement: {requirement}

                Return a dbt test SQL file that returns rows that FAIL the test.
                Include comments explaining the test."""
            }]
        )

        return response.choices[0].message.content

Intelligent Code Review

class DBTReviewer:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def review_model(self, sql_content: str) -> dict:
        """Review a dbt model for best practices."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "system",
                "content": """You are a senior data engineer reviewing dbt code.
                Check for:
                - Performance issues (full table scans, missing filters on incrementals)
                - Best practice violations
                - Potential bugs
                - Readability improvements
                - Missing tests"""
            }, {
                "role": "user",
                "content": f"""Review this dbt model:

                ```sql
                {sql_content}
                ```

                Return JSON:
                {{
                    "score": 1-10,
                    "issues": [
                        {{"severity": "high|medium|low", "type": "performance|bug|style|best_practice", "description": "issue description", "suggestion": "how to fix", "line": line_number_or_null}}
                    ],
                    "positive_aspects": ["things done well"],
                    "suggested_refactor": "improved code if significant changes needed"
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

Integration with dbt CLI

import subprocess
import os

class DBTAIWorkflow:
    def __init__(self, project_path: str, llm_client: AIFoundryClient):
        self.project_path = project_path
        self.generator = DBTModelGenerator(llm_client, project_path)
        self.doc_generator = DBTDocGenerator(llm_client)
        self.test_generator = DBTTestGenerator(llm_client)
        self.reviewer = DBTReviewer(llm_client)

    async def create_model_workflow(self, requirements: str, sources: list[dict]):
        """Complete workflow for creating a new model."""

        # 1. Generate model
        print("Generating model...")
        model = await self.generator.generate_model(requirements, sources)

        # 2. Save model
        model_path = self.generator.save_model(model)
        print(f"Model saved to: {model_path}")

        # 3. Review generated code
        print("Reviewing generated code...")
        review = await self.reviewer.review_model(model["sql"])
        if review["score"] < 7:
            print(f"Warning: Model scored {review['score']}/10")
            for issue in review["issues"]:
                print(f"  - [{issue['severity']}] {issue['description']}")

        # 4. Generate tests
        print("Generating tests...")
        tests = await self.test_generator.suggest_tests(model["sql"], model["model_name"])

        # 5. Compile and test
        print("Compiling model...")
        result = subprocess.run(
            ["dbt", "compile", "--select", model["model_name"]],
            cwd=self.project_path,
            capture_output=True
        )

        if result.returncode != 0:
            print(f"Compilation failed: {result.stderr.decode()}")

        return {
            "model": model,
            "review": review,
            "tests": tests,
            "compiled": result.returncode == 0
        }

Best Practices

  1. Human review required: AI-generated models are starting points
  2. Validate logic: Check business rules are correctly implemented
  3. Test thoroughly: AI can miss edge cases
  4. Iterate: Use AI for quick iterations, refine manually
  5. Version control: Track AI-generated vs. human-modified code

AI accelerates dbt development but doesn’t replace data engineering expertise. Use it to handle boilerplate and documentation while focusing your expertise on business logic and optimization.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.