Back to Blog
7 min read

OneLake AI Workloads: Running AI Directly on Your Data Lake

OneLake AI Workloads enable running AI operations directly on your data lake without moving data. Let’s explore how to leverage this for embeddings, vector search, and AI processing.

OneLake AI Architecture

┌─────────────────────────────────────────────────────────────┐
│                    OneLake AI Workloads                      │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Documents  │  │  Structured │  │    Images   │         │
│  │    Files    │  │    Data     │  │   & Media   │         │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘         │
│         │                │                │                  │
│         └────────────────┼────────────────┘                  │
│                          ↓                                   │
│              ┌───────────────────────┐                      │
│              │   AI Workload Engine  │                      │
│              │  • Embedding          │                      │
│              │  • Vector Index       │                      │
│              │  • Semantic Search    │                      │
│              │  • AI Enrichment      │                      │
│              └───────────────────────┘                      │
│                          ↓                                   │
│              ┌───────────────────────┐                      │
│              │    Output to OneLake  │                      │
│              │  (Delta Tables, Files)│                      │
│              └───────────────────────┘                      │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Creating Embeddings at Scale

# Create embeddings using Azure OpenAI and PySpark in Fabric notebooks
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential
import os

spark = SparkSession.builder.getOrCreate()

# Initialize Azure OpenAI client
credential = DefaultAzureCredential()
token = credential.get_token("https://cognitiveservices.azure.com/.default").token

client = AzureOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_version="2024-02-01",
    azure_ad_token=token
)

# Create UDF for generating embeddings
def get_embedding(text):
    if not text or len(text.strip()) == 0:
        return None
    try:
        response = client.embeddings.create(
            model="text-embedding-3-large",
            input=text[:8000]  # Truncate to model limit
        )
        return response.data[0].embedding
    except Exception as e:
        print(f"Error: {e}")
        return None

embedding_udf = F.udf(get_embedding, ArrayType(FloatType()))

# Read documents from OneLake Files
documents_df = spark.read.text("Files/documents/**/*.txt")

# Chunk documents (simple approach - split by paragraphs)
chunked_df = documents_df \
    .withColumn("chunks", F.split(F.col("value"), "\n\n")) \
    .select(F.explode("chunks").alias("chunk_text")) \
    .filter(F.length("chunk_text") > 50)

# Generate embeddings (process in batches for large datasets)
embeddings_df = chunked_df \
    .withColumn("embedding", embedding_udf(F.col("chunk_text"))) \
    .withColumn("created_at", F.current_timestamp())

# Save to Delta table
embeddings_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("document_embeddings")

print(f"Processed {embeddings_df.count()} chunks")

Creating Vector Indexes

# For vector search, integrate with Azure AI Search or use in-memory search
# Option 1: Azure AI Search for production vector indexing

from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex, SearchField, SearchFieldDataType,
    VectorSearch, HnswAlgorithmConfiguration, VectorSearchProfile
)

credential = DefaultAzureCredential()

# Create search index with vector field
index_client = SearchIndexClient(
    endpoint="https://<search-service>.search.windows.net",
    credential=credential
)

# Define index schema
fields = [
    SearchField(name="id", type=SearchFieldDataType.String, key=True),
    SearchField(name="chunk_text", type=SearchFieldDataType.String, searchable=True),
    SearchField(name="source_path", type=SearchFieldDataType.String, filterable=True),
    SearchField(
        name="embedding",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        vector_search_dimensions=3072,
        vector_search_profile_name="vector-profile"
    )
]

vector_search = VectorSearch(
    algorithms=[HnswAlgorithmConfiguration(name="hnsw-config", parameters={"m": 16, "efConstruction": 200})],
    profiles=[VectorSearchProfile(name="vector-profile", algorithm_configuration_name="hnsw-config")]
)

index = SearchIndex(name="document-search-index", fields=fields, vector_search=vector_search)
index_client.create_or_update_index(index)

# Upload embeddings from Fabric to Azure AI Search
search_client = SearchClient(
    endpoint="https://<search-service>.search.windows.net",
    index_name="document-search-index",
    credential=credential
)

