Back to Blog
6 min read

dbt with AI: Accelerating Data Transformation Development

dbt (data build tool) has revolutionized data transformation. Adding AI to the workflow accelerates model development, improves documentation, and catches errors early. Let’s explore how to integrate AI into your dbt workflow.

AI-Powered dbt Model Generation

From Requirements to Models

from azure.ai.foundry import AIFoundryClient

class DBTModelGenerator:
    def __init__(self, llm_client: AIFoundryClient, project_path: str):
        self.llm = llm_client
        self.project_path = project_path

    async def generate_model(self, requirements: str, source_tables: list[dict]) -> dict:
        """Generate a dbt model from requirements."""

        # Format source information
        sources_context = self._format_sources(source_tables)

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "system",
                "content": """You are a dbt expert. Generate dbt models following best practices:
                - Use CTEs for readability
                - Include appropriate tests
                - Write comprehensive documentation
                - Follow naming conventions (stg_, int_, fct_, dim_)
                - Use Jinja for reusability"""
            }, {
                "role": "user",
                "content": f"""Generate a dbt model based on these requirements:

                Requirements:
                {requirements}

                Available source tables:
                {sources_context}

                Return JSON with:
                {{
                    "model_name": "name following conventions",
                    "description": "what this model does",
                    "sql": "the dbt SQL model",
                    "schema_yml": "the schema.yml content for this model",
                    "tests": ["list of recommended tests"],
                    "dependencies": ["upstream models needed"]
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

    def _format_sources(self, sources: list[dict]) -> str:
        lines = []
        for source in sources:
            lines.append(f"Table: {source['name']}")
            lines.append("Columns:")
            for col in source['columns']:
                lines.append(f"  - {col['name']}: {col['type']} - {col.get('description', '')}")
            lines.append("")
        return "\n".join(lines)

    def save_model(self, model: dict, layer: str = "marts"):
        """Save generated model to appropriate location."""
        import os

        # Determine path based on naming convention
        if model["model_name"].startswith("stg_"):
            folder = "staging"
        elif model["model_name"].startswith("int_"):
            folder = "intermediate"
        else:
            folder = layer

        model_path = os.path.join(self.project_path, "models", folder)
        os.makedirs(model_path, exist_ok=True)

        # Save SQL file
        sql_path = os.path.join(model_path, f"{model['model_name']}.sql")
        with open(sql_path, 'w') as f:
            f.write(model["sql"])

        # Append to schema.yml
        schema_path = os.path.join(model_path, "schema.yml")
        # ... append schema content

        return sql_path

Example Generated Model

-- models/marts/fct_daily_sales.sql
-- Generated by AI, reviewed by human

{{
    config(
        materialized='incremental',
        unique_key='sale_date_product_key',
        on_schema_change='sync_all_columns'
    )
}}

with source_sales as (
    select * from {{ ref('stg_sales_transactions') }}
    {% if is_incremental() %}
    where transaction_date > (select max(sale_date) from {{ this }})
    {% endif %}
),

source_products as (
    select * from {{ ref('dim_products') }}
),

source_customers as (
    select * from {{ ref('dim_customers') }}
),

aggregated as (
    select
        date_trunc('day', s.transaction_date) as sale_date,
        s.product_id,
        p.product_key,
        p.category,
        p.subcategory,
        s.customer_id,
        c.customer_key,
        c.segment,
        c.region,
        count(*) as transaction_count,
        sum(s.quantity) as total_quantity,
        sum(s.amount) as total_revenue,
        avg(s.amount) as avg_transaction_value
    from source_sales s
    left join source_products p on s.product_id = p.product_id
    left join source_customers c on s.customer_id = c.customer_id
    group by 1, 2, 3, 4, 5, 6, 7, 8, 9
),

final as (
    select
        {{ dbt_utils.generate_surrogate_key(['sale_date', 'product_key']) }} as sale_date_product_key,
        sale_date,
        product_key,
        category,
        subcategory,
        customer_key,
        segment,
        region,
        transaction_count,
        total_quantity,
        total_revenue,
        avg_transaction_value,
        current_timestamp as loaded_at
    from aggregated
)

select * from final

AI-Generated Documentation

class DBTDocGenerator:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def generate_model_docs(self, sql_content: str, model_name: str) -> str:
        """Generate documentation for existing dbt model."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Analyze this dbt model and generate schema.yml documentation:

                Model name: {model_name}
                SQL:
                ```sql
                {sql_content}
                ```

                Generate YAML in this format:
                ```yaml
                version: 2

                models:
                  - name: {model_name}
                    description: |
                      [Detailed description of what this model does]
                    columns:
                      - name: column_name
                        description: [What this column represents]
                        tests:
                          - [appropriate tests]
                ```

                For each column:
                - Write a clear business-friendly description
                - Suggest appropriate tests (not_null, unique, accepted_values, relationships)
                - Note any important business logic"""
            }]
        )

        return response.choices[0].message.content

    async def generate_column_descriptions(self, columns: list[str], context: str) -> dict:
        """Generate descriptions for a list of columns."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Generate clear, business-friendly descriptions for these columns.

                Context: {context}
                Columns: {columns}

                Return JSON: {{"column_name": "description", ...}}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

AI-Assisted Testing

class DBTTestGenerator:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def suggest_tests(self, model_sql: str, model_name: str) -> list[dict]:
        """Suggest tests for a dbt model."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Analyze this dbt model and suggest comprehensive tests:

                Model: {model_name}
                SQL:
                ```sql
                {model_sql}
                ```

                Suggest tests in these categories:
                1. Schema tests (not_null, unique, accepted_values, relationships)
                2. Data tests (custom SQL tests)
                3. Freshness tests (if applicable)

                Return JSON:
                {{
                    "schema_tests": [
                        {{"column": "col_name", "tests": ["not_null", "unique"]}}
                    ],
                    "data_tests": [
                        {{"name": "test_name", "description": "what it tests", "sql": "SELECT..."}}
                    ],
                    "freshness_tests": [
                        {{"table": "table_name", "warn_after": "12 hours", "error_after": "24 hours"}}
                    ]
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

    async def generate_custom_test(self, requirement: str, model_name: str) -> str:
        """Generate a custom data test."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""Create a dbt data test for this requirement:

                Model: {model_name}
                Requirement: {requirement}

                Return a dbt test SQL file that returns rows that FAIL the test.
                Include comments explaining the test."""
            }]
        )

        return response.choices[0].message.content

Intelligent Code Review

class DBTReviewer:
    def __init__(self, llm_client: AIFoundryClient):
        self.llm = llm_client

    async def review_model(self, sql_content: str) -> dict:
        """Review a dbt model for best practices."""

        response = await self.llm.chat.complete_async(
            deployment="gpt-4o",
            messages=[{
                "role": "system",
                "content": """You are a senior data engineer reviewing dbt code.
                Check for:
                - Performance issues (full table scans, missing filters on incrementals)
                - Best practice violations
                - Potential bugs
                - Readability improvements
                - Missing tests"""
            }, {
                "role": "user",
                "content": f"""Review this dbt model:

                ```sql
                {sql_content}
                ```

                Return JSON:
                {{
                    "score": 1-10,
                    "issues": [
                        {{"severity": "high|medium|low", "type": "performance|bug|style|best_practice", "description": "issue description", "suggestion": "how to fix", "line": line_number_or_null}}
                    ],
                    "positive_aspects": ["things done well"],
                    "suggested_refactor": "improved code if significant changes needed"
                }}"""
            }]
        )

        return json.loads(response.choices[0].message.content)

Integration with dbt CLI

import subprocess
import os

class DBTAIWorkflow:
    def __init__(self, project_path: str, llm_client: AIFoundryClient):
        self.project_path = project_path
        self.generator = DBTModelGenerator(llm_client, project_path)
        self.doc_generator = DBTDocGenerator(llm_client)
        self.test_generator = DBTTestGenerator(llm_client)
        self.reviewer = DBTReviewer(llm_client)

    async def create_model_workflow(self, requirements: str, sources: list[dict]):
        """Complete workflow for creating a new model."""

        # 1. Generate model
        print("Generating model...")
        model = await self.generator.generate_model(requirements, sources)

        # 2. Save model
        model_path = self.generator.save_model(model)
        print(f"Model saved to: {model_path}")

        # 3. Review generated code
        print("Reviewing generated code...")
        review = await self.reviewer.review_model(model["sql"])
        if review["score"] < 7:
            print(f"Warning: Model scored {review['score']}/10")
            for issue in review["issues"]:
                print(f"  - [{issue['severity']}] {issue['description']}")

        # 4. Generate tests
        print("Generating tests...")
        tests = await self.test_generator.suggest_tests(model["sql"], model["model_name"])

        # 5. Compile and test
        print("Compiling model...")
        result = subprocess.run(
            ["dbt", "compile", "--select", model["model_name"]],
            cwd=self.project_path,
            capture_output=True
        )

        if result.returncode != 0:
            print(f"Compilation failed: {result.stderr.decode()}")

        return {
            "model": model,
            "review": review,
            "tests": tests,
            "compiled": result.returncode == 0
        }

Best Practices

  1. Human review required: AI-generated models are starting points
  2. Validate logic: Check business rules are correctly implemented
  3. Test thoroughly: AI can miss edge cases
  4. Iterate: Use AI for quick iterations, refine manually
  5. Version control: Track AI-generated vs. human-modified code

AI accelerates dbt development but doesn’t replace data engineering expertise. Use it to handle boilerplate and documentation while focusing your expertise on business logic and optimization.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.