1 min read
Building Custom Skills for Azure Cognitive Search
I wrote “Building Custom Skills for Azure Cognitive Search” to share practical, production-minded guidance on this topic.
Custom Skill Architecture
Custom skills communicate via HTTP:
- Search receives document during indexing
- Calls your skill’s REST endpoint with input data
- Your skill processes and returns enriched output
- Search continues with enriched document
Creating a Custom Skill with Azure Functions
# Azure Function: function_app.py
import azure.functions as func
import json
import logging
app = func.FunctionApp()
@app.function_name(name="CustomEntityExtractor")
@app.route(route="extract-entities", methods=["POST"])
def extract_entities(req: func.HttpRequest) -> func.HttpResponse:
"""Custom skill for entity extraction"""
logging.info('Custom entity extractor called')
try:
body = req.get_json()
except ValueError:
return func.HttpResponse("Invalid JSON", status_code=400)
# Process each record
values = body.get("values", [])
results = []
for record in values:
record_id = record.get("recordId")
data = record.get("data", {})
text = data.get("text", "")
try:
# Your custom extraction logic
entities = extract_custom_entities(text)
results.append({
"recordId": record_id,
"data": {
"customEntities": entities
},
"errors": [],
"warnings": []
})
except Exception as e:
results.append({
"recordId": record_id,
"data": {},
"errors": [{"message": str(e)}],
"warnings": []
})
return func.HttpResponse(
json.dumps({"values": results}),
mimetype="application/json"
)
def extract_custom_entities(text):
"""Custom entity extraction logic"""
entities = []
# Example: Extract product codes (e.g., PRD-12345)
import re
product_pattern = r'PRD-\d{5}'
for match in re.finditer(product_pattern, text):
entities.append({
"type": "ProductCode",
"value": match.group(),
"offset": match.start(),
"length": len(match.group())
})
# Example: Extract custom terms
custom_terms = ["machine learning", "artificial intelligence", "data science"]
text_lower = text.lower()
for term in custom_terms:
if term in text_lower:
idx = text_lower.find(term)
entities.append({
"type": "TechnologyTerm",
"value": term,
"offset": idx,
"length": len(term)
})
return entities
Integrating ML Model
import azure.functions as func
import json
import joblib
import numpy as np
# Load model at startup
model = None
def load_model():
global model
if model is None:
model = joblib.load("model.pkl")
return model
@app.function_name(name="SentimentClassifier")
@app.route(route="classify-sentiment", methods=["POST"])
def classify_sentiment(req: func.HttpRequest) -> func.HttpResponse:
"""Custom skill using ML model for sentiment classification"""
body = req.get_json()
values = body.get("values", [])
results = []
clf = load_model()
for record in values:
record_id = record.get("recordId")
text = record.get("data", {}).get("text", "")
try:
# Preprocess and predict
features = preprocess_text(text)
prediction = clf.predict([features])[0]
confidence = clf.predict_proba([features]).max()
results.append({
"recordId": record_id,
"data": {
"sentiment": prediction,
"confidence": float(confidence)
},
"errors": [],
"warnings": []
})
except Exception as e:
results.append({
"recordId": record_id,
"data": {},
"errors": [{"message": str(e)}],
"warnings": []
})
return func.HttpResponse(
json.dumps({"values": results}),
mimetype="application/json"
)
def preprocess_text(text):
"""Preprocess text for model"""
# Your preprocessing logic
return text.lower()
Custom Skill Using External API
import azure.functions as func
import json
import requests
@app.function_name(name="TranslationSkill")
@app.route(route="translate", methods=["POST"])
def translate_text(req: func.HttpRequest) -> func.HttpResponse:
"""Custom skill that calls external translation API"""
body = req.get_json()
values = body.get("values", [])
results = []
api_key = os.environ.get("TRANSLATION_API_KEY")
for record in values:
record_id = record.get("recordId")
data = record.get("data", {})
text = data.get("text", "")
source_lang = data.get("sourceLanguage", "auto")
target_lang = data.get("targetLanguage", "en")
try:
# Call external translation API
translated = call_translation_api(text, source_lang, target_lang, api_key)
results.append({
"recordId": record_id,
"data": {
"translatedText": translated["text"],
"detectedLanguage": translated["detectedLanguage"]
},
"errors": [],
"warnings": []
})
except Exception as e:
results.append({
"recordId": record_id,
"data": {},
"errors": [{"message": str(e)}],
"warnings": []
})
return func.HttpResponse(
json.dumps({"values": results}),
mimetype="application/json"
)
def call_translation_api(text, source, target, api_key):
"""Call external translation service"""
response = requests.post(
"https://api.translation-service.com/translate",
json={"text": text, "source": source, "target": target},
headers={"Authorization": f"Bearer {api_key}"}
)
return response.json()
Registering Custom Skill in Skillset
from azure.search.documents.indexes.models import (
SearchIndexerSkillset,
WebApiSkill,
InputFieldMappingEntry,
OutputFieldMappingEntry
)
# Define custom skill
custom_entity_skill = WebApiSkill(
name="custom-entity-extractor",
description="Extract custom entities using Azure Function",
context="/document",
uri="https://myfunctionapp.azurewebsites.net/api/extract-entities",
http_method="POST",
timeout="PT30S",
batch_size=10,
degree_of_parallelism=5,
http_headers={
"x-functions-key": "your-function-key"
},
inputs=[
InputFieldMappingEntry(name="text", source="/document/content")
],
outputs=[
OutputFieldMappingEntry(name="customEntities", target_name="customEntities")
]
)
# Custom ML skill
sentiment_skill = WebApiSkill(
name="custom-sentiment",
description="Classify sentiment using custom ML model",
context="/document",
uri="https://myfunctionapp.azurewebsites.net/api/classify-sentiment",
http_method="POST",
timeout="PT60S",
batch_size=25,
inputs=[
InputFieldMappingEntry(name="text", source="/document/content")
],
outputs=[
OutputFieldMappingEntry(name="sentiment", target_name="customSentiment"),
OutputFieldMappingEntry(name="confidence", target_name="sentimentConfidence")
]
)
# Create skillset with custom skills
skillset = SearchIndexerSkillset(
name="custom-enrichment-skillset",
skills=[
custom_entity_skill,
sentiment_skill,
# Can combine with built-in skills
KeyPhraseExtractionSkill(
name="extract-keyphrases",
context="/document",
inputs=[InputFieldMappingEntry(name="text", source="/document/content")],
outputs=[OutputFieldMappingEntry(name="keyPhrases", target_name="keyPhrases")]
)
]
)
indexer_client.create_or_update_skillset(skillset)
Testing Custom Skills
def test_custom_skill(function_url, test_records):
"""Test custom skill endpoint"""
import requests
payload = {"values": test_records}
response = requests.post(
function_url,
json=payload,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
results = response.json()
for result in results.get("values", []):
print(f"Record {result['recordId']}:")
if result.get("errors"):
print(f" Errors: {result['errors']}")
else:
print(f" Data: {result['data']}")
else:
print(f"Error: {response.status_code} - {response.text}")
# Test records
test_records = [
{
"recordId": "1",
"data": {
"text": "Our product PRD-12345 uses machine learning for predictions."
}
},
{
"recordId": "2",
"data": {
"text": "Contact support at support@company.com for PRD-67890."
}
}
]
test_custom_skill(
"https://myfunctionapp.azurewebsites.net/api/extract-entities",
test_records
)
Error Handling Best Practices
@app.function_name(name="RobustCustomSkill")
@app.route(route="robust-skill", methods=["POST"])
def robust_skill(req: func.HttpRequest) -> func.HttpResponse:
"""Custom skill with comprehensive error handling"""
# Validate request
try:
body = req.get_json()
except ValueError:
return func.HttpResponse(
json.dumps({"values": [], "error": "Invalid JSON"}),
status_code=400,
mimetype="application/json"
)
values = body.get("values", [])
if not values:
return func.HttpResponse(
json.dumps({"values": []}),
mimetype="application/json"
)
results = []
for record in values:
record_id = record.get("recordId", "unknown")
warnings = []
errors = []
try:
data = record.get("data", {})
text = data.get("text", "")
# Validate input
if not text:
warnings.append({
"message": "Empty text input, skipping processing"
})
results.append({
"recordId": record_id,
"data": {"result": None},
"errors": errors,
"warnings": warnings
})
continue
# Process with timeout protection
result = process_with_timeout(text, timeout_seconds=25)
results.append({
"recordId": record_id,
"data": {"result": result},
"errors": errors,
"warnings": warnings
})
except TimeoutError:
errors.append({
"message": "Processing timeout exceeded"
})
results.append({
"recordId": record_id,
"data": {},
"errors": errors,
"warnings": warnings
})
except Exception as e:
logging.error(f"Error processing record {record_id}: {str(e)}")
errors.append({
"message": f"Processing error: {str(e)}"
})
results.append({
"recordId": record_id,
"data": {},
"errors": errors,
"warnings": warnings
})
return func.HttpResponse(
json.dumps({"values": results}),
mimetype="application/json"
)
Custom skills unlock unlimited possibilities for AI enrichment tailored to your specific business needs.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n