# Read embeddings from Delta table and upload
embeddings_df = spark.read.table("document_embeddings")
documents = [row.asDict() for row in embeddings_df.collect()]
search_client.upload_documents(documents)
# Semantic search using Azure AI Search
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery
from openai import AzureOpenAI

credential = DefaultAzureCredential()

# Initialize clients
search_client = SearchClient(
    endpoint="https://<search-service>.search.windows.net",
    index_name="document-search-index",
    credential=credential
)

openai_client = AzureOpenAI(
    azure_endpoint="https://<openai>.openai.azure.com",
    api_version="2024-02-01",
    azure_ad_token=credential.get_token("https://cognitiveservices.azure.com/.default").token
)

def semantic_search(query_text: str, top_k: int = 10, min_score: float = 0.7):
    # Generate query embedding
    response = openai_client.embeddings.create(
        model="text-embedding-3-large",
        input=query_text
    )
    query_vector = response.data[0].embedding

    # Perform vector search
    vector_query = VectorizedQuery(
        vector=query_vector,
        k_nearest_neighbors=top_k,
        fields="embedding"
    )

    results = search_client.search(
        search_text=None,
        vector_queries=[vector_query],
        select=["chunk_text", "source_path"]
    )

    for result in results:
        score = result.get("@search.score", 0)
        if score >= min_score:
            print(f"Score: {score:.3f}")
            print(f"Text: {result['chunk_text'][:200]}...")
            print()

# Hybrid search (vector + keyword)
def hybrid_search(query_text: str, keyword_query: str, top_k: int = 10):
    response = openai_client.embeddings.create(model="text-embedding-3-large", input=query_text)
    vector_query = VectorizedQuery(vector=response.data[0].embedding, k_nearest_neighbors=top_k, fields="embedding")

    results = search_client.search(
        search_text=keyword_query,  # Full-text search
        vector_queries=[vector_query],  # Vector search
        select=["chunk_text", "source_path"]
    )
    return list(results)

AI Enrichment Workloads

# AI enrichment using Azure OpenAI and PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, FloatType
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential
import json

spark = SparkSession.builder.getOrCreate()

credential = DefaultAzureCredential()
openai_client = AzureOpenAI(
    azure_endpoint="https://<openai>.openai.azure.com",
    api_version="2024-02-01",
    azure_ad_token=credential.get_token("https://cognitiveservices.azure.com/.default").token
)

# Classification UDF
def classify_product(product_name, description):
    prompt = f"""Classify this product into one category: Electronics, Clothing, Home, Sports, Other
Product: {product_name} - {description}
Return only the category name."""
    try:
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=20
        )
        return response.choices[0].message.content.strip()
    except:
        return "Other"

classify_udf = F.udf(classify_product, StringType())

# Feature extraction UDF
def extract_features(description):
    prompt = f"Extract key features from this product description as a JSON array: {description}"
    try:
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=200
        )
        return response.choices[0].message.content
    except:
        return "[]"

features_udf = F.udf(extract_features, StringType())

# Read source data
products_df = spark.read.table("bronze.raw_products")

# Apply AI enrichments
enriched_df = products_df \
    .withColumn("ai_category", classify_udf(F.col("product_name"), F.col("description"))) \
    .withColumn("extracted_features", features_udf(F.col("description")))

# Write to silver layer
enriched_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("silver.enriched_products")

Building RAG on OneLake

# Build RAG pipeline using Azure OpenAI and Azure AI Search
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery
from openai import AzureOpenAI

credential = DefaultAzureCredential()

# Initialize clients
search_client = SearchClient(
    endpoint="https://<search-service>.search.windows.net",
    index_name="document-search-index",
    credential=credential
)

openai_client = AzureOpenAI(
    azure_endpoint="https://<openai>.openai.azure.com",
    api_version="2024-02-01",
    azure_ad_token=credential.get_token("https://cognitiveservices.azure.com/.default").token
)

