Back to Blog
10 min read

Multi-Index RAG: Querying Across Multiple Knowledge Sources

Introduction

Multi-index RAG enables querying across multiple knowledge sources, each potentially using different indexing strategies or containing different types of content. This post covers architectures and techniques for building effective multi-index RAG systems.

Multi-Index Architecture

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from abc import ABC, abstractmethod
from enum import Enum
import uuid

class IndexType(Enum):
    VECTOR = "vector"
    KEYWORD = "keyword"
    GRAPH = "graph"
    STRUCTURED = "structured"

@dataclass
class IndexConfig:
    name: str
    index_type: IndexType
    description: str
    metadata: Dict = field(default_factory=dict)

@dataclass
class MultiIndexResult:
    source_index: str
    content: str
    score: float
    metadata: Dict = field(default_factory=dict)

class BaseIndex(ABC):
    """Abstract base class for indexes"""

    def __init__(self, config: IndexConfig):
        self.config = config
        self.name = config.name

    @abstractmethod
    def add_documents(self, documents: List[str], metadatas: List[Dict] = None):
        pass

    @abstractmethod
    def query(self, query: str, top_k: int = 5) -> List[Dict]:
        pass

class VectorIndex(BaseIndex):
    """Vector similarity based index"""

    def __init__(self, config: IndexConfig, embedding_model=None):
        super().__init__(config)
        self.embedding_model = embedding_model
        self.documents = []
        self.embeddings = []
        self.metadatas = []

    def add_documents(self, documents: List[str], metadatas: List[Dict] = None):
        """Add documents to vector index"""
        for i, doc in enumerate(documents):
            embedding = self._embed(doc)
            self.documents.append(doc)
            self.embeddings.append(embedding)
            self.metadatas.append(metadatas[i] if metadatas else {})

    def query(self, query: str, top_k: int = 5) -> List[Dict]:
        """Query vector index"""
        query_embedding = self._embed(query)

        # Calculate similarities
        scored = []
        for i, emb in enumerate(self.embeddings):
            score = self._cosine_similarity(query_embedding, emb)
            scored.append((i, score))

        scored.sort(key=lambda x: x[1], reverse=True)

        results = []
        for i, score in scored[:top_k]:
            results.append({
                "content": self.documents[i],
                "score": score,
                "metadata": self.metadatas[i],
                "source_index": self.name
            })

        return results

    def _embed(self, text: str) -> List[float]:
        """Generate embedding"""
        if self.embedding_model:
            return self.embedding_model.encode(text)

        # Simple fallback embedding
        words = text.lower().split()
        embedding = [0.0] * 128
        for word in words:
            idx = hash(word) % 128
            embedding[idx] += 1
        norm = sum(x*x for x in embedding) ** 0.5
        if norm > 0:
            embedding = [x/norm for x in embedding]
        return embedding

    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """Calculate cosine similarity"""
        dot = sum(x*y for x, y in zip(a, b))
        norm_a = sum(x*x for x in a) ** 0.5
        norm_b = sum(x*x for x in b) ** 0.5
        return dot / (norm_a * norm_b) if norm_a * norm_b > 0 else 0

class KeywordIndex(BaseIndex):
    """BM25/keyword based index"""

    def __init__(self, config: IndexConfig):
        super().__init__(config)
        self.documents = []
        self.metadatas = []
        self.inverted_index: Dict[str, List[int]] = {}

    def add_documents(self, documents: List[str], metadatas: List[Dict] = None):
        """Add documents to keyword index"""
        for i, doc in enumerate(documents):
            doc_idx = len(self.documents)
            self.documents.append(doc)
            self.metadatas.append(metadatas[i] if metadatas else {})

            # Build inverted index
            words = self._tokenize(doc)
            for word in set(words):
                if word not in self.inverted_index:
                    self.inverted_index[word] = []
                self.inverted_index[word].append(doc_idx)

    def query(self, query: str, top_k: int = 5) -> List[Dict]:
        """Query keyword index using BM25-like scoring"""
        query_words = self._tokenize(query)

        # Score documents
        scores = {}
        for word in query_words:
            if word in self.inverted_index:
                for doc_idx in self.inverted_index[word]:
                    if doc_idx not in scores:
                        scores[doc_idx] = 0
                    # Simple TF-IDF-like scoring
                    tf = self.documents[doc_idx].lower().count(word)
                    df = len(self.inverted_index[word])
                    idf = 1 / (1 + df)
                    scores[doc_idx] += tf * idf

        # Sort and return
        sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)

        results = []
        for doc_idx, score in sorted_docs[:top_k]:
            results.append({
                "content": self.documents[doc_idx],
                "score": score,
                "metadata": self.metadatas[doc_idx],
                "source_index": self.name
            })

        return results

    def _tokenize(self, text: str) -> List[str]:
        """Simple tokenization"""
        import re
        words = re.findall(r'\w+', text.lower())
        # Remove stopwords
        stopwords = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'in', 'on', 'at', 'to', 'for'}
        return [w for w in words if w not in stopwords]

