Back to Blog
5 min read

Building Custom Skills for Azure Cognitive Search

Custom skills extend Azure Cognitive Search’s AI enrichment capabilities with your own logic. Using Azure Functions or any web API, you can integrate custom ML models, business logic, or external services.

Custom Skill Architecture

Custom skills communicate via HTTP:

  1. Search receives document during indexing
  2. Calls your skill’s REST endpoint with input data
  3. Your skill processes and returns enriched output
  4. Search continues with enriched document

Creating a Custom Skill with Azure Functions

# Azure Function: function_app.py
import azure.functions as func
import json
import logging

app = func.FunctionApp()

@app.function_name(name="CustomEntityExtractor")
@app.route(route="extract-entities", methods=["POST"])
def extract_entities(req: func.HttpRequest) -> func.HttpResponse:
    """Custom skill for entity extraction"""
    logging.info('Custom entity extractor called')

    try:
        body = req.get_json()
    except ValueError:
        return func.HttpResponse("Invalid JSON", status_code=400)

    # Process each record
    values = body.get("values", [])
    results = []

    for record in values:
        record_id = record.get("recordId")
        data = record.get("data", {})
        text = data.get("text", "")

        try:
            # Your custom extraction logic
            entities = extract_custom_entities(text)

            results.append({
                "recordId": record_id,
                "data": {
                    "customEntities": entities
                },
                "errors": [],
                "warnings": []
            })

        except Exception as e:
            results.append({
                "recordId": record_id,
                "data": {},
                "errors": [{"message": str(e)}],
                "warnings": []
            })

    return func.HttpResponse(
        json.dumps({"values": results}),
        mimetype="application/json"
    )

def extract_custom_entities(text):
    """Custom entity extraction logic"""
    entities = []

    # Example: Extract product codes (e.g., PRD-12345)
    import re
    product_pattern = r'PRD-\d{5}'
    for match in re.finditer(product_pattern, text):
        entities.append({
            "type": "ProductCode",
            "value": match.group(),
            "offset": match.start(),
            "length": len(match.group())
        })

    # Example: Extract custom terms
    custom_terms = ["machine learning", "artificial intelligence", "data science"]
    text_lower = text.lower()
    for term in custom_terms:
        if term in text_lower:
            idx = text_lower.find(term)
            entities.append({
                "type": "TechnologyTerm",
                "value": term,
                "offset": idx,
                "length": len(term)
            })

    return entities

Integrating ML Model

import azure.functions as func
import json
import joblib
import numpy as np

# Load model at startup
model = None

def load_model():
    global model
    if model is None:
        model = joblib.load("model.pkl")
    return model

@app.function_name(name="SentimentClassifier")
@app.route(route="classify-sentiment", methods=["POST"])
def classify_sentiment(req: func.HttpRequest) -> func.HttpResponse:
    """Custom skill using ML model for sentiment classification"""

    body = req.get_json()
    values = body.get("values", [])
    results = []

    clf = load_model()

    for record in values:
        record_id = record.get("recordId")
        text = record.get("data", {}).get("text", "")

        try:
            # Preprocess and predict
            features = preprocess_text(text)
            prediction = clf.predict([features])[0]
            confidence = clf.predict_proba([features]).max()

            results.append({
                "recordId": record_id,
                "data": {
                    "sentiment": prediction,
                    "confidence": float(confidence)
                },
                "errors": [],
                "warnings": []
            })

        except Exception as e:
            results.append({
                "recordId": record_id,
                "data": {},
                "errors": [{"message": str(e)}],
                "warnings": []
            })

    return func.HttpResponse(
        json.dumps({"values": results}),
        mimetype="application/json"
    )

def preprocess_text(text):
    """Preprocess text for model"""
    # Your preprocessing logic
    return text.lower()

Custom Skill Using External API

import azure.functions as func
import json
import requests

@app.function_name(name="TranslationSkill")
@app.route(route="translate", methods=["POST"])
def translate_text(req: func.HttpRequest) -> func.HttpResponse:
    """Custom skill that calls external translation API"""

    body = req.get_json()
    values = body.get("values", [])
    results = []

    api_key = os.environ.get("TRANSLATION_API_KEY")

    for record in values:
        record_id = record.get("recordId")
        data = record.get("data", {})
        text = data.get("text", "")
        source_lang = data.get("sourceLanguage", "auto")
        target_lang = data.get("targetLanguage", "en")

        try:
            # Call external translation API
            translated = call_translation_api(text, source_lang, target_lang, api_key)

            results.append({
                "recordId": record_id,
                "data": {
                    "translatedText": translated["text"],
                    "detectedLanguage": translated["detectedLanguage"]
                },
                "errors": [],
                "warnings": []
            })

        except Exception as e:
            results.append({
                "recordId": record_id,
                "data": {},
                "errors": [{"message": str(e)}],
                "warnings": []
            })

    return func.HttpResponse(
        json.dumps({"values": results}),
        mimetype="application/json"
    )

