6 min read
dbt with AI: Accelerating Data Transformation Development
dbt (data build tool) has revolutionized data transformation. Adding AI to the workflow accelerates model development, improves documentation, and catches errors early. Let’s explore how to integrate AI into your dbt workflow.
AI-Powered dbt Model Generation
From Requirements to Models
from azure.ai.foundry import AIFoundryClient
class DBTModelGenerator:
def __init__(self, llm_client: AIFoundryClient, project_path: str):
self.llm = llm_client
self.project_path = project_path
async def generate_model(self, requirements: str, source_tables: list[dict]) -> dict:
"""Generate a dbt model from requirements."""
# Format source information
sources_context = self._format_sources(source_tables)
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "system",
"content": """You are a dbt expert. Generate dbt models following best practices:
- Use CTEs for readability
- Include appropriate tests
- Write comprehensive documentation
- Follow naming conventions (stg_, int_, fct_, dim_)
- Use Jinja for reusability"""
}, {
"role": "user",
"content": f"""Generate a dbt model based on these requirements:
Requirements:
{requirements}
Available source tables:
{sources_context}
Return JSON with:
{{
"model_name": "name following conventions",
"description": "what this model does",
"sql": "the dbt SQL model",
"schema_yml": "the schema.yml content for this model",
"tests": ["list of recommended tests"],
"dependencies": ["upstream models needed"]
}}"""
}]
)
return json.loads(response.choices[0].message.content)
def _format_sources(self, sources: list[dict]) -> str:
lines = []
for source in sources:
lines.append(f"Table: {source['name']}")
lines.append("Columns:")
for col in source['columns']:
lines.append(f" - {col['name']}: {col['type']} - {col.get('description', '')}")
lines.append("")
return "\n".join(lines)
def save_model(self, model: dict, layer: str = "marts"):
"""Save generated model to appropriate location."""
import os
# Determine path based on naming convention
if model["model_name"].startswith("stg_"):
folder = "staging"
elif model["model_name"].startswith("int_"):
folder = "intermediate"
else:
folder = layer
model_path = os.path.join(self.project_path, "models", folder)
os.makedirs(model_path, exist_ok=True)
# Save SQL file
sql_path = os.path.join(model_path, f"{model['model_name']}.sql")
with open(sql_path, 'w') as f:
f.write(model["sql"])
# Append to schema.yml
schema_path = os.path.join(model_path, "schema.yml")
# ... append schema content
return sql_path
Example Generated Model
-- models/marts/fct_daily_sales.sql
-- Generated by AI, reviewed by human
{{
config(
materialized='incremental',
unique_key='sale_date_product_key',
on_schema_change='sync_all_columns'
)
}}
with source_sales as (
select * from {{ ref('stg_sales_transactions') }}
{% if is_incremental() %}
where transaction_date > (select max(sale_date) from {{ this }})
{% endif %}
),
source_products as (
select * from {{ ref('dim_products') }}
),
source_customers as (
select * from {{ ref('dim_customers') }}
),
aggregated as (
select
date_trunc('day', s.transaction_date) as sale_date,
s.product_id,
p.product_key,
p.category,
p.subcategory,
s.customer_id,
c.customer_key,
c.segment,
c.region,
count(*) as transaction_count,
sum(s.quantity) as total_quantity,
sum(s.amount) as total_revenue,
avg(s.amount) as avg_transaction_value
from source_sales s
left join source_products p on s.product_id = p.product_id
left join source_customers c on s.customer_id = c.customer_id
group by 1, 2, 3, 4, 5, 6, 7, 8, 9
),
final as (
select
{{ dbt_utils.generate_surrogate_key(['sale_date', 'product_key']) }} as sale_date_product_key,
sale_date,
product_key,
category,
subcategory,
customer_key,
segment,
region,
transaction_count,
total_quantity,
total_revenue,
avg_transaction_value,
current_timestamp as loaded_at
from aggregated
)
select * from final
AI-Generated Documentation
class DBTDocGenerator:
def __init__(self, llm_client: AIFoundryClient):
self.llm = llm_client
async def generate_model_docs(self, sql_content: str, model_name: str) -> str:
"""Generate documentation for existing dbt model."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Analyze this dbt model and generate schema.yml documentation:
Model name: {model_name}
SQL:
```sql
{sql_content}
```
Generate YAML in this format:
```yaml
version: 2
models:
- name: {model_name}
description: |
[Detailed description of what this model does]
columns:
- name: column_name
description: [What this column represents]
tests:
- [appropriate tests]
```
For each column:
- Write a clear business-friendly description
- Suggest appropriate tests (not_null, unique, accepted_values, relationships)
- Note any important business logic"""
}]
)
return response.choices[0].message.content
async def generate_column_descriptions(self, columns: list[str], context: str) -> dict:
"""Generate descriptions for a list of columns."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Generate clear, business-friendly descriptions for these columns.
Context: {context}
Columns: {columns}
Return JSON: {{"column_name": "description", ...}}"""
}]
)
return json.loads(response.choices[0].message.content)
AI-Assisted Testing
class DBTTestGenerator:
def __init__(self, llm_client: AIFoundryClient):
self.llm = llm_client
async def suggest_tests(self, model_sql: str, model_name: str) -> list[dict]:
"""Suggest tests for a dbt model."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Analyze this dbt model and suggest comprehensive tests:
Model: {model_name}
SQL:
```sql
{model_sql}
```
Suggest tests in these categories:
1. Schema tests (not_null, unique, accepted_values, relationships)
2. Data tests (custom SQL tests)
3. Freshness tests (if applicable)
Return JSON:
{{
"schema_tests": [
{{"column": "col_name", "tests": ["not_null", "unique"]}}
],
"data_tests": [
{{"name": "test_name", "description": "what it tests", "sql": "SELECT..."}}
],
"freshness_tests": [
{{"table": "table_name", "warn_after": "12 hours", "error_after": "24 hours"}}
]
}}"""
}]
)
return json.loads(response.choices[0].message.content)
async def generate_custom_test(self, requirement: str, model_name: str) -> str:
"""Generate a custom data test."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Create a dbt data test for this requirement:
Model: {model_name}
Requirement: {requirement}
Return a dbt test SQL file that returns rows that FAIL the test.
Include comments explaining the test."""
}]
)
return response.choices[0].message.content
Intelligent Code Review
class DBTReviewer:
def __init__(self, llm_client: AIFoundryClient):
self.llm = llm_client
async def review_model(self, sql_content: str) -> dict:
"""Review a dbt model for best practices."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "system",
"content": """You are a senior data engineer reviewing dbt code.
Check for:
- Performance issues (full table scans, missing filters on incrementals)
- Best practice violations
- Potential bugs
- Readability improvements
- Missing tests"""
}, {
"role": "user",
"content": f"""Review this dbt model:
```sql
{sql_content}
```
Return JSON:
{{
"score": 1-10,
"issues": [
{{"severity": "high|medium|low", "type": "performance|bug|style|best_practice", "description": "issue description", "suggestion": "how to fix", "line": line_number_or_null}}
],
"positive_aspects": ["things done well"],
"suggested_refactor": "improved code if significant changes needed"
}}"""
}]
)
return json.loads(response.choices[0].message.content)
Integration with dbt CLI
import subprocess
import os
class DBTAIWorkflow:
def __init__(self, project_path: str, llm_client: AIFoundryClient):
self.project_path = project_path
self.generator = DBTModelGenerator(llm_client, project_path)
self.doc_generator = DBTDocGenerator(llm_client)
self.test_generator = DBTTestGenerator(llm_client)
self.reviewer = DBTReviewer(llm_client)
async def create_model_workflow(self, requirements: str, sources: list[dict]):
"""Complete workflow for creating a new model."""
# 1. Generate model
print("Generating model...")
model = await self.generator.generate_model(requirements, sources)
# 2. Save model
model_path = self.generator.save_model(model)
print(f"Model saved to: {model_path}")
# 3. Review generated code
print("Reviewing generated code...")
review = await self.reviewer.review_model(model["sql"])
if review["score"] < 7:
print(f"Warning: Model scored {review['score']}/10")
for issue in review["issues"]:
print(f" - [{issue['severity']}] {issue['description']}")
# 4. Generate tests
print("Generating tests...")
tests = await self.test_generator.suggest_tests(model["sql"], model["model_name"])
# 5. Compile and test
print("Compiling model...")
result = subprocess.run(
["dbt", "compile", "--select", model["model_name"]],
cwd=self.project_path,
capture_output=True
)
if result.returncode != 0:
print(f"Compilation failed: {result.stderr.decode()}")
return {
"model": model,
"review": review,
"tests": tests,
"compiled": result.returncode == 0
}
Best Practices
- Human review required: AI-generated models are starting points
- Validate logic: Check business rules are correctly implemented
- Test thoroughly: AI can miss edge cases
- Iterate: Use AI for quick iterations, refine manually
- Version control: Track AI-generated vs. human-modified code
AI accelerates dbt development but doesn’t replace data engineering expertise. Use it to handle boilerplate and documentation while focusing your expertise on business logic and optimization.