Multi-Index Manager

class MultiIndexManager:
    """Manage multiple indexes"""

    def __init__(self):
        self.indexes: Dict[str, BaseIndex] = {}
        self.index_configs: Dict[str, IndexConfig] = {}

    def add_index(self, index: BaseIndex):
        """Add an index to the manager"""
        self.indexes[index.name] = index
        self.index_configs[index.name] = index.config

    def remove_index(self, name: str):
        """Remove an index"""
        if name in self.indexes:
            del self.indexes[name]
            del self.index_configs[name]

    def get_index(self, name: str) -> Optional[BaseIndex]:
        """Get index by name"""
        return self.indexes.get(name)

    def list_indexes(self) -> List[Dict]:
        """List all indexes"""
        return [
            {
                "name": config.name,
                "type": config.index_type.value,
                "description": config.description
            }
            for config in self.index_configs.values()
        ]

    def query_index(
        self,
        index_name: str,
        query: str,
        top_k: int = 5
    ) -> List[Dict]:
        """Query a specific index"""
        index = self.indexes.get(index_name)
        if not index:
            return []
        return index.query(query, top_k)

    def query_all(
        self,
        query: str,
        top_k_per_index: int = 3
    ) -> Dict[str, List[Dict]]:
        """Query all indexes"""
        results = {}
        for name, index in self.indexes.items():
            results[name] = index.query(query, top_k_per_index)
        return results

Query Routing

class QueryRouter:
    """Route queries to appropriate indexes"""

    def __init__(
        self,
        index_manager: MultiIndexManager,
        llm_client=None
    ):
        self.manager = index_manager
        self.llm = llm_client

    def route(self, query: str) -> List[str]:
        """Determine which indexes to query"""
        if self.llm:
            return self._llm_route(query)
        return self._rule_based_route(query)

    def _rule_based_route(self, query: str) -> List[str]:
        """Rule-based query routing"""
        query_lower = query.lower()
        selected = []

        for name, config in self.manager.index_configs.items():
            # Match based on keywords in description
            desc_lower = config.description.lower()
            keywords = desc_lower.split()

            # Check for overlap
            query_words = set(query_lower.split())
            desc_words = set(keywords)

            if query_words & desc_words:
                selected.append(name)

        # If nothing matched, return all
        return selected if selected else list(self.manager.indexes.keys())

    def _llm_route(self, query: str) -> List[str]:
        """LLM-based query routing"""
        index_descriptions = "\n".join([
            f"- {config.name}: {config.description}"
            for config in self.manager.index_configs.values()
        ])

        prompt = f"""Given the following query and available knowledge sources, select which sources are most relevant.

Query: {query}

Available Sources:
{index_descriptions}

List the names of relevant sources (comma-separated):"""

        response = self.llm.generate(prompt)

        # Parse response
        selected = []
        for name in self.manager.indexes.keys():
            if name.lower() in response.lower():
                selected.append(name)

        return selected if selected else list(self.manager.indexes.keys())

