Back to Blog
5 min read

Cognitive Services in Azure Synapse: Built-in AI Capabilities

Azure Synapse Analytics includes built-in Cognitive Services integration. Apply AI capabilities directly within your data pipelines - sentiment analysis, entity extraction, translation, and more at scale.

Synapse Cognitive Services Setup

# Synapse notebook with Cognitive Services
from synapse.ml.cognitive import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Cognitive Services configuration
cognitive_key = mssparkutils.credentials.getSecret("keyvault", "cognitive-key")
cognitive_endpoint = "https://your-cognitive.cognitiveservices.azure.com/"

class SynapseCognitiveServices:
    """Cognitive Services integration for Synapse."""

    def __init__(self, spark: SparkSession, key: str, endpoint: str):
        self.spark = spark
        self.key = key
        self.endpoint = endpoint

    def analyze_sentiment(self, df, text_column: str, output_column: str = "sentiment"):
        """Analyze sentiment of text column."""
        from synapse.ml.cognitive import TextSentiment

        sentiment = (TextSentiment()
            .setTextCol(text_column)
            .setUrl(f"{self.endpoint}text/analytics/v3.1/sentiment")
            .setSubscriptionKey(self.key)
            .setOutputCol(output_column)
            .setErrorCol("sentiment_error"))

        return sentiment.transform(df)

    def extract_entities(self, df, text_column: str, output_column: str = "entities"):
        """Extract named entities from text."""
        from synapse.ml.cognitive import NER

        ner = (NER()
            .setTextCol(text_column)
            .setUrl(f"{self.endpoint}text/analytics/v3.1/entities/recognition/general")
            .setSubscriptionKey(self.key)
            .setOutputCol(output_column)
            .setErrorCol("ner_error"))

        return ner.transform(df)

    def extract_key_phrases(self, df, text_column: str, output_column: str = "key_phrases"):
        """Extract key phrases from text."""
        from synapse.ml.cognitive import KeyPhraseExtractor

        extractor = (KeyPhraseExtractor()
            .setTextCol(text_column)
            .setUrl(f"{self.endpoint}text/analytics/v3.1/keyPhrases")
            .setSubscriptionKey(self.key)
            .setOutputCol(output_column)
            .setErrorCol("keyphrase_error"))

        return extractor.transform(df)

# Initialize
cog = SynapseCognitiveServices(spark, cognitive_key, cognitive_endpoint)

Sentiment Analysis at Scale

class SentimentPipeline:
    """Scalable sentiment analysis pipeline."""

    def __init__(self, cognitive_services):
        self.cog = cognitive_services

    def analyze_customer_feedback(self, df):
        """Analyze customer feedback sentiment."""

        # Apply sentiment analysis
        result = self.cog.analyze_sentiment(df, "feedback_text")

        # Extract sentiment scores
        from pyspark.sql.functions import col

        result = result.select(
            col("customer_id"),
            col("feedback_text"),
            col("feedback_date"),
            col("sentiment.document.sentiment").alias("overall_sentiment"),
            col("sentiment.document.confidenceScores.positive").alias("positive_score"),
            col("sentiment.document.confidenceScores.negative").alias("negative_score"),
            col("sentiment.document.confidenceScores.neutral").alias("neutral_score")
        )

        return result

    def batch_process_reviews(self, source_table: str, target_table: str):
        """Process reviews in batches."""

        # Read source data
        df = spark.table(source_table)

        # Process in chunks for large datasets
        batch_size = 10000
        total_rows = df.count()

        for offset in range(0, total_rows, batch_size):
            batch_df = df.limit(batch_size).offset(offset)
            result = self.analyze_customer_feedback(batch_df)

            # Write to target
            result.write.mode("append").saveAsTable(target_table)

            print(f"Processed {min(offset + batch_size, total_rows)}/{total_rows} rows")

    def aggregate_sentiment_metrics(self, sentiment_df) -> dict:
        """Aggregate sentiment metrics."""

        from pyspark.sql.functions import avg, count, when

        metrics = sentiment_df.agg(
            count("*").alias("total_reviews"),
            avg("positive_score").alias("avg_positive"),
            avg("negative_score").alias("avg_negative"),
            count(when(col("overall_sentiment") == "positive", 1)).alias("positive_count"),
            count(when(col("overall_sentiment") == "negative", 1)).alias("negative_count"),
            count(when(col("overall_sentiment") == "neutral", 1)).alias("neutral_count")
        ).collect()[0]

        return metrics.asDict()

