Back to Blog
5 min read

Conversation Management: Building Production Chat Systems

Production chat systems require robust conversation management beyond simple message passing. Today, I will cover strategies for building reliable conversational AI systems.

Conversation Lifecycle

from enum import Enum
from datetime import datetime, timedelta

class ConversationStatus(Enum):
    ACTIVE = "active"
    IDLE = "idle"
    WAITING_FOR_INPUT = "waiting_for_input"
    PROCESSING = "processing"
    ESCALATED = "escalated"
    CLOSED = "closed"

class ConversationLifecycle:
    def __init__(self, idle_timeout_minutes: int = 30, max_duration_hours: int = 24):
        self.idle_timeout = timedelta(minutes=idle_timeout_minutes)
        self.max_duration = timedelta(hours=max_duration_hours)

    def check_status(self, conversation) -> ConversationStatus:
        now = datetime.utcnow()

        # Check if conversation has exceeded max duration
        if now - conversation.created_at > self.max_duration:
            return ConversationStatus.CLOSED

        # Check if idle too long
        if now - conversation.updated_at > self.idle_timeout:
            return ConversationStatus.IDLE

        return conversation.status

    def handle_idle(self, conversation, manager):
        """Handle idle conversation"""
        # Send reminder or close
        if conversation.reminder_sent:
            manager.close_conversation(conversation.session_id, "Closed due to inactivity")
        else:
            manager.send_message(
                conversation.session_id,
                "Are you still there? Let me know if you need any more help.",
                role="system"
            )
            conversation.reminder_sent = True

Conversation Routing

class ConversationRouter:
    """Route conversations based on intent and context"""

    def __init__(self, client):
        self.client = client
        self.handlers = {}

    def register_handler(self, intent: str, handler: callable):
        self.handlers[intent] = handler

    def classify_intent(self, message: str, context: dict) -> str:
        """Classify user intent"""
        classification_prompt = f"""Classify the user's intent from this message.

Context: {json.dumps(context)}
User message: {message}

Possible intents:
- order_status: Checking order status
- product_inquiry: Questions about products
- support_request: Technical support needed
- billing_question: Payment or billing related
- general_inquiry: General questions
- complaint: Expressing dissatisfaction
- human_agent: Requesting human assistance

Return only the intent name."""

        response = self.client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": classification_prompt}],
            max_tokens=20,
            temperature=0
        )

        return response.choices[0].message.content.strip().lower()

    def route(self, message: str, conversation) -> str:
        """Route message to appropriate handler"""
        intent = self.classify_intent(message, conversation.context)

        if intent in self.handlers:
            return self.handlers[intent](message, conversation)

        # Default handler
        return self.handlers.get("general_inquiry", self.default_handler)(message, conversation)

    def default_handler(self, message: str, conversation) -> str:
        return "I'm not sure how to help with that. Could you please rephrase?"

Handoff to Human Agents

class HumanHandoff:
    """Manage handoff to human agents"""

    def __init__(self, queue_service):
        self.queue = queue_service
        self.escalation_triggers = [
            "speak to human",
            "talk to agent",
            "real person",
            "escalate",
            "supervisor"
        ]

    def should_escalate(self, message: str, conversation) -> tuple[bool, str]:
        """Determine if conversation should escalate"""

        # Explicit request
        if any(trigger in message.lower() for trigger in self.escalation_triggers):
            return True, "User requested human assistance"

        # Too many failed attempts
        if conversation.context.get("failed_attempts", 0) >= 3:
            return True, "Multiple failed resolution attempts"

        # Sentiment analysis
        if self._detect_frustration(conversation.messages):
            return True, "Detected user frustration"

        # Complex issue
        if conversation.context.get("complexity_score", 0) > 8:
            return True, "Issue too complex for automated resolution"

        return False, None

    def initiate_handoff(self, conversation, reason: str) -> dict:
        """Create handoff request"""

        # Prepare summary for human agent
        summary = self._create_handoff_summary(conversation)

        handoff_request = {
            "conversation_id": conversation.session_id,
            "reason": reason,
            "summary": summary,
            "priority": self._calculate_priority(conversation),
            "context": conversation.context,
            "messages": [m.to_dict() for m in conversation.messages[-20:]],
            "created_at": datetime.utcnow().isoformat()
        }

        # Add to queue
        ticket_id = self.queue.add(handoff_request)

        return {
            "ticket_id": ticket_id,
            "estimated_wait": self.queue.estimate_wait_time(),
            "message": "I'm connecting you with a human agent. Please wait..."
        }

    def _create_handoff_summary(self, conversation) -> str:
        """Create summary for human agent"""
        prompt = f"""Create a brief summary for a human agent taking over this conversation:

Conversation:
{chr(10).join([f"{m.role}: {m.content}" for m in conversation.messages])}

Include:
1. Customer's main issue/request
2. What the bot tried
3. Why escalation is needed
4. Any relevant context (order numbers, account info, etc.)"""

        response = self.client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=200
        )

        return response.choices[0].message.content

    def _detect_frustration(self, messages) -> bool:
        """Detect user frustration from messages"""
        recent_user_messages = [m.content for m in messages[-5:] if m.role == "user"]

        indicators = ["!", "?!", "frustrated", "annoyed", "useless", "terrible", "worst"]
        frustration_count = sum(
            1 for msg in recent_user_messages
            if any(ind in msg.lower() for ind in indicators)
        )

        return frustration_count >= 2

    def _calculate_priority(self, conversation) -> int:
        """Calculate handoff priority (1=highest)"""
        priority = 5  # Default medium

        # VIP customer
        if conversation.context.get("customer_tier") == "premium":
            priority -= 2

        # Payment issue
        if conversation.context.get("intent") == "billing":
            priority -= 1

        # Long wait already
        duration = datetime.utcnow() - conversation.created_at
        if duration > timedelta(minutes=15):
            priority -= 1

        return max(1, priority)

