1 min read
dbt with AI: Accelerating Data Transformation Development
I wrote “dbt with AI: Accelerating Data Transformation Development” to share practical, production-minded guidance on this topic.
AI-Powered dbt Model Generation
From Requirements to Models
from azure.ai.foundry import AIFoundryClient
class DBTModelGenerator:
def __init__(self, llm_client: AIFoundryClient, project_path: str):
self.llm = llm_client
self.project_path = project_path
async def generate_model(self, requirements: str, source_tables: list[dict]) -> dict:
"""Generate a dbt model from requirements."""
# Format source information
sources_context = self._format_sources(source_tables)
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "system",
"content": """You are a dbt expert. Generate dbt models following best practices:
- Use CTEs for readability
- Include appropriate tests
- Write comprehensive documentation
- Follow naming conventions (stg_, int_, fct_, dim_)
- Use Jinja for reusability"""
}, {
"role": "user",
"content": f"""Generate a dbt model based on these requirements:
Requirements:
{requirements}
Available source tables:
{sources_context}
Return JSON with:
{{
"model_name": "name following conventions",
"description": "what this model does",
"sql": "the dbt SQL model",
"schema_yml": "the schema.yml content for this model",
"tests": ["list of recommended tests"],
"dependencies": ["upstream models needed"]
}}"""
}]
)
return json.loads(response.choices[0].message.content)
def _format_sources(self, sources: list[dict]) -> str:
lines = []
for source in sources:
lines.append(f"Table: {source['name']}")
lines.append("Columns:")
for col in source['columns']:
lines.append(f" - {col['name']}: {col['type']} - {col.get('description', '')}")
lines.append("")
return "\n".join(lines)
def save_model(self, model: dict, layer: str = "marts"):
"""Save generated model to appropriate location."""
import os
# Determine path based on naming convention
if model["model_name"].startswith("stg_"):
folder = "staging"
elif model["model_name"].startswith("int_"):
folder = "intermediate"
else:
folder = layer
model_path = os.path.join(self.project_path, "models", folder)
os.makedirs(model_path, exist_ok=True)
# Save SQL file
sql_path = os.path.join(model_path, f"{model['model_name']}.sql")
with open(sql_path, 'w') as f:
f.write(model["sql"])
# Append to schema.yml
schema_path = os.path.join(model_path, "schema.yml")
# ... append schema content
return sql_path
Example Generated Model
-- models/marts/fct_daily_sales.sql
-- Generated by AI, reviewed by human
{{
config(
materialized='incremental',
unique_key='sale_date_product_key',
on_schema_change='sync_all_columns'
)
}}
with source_sales as (
select * from {{ ref('stg_sales_transactions') }}
{% if is_incremental() %}
where transaction_date > (select max(sale_date) from {{ this }})
{% endif %}
),
source_products as (
select * from {{ ref('dim_products') }}
),
source_customers as (
select * from {{ ref('dim_customers') }}
),
aggregated as (
select
date_trunc('day', s.transaction_date) as sale_date,
s.product_id,
p.product_key,
p.category,
p.subcategory,
s.customer_id,
c.customer_key,
c.segment,
c.region,
count(*) as transaction_count,
sum(s.quantity) as total_quantity,
sum(s.amount) as total_revenue,
avg(s.amount) as avg_transaction_value
from source_sales s
left join source_products p on s.product_id = p.product_id
left join source_customers c on s.customer_id = c.customer_id
group by 1, 2, 3, 4, 5, 6, 7, 8, 9
),
final as (
select
{{ dbt_utils.generate_surrogate_key(['sale_date', 'product_key']) }} as sale_date_product_key,
sale_date,
product_key,
category,
subcategory,
customer_key,
segment,
region,
transaction_count,
total_quantity,
total_revenue,
avg_transaction_value,
current_timestamp as loaded_at
from aggregated
)
select * from final
AI-Generated Documentation
class DBTDocGenerator:
def __init__(self, llm_client: AIFoundryClient):
self.llm = llm_client
async def generate_model_docs(self, sql_content: str, model_name: str) -> str:
"""Generate documentation for existing dbt model."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Analyze this dbt model and generate schema.yml documentation:
Model name: {model_name}
SQL:
```sql
{sql_content}
```
Generate YAML in this format:
```yaml
version: 2
models:
- name: {model_name}
description: |
[Detailed description of what this model does]
columns:
- name: column_name
description: [What this column represents]
tests:
- [appropriate tests]
```
For each column:
- Write a clear business-friendly description
- Suggest appropriate tests (not_null, unique, accepted_values, relationships)
- Note any important business logic"""
}]
)
return response.choices[0].message.content
async def generate_column_descriptions(self, columns: list[str], context: str) -> dict:
"""Generate descriptions for a list of columns."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Generate clear, business-friendly descriptions for these columns.
Context: {context}
Columns: {columns}
Return JSON: {{"column_name": "description", ...}}"""
}]
)
return json.loads(response.choices[0].message.content)
AI-Assisted Testing
class DBTTestGenerator:
def __init__(self, llm_client: AIFoundryClient):
self.llm = llm_client
async def suggest_tests(self, model_sql: str, model_name: str) -> list[dict]:
"""Suggest tests for a dbt model."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Analyze this dbt model and suggest comprehensive tests:
Model: {model_name}
SQL:
```sql
{model_sql}
```
Suggest tests in these categories:
1. Schema tests (not_null, unique, accepted_values, relationships)
2. Data tests (custom SQL tests)
3. Freshness tests (if applicable)
Return JSON:
{{
"schema_tests": [
{{"column": "col_name", "tests": ["not_null", "unique"]}}
],
"data_tests": [
{{"name": "test_name", "description": "what it tests", "sql": "SELECT..."}}
],
"freshness_tests": [
{{"table": "table_name", "warn_after": "12 hours", "error_after": "24 hours"}}
]
}}"""
}]
)
return json.loads(response.choices[0].message.content)
async def generate_custom_test(self, requirement: str, model_name: str) -> str:
"""Generate a custom data test."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "user",
"content": f"""Create a dbt data test for this requirement:
Model: {model_name}
Requirement: {requirement}
Return a dbt test SQL file that returns rows that FAIL the test.
Include comments explaining the test."""
}]
)
return response.choices[0].message.content
Intelligent Code Review
class DBTReviewer:
def __init__(self, llm_client: AIFoundryClient):
self.llm = llm_client
async def review_model(self, sql_content: str) -> dict:
"""Review a dbt model for best practices."""
response = await self.llm.chat.complete_async(
deployment="gpt-4o",
messages=[{
"role": "system",
"content": """You are a senior data engineer reviewing dbt code.
Check for:
- Performance issues (full table scans, missing filters on incrementals)
- Best practice violations
- Potential bugs
- Readability improvements
- Missing tests"""
}, {
"role": "user",
"content": f"""Review this dbt model:
```sql
{sql_content}
```
Return JSON:
{{
"score": 1-10,
"issues": [
{{"severity": "high|medium|low", "type": "performance|bug|style|best_practice", "description": "issue description", "suggestion": "how to fix", "line": line_number_or_null}}
],
"positive_aspects": ["things done well"],
"suggested_refactor": "improved code if significant changes needed"
}}"""
}]
)
return json.loads(response.choices[0].message.content)
Integration with dbt CLI
import subprocess
import os
class DBTAIWorkflow:
def __init__(self, project_path: str, llm_client: AIFoundryClient):
self.project_path = project_path
self.generator = DBTModelGenerator(llm_client, project_path)
self.doc_generator = DBTDocGenerator(llm_client)
self.test_generator = DBTTestGenerator(llm_client)
self.reviewer = DBTReviewer(llm_client)
async def create_model_workflow(self, requirements: str, sources: list[dict]):
"""Complete workflow for creating a new model."""
# 1. Generate model
print("Generating model...")
model = await self.generator.generate_model(requirements, sources)
# 2. Save model
model_path = self.generator.save_model(model)
print(f"Model saved to: {model_path}")
# 3. Review generated code
print("Reviewing generated code...")
review = await self.reviewer.review_model(model["sql"])
if review["score"] < 7:
print(f"Warning: Model scored {review['score']}/10")
for issue in review["issues"]:
print(f" - [{issue['severity']}] {issue['description']}")
# 4. Generate tests
print("Generating tests...")
tests = await self.test_generator.suggest_tests(model["sql"], model["model_name"])
# 5. Compile and test
print("Compiling model...")
result = subprocess.run(
["dbt", "compile", "--select", model["model_name"]],
cwd=self.project_path,
capture_output=True
)
if result.returncode != 0:
print(f"Compilation failed: {result.stderr.decode()}")
return {
"model": model,
"review": review,
"tests": tests,
"compiled": result.returncode == 0
}
Best Practices
- Human review required: AI-generated models are starting points
- Validate logic: Check business rules are correctly implemented
- Test thoroughly: AI can miss edge cases
- Iterate: Use AI for quick iterations, refine manually
- Version control: Track AI-generated vs. human-modified code
AI accelerates dbt development but doesn’t replace data engineering expertise. Use it to handle boilerplate and documentation while focusing your expertise on business logic and optimization.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n