# Usage
pipeline = SentimentPipeline(cog)
reviews_df = spark.table("bronze.customer_reviews")
sentiment_df = pipeline.analyze_customer_feedback(reviews_df)
sentiment_df.write.mode("overwrite").saveAsTable("silver.review_sentiments")

Entity Extraction Pipeline

class EntityExtractionPipeline:
    """Extract and categorize entities at scale."""

    def __init__(self, cognitive_services):
        self.cog = cognitive_services

    def extract_and_flatten_entities(self, df, text_column: str):
        """Extract entities and flatten results."""

        # Apply NER
        result = self.cog.extract_entities(df, text_column)

        # Flatten entity results
        from pyspark.sql.functions import explode, col

        flattened = result.select(
            col("id"),
            col(text_column),
            explode(col("entities.document.entities")).alias("entity")
        ).select(
            col("id"),
            col(text_column),
            col("entity.text").alias("entity_text"),
            col("entity.category").alias("entity_category"),
            col("entity.subcategory").alias("entity_subcategory"),
            col("entity.confidenceScore").alias("confidence")
        )

        return flattened

    def build_entity_graph(self, entity_df):
        """Build entity co-occurrence graph."""

        from pyspark.sql.functions import collect_set

        # Group entities by document
        doc_entities = entity_df.groupBy("id").agg(
            collect_set("entity_text").alias("entities")
        )

        # Create entity pairs
        from pyspark.sql.functions import explode, array, struct

        entity_pairs = doc_entities.select(
            explode(
                self._generate_pairs(col("entities"))
            ).alias("pair")
        ).select(
            col("pair.entity1"),
            col("pair.entity2")
        )

        # Count co-occurrences
        co_occurrence = entity_pairs.groupBy("entity1", "entity2").count()

        return co_occurrence

    def categorize_documents(self, df, text_column: str):
        """Categorize documents based on entities."""

        entities = self.extract_and_flatten_entities(df, text_column)

        # Pivot entity categories
        from pyspark.sql.functions import count

        category_counts = entities.groupBy("id").pivot("entity_category").agg(
            count("entity_text")
        ).na.fill(0)

        # Join back with original
        return df.join(category_counts, "id", "left")

# Usage
entity_pipeline = EntityExtractionPipeline(cog)
articles_df = spark.table("bronze.news_articles")
entities_df = entity_pipeline.extract_and_flatten_entities(articles_df, "article_text")
entities_df.write.mode("overwrite").saveAsTable("silver.article_entities")

Translation Pipeline

class TranslationPipeline:
    """Multi-language translation at scale."""

    def __init__(self, spark, translator_key: str, translator_endpoint: str):
        self.spark = spark
        self.key = translator_key
        self.endpoint = translator_endpoint

    def translate_column(
        self,
        df,
        text_column: str,
        target_language: str = "en",
        output_column: str = "translated"
    ):
        """Translate text column to target language."""

        from synapse.ml.cognitive import Translate

        translator = (Translate()
            .setTextCol(text_column)
            .setToLanguage([target_language])
            .setUrl(self.endpoint)
            .setSubscriptionKey(self.key)
            .setOutputCol(output_column)
            .setErrorCol("translation_error"))

        result = translator.transform(df)

        # Extract translated text
        result = result.withColumn(
            output_column,
            col(f"{output_column}")[0]["translations"][0]["text"]
        )

        return result

    def detect_language(self, df, text_column: str, output_column: str = "detected_language"):
        """Detect language of text."""

        from synapse.ml.cognitive import LanguageDetector

        detector = (LanguageDetector()
            .setTextCol(text_column)
            .setUrl(f"{self.endpoint.replace('translator', 'text/analytics')}/v3.1/languages")
            .setSubscriptionKey(self.key)
            .setOutputCol(output_column)
            .setErrorCol("detection_error"))

        return detector.transform(df)

    def translate_multilingual_dataset(
        self,
        df,
        text_column: str,
        target_language: str = "en"
    ):
        """Translate multi-language dataset to single language."""

        # First detect languages
        with_language = self.detect_language(df, text_column)

        # Filter non-target language rows
        needs_translation = with_language.filter(
            col("detected_language.document.detectedLanguage.iso6391Name") != target_language
        )

        already_target = with_language.filter(
            col("detected_language.document.detectedLanguage.iso6391Name") == target_language
        ).withColumn("translated", col(text_column))

        # Translate
        translated = self.translate_column(needs_translation, text_column, target_language)

        # Union results
        return already_target.unionByName(translated, allowMissingColumns=True)

