Back to Blog
6 min read

Azure Synapse Analytics + OpenAI: Intelligent Data Warehousing

Azure Synapse Analytics combined with OpenAI creates intelligent data warehousing experiences. Natural language queries, automated documentation, and AI-powered data exploration at scale.

Synapse OpenAI Integration

# Synapse notebook integration
from azure.identity import DefaultAzureCredential
from azure.synapse.spark import SparkSession
import openai
import json

class SynapseOpenAI:
    """OpenAI integration for Azure Synapse."""

    def __init__(self, endpoint: str, deployment: str):
        self.endpoint = endpoint
        self.deployment = deployment
        self._setup_client()

    def _setup_client(self):
        """Setup Azure OpenAI client."""
        credential = DefaultAzureCredential()
        token = credential.get_token("https://cognitiveservices.azure.com/.default")

        openai.api_type = "azure"
        openai.api_base = self.endpoint
        openai.api_key = token.token
        openai.api_version = "2023-03-15-preview"

    def query(self, prompt: str, temperature: float = 0.3) -> str:
        """Send query to OpenAI."""
        response = openai.ChatCompletion.create(
            engine=self.deployment,
            messages=[{"role": "user", "content": prompt}],
            temperature=temperature
        )
        return response.choices[0].message.content

# Initialize
ai = SynapseOpenAI(
    endpoint="https://your-openai.openai.azure.com/",
    deployment="gpt-4"
)

Natural Language to Synapse SQL

class SynapseNL2SQL:
    """Convert natural language to Synapse SQL."""

    def __init__(self, ai_client, spark: SparkSession):
        self.ai = ai_client
        self.spark = spark

    def get_warehouse_schema(self, database: str) -> str:
        """Get Synapse dedicated pool schema."""
        query = f"""
        SELECT
            s.name as schema_name,
            t.name as table_name,
            c.name as column_name,
            ty.name as data_type
        FROM {database}.sys.tables t
        JOIN {database}.sys.schemas s ON t.schema_id = s.schema_id
        JOIN {database}.sys.columns c ON t.object_id = c.object_id
        JOIN {database}.sys.types ty ON c.user_type_id = ty.user_type_id
        ORDER BY s.name, t.name, c.column_id
        """

        df = self.spark.sql(query)
        return self._format_schema(df.collect())

    def _format_schema(self, rows) -> str:
        """Format schema for LLM context."""
        tables = {}
        for row in rows:
            key = f"{row.schema_name}.{row.table_name}"
            if key not in tables:
                tables[key] = []
            tables[key].append(f"{row.column_name} ({row.data_type})")

        parts = []
        for table, columns in tables.items():
            parts.append(f"Table {table}:\n  Columns: {', '.join(columns)}")

        return "\n\n".join(parts)

    def natural_language_query(
        self,
        question: str,
        database: str
    ) -> dict:
        """Convert natural language to SQL and execute."""

        schema = self.get_warehouse_schema(database)

        prompt = f"""Convert this question to Synapse SQL (T-SQL dialect).

Database Schema:
{schema}

Question: {question}

Rules:
- Use proper T-SQL syntax
- Include appropriate JOINs
- Handle NULLs properly
- Add TOP 1000 for unbounded queries
- Use table aliases

Return only the SQL query."""

        sql = self.ai.query(prompt, temperature=0)
        sql = self._clean_sql(sql)

        try:
            df = self.spark.sql(sql)
            return {
                "question": question,
                "sql": sql,
                "success": True,
                "result": df,
                "row_count": df.count()
            }
        except Exception as e:
            return {
                "question": question,
                "sql": sql,
                "success": False,
                "error": str(e)
            }

    def _clean_sql(self, response: str) -> str:
        """Clean SQL from response."""
        sql = response.strip()
        if sql.startswith("```"):
            sql = sql.split("```")[1]
            if sql.startswith("sql"):
                sql = sql[3:]
        return sql.strip()

# Usage
nl2sql = SynapseNL2SQL(ai, spark)
result = nl2sql.natural_language_query(
    "What are the top 10 products by revenue in Q1 2023?",
    "sales_warehouse"
)

Automated Data Dictionary