def call_translation_api(text, source, target, api_key):
    """Call external translation service"""
    response = requests.post(
        "https://api.translation-service.com/translate",
        json={"text": text, "source": source, "target": target},
        headers={"Authorization": f"Bearer {api_key}"}
    )
    return response.json()

Registering Custom Skill in Skillset

from azure.search.documents.indexes.models import (
    SearchIndexerSkillset,
    WebApiSkill,
    InputFieldMappingEntry,
    OutputFieldMappingEntry
)

# Define custom skill
custom_entity_skill = WebApiSkill(
    name="custom-entity-extractor",
    description="Extract custom entities using Azure Function",
    context="/document",
    uri="https://myfunctionapp.azurewebsites.net/api/extract-entities",
    http_method="POST",
    timeout="PT30S",
    batch_size=10,
    degree_of_parallelism=5,
    http_headers={
        "x-functions-key": "your-function-key"
    },
    inputs=[
        InputFieldMappingEntry(name="text", source="/document/content")
    ],
    outputs=[
        OutputFieldMappingEntry(name="customEntities", target_name="customEntities")
    ]
)

# Custom ML skill
sentiment_skill = WebApiSkill(
    name="custom-sentiment",
    description="Classify sentiment using custom ML model",
    context="/document",
    uri="https://myfunctionapp.azurewebsites.net/api/classify-sentiment",
    http_method="POST",
    timeout="PT60S",
    batch_size=25,
    inputs=[
        InputFieldMappingEntry(name="text", source="/document/content")
    ],
    outputs=[
        OutputFieldMappingEntry(name="sentiment", target_name="customSentiment"),
        OutputFieldMappingEntry(name="confidence", target_name="sentimentConfidence")
    ]
)

# Create skillset with custom skills
skillset = SearchIndexerSkillset(
    name="custom-enrichment-skillset",
    skills=[
        custom_entity_skill,
        sentiment_skill,
        # Can combine with built-in skills
        KeyPhraseExtractionSkill(
            name="extract-keyphrases",
            context="/document",
            inputs=[InputFieldMappingEntry(name="text", source="/document/content")],
            outputs=[OutputFieldMappingEntry(name="keyPhrases", target_name="keyPhrases")]
        )
    ]
)

indexer_client.create_or_update_skillset(skillset)

Testing Custom Skills

def test_custom_skill(function_url, test_records):
    """Test custom skill endpoint"""
    import requests

    payload = {"values": test_records}

    response = requests.post(
        function_url,
        json=payload,
        headers={"Content-Type": "application/json"}
    )

    if response.status_code == 200:
        results = response.json()
        for result in results.get("values", []):
            print(f"Record {result['recordId']}:")
            if result.get("errors"):
                print(f"  Errors: {result['errors']}")
            else:
                print(f"  Data: {result['data']}")
    else:
        print(f"Error: {response.status_code} - {response.text}")

# Test records
test_records = [
    {
        "recordId": "1",
        "data": {
            "text": "Our product PRD-12345 uses machine learning for predictions."
        }
    },
    {
        "recordId": "2",
        "data": {
            "text": "Contact support at support@company.com for PRD-67890."
        }
    }
]

test_custom_skill(
    "https://myfunctionapp.azurewebsites.net/api/extract-entities",
    test_records
)

Error Handling Best Practices

@app.function_name(name="RobustCustomSkill")
@app.route(route="robust-skill", methods=["POST"])
def robust_skill(req: func.HttpRequest) -> func.HttpResponse:
    """Custom skill with comprehensive error handling"""

    # Validate request
    try:
        body = req.get_json()
    except ValueError:
        return func.HttpResponse(
            json.dumps({"values": [], "error": "Invalid JSON"}),
            status_code=400,
            mimetype="application/json"
        )

    values = body.get("values", [])
    if not values:
        return func.HttpResponse(
            json.dumps({"values": []}),
            mimetype="application/json"
        )

    results = []

    for record in values:
        record_id = record.get("recordId", "unknown")
        warnings = []
        errors = []

        try:
            data = record.get("data", {})
            text = data.get("text", "")

            # Validate input
            if not text:
                warnings.append({
                    "message": "Empty text input, skipping processing"
                })
                results.append({
                    "recordId": record_id,
                    "data": {"result": None},
                    "errors": errors,
                    "warnings": warnings
                })
                continue

            # Process with timeout protection
            result = process_with_timeout(text, timeout_seconds=25)

            results.append({
                "recordId": record_id,
                "data": {"result": result},
                "errors": errors,
                "warnings": warnings
            })

        except TimeoutError:
            errors.append({
                "message": "Processing timeout exceeded"
            })
            results.append({
                "recordId": record_id,
                "data": {},
                "errors": errors,
                "warnings": warnings
            })

        except Exception as e:
            logging.error(f"Error processing record {record_id}: {str(e)}")
            errors.append({
                "message": f"Processing error: {str(e)}"
            })
            results.append({
                "recordId": record_id,
                "data": {},
                "errors": errors,
                "warnings": warnings
            })

    return func.HttpResponse(
        json.dumps({"values": results}),
        mimetype="application/json"
    )

Custom skills unlock unlimited possibilities for AI enrichment tailored to your specific business needs.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.