7 min read
OneLake AI Workloads: Running AI Directly on Your Data Lake
OneLake AI Workloads enable running AI operations directly on your data lake without moving data. Let’s explore how to leverage this for embeddings, vector search, and AI processing.
OneLake AI Architecture
┌─────────────────────────────────────────────────────────────┐
│ OneLake AI Workloads │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Documents │ │ Structured │ │ Images │ │
│ │ Files │ │ Data │ │ & Media │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ ↓ │
│ ┌───────────────────────┐ │
│ │ AI Workload Engine │ │
│ │ • Embedding │ │
│ │ • Vector Index │ │
│ │ • Semantic Search │ │
│ │ • AI Enrichment │ │
│ └───────────────────────┘ │
│ ↓ │
│ ┌───────────────────────┐ │
│ │ Output to OneLake │ │
│ │ (Delta Tables, Files)│ │
│ └───────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Creating Embeddings at Scale
# Create embeddings using Azure OpenAI and PySpark in Fabric notebooks
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential
import os
spark = SparkSession.builder.getOrCreate()
# Initialize Azure OpenAI client
credential = DefaultAzureCredential()
token = credential.get_token("https://cognitiveservices.azure.com/.default").token
client = AzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version="2024-02-01",
azure_ad_token=token
)
# Create UDF for generating embeddings
def get_embedding(text):
if not text or len(text.strip()) == 0:
return None
try:
response = client.embeddings.create(
model="text-embedding-3-large",
input=text[:8000] # Truncate to model limit
)
return response.data[0].embedding
except Exception as e:
print(f"Error: {e}")
return None
embedding_udf = F.udf(get_embedding, ArrayType(FloatType()))
# Read documents from OneLake Files
documents_df = spark.read.text("Files/documents/**/*.txt")
# Chunk documents (simple approach - split by paragraphs)
chunked_df = documents_df \
.withColumn("chunks", F.split(F.col("value"), "\n\n")) \
.select(F.explode("chunks").alias("chunk_text")) \
.filter(F.length("chunk_text") > 50)
# Generate embeddings (process in batches for large datasets)
embeddings_df = chunked_df \
.withColumn("embedding", embedding_udf(F.col("chunk_text"))) \
.withColumn("created_at", F.current_timestamp())
# Save to Delta table
embeddings_df.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("document_embeddings")
print(f"Processed {embeddings_df.count()} chunks")
Creating Vector Indexes
# For vector search, integrate with Azure AI Search or use in-memory search
# Option 1: Azure AI Search for production vector indexing
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
SearchIndex, SearchField, SearchFieldDataType,
VectorSearch, HnswAlgorithmConfiguration, VectorSearchProfile
)
credential = DefaultAzureCredential()
# Create search index with vector field
index_client = SearchIndexClient(
endpoint="https://<search-service>.search.windows.net",
credential=credential
)
# Define index schema
fields = [
SearchField(name="id", type=SearchFieldDataType.String, key=True),
SearchField(name="chunk_text", type=SearchFieldDataType.String, searchable=True),
SearchField(name="source_path", type=SearchFieldDataType.String, filterable=True),
SearchField(
name="embedding",
type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
searchable=True,
vector_search_dimensions=3072,
vector_search_profile_name="vector-profile"
)
]
vector_search = VectorSearch(
algorithms=[HnswAlgorithmConfiguration(name="hnsw-config", parameters={"m": 16, "efConstruction": 200})],
profiles=[VectorSearchProfile(name="vector-profile", algorithm_configuration_name="hnsw-config")]
)
index = SearchIndex(name="document-search-index", fields=fields, vector_search=vector_search)
index_client.create_or_update_index(index)
# Upload embeddings from Fabric to Azure AI Search
search_client = SearchClient(
endpoint="https://<search-service>.search.windows.net",
index_name="document-search-index",
credential=credential
)
# Read embeddings from Delta table and upload
embeddings_df = spark.read.table("document_embeddings")
documents = [row.asDict() for row in embeddings_df.collect()]
search_client.upload_documents(documents)
Semantic Search
# Semantic search using Azure AI Search
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery
from openai import AzureOpenAI
credential = DefaultAzureCredential()
# Initialize clients
search_client = SearchClient(
endpoint="https://<search-service>.search.windows.net",
index_name="document-search-index",
credential=credential
)
openai_client = AzureOpenAI(
azure_endpoint="https://<openai>.openai.azure.com",
api_version="2024-02-01",
azure_ad_token=credential.get_token("https://cognitiveservices.azure.com/.default").token
)
def semantic_search(query_text: str, top_k: int = 10, min_score: float = 0.7):
# Generate query embedding
response = openai_client.embeddings.create(
model="text-embedding-3-large",
input=query_text
)
query_vector = response.data[0].embedding
# Perform vector search
vector_query = VectorizedQuery(
vector=query_vector,
k_nearest_neighbors=top_k,
fields="embedding"
)
results = search_client.search(
search_text=None,
vector_queries=[vector_query],
select=["chunk_text", "source_path"]
)
for result in results:
score = result.get("@search.score", 0)
if score >= min_score:
print(f"Score: {score:.3f}")
print(f"Text: {result['chunk_text'][:200]}...")
print()
# Hybrid search (vector + keyword)
def hybrid_search(query_text: str, keyword_query: str, top_k: int = 10):
response = openai_client.embeddings.create(model="text-embedding-3-large", input=query_text)
vector_query = VectorizedQuery(vector=response.data[0].embedding, k_nearest_neighbors=top_k, fields="embedding")
results = search_client.search(
search_text=keyword_query, # Full-text search
vector_queries=[vector_query], # Vector search
select=["chunk_text", "source_path"]
)
return list(results)
AI Enrichment Workloads
# AI enrichment using Azure OpenAI and PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, FloatType
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential
import json
spark = SparkSession.builder.getOrCreate()
credential = DefaultAzureCredential()
openai_client = AzureOpenAI(
azure_endpoint="https://<openai>.openai.azure.com",
api_version="2024-02-01",
azure_ad_token=credential.get_token("https://cognitiveservices.azure.com/.default").token
)
# Classification UDF
def classify_product(product_name, description):
prompt = f"""Classify this product into one category: Electronics, Clothing, Home, Sports, Other
Product: {product_name} - {description}
Return only the category name."""
try:
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=20
)
return response.choices[0].message.content.strip()
except:
return "Other"
classify_udf = F.udf(classify_product, StringType())
# Feature extraction UDF
def extract_features(description):
prompt = f"Extract key features from this product description as a JSON array: {description}"
try:
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=200
)
return response.choices[0].message.content
except:
return "[]"
features_udf = F.udf(extract_features, StringType())
# Read source data
products_df = spark.read.table("bronze.raw_products")
# Apply AI enrichments
enriched_df = products_df \
.withColumn("ai_category", classify_udf(F.col("product_name"), F.col("description"))) \
.withColumn("extracted_features", features_udf(F.col("description")))
# Write to silver layer
enriched_df.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.saveAsTable("silver.enriched_products")
Building RAG on OneLake
# Build RAG pipeline using Azure OpenAI and Azure AI Search
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery
from openai import AzureOpenAI
credential = DefaultAzureCredential()
# Initialize clients
search_client = SearchClient(
endpoint="https://<search-service>.search.windows.net",
index_name="document-search-index",
credential=credential
)
openai_client = AzureOpenAI(
azure_endpoint="https://<openai>.openai.azure.com",
api_version="2024-02-01",
azure_ad_token=credential.get_token("https://cognitiveservices.azure.com/.default").token
)
def rag_query(question: str, top_k: int = 5):
# Step 1: Generate query embedding
embedding_response = openai_client.embeddings.create(
model="text-embedding-3-large",
input=question
)
query_vector = embedding_response.data[0].embedding
# Step 2: Retrieve relevant documents
vector_query = VectorizedQuery(vector=query_vector, k_nearest_neighbors=top_k, fields="embedding")
search_results = search_client.search(search_text=None, vector_queries=[vector_query], select=["chunk_text", "source_path"])
# Build context from retrieved documents
context_parts = []
sources = []
for i, result in enumerate(search_results):
context_parts.append(f"[{i+1}] {result['chunk_text']}")
sources.append({"source": result["source_path"], "score": result.get("@search.score", 0)})
context = "\n\n".join(context_parts)
# Step 3: Generate answer with context
system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Always cite your sources using [1], [2], etc. If the context doesn't contain the answer, say so."""
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
],
max_tokens=1000,
temperature=0.7
)
return {
"answer": response.choices[0].message.content,
"sources": sources,
"tokens_used": response.usage.total_tokens
}
# Example usage
result = rag_query("What are the best practices for Fabric lakehouse design?")
print(f"Answer: {result['answer']}\n\nSources: {result['sources']}")
Incremental Processing
# Incremental embedding updates using Delta Lake change data feed
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta.tables import DeltaTable
spark = SparkSession.builder.getOrCreate()
# Enable change data feed on source table
spark.sql("""
ALTER TABLE documents
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read changes since last processing
last_version = spark.read.table("embedding_processing_state").collect()[0]["last_version"]
changes_df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", last_version) \
.table("documents") \
.filter(F.col("_change_type").isin(["insert", "update_postimage"]))
# Process only changed documents
if changes_df.count() > 0:
# Generate embeddings for changed docs (using UDF from earlier)
new_embeddings = changes_df \
.withColumn("embedding", embedding_udf(F.col("content"))) \
.select("document_id", "chunk_id", "content", "embedding", F.current_timestamp().alias("processed_at"))
# Upsert to embeddings table
embeddings_table = DeltaTable.forName(spark, "document_embeddings")
embeddings_table.alias("target").merge(
new_embeddings.alias("source"),
"target.document_id = source.document_id AND target.chunk_id = source.chunk_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# Update processing state
current_version = spark.sql("DESCRIBE HISTORY documents LIMIT 1").collect()[0]["version"]
spark.sql(f"UPDATE embedding_processing_state SET last_version = {current_version}")
# Schedule this notebook to run every 6 hours using Fabric Data Pipelines
Monitoring AI Workloads
# Monitor AI workloads using Fabric monitoring and Azure Monitor
from pyspark.sql import SparkSession
from azure.identity import DefaultAzureCredential
import requests
spark = SparkSession.builder.getOrCreate()
# Monitor embedding job status via Delta table metadata
embeddings_history = spark.sql("DESCRIBE HISTORY document_embeddings LIMIT 10")
embeddings_history.show()
# Get table statistics
stats = spark.sql("""
SELECT
COUNT(*) as total_embeddings,
COUNT(DISTINCT document_id) as unique_documents,
MAX(processed_at) as last_processed,
AVG(SIZE(embedding)) as avg_embedding_size
FROM document_embeddings
""").collect()[0]
print(f"Total embeddings: {stats['total_embeddings']:,}")
print(f"Unique documents: {stats['unique_documents']:,}")
print(f"Last processed: {stats['last_processed']}")
# Monitor costs via Azure OpenAI usage API
credential = DefaultAzureCredential()
token = credential.get_token("https://management.azure.com/.default").token
# Get Azure OpenAI resource usage (requires appropriate permissions)
subscription_id = "your-subscription-id"
resource_group = "your-rg"
openai_resource = "your-openai"
usage_url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.CognitiveServices/accounts/{openai_resource}/usages?api-version=2023-05-01"
response = requests.get(usage_url, headers={"Authorization": f"Bearer {token}"})
usage = response.json()
for metric in usage.get("value", []):
print(f"{metric['name']['value']}: {metric['currentValue']} / {metric['limit']}")
# Set up alerts using Azure Monitor or Fabric monitoring
# Configure in Azure Portal > Monitor > Alerts
OneLake AI Workloads bring AI capabilities directly to your data, eliminating data movement and enabling efficient large-scale AI processing.