Back to Blog
9 min read

Content Moderation Patterns for AI Applications

Introduction

Content moderation is essential for responsible AI deployment. This post covers design patterns and architectural approaches for implementing scalable, effective content moderation in AI applications.

Moderation Architecture Patterns

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class ModerationDecision(Enum):
    APPROVE = "approve"
    REJECT = "reject"
    REVIEW = "review"
    MODIFY = "modify"

@dataclass
class ModerationResult:
    decision: ModerationDecision
    confidence: float
    reasons: List[str]
    modified_content: Optional[str] = None

class ContentModerator(ABC):
    """Abstract base class for content moderators"""

    @abstractmethod
    def moderate(self, content: str) -> ModerationResult:
        pass

class ChainOfResponsibilityPattern:
    """Chain of responsibility for moderation"""

    def __init__(self):
        self.handlers: List[ContentModerator] = []

    def add_handler(self, handler: ContentModerator):
        """Add handler to chain"""
        self.handlers.append(handler)
        return self

    def moderate(self, content: str) -> ModerationResult:
        """Process content through chain"""
        for handler in self.handlers:
            result = handler.moderate(content)

            # If rejected or needs review, stop chain
            if result.decision in [ModerationDecision.REJECT, ModerationDecision.REVIEW]:
                return result

            # If modified, continue with modified content
            if result.decision == ModerationDecision.MODIFY:
                content = result.modified_content

        return ModerationResult(
            decision=ModerationDecision.APPROVE,
            confidence=1.0,
            reasons=["Passed all moderation checks"]
        )

Layered Moderation Pattern

class KeywordFilter(ContentModerator):
    """Fast keyword-based filtering"""

    def __init__(self, blocked_words: List[str]):
        self.blocked_words = set(w.lower() for w in blocked_words)

    def moderate(self, content: str) -> ModerationResult:
        content_lower = content.lower()
        found_words = [
            word for word in self.blocked_words
            if word in content_lower
        ]

        if found_words:
            return ModerationResult(
                decision=ModerationDecision.REJECT,
                confidence=1.0,
                reasons=[f"Blocked words found: {', '.join(found_words)}"]
            )

        return ModerationResult(
            decision=ModerationDecision.APPROVE,
            confidence=1.0,
            reasons=["No blocked words found"]
        )

class RegexFilter(ContentModerator):
    """Pattern-based filtering using regex"""

    def __init__(self, patterns: Dict[str, str]):
        import re
        self.patterns = {
            name: re.compile(pattern, re.IGNORECASE)
            for name, pattern in patterns.items()
        }

    def moderate(self, content: str) -> ModerationResult:
        matches = []

        for name, pattern in self.patterns.items():
            if pattern.search(content):
                matches.append(name)

        if matches:
            return ModerationResult(
                decision=ModerationDecision.REVIEW,
                confidence=0.8,
                reasons=[f"Pattern matches: {', '.join(matches)}"]
            )

        return ModerationResult(
            decision=ModerationDecision.APPROVE,
            confidence=0.9,
            reasons=["No suspicious patterns found"]
        )

class MLClassifier(ContentModerator):
    """ML-based content classification"""

    def __init__(self, model, threshold: float = 0.8):
        self.model = model
        self.threshold = threshold

    def moderate(self, content: str) -> ModerationResult:
        # Get model predictions
        prediction = self.model.predict(content)

        if prediction["harmful_score"] > self.threshold:
            return ModerationResult(
                decision=ModerationDecision.REJECT,
                confidence=prediction["harmful_score"],
                reasons=[f"ML classifier flagged: {prediction['category']}"]
            )

        if prediction["harmful_score"] > self.threshold * 0.7:
            return ModerationResult(
                decision=ModerationDecision.REVIEW,
                confidence=prediction["harmful_score"],
                reasons=[f"ML classifier uncertain: {prediction['category']}"]
            )

        return ModerationResult(
            decision=ModerationDecision.APPROVE,
            confidence=1 - prediction["harmful_score"],
            reasons=["ML classifier approved"]
        )