class SmartRouter:
    """Advanced router with query analysis"""

    def __init__(
        self,
        index_manager: MultiIndexManager,
        llm_client=None
    ):
        self.manager = index_manager
        self.llm = llm_client
        self.query_history: List[Dict] = []

    def route_with_strategy(
        self,
        query: str,
        strategy: str = "balanced"
    ) -> Dict:
        """Route with specified strategy"""
        if strategy == "all":
            return self._route_all()
        elif strategy == "single_best":
            return self._route_single_best(query)
        elif strategy == "balanced":
            return self._route_balanced(query)
        elif strategy == "cascade":
            return self._route_cascade(query)
        else:
            return self._route_all()

    def _route_all(self) -> Dict:
        """Route to all indexes"""
        return {
            "indexes": list(self.manager.indexes.keys()),
            "strategy": "all",
            "weights": {n: 1.0 for n in self.manager.indexes.keys()}
        }

    def _route_single_best(self, query: str) -> Dict:
        """Route to single best index"""
        scores = {}

        for name, config in self.manager.index_configs.items():
            score = self._estimate_relevance(query, config)
            scores[name] = score

        best = max(scores, key=scores.get)

        return {
            "indexes": [best],
            "strategy": "single_best",
            "weights": {best: 1.0},
            "scores": scores
        }

    def _route_balanced(self, query: str) -> Dict:
        """Route to relevant indexes with weights"""
        scores = {}

        for name, config in self.manager.index_configs.items():
            score = self._estimate_relevance(query, config)
            scores[name] = score

        # Select indexes above threshold
        threshold = max(scores.values()) * 0.5
        selected = [n for n, s in scores.items() if s >= threshold]

        # Normalize weights
        total = sum(scores[n] for n in selected)
        weights = {n: scores[n] / total for n in selected}

        return {
            "indexes": selected,
            "strategy": "balanced",
            "weights": weights,
            "scores": scores
        }

    def _route_cascade(self, query: str) -> Dict:
        """Route in cascade order"""
        scores = {}

        for name, config in self.manager.index_configs.items():
            score = self._estimate_relevance(query, config)
            scores[name] = score

        # Sort by score
        ordered = sorted(scores.keys(), key=lambda n: scores[n], reverse=True)

        return {
            "indexes": ordered,
            "strategy": "cascade",
            "weights": {n: 1.0 / (i + 1) for i, n in enumerate(ordered)},
            "order": ordered
        }

    def _estimate_relevance(self, query: str, config: IndexConfig) -> float:
        """Estimate relevance of index for query"""
        query_words = set(query.lower().split())
        desc_words = set(config.description.lower().split())
        name_words = set(config.name.lower().replace("_", " ").split())

        # Calculate overlap
        overlap = len(query_words & (desc_words | name_words))
        return overlap / max(len(query_words), 1)

Result Fusion

class ResultFusion:
    """Fuse results from multiple indexes"""

    def __init__(self, k: int = 60):
        self.k = k  # RRF parameter

    def reciprocal_rank_fusion(
        self,
        results_by_index: Dict[str, List[Dict]],
        weights: Dict[str, float] = None
    ) -> List[Dict]:
        """Combine results using RRF"""
        rrf_scores = {}
        all_docs = {}

        for index_name, results in results_by_index.items():
            weight = weights.get(index_name, 1.0) if weights else 1.0

            for rank, result in enumerate(results):
                # Create unique ID for document
                doc_id = hash(result["content"][:100])

                if doc_id not in all_docs:
                    all_docs[doc_id] = result

                # RRF score
                rrf = weight / (self.k + rank + 1)
                rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + rrf

        # Sort by RRF score
        sorted_docs = sorted(
            rrf_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )

        # Build result list
        fused = []
        for doc_id, score in sorted_docs:
            doc = all_docs[doc_id].copy()
            doc["fused_score"] = score
            fused.append(doc)

        return fused

    def weighted_score_fusion(
        self,
        results_by_index: Dict[str, List[Dict]],
        weights: Dict[str, float]
    ) -> List[Dict]:
        """Combine results using weighted scores"""
        combined_scores = {}
        all_docs = {}

        for index_name, results in results_by_index.items():
            weight = weights.get(index_name, 1.0)

            # Normalize scores within index
            scores = [r["score"] for r in results]
            max_score = max(scores) if scores else 1
            min_score = min(scores) if scores else 0

            for result in results:
                doc_id = hash(result["content"][:100])

                if doc_id not in all_docs:
                    all_docs[doc_id] = result

                # Normalize and weight
                if max_score > min_score:
                    normalized = (result["score"] - min_score) / (max_score - min_score)
                else:
                    normalized = 1.0

                weighted_score = normalized * weight
                combined_scores[doc_id] = combined_scores.get(doc_id, 0) + weighted_score

        # Sort and return
        sorted_docs = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )

        fused = []
        for doc_id, score in sorted_docs:
            doc = all_docs[doc_id].copy()
            doc["fused_score"] = score
            fused.append(doc)

        return fused

    def cross_encoder_rerank(
        self,
        results: List[Dict],
        query: str,
        reranker_model=None,
        top_k: int = 10
    ) -> List[Dict]:
        """Rerank fused results with cross-encoder"""
        if not reranker_model:
            return results[:top_k]

        # Score each result
        scored = []
        for result in results:
            score = reranker_model.score(query, result["content"])
            result["rerank_score"] = score
            scored.append(result)

        # Sort by rerank score
        scored.sort(key=lambda x: x["rerank_score"], reverse=True)

        return scored[:top_k]