class SynapseDataDictionary:
    """AI-generated data dictionary for Synapse."""

    def __init__(self, ai_client, spark):
        self.ai = ai_client
        self.spark = spark

    def generate_table_documentation(
        self,
        database: str,
        schema: str,
        table: str
    ) -> dict:
        """Generate documentation for a table."""

        # Get table metadata
        columns = self._get_column_info(database, schema, table)
        sample = self._get_sample_data(database, schema, table)
        stats = self._get_basic_stats(database, schema, table)

        prompt = f"""Generate comprehensive documentation for this Synapse table.

Table: {database}.{schema}.{table}

Columns:
{columns}

Sample Data (5 rows):
{sample}

Statistics:
{stats}

Generate:
1. Table description (what data it contains)
2. Column descriptions (purpose of each)
3. Business context
4. Common use cases
5. Data quality notes
6. Related tables (inferred)

Return as JSON."""

        response = self.ai.query(prompt)

        try:
            return json.loads(response)
        except:
            return {"raw_documentation": response}

    def _get_column_info(self, db, schema, table) -> str:
        """Get column information."""
        query = f"""
        SELECT
            c.name,
            t.name as type,
            c.is_nullable,
            c.max_length
        FROM {db}.sys.columns c
        JOIN {db}.sys.types t ON c.user_type_id = t.user_type_id
        JOIN {db}.sys.tables tab ON c.object_id = tab.object_id
        JOIN {db}.sys.schemas s ON tab.schema_id = s.schema_id
        WHERE tab.name = '{table}' AND s.name = '{schema}'
        """
        rows = self.spark.sql(query).collect()
        return "\n".join([f"- {r.name}: {r.type}, nullable={r.is_nullable}" for r in rows])

    def _get_sample_data(self, db, schema, table) -> str:
        """Get sample data."""
        query = f"SELECT TOP 5 * FROM {db}.{schema}.{table}"
        df = self.spark.sql(query)
        return df.toPandas().to_string()

    def _get_basic_stats(self, db, schema, table) -> str:
        """Get basic statistics."""
        query = f"SELECT COUNT(*) as row_count FROM {db}.{schema}.{table}"
        count = self.spark.sql(query).collect()[0].row_count
        return f"Row count: {count:,}"

    def generate_full_catalog(self, database: str) -> dict:
        """Generate documentation for entire database."""
        tables = self._get_all_tables(database)
        catalog = {}

        for table in tables:
            key = f"{table.schema_name}.{table.table_name}"
            catalog[key] = self.generate_table_documentation(
                database,
                table.schema_name,
                table.table_name
            )

        return catalog

Query Optimization Assistant

class SynapseQueryOptimizer:
    """AI-powered query optimization for Synapse."""

    def __init__(self, ai_client, spark):
        self.ai = ai_client
        self.spark = spark

    def analyze_query(self, sql: str, database: str) -> dict:
        """Analyze query for optimization opportunities."""

        # Get execution plan
        explain_query = f"EXPLAIN {sql}"
        try:
            plan = self.spark.sql(explain_query).collect()
            plan_str = str(plan)
        except:
            plan_str = "Plan not available"

        prompt = f"""Analyze this Synapse SQL query for optimization.

Query:
{sql}

Execution Plan:
{plan_str}

Provide:
1. Performance issues identified
2. Optimization recommendations
3. Index suggestions
4. Distribution key recommendations (for Synapse dedicated pools)
5. Materialized view opportunities
6. Rewritten optimized query

Format as JSON."""

        response = self.ai.query(prompt)

        try:
            return json.loads(response)
        except:
            return {"analysis": response}

    def suggest_distribution(
        self,
        table_name: str,
        common_queries: list[str]
    ) -> dict:
        """Suggest optimal distribution strategy."""

        prompt = f"""Recommend distribution strategy for Synapse dedicated pool.

Table: {table_name}

Common Queries:
{chr(10).join(common_queries)}

Consider:
1. HASH distribution - best for large fact tables
2. ROUND_ROBIN - good for staging tables
3. REPLICATE - best for small dimension tables

Recommend:
- Distribution type
- Distribution column (if HASH)
- Reasoning
- Impact on provided queries"""

        return {"recommendation": self.ai.query(prompt)}

# Usage
optimizer = SynapseQueryOptimizer(ai, spark)
analysis = optimizer.analyze_query(
    "SELECT * FROM sales.fact_orders WHERE order_date > '2023-01-01'",
    "sales_warehouse"
)

Intelligent Data Profiling

class SynapseDataProfiler:
    """AI-enhanced data profiling for Synapse."""

    def __init__(self, ai_client, spark):
        self.ai = ai_client
        self.spark = spark

    def profile_table(self, database: str, schema: str, table: str) -> dict:
        """Generate comprehensive data profile."""

        full_table = f"{database}.{schema}.{table}"

        # Gather statistics
        stats = self._gather_statistics(full_table)

        prompt = f"""Analyze this data profile and provide insights.

Table: {full_table}

Statistics:
{json.dumps(stats, indent=2, default=str)}

Provide:
1. Data quality assessment
2. Anomalies detected
3. Pattern observations
4. Recommendations for data cleaning
5. Potential issues for analytics"""

        analysis = self.ai.query(prompt)

        return {
            "table": full_table,
            "statistics": stats,
            "ai_analysis": analysis
        }

    def _gather_statistics(self, table: str) -> dict:
        """Gather table statistics."""
        df = self.spark.table(table)

        stats = {
            "row_count": df.count(),
            "column_count": len(df.columns),
            "columns": {}
        }

        for col in df.columns:
            col_stats = {
                "null_count": df.filter(df[col].isNull()).count(),
                "distinct_count": df.select(col).distinct().count()
            }

            # Numeric stats
            if str(df.schema[col].dataType) in ["IntegerType", "LongType", "DoubleType"]:
                numeric_stats = df.select(col).describe().collect()
                col_stats["numeric_stats"] = {
                    row[0]: row[1] for row in numeric_stats
                }

            stats["columns"][col] = col_stats

        return stats

Azure Synapse with OpenAI transforms data warehousing from technical SQL expertise to conversational data access. Business users can explore enterprise data using natural language.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.