class LayeredModerationSystem:
    """Multi-layer moderation with fast path"""

    def __init__(self):
        # Layer 1: Fast, rule-based
        self.keyword_filter = KeywordFilter([
            "spam", "scam", "hack"
        ])

        # Layer 2: Pattern matching
        self.regex_filter = RegexFilter({
            "email_solicitation": r"send.*email.*password",
            "phone_number": r"\d{3}[-.]?\d{3}[-.]?\d{4}",
            "credit_card": r"\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}"
        })

        # Layer 3: ML-based (heavier)
        # self.ml_classifier = MLClassifier(model)

    def moderate(self, content: str) -> ModerationResult:
        """Moderate through layers, short-circuit on rejection"""
        # Layer 1: Keyword filter (fastest)
        result = self.keyword_filter.moderate(content)
        if result.decision == ModerationDecision.REJECT:
            return result

        # Layer 2: Regex patterns
        result = self.regex_filter.moderate(content)
        if result.decision in [ModerationDecision.REJECT, ModerationDecision.REVIEW]:
            return result

        # Layer 3: ML classifier (only if passed previous layers)
        # result = self.ml_classifier.moderate(content)

        return result

# Usage
moderation_system = LayeredModerationSystem()
result = moderation_system.moderate("This is a test message")
print(f"Decision: {result.decision.value}")

Async Moderation Pattern

import asyncio
from typing import Callable, Awaitable

class AsyncModerationPipeline:
    """Asynchronous moderation for high throughput"""

    def __init__(self):
        self.sync_filters: List[ContentModerator] = []
        self.async_filters: List[Callable[[str], Awaitable[ModerationResult]]] = []

    def add_sync_filter(self, filter: ContentModerator):
        """Add synchronous filter"""
        self.sync_filters.append(filter)
        return self

    def add_async_filter(self, filter: Callable):
        """Add asynchronous filter"""
        self.async_filters.append(filter)
        return self

    async def moderate(self, content: str) -> ModerationResult:
        """Moderate content asynchronously"""
        # Run sync filters first (fast path)
        for filter in self.sync_filters:
            result = filter.moderate(content)
            if result.decision == ModerationDecision.REJECT:
                return result

        # Run async filters in parallel
        if self.async_filters:
            tasks = [filter(content) for filter in self.async_filters]
            results = await asyncio.gather(*tasks)

            # Aggregate results
            for result in results:
                if result.decision == ModerationDecision.REJECT:
                    return result

            # Check for any review needed
            review_results = [r for r in results if r.decision == ModerationDecision.REVIEW]
            if review_results:
                return ModerationResult(
                    decision=ModerationDecision.REVIEW,
                    confidence=min(r.confidence for r in review_results),
                    reasons=[r for result in review_results for r in result.reasons]
                )

        return ModerationResult(
            decision=ModerationDecision.APPROVE,
            confidence=1.0,
            reasons=["All filters passed"]
        )

    async def moderate_batch(self, contents: List[str]) -> List[ModerationResult]:
        """Moderate multiple items in parallel"""
        tasks = [self.moderate(content) for content in contents]
        return await asyncio.gather(*tasks)

# Example async filter
async def external_api_filter(content: str) -> ModerationResult:
    """Call external moderation API"""
    # Simulate API call
    await asyncio.sleep(0.1)
    return ModerationResult(
        decision=ModerationDecision.APPROVE,
        confidence=0.95,
        reasons=["External API approved"]
    )

# Usage
async def main():
    pipeline = AsyncModerationPipeline()
    pipeline.add_sync_filter(KeywordFilter(["spam"]))
    pipeline.add_async_filter(external_api_filter)

    result = await pipeline.moderate("Test content")
    print(f"Decision: {result.decision.value}")

# asyncio.run(main())

Human-in-the-Loop Pattern

from datetime import datetime
import uuid

@dataclass
class ReviewItem:
    id: str
    content: str
    automated_result: ModerationResult
    created_at: datetime
    priority: int
    status: str = "pending"
    reviewer: Optional[str] = None
    final_decision: Optional[ModerationDecision] = None