Complete Multi-Index RAG

class MultiIndexRAG:
    """Complete multi-index RAG system"""

    def __init__(self, generator):
        self.manager = MultiIndexManager()
        self.router = None
        self.fusion = ResultFusion()
        self.generator = generator

    def add_vector_index(
        self,
        name: str,
        description: str,
        documents: List[str],
        metadatas: List[Dict] = None
    ):
        """Add a vector index"""
        config = IndexConfig(
            name=name,
            index_type=IndexType.VECTOR,
            description=description
        )
        index = VectorIndex(config)
        index.add_documents(documents, metadatas)
        self.manager.add_index(index)
        self._update_router()

    def add_keyword_index(
        self,
        name: str,
        description: str,
        documents: List[str],
        metadatas: List[Dict] = None
    ):
        """Add a keyword index"""
        config = IndexConfig(
            name=name,
            index_type=IndexType.KEYWORD,
            description=description
        )
        index = KeywordIndex(config)
        index.add_documents(documents, metadatas)
        self.manager.add_index(index)
        self._update_router()

    def _update_router(self):
        """Update router with current indexes"""
        self.router = SmartRouter(self.manager, self.generator)

    def query(
        self,
        question: str,
        routing_strategy: str = "balanced",
        top_k: int = 5
    ) -> Dict:
        """Query across multiple indexes"""
        if not self.router:
            return {"error": "No indexes configured"}

        # Route query
        routing = self.router.route_with_strategy(question, routing_strategy)

        # Query selected indexes
        results_by_index = {}
        for index_name in routing["indexes"]:
            results = self.manager.query_index(index_name, question, top_k)
            results_by_index[index_name] = results

        # Fuse results
        fused = self.fusion.reciprocal_rank_fusion(
            results_by_index,
            routing["weights"]
        )

        # Build context from top results
        top_results = fused[:top_k]
        context = self._build_context(top_results)

        # Generate answer
        prompt = f"""Answer the question based on information from multiple sources.

Sources:
{context}

Question: {question}

Provide a comprehensive answer:"""

        answer = self.generator.generate(prompt)

        return {
            "answer": answer,
            "routing": routing,
            "sources": [
                {
                    "index": r.get("source_index", "unknown"),
                    "score": r.get("fused_score", 0),
                    "preview": r["content"][:100] + "..."
                }
                for r in top_results
            ],
            "indexes_queried": routing["indexes"]
        }

    def _build_context(self, results: List[Dict]) -> str:
        """Build context from fused results"""
        parts = []
        for i, r in enumerate(results):
            source = r.get("source_index", "unknown")
            parts.append(f"[Source: {source}]\n{r['content']}")
        return "\n\n---\n\n".join(parts)

# Usage
class MockGenerator:
    def generate(self, prompt):
        return "Generated multi-source answer."

rag = MultiIndexRAG(MockGenerator())

# Add different knowledge sources
technical_docs = [
    "Machine learning models require training data.",
    "Neural networks have multiple layers.",
    "Deep learning is a subset of machine learning."
]

business_docs = [
    "AI improves business efficiency.",
    "Companies invest in ML for automation.",
    "ROI of AI projects varies by industry."
]

rag.add_vector_index(
    "technical_knowledge",
    "Technical documentation about ML and AI",
    technical_docs
)

rag.add_keyword_index(
    "business_knowledge",
    "Business and strategy documents about AI adoption",
    business_docs
)

# Query
result = rag.query(
    "How do companies use machine learning?",
    routing_strategy="balanced"
)

print(f"Answer: {result['answer']}")
print(f"Indexes queried: {result['indexes_queried']}")
print(f"Sources: {len(result['sources'])}")

Conclusion

Multi-index RAG enables querying across heterogeneous knowledge sources, combining results intelligently for comprehensive answers. Key components include diverse index types, smart query routing, result fusion strategies, and flexible architecture. This approach is essential for enterprise RAG systems that need to leverage multiple knowledge bases, document types, and retrieval strategies.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.