Back to Blog
6 min read

Error Handling for AI Applications: Building Resilient Systems

AI applications face unique error scenarios that require careful handling. Today, I will cover patterns for building resilient AI systems.

Common Error Types

from enum import Enum

class AIErrorType(Enum):
    RATE_LIMIT = "rate_limit"
    TOKEN_LIMIT = "token_limit"
    CONTENT_FILTER = "content_filter"
    INVALID_REQUEST = "invalid_request"
    MODEL_UNAVAILABLE = "model_unavailable"
    TIMEOUT = "timeout"
    NETWORK = "network"
    AUTHENTICATION = "authentication"
    QUOTA_EXCEEDED = "quota_exceeded"

error_handling_strategies = {
    AIErrorType.RATE_LIMIT: "Retry with exponential backoff",
    AIErrorType.TOKEN_LIMIT: "Reduce context and retry",
    AIErrorType.CONTENT_FILTER: "Modify input or use fallback",
    AIErrorType.INVALID_REQUEST: "Validate and correct request",
    AIErrorType.MODEL_UNAVAILABLE: "Use fallback model",
    AIErrorType.TIMEOUT: "Retry with shorter timeout",
    AIErrorType.NETWORK: "Retry with backoff",
    AIErrorType.AUTHENTICATION: "Refresh credentials",
    AIErrorType.QUOTA_EXCEEDED: "Alert and queue for later"
}

Exception Hierarchy

class AIException(Exception):
    """Base exception for AI operations"""

    def __init__(self, message: str, error_type: AIErrorType, details: dict = None):
        super().__init__(message)
        self.error_type = error_type
        self.details = details or {}
        self.retryable = error_type in [
            AIErrorType.RATE_LIMIT,
            AIErrorType.TIMEOUT,
            AIErrorType.NETWORK
        ]

class RateLimitException(AIException):
    def __init__(self, message: str, retry_after: int = None):
        super().__init__(message, AIErrorType.RATE_LIMIT, {"retry_after": retry_after})
        self.retry_after = retry_after

class TokenLimitException(AIException):
    def __init__(self, message: str, tokens_used: int, limit: int):
        super().__init__(message, AIErrorType.TOKEN_LIMIT, {
            "tokens_used": tokens_used,
            "limit": limit
        })

class ContentFilterException(AIException):
    def __init__(self, message: str, categories: list):
        super().__init__(message, AIErrorType.CONTENT_FILTER, {"categories": categories})
        self.categories = categories

Error Handler

from openai import RateLimitError, APIError, APITimeoutError, BadRequestError
import logging

logger = logging.getLogger(__name__)

class AIErrorHandler:
    """Handle and classify AI errors"""

    def handle_openai_error(self, error: Exception) -> AIException:
        """Convert OpenAI errors to our exception types"""

        if isinstance(error, RateLimitError):
            retry_after = getattr(error, 'retry_after', 60)
            return RateLimitException(
                f"Rate limit exceeded. Retry after {retry_after}s",
                retry_after=retry_after
            )

        if isinstance(error, APITimeoutError):
            return AIException(
                "API request timed out",
                AIErrorType.TIMEOUT
            )

        if isinstance(error, BadRequestError):
            error_msg = str(error)

            # Check for content filter
            if "content_filter" in error_msg.lower():
                return ContentFilterException(
                    "Content was filtered",
                    categories=self._extract_filter_categories(error)
                )

            # Check for token limit
            if "maximum context length" in error_msg.lower():
                return TokenLimitException(
                    "Token limit exceeded",
                    tokens_used=self._extract_token_count(error),
                    limit=self._extract_limit(error)
                )

            return AIException(error_msg, AIErrorType.INVALID_REQUEST)

        if isinstance(error, APIError):
            return AIException(
                str(error),
                AIErrorType.MODEL_UNAVAILABLE if "model" in str(error).lower()
                else AIErrorType.NETWORK
            )

        # Unknown error
        logger.error(f"Unexpected error: {type(error).__name__}: {error}")
        return AIException(str(error), AIErrorType.NETWORK)

    def _extract_filter_categories(self, error) -> list:
        # Parse error message for filter categories
        return []

    def _extract_token_count(self, error) -> int:
        # Parse error message for token count
        return 0

    def _extract_limit(self, error) -> int:
        return 0

Retry Logic

import asyncio
from functools import wraps
import random

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
):
    """Decorator for retry with exponential backoff"""

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            error_handler = AIErrorHandler()
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    ai_error = error_handler.handle_openai_error(e)
                    last_exception = ai_error

                    if not ai_error.retryable or attempt == max_retries:
                        raise ai_error

                    # Calculate delay
                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay
                    )

                    # Use retry_after if available
                    if hasattr(ai_error, 'retry_after') and ai_error.retry_after:
                        delay = ai_error.retry_after

                    # Add jitter
                    if jitter:
                        delay *= (0.5 + random.random())

                    logger.warning(
                        f"Attempt {attempt + 1} failed: {ai_error}. "
                        f"Retrying in {delay:.2f}s"
                    )

                    await asyncio.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

