Back to Blog
4 min read

Azure Databricks + OpenAI: LLMs for Data Engineering

Azure Databricks combined with Azure OpenAI enables powerful data engineering workflows. Generate SQL from natural language, automate data quality checks, and build intelligent data pipelines.

Setting Up the Integration

# Databricks notebook setup
import openai
from pyspark.sql import SparkSession

# Configure Azure OpenAI
openai.api_type = "azure"
openai.api_base = dbutils.secrets.get("openai", "endpoint")
openai.api_key = dbutils.secrets.get("openai", "key")
openai.api_version = "2023-03-15-preview"

class DatabricksOpenAI:
    """Azure OpenAI integration for Databricks."""

    def __init__(self, deployment_name: str = "gpt-4"):
        self.deployment = deployment_name

    def chat(self, prompt: str, temperature: float = 0.3) -> str:
        """Send chat completion request."""
        response = openai.ChatCompletion.create(
            engine=self.deployment,
            messages=[{"role": "user", "content": prompt}],
            temperature=temperature
        )
        return response.choices[0].message.content

ai = DatabricksOpenAI()

Natural Language to SQL

class NL2SQLDatabricks:
    """Convert natural language to Spark SQL."""

    def __init__(self, ai_client, spark: SparkSession):
        self.ai = ai_client
        self.spark = spark

    def get_schema_context(self, database: str) -> str:
        """Get database schema for context."""
        tables = self.spark.sql(f"SHOW TABLES IN {database}").collect()

        schema_parts = []
        for table in tables:
            table_name = f"{database}.{table.tableName}"
            columns = self.spark.sql(f"DESCRIBE {table_name}").collect()

            cols_str = ", ".join([f"{c.col_name} {c.data_type}" for c in columns])
            schema_parts.append(f"Table {table_name}: {cols_str}")

        return "\n".join(schema_parts)

    def query(self, natural_language: str, database: str) -> dict:
        """Convert natural language to SQL and execute."""

        schema = self.get_schema_context(database)

        prompt = f"""Convert this question to Spark SQL.

Database Schema:
{schema}

Question: {natural_language}

Return only the SQL query, no explanation."""

        sql = self.ai.chat(prompt, temperature=0)

        # Clean up response
        sql = sql.strip().replace("```sql", "").replace("```", "").strip()

        # Execute
        try:
            df = self.spark.sql(sql)
            return {
                "sql": sql,
                "success": True,
                "result": df,
                "row_count": df.count()
            }
        except Exception as e:
            return {
                "sql": sql,
                "success": False,
                "error": str(e)
            }

# Usage
nl2sql = NL2SQLDatabricks(ai, spark)
result = nl2sql.query(
    "Show me the top 10 customers by total spend last quarter",
    "sales_db"
)
if result["success"]:
    display(result["result"])

AI-Powered Data Quality

class AIDataQuality:
    """AI-assisted data quality checks."""

    def __init__(self, ai_client, spark):
        self.ai = ai_client
        self.spark = spark

    def analyze_data_quality(self, table_name: str) -> dict:
        """Analyze data quality issues."""

        # Get sample and stats
        df = self.spark.table(table_name)
        sample = df.limit(100).toPandas().to_string()
        stats = df.describe().toPandas().to_string()

        prompt = f"""Analyze this data for quality issues.

Table: {table_name}

Sample (100 rows):
{sample}

Statistics:
{stats}

Identify:
1. Missing value patterns
2. Potential outliers
3. Data type issues
4. Inconsistencies
5. Suggested cleaning steps"""

        analysis = self.ai.chat(prompt)

        return {
            "table": table_name,
            "analysis": analysis,
            "row_count": df.count()
        }

    def generate_validation_rules(self, table_name: str) -> str:
        """Generate data validation rules."""

        schema = self.spark.table(table_name).schema
        schema_str = "\n".join([f"{f.name}: {f.dataType}" for f in schema.fields])

        prompt = f"""Generate data validation rules for this table.

Table: {table_name}
Schema:
{schema_str}

Generate:
1. Null checks for required fields
2. Range validations for numeric fields
3. Format validations for strings
4. Referential integrity checks

Return as Python code using Great Expectations or similar."""

        return self.ai.chat(prompt)

# Usage
dq = AIDataQuality(ai, spark)
quality_report = dq.analyze_data_quality("bronze.transactions")
print(quality_report["analysis"])

Intelligent ETL Documentation

class ETLDocGenerator:
    """Generate ETL documentation with AI."""

    def __init__(self, ai_client):
        self.ai = ai_client

    def document_notebook(self, notebook_code: str) -> str:
        """Generate documentation for a Databricks notebook."""

        prompt = f"""Document this Databricks ETL notebook.

Code:
{notebook_code[:10000]}

Generate:
1. Overview of what the notebook does
2. Input data sources
3. Transformations applied
4. Output tables/files
5. Dependencies
6. Scheduling recommendations"""

        return self.ai.chat(prompt)

    def generate_lineage_description(
        self,
        source_tables: list[str],
        target_table: str,
        transformations: str
    ) -> str:
        """Describe data lineage."""

        prompt = f"""Describe the data lineage for this pipeline.

Source tables: {', '.join(source_tables)}
Target table: {target_table}
Transformations: {transformations}

Create a clear description of:
1. Data flow
2. Key transformations
3. Business logic applied
4. Data freshness expectations"""

        return self.ai.chat(prompt)

Auto-Generated Notebooks

class NotebookGenerator:
    """Generate Databricks notebooks from descriptions."""

    def __init__(self, ai_client):
        self.ai = ai_client

    def generate_etl_notebook(
        self,
        description: str,
        source_format: str,
        target_format: str
    ) -> str:
        """Generate ETL notebook code."""

        prompt = f"""Generate a Databricks notebook for this ETL task.

Description: {description}
Source format: {source_format}
Target format: {target_format}

Include:
1. Configuration cell
2. Data reading
3. Transformations
4. Data quality checks
5. Writing to target
6. Error handling
7. Comments explaining each step

Use PySpark and Delta Lake best practices."""

        return self.ai.chat(prompt)

    def generate_analysis_notebook(
        self,
        data_description: str,
        analysis_goals: list[str]
    ) -> str:
        """Generate data analysis notebook."""

        goals_str = "\n".join(f"- {g}" for g in analysis_goals)

        prompt = f"""Generate a Databricks analysis notebook.

Data: {data_description}
Analysis Goals:
{goals_str}

Include:
1. Data exploration
2. Visualizations
3. Statistical analysis
4. Insights summary
5. Recommendations"""

        return self.ai.chat(prompt)

# Usage
gen = NotebookGenerator(ai)
notebook_code = gen.generate_etl_notebook(
    "Load customer data from CSV, clean it, and write to Delta table",
    "CSV",
    "Delta Lake"
)
print(notebook_code)

Azure Databricks with Azure OpenAI creates intelligent data platforms where natural language becomes the interface for data engineering tasks.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.