class HumanReviewQueue:
    """Queue for human review of flagged content"""

    def __init__(self):
        self.queue: Dict[str, ReviewItem] = {}

    def add_for_review(
        self,
        content: str,
        automated_result: ModerationResult,
        priority: int = 5
    ) -> str:
        """Add item to review queue"""
        item_id = str(uuid.uuid4())
        item = ReviewItem(
            id=item_id,
            content=content,
            automated_result=automated_result,
            created_at=datetime.now(),
            priority=priority
        )
        self.queue[item_id] = item
        return item_id

    def get_next_item(self, reviewer: str) -> Optional[ReviewItem]:
        """Get next item for review"""
        pending = [
            item for item in self.queue.values()
            if item.status == "pending"
        ]

        if not pending:
            return None

        # Sort by priority (higher first) and age (older first)
        pending.sort(key=lambda x: (-x.priority, x.created_at))

        item = pending[0]
        item.status = "in_review"
        item.reviewer = reviewer
        return item

    def submit_review(
        self,
        item_id: str,
        decision: ModerationDecision,
        notes: str = ""
    ) -> bool:
        """Submit human review decision"""
        if item_id not in self.queue:
            return False

        item = self.queue[item_id]
        item.status = "reviewed"
        item.final_decision = decision
        return True

    def get_stats(self) -> Dict:
        """Get queue statistics"""
        items = list(self.queue.values())

        pending = sum(1 for i in items if i.status == "pending")
        in_review = sum(1 for i in items if i.status == "in_review")
        reviewed = sum(1 for i in items if i.status == "reviewed")

        # Calculate agreement rate
        reviewed_items = [i for i in items if i.status == "reviewed"]
        agreements = sum(
            1 for i in reviewed_items
            if i.final_decision == i.automated_result.decision
        )

        return {
            "total": len(items),
            "pending": pending,
            "in_review": in_review,
            "reviewed": reviewed,
            "agreement_rate": agreements / len(reviewed_items) if reviewed_items else 0
        }

class ModerationWithHITL:
    """Moderation system with human-in-the-loop"""

    def __init__(self):
        self.automated = LayeredModerationSystem()
        self.review_queue = HumanReviewQueue()
        self.review_threshold = 0.7

    def moderate(self, content: str) -> Dict:
        """Moderate with potential human review"""
        result = self.automated.moderate(content)

        if result.decision == ModerationDecision.REVIEW:
            # Add to human review queue
            review_id = self.review_queue.add_for_review(
                content=content,
                automated_result=result,
                priority=self._calculate_priority(result)
            )

            return {
                "decision": "pending_review",
                "review_id": review_id,
                "automated_result": result
            }

        return {
            "decision": result.decision.value,
            "confidence": result.confidence,
            "reasons": result.reasons
        }

    def _calculate_priority(self, result: ModerationResult) -> int:
        """Calculate review priority"""
        # Higher priority for lower confidence
        if result.confidence < 0.5:
            return 10
        elif result.confidence < 0.7:
            return 7
        else:
            return 5

Feedback Loop Pattern

class FeedbackCollector:
    """Collect feedback on moderation decisions"""

    def __init__(self):
        self.feedback_log = []

    def log_feedback(
        self,
        content: str,
        automated_decision: ModerationDecision,
        correct_decision: ModerationDecision,
        source: str = "user"
    ):
        """Log feedback on moderation decision"""
        self.feedback_log.append({
            "timestamp": datetime.now(),
            "content_hash": hash(content),
            "automated": automated_decision.value,
            "correct": correct_decision.value,
            "was_correct": automated_decision == correct_decision,
            "source": source
        })

    def get_accuracy_report(self) -> Dict:
        """Generate accuracy report from feedback"""
        if not self.feedback_log:
            return {"error": "No feedback collected"}

        total = len(self.feedback_log)
        correct = sum(1 for f in self.feedback_log if f["was_correct"])

        # Breakdown by decision type
        by_automated = {}
        for f in self.feedback_log:
            auto = f["automated"]
            if auto not in by_automated:
                by_automated[auto] = {"total": 0, "correct": 0}
            by_automated[auto]["total"] += 1
            if f["was_correct"]:
                by_automated[auto]["correct"] += 1

        return {
            "total_feedback": total,
            "overall_accuracy": correct / total,
            "by_decision_type": {
                k: v["correct"] / v["total"] if v["total"] > 0 else 0
                for k, v in by_automated.items()
            },
            "false_positive_rate": self._calculate_fpr(),
            "false_negative_rate": self._calculate_fnr()
        }

    def _calculate_fpr(self) -> float:
        """Calculate false positive rate"""
        # Rejected when should be approved
        rejected_feedback = [
            f for f in self.feedback_log
            if f["automated"] == "reject"
        ]
        if not rejected_feedback:
            return 0.0

        false_positives = sum(
            1 for f in rejected_feedback
            if f["correct"] == "approve"
        )
        return false_positives / len(rejected_feedback)

    def _calculate_fnr(self) -> float:
        """Calculate false negative rate"""
        # Approved when should be rejected
        approved_feedback = [
            f for f in self.feedback_log
            if f["automated"] == "approve"
        ]
        if not approved_feedback:
            return 0.0

        false_negatives = sum(
            1 for f in approved_feedback
            if f["correct"] == "reject"
        )
        return false_negatives / len(approved_feedback)