# Usage
@retry_with_backoff(max_retries=3, base_delay=1.0)
async def call_openai(messages: list):
    return await client.chat.completions.create(
        model="gpt-4",
        messages=messages
    )

Fallback Strategies

class FallbackManager:
    """Manage fallback strategies for AI calls"""

    def __init__(self, primary_client, fallback_client=None):
        self.primary = primary_client
        self.fallback = fallback_client
        self.fallback_models = ["gpt-4-turbo", "gpt-35-turbo"]

    async def chat_with_fallback(
        self,
        messages: list,
        model: str = "gpt-4"
    ) -> dict:
        """Try primary, fall back to alternatives"""

        # Try primary
        try:
            response = await self.primary.chat.completions.create(
                model=model,
                messages=messages
            )
            return {"response": response, "fallback_used": False}
        except Exception as e:
            logger.warning(f"Primary model failed: {e}")

        # Try fallback models
        for fallback_model in self.fallback_models:
            if fallback_model == model:
                continue

            try:
                response = await self.primary.chat.completions.create(
                    model=fallback_model,
                    messages=messages
                )
                return {
                    "response": response,
                    "fallback_used": True,
                    "model_used": fallback_model
                }
            except Exception as e:
                logger.warning(f"Fallback {fallback_model} failed: {e}")

        # Try fallback client (different region/endpoint)
        if self.fallback:
            try:
                response = await self.fallback.chat.completions.create(
                    model="gpt-35-turbo",
                    messages=messages
                )
                return {
                    "response": response,
                    "fallback_used": True,
                    "fallback_client": True
                }
            except Exception as e:
                logger.error(f"All fallbacks failed: {e}")

        raise AIException(
            "All models and fallbacks failed",
            AIErrorType.MODEL_UNAVAILABLE
        )

Graceful Degradation

class GracefulDegradation:
    """Provide degraded responses when AI fails"""

    def __init__(self):
        self.canned_responses = {
            "greeting": "Hello! I'm here to help, though I'm experiencing some issues. Please try again in a moment.",
            "error": "I apologize, but I'm having trouble processing your request. Please try again later.",
            "fallback": "I cannot provide a detailed response right now. Here's a general answer: {topic}"
        }

    def get_fallback_response(self, user_input: str, error: AIException) -> str:
        """Generate fallback response based on error type"""

        # Content filter - can't respond
        if error.error_type == AIErrorType.CONTENT_FILTER:
            return "I'm unable to respond to that type of request. Please rephrase your question."

        # Rate limit - temporary
        if error.error_type == AIErrorType.RATE_LIMIT:
            return "I'm handling many requests right now. Please try again in a moment."

        # Check for simple queries we can handle
        simple_intent = self._detect_simple_intent(user_input)
        if simple_intent:
            return self._handle_simple_intent(simple_intent)

        return self.canned_responses["error"]

    def _detect_simple_intent(self, text: str) -> str:
        """Detect simple intents without AI"""
        text_lower = text.lower()

        if any(word in text_lower for word in ["hi", "hello", "hey"]):
            return "greeting"
        if any(word in text_lower for word in ["bye", "goodbye", "thanks"]):
            return "farewell"
        if "help" in text_lower:
            return "help"

        return None

    def _handle_simple_intent(self, intent: str) -> str:
        """Handle simple intents with canned responses"""
        responses = {
            "greeting": "Hello! How can I help you today?",
            "farewell": "Goodbye! Have a great day!",
            "help": "I can help with questions, analysis, and more. What would you like to know?"
        }
        return responses.get(intent, self.canned_responses["error"])

Complete Error-Handling Flow

class ResilientAIClient:
    """AI client with comprehensive error handling"""

    def __init__(self, primary_client, fallback_client=None):
        self.error_handler = AIErrorHandler()
        self.fallback_manager = FallbackManager(primary_client, fallback_client)
        self.degradation = GracefulDegradation()

    @retry_with_backoff(max_retries=3)
    async def chat(self, messages: list, model: str = "gpt-4") -> dict:
        try:
            result = await self.fallback_manager.chat_with_fallback(messages, model)
            return {
                "success": True,
                "response": result["response"].choices[0].message.content,
                "fallback_used": result.get("fallback_used", False)
            }
        except AIException as e:
            # Log for monitoring
            logger.error(f"AI call failed: {e.error_type}: {e}")

            # Get degraded response
            user_input = messages[-1]["content"] if messages else ""
            fallback_response = self.degradation.get_fallback_response(user_input, e)

            return {
                "success": False,
                "response": fallback_response,
                "error_type": e.error_type.value,
                "degraded": True
            }

Robust error handling ensures AI applications remain useful even during failures. Tomorrow, I will cover retry strategies in more depth.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.