Conversation Analytics

class ConversationAnalytics:
    """Track and analyze conversation metrics"""

    def __init__(self, metrics_store):
        self.store = metrics_store

    def track_conversation(self, conversation):
        """Track conversation metrics"""
        metrics = {
            "session_id": conversation.session_id,
            "duration_seconds": (conversation.updated_at - conversation.created_at).total_seconds(),
            "message_count": len(conversation.messages),
            "user_messages": len([m for m in conversation.messages if m.role == "user"]),
            "resolution_status": conversation.context.get("resolved", False),
            "escalated": conversation.status == ConversationStatus.ESCALATED,
            "intents": conversation.context.get("intents_detected", []),
            "satisfaction_score": conversation.context.get("satisfaction"),
            "timestamp": datetime.utcnow().isoformat()
        }

        self.store.record(metrics)

    def get_summary_stats(self, timeframe: str = "24h") -> dict:
        """Get summary statistics"""
        data = self.store.query(timeframe)

        return {
            "total_conversations": len(data),
            "avg_duration_seconds": sum(d["duration_seconds"] for d in data) / len(data) if data else 0,
            "resolution_rate": sum(1 for d in data if d["resolution_status"]) / len(data) if data else 0,
            "escalation_rate": sum(1 for d in data if d["escalated"]) / len(data) if data else 0,
            "avg_messages": sum(d["message_count"] for d in data) / len(data) if data else 0,
            "top_intents": self._get_top_intents(data),
            "avg_satisfaction": sum(d.get("satisfaction_score", 0) for d in data if d.get("satisfaction_score")) / len([d for d in data if d.get("satisfaction_score")]) if data else 0
        }

    def _get_top_intents(self, data, top_n=5) -> list:
        """Get most common intents"""
        intent_counts = {}
        for d in data:
            for intent in d.get("intents", []):
                intent_counts[intent] = intent_counts.get(intent, 0) + 1

        sorted_intents = sorted(intent_counts.items(), key=lambda x: x[1], reverse=True)
        return sorted_intents[:top_n]

Complete Example

class ProductionChatSystem:
    def __init__(self, config):
        self.client = AzureOpenAI(**config["openai"])
        self.manager = ConversationManager(self.client, config["system_prompt"])
        self.router = ConversationRouter(self.client)
        self.handoff = HumanHandoff(config["queue_service"])
        self.analytics = ConversationAnalytics(config["metrics_store"])
        self.store = ConversationStore(config["redis_url"])

        self._register_handlers()

    def _register_handlers(self):
        self.router.register_handler("order_status", self.handle_order_status)
        self.router.register_handler("product_inquiry", self.handle_product_inquiry)
        self.router.register_handler("human_agent", self.handle_human_request)

    async def handle_message(self, session_id: str, message: str) -> dict:
        # Load or create conversation
        conversation = self.store.load(session_id) or self.manager.get_or_create_conversation(session_id)

        # Check for escalation
        should_escalate, reason = self.handoff.should_escalate(message, conversation)
        if should_escalate:
            result = self.handoff.initiate_handoff(conversation, reason)
            conversation.status = ConversationStatus.ESCALATED
            self.store.save(conversation)
            return {"type": "escalation", "data": result}

        # Route and process
        response = self.router.route(message, conversation)
        conversation.add_message("user", message)
        conversation.add_message("assistant", response)

        # Save state
        self.store.save(conversation)

        # Track analytics
        self.analytics.track_conversation(conversation)

        return {"type": "response", "message": response}

Robust conversation management enables reliable production chat systems. Tomorrow, I will cover context pruning strategies.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.