# Usage
translator = TranslationPipeline(spark, translator_key, translator_endpoint)
multilingual_df = spark.table("bronze.international_reviews")
english_df = translator.translate_multilingual_dataset(multilingual_df, "review_text", "en")

Computer Vision in Synapse

class SynapseComputerVision:
    """Computer Vision integration for image processing."""

    def __init__(self, spark, vision_key: str, vision_endpoint: str):
        self.spark = spark
        self.key = vision_key
        self.endpoint = vision_endpoint

    def analyze_images(self, df, image_url_column: str):
        """Analyze images and extract features."""

        from synapse.ml.cognitive import AnalyzeImage

        analyzer = (AnalyzeImage()
            .setImageUrlCol(image_url_column)
            .setUrl(f"{self.endpoint}vision/v3.2/analyze")
            .setSubscriptionKey(self.key)
            .setVisualFeatures(["Categories", "Tags", "Description", "Objects"])
            .setOutputCol("analysis")
            .setErrorCol("vision_error"))

        return analyzer.transform(df)

    def extract_text_from_images(self, df, image_url_column: str):
        """Extract text from images using OCR."""

        from synapse.ml.cognitive import OCR

        ocr = (OCR()
            .setImageUrlCol(image_url_column)
            .setUrl(f"{self.endpoint}vision/v3.2/ocr")
            .setSubscriptionKey(self.key)
            .setOutputCol("ocr_result")
            .setErrorCol("ocr_error"))

        result = ocr.transform(df)

        # Flatten OCR results
        return result.withColumn(
            "extracted_text",
            self._flatten_ocr_text(col("ocr_result"))
        )

    def generate_image_captions(self, df, image_url_column: str):
        """Generate captions for images."""

        analyzed = self.analyze_images(df, image_url_column)

        return analyzed.withColumn(
            "caption",
            col("analysis.description.captions")[0]["text"]
        ).withColumn(
            "caption_confidence",
            col("analysis.description.captions")[0]["confidence"]
        )

# Usage
vision = SynapseComputerVision(spark, vision_key, vision_endpoint)
products_df = spark.table("bronze.product_images")
analyzed_df = vision.generate_image_captions(products_df, "image_url")

Unified AI Pipeline

class UnifiedAIPipeline:
    """Combined Cognitive Services pipeline."""

    def __init__(self, spark, config: dict):
        self.spark = spark
        self.sentiment = SynapseCognitiveServices(
            spark, config["text_key"], config["text_endpoint"]
        )
        self.vision = SynapseComputerVision(
            spark, config["vision_key"], config["vision_endpoint"]
        )

    def process_social_media_posts(self, df):
        """Process social media posts with text and images."""

        # Separate text-only and image posts
        text_only = df.filter(col("image_url").isNull())
        with_images = df.filter(col("image_url").isNotNull())

        # Process text
        text_analyzed = self.sentiment.analyze_sentiment(text_only, "post_text")
        text_analyzed = self.sentiment.extract_entities(text_analyzed, "post_text")

        # Process images
        image_analyzed = self.vision.analyze_images(with_images, "image_url")
        image_analyzed = self.sentiment.analyze_sentiment(image_analyzed, "post_text")

        # Combine results
        return text_analyzed.unionByName(image_analyzed, allowMissingColumns=True)

# Usage
pipeline = UnifiedAIPipeline(spark, {
    "text_key": cognitive_key,
    "text_endpoint": cognitive_endpoint,
    "vision_key": vision_key,
    "vision_endpoint": vision_endpoint
})

social_df = spark.table("bronze.social_media_posts")
enriched_df = pipeline.process_social_media_posts(social_df)
enriched_df.write.mode("overwrite").saveAsTable("silver.enriched_social_posts")

Cognitive Services in Synapse brings AI capabilities directly into your data engineering workflows. Process millions of records with sentiment, entities, translation, and vision without leaving your Spark environment.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.