class AdaptiveModerationSystem:
    """Moderation system that adapts based on feedback"""

    def __init__(self):
        self.base_system = LayeredModerationSystem()
        self.feedback = FeedbackCollector()
        self.adjustment_threshold = 0.1

    def moderate(self, content: str) -> ModerationResult:
        """Moderate with adaptive thresholds"""
        base_result = self.base_system.moderate(content)

        # Adjust based on feedback
        accuracy_report = self.feedback.get_accuracy_report()

        if "false_positive_rate" in accuracy_report:
            if accuracy_report["false_positive_rate"] > self.adjustment_threshold:
                # Too many false positives - be more lenient
                if base_result.decision == ModerationDecision.REJECT:
                    if base_result.confidence < 0.9:
                        return ModerationResult(
                            decision=ModerationDecision.REVIEW,
                            confidence=base_result.confidence,
                            reasons=base_result.reasons + ["Adjusted due to high FPR"]
                        )

        return base_result

    def process_feedback(
        self,
        content: str,
        automated_decision: ModerationDecision,
        correct_decision: ModerationDecision
    ):
        """Process user feedback"""
        self.feedback.log_feedback(
            content=content,
            automated_decision=automated_decision,
            correct_decision=correct_decision
        )

Caching Pattern

from functools import lru_cache
import hashlib

class CachedModerationSystem:
    """Moderation with caching for repeated content"""

    def __init__(self, base_system: ContentModerator, cache_size: int = 10000):
        self.base_system = base_system
        self.cache_size = cache_size
        self._cache = {}

    def _get_content_hash(self, content: str) -> str:
        """Generate hash for content"""
        return hashlib.sha256(content.encode()).hexdigest()

    def moderate(self, content: str) -> ModerationResult:
        """Moderate with caching"""
        content_hash = self._get_content_hash(content)

        # Check cache
        if content_hash in self._cache:
            cached = self._cache[content_hash]
            return ModerationResult(
                decision=cached["decision"],
                confidence=cached["confidence"],
                reasons=cached["reasons"] + ["(cached)"]
            )

        # Moderate
        result = self.base_system.moderate(content)

        # Cache result
        if len(self._cache) >= self.cache_size:
            # Simple eviction: remove oldest
            oldest_key = next(iter(self._cache))
            del self._cache[oldest_key]

        self._cache[content_hash] = {
            "decision": result.decision,
            "confidence": result.confidence,
            "reasons": result.reasons
        }

        return result

    def invalidate_cache(self):
        """Clear the cache"""
        self._cache.clear()

    def get_cache_stats(self) -> Dict:
        """Get cache statistics"""
        return {
            "cache_size": len(self._cache),
            "max_size": self.cache_size,
            "utilization": len(self._cache) / self.cache_size
        }

Conclusion

Content moderation patterns provide structured approaches to building effective moderation systems. Key patterns include chain of responsibility for modular processing, layered filtering for efficiency, async pipelines for throughput, human-in-the-loop for complex cases, feedback loops for continuous improvement, and caching for performance. Combining these patterns creates robust, scalable moderation systems.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.