def rag_query(question: str, top_k: int = 5):
    # Step 1: Generate query embedding
    embedding_response = openai_client.embeddings.create(
        model="text-embedding-3-large",
        input=question
    )
    query_vector = embedding_response.data[0].embedding

    # Step 2: Retrieve relevant documents
    vector_query = VectorizedQuery(vector=query_vector, k_nearest_neighbors=top_k, fields="embedding")
    search_results = search_client.search(search_text=None, vector_queries=[vector_query], select=["chunk_text", "source_path"])

    # Build context from retrieved documents
    context_parts = []
    sources = []
    for i, result in enumerate(search_results):
        context_parts.append(f"[{i+1}] {result['chunk_text']}")
        sources.append({"source": result["source_path"], "score": result.get("@search.score", 0)})

    context = "\n\n".join(context_parts)

    # Step 3: Generate answer with context
    system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Always cite your sources using [1], [2], etc. If the context doesn't contain the answer, say so."""

    response = openai_client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
        ],
        max_tokens=1000,
        temperature=0.7
    )

    return {
        "answer": response.choices[0].message.content,
        "sources": sources,
        "tokens_used": response.usage.total_tokens
    }

# Example usage
result = rag_query("What are the best practices for Fabric lakehouse design?")
print(f"Answer: {result['answer']}\n\nSources: {result['sources']}")

Incremental Processing

# Incremental embedding updates using Delta Lake change data feed
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

# Enable change data feed on source table
spark.sql("""
    ALTER TABLE documents
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read changes since last processing
last_version = spark.read.table("embedding_processing_state").collect()[0]["last_version"]

changes_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", last_version) \
    .table("documents") \
    .filter(F.col("_change_type").isin(["insert", "update_postimage"]))

# Process only changed documents
if changes_df.count() > 0:
    # Generate embeddings for changed docs (using UDF from earlier)
    new_embeddings = changes_df \
        .withColumn("embedding", embedding_udf(F.col("content"))) \
        .select("document_id", "chunk_id", "content", "embedding", F.current_timestamp().alias("processed_at"))

    # Upsert to embeddings table
    embeddings_table = DeltaTable.forName(spark, "document_embeddings")
    embeddings_table.alias("target").merge(
        new_embeddings.alias("source"),
        "target.document_id = source.document_id AND target.chunk_id = source.chunk_id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    # Update processing state
    current_version = spark.sql("DESCRIBE HISTORY documents LIMIT 1").collect()[0]["version"]
    spark.sql(f"UPDATE embedding_processing_state SET last_version = {current_version}")

# Schedule this notebook to run every 6 hours using Fabric Data Pipelines

Monitoring AI Workloads

# Monitor AI workloads using Fabric monitoring and Azure Monitor
from pyspark.sql import SparkSession
from azure.identity import DefaultAzureCredential
import requests

spark = SparkSession.builder.getOrCreate()

# Monitor embedding job status via Delta table metadata
embeddings_history = spark.sql("DESCRIBE HISTORY document_embeddings LIMIT 10")
embeddings_history.show()

# Get table statistics
stats = spark.sql("""
    SELECT
        COUNT(*) as total_embeddings,
        COUNT(DISTINCT document_id) as unique_documents,
        MAX(processed_at) as last_processed,
        AVG(SIZE(embedding)) as avg_embedding_size
    FROM document_embeddings
""").collect()[0]

print(f"Total embeddings: {stats['total_embeddings']:,}")
print(f"Unique documents: {stats['unique_documents']:,}")
print(f"Last processed: {stats['last_processed']}")

# Monitor costs via Azure OpenAI usage API
credential = DefaultAzureCredential()
token = credential.get_token("https://management.azure.com/.default").token

# Get Azure OpenAI resource usage (requires appropriate permissions)
subscription_id = "your-subscription-id"
resource_group = "your-rg"
openai_resource = "your-openai"

usage_url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.CognitiveServices/accounts/{openai_resource}/usages?api-version=2023-05-01"
response = requests.get(usage_url, headers={"Authorization": f"Bearer {token}"})
usage = response.json()

for metric in usage.get("value", []):
    print(f"{metric['name']['value']}: {metric['currentValue']} / {metric['limit']}")

# Set up alerts using Azure Monitor or Fabric monitoring
# Configure in Azure Portal > Monitor > Alerts

OneLake AI Workloads bring AI capabilities directly to your data, eliminating data movement and enabling efficient large-scale AI processing.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.