Back to Blog
5 min read

Vector Search in Databricks: Semantic Search at Scale

Vector Search in Databricks: Semantic Search at Scale

Databricks Vector Search enables semantic similarity search over your lakehouse data. Build RAG applications, recommendation systems, and intelligent search with managed vector indexes.

Vector Search Architecture

VECTOR_SEARCH_COMPONENTS = {
    "index_types": {
        "delta_sync": {
            "description": "Auto-syncs with Delta table",
            "use_case": "Production RAG applications",
            "features": ["Auto-refresh", "Managed embeddings", "Real-time sync"]
        },
        "direct_access": {
            "description": "Direct vector operations",
            "use_case": "Pre-computed embeddings",
            "features": ["Bring your own embeddings", "Manual updates"]
        }
    },
    "embedding_models": [
        "databricks-bge-large-en",
        "databricks-gte-large-en",
        "Custom models via Model Serving"
    ],
    "similarity_metrics": [
        "cosine",
        "dot_product",
        "euclidean"
    ]
}
from databricks.vector_search.client import VectorSearchClient

# Initialize the client
vsc = VectorSearchClient()

# Create a Vector Search endpoint
endpoint = vsc.create_endpoint(
    name="my-vector-search-endpoint",
    endpoint_type="STANDARD"
)

# Wait for endpoint to be ready
vsc.wait_for_endpoint(endpoint.name)
print(f"Endpoint {endpoint.name} is ready")

Creating a Delta Sync Index

# Create index that auto-syncs with Delta table
index = vsc.create_delta_sync_index(
    endpoint_name="my-vector-search-endpoint",
    index_name="my_catalog.my_schema.document_index",

    # Source Delta table
    source_table_name="my_catalog.my_schema.documents",

    # Pipeline configuration
    pipeline_type="TRIGGERED",  # or "CONTINUOUS"

    # Column containing primary key
    primary_key="doc_id",

    # Column to embed (text column)
    embedding_source_column="content",

    # Embedding model endpoint
    embedding_model_endpoint_name="databricks-bge-large-en"
)

# Wait for index to be ready
vsc.wait_for_index(
    endpoint_name="my-vector-search-endpoint",
    index_name="my_catalog.my_schema.document_index"
)

Creating a Direct Access Index

# For pre-computed embeddings
index = vsc.create_direct_access_index(
    endpoint_name="my-vector-search-endpoint",
    index_name="my_catalog.my_schema.precomputed_index",
    primary_key="id",
    embedding_dimension=1024,
    embedding_vector_column="embedding",
    schema={
        "id": "string",
        "content": "string",
        "embedding": "array<float>",
        "metadata": "string"
    }
)
from databricks.vector_search.client import VectorSearchClient

vsc = VectorSearchClient()

# Get the index
index = vsc.get_index(
    endpoint_name="my-vector-search-endpoint",
    index_name="my_catalog.my_schema.document_index"
)

# Similarity search with text query (auto-embedded)
results = index.similarity_search(
    query_text="How do I configure auto-scaling in Databricks?",
    columns=["doc_id", "title", "content", "source"],
    num_results=10,
    filters={"source": "documentation"}  # Optional filtering
)

# Process results
for doc in results['result']['data_array']:
    score = doc[0]  # Similarity score
    doc_id = doc[1]
    title = doc[2]
    content = doc[3]
    print(f"Score: {score:.4f} | {title}")

Building a RAG Application

import anthropic
from databricks.vector_search.client import VectorSearchClient

class DatabricksRAG:
    """RAG application using Databricks Vector Search"""

    def __init__(
        self,
        endpoint_name: str,
        index_name: str,
        llm_model: str = "claude-3-sonnet-20240229"
    ):
        self.vsc = VectorSearchClient()
        self.index = self.vsc.get_index(endpoint_name, index_name)
        self.llm = anthropic.Anthropic()
        self.llm_model = llm_model

    def retrieve(
        self,
        query: str,
        num_results: int = 5,
        filters: dict = None
    ) -> list:
        """Retrieve relevant documents"""

        results = self.index.similarity_search(
            query_text=query,
            columns=["doc_id", "title", "content"],
            num_results=num_results,
            filters=filters
        )

        documents = []
        for row in results['result']['data_array']:
            documents.append({
                "score": row[0],
                "doc_id": row[1],
                "title": row[2],
                "content": row[3]
            })

        return documents

    def generate(
        self,
        query: str,
        context: list,
        system_prompt: str = None
    ) -> str:
        """Generate response using retrieved context"""

        if system_prompt is None:
            system_prompt = """You are a helpful assistant that answers questions
            based on the provided context. If the context doesn't contain
            relevant information, say so."""

        # Format context
        context_text = "\n\n---\n\n".join([
            f"Source: {doc['title']}\n{doc['content']}"
            for doc in context
        ])

        # Generate response
        response = self.llm.messages.create(
            model=self.llm_model,
            max_tokens=1000,
            system=system_prompt,
            messages=[
                {
                    "role": "user",
                    "content": f"""Based on the following context, answer the question.

Context:
{context_text}

Question: {query}

Answer:"""
                }
            ]
        )

        return response.content[0].text

    def query(
        self,
        question: str,
        num_context: int = 5,
        filters: dict = None
    ) -> dict:
        """Complete RAG pipeline"""

        # Retrieve
        context = self.retrieve(question, num_context, filters)

        # Generate
        answer = self.generate(question, context)

        return {
            "question": question,
            "answer": answer,
            "sources": [
                {"title": doc["title"], "score": doc["score"]}
                for doc in context
            ]
        }

# Usage
rag = DatabricksRAG(
    endpoint_name="my-vector-search-endpoint",
    index_name="my_catalog.my_schema.document_index"
)

result = rag.query("How do I create a cluster in Databricks?")
print(result["answer"])
print("\nSources:")
for source in result["sources"]:
    print(f"  - {source['title']} (score: {source['score']:.3f})")
class HybridSearch:
    """Combine vector search with keyword search"""

    def __init__(self, vsc_client, index, spark):
        self.vsc = vsc_client
        self.index = index
        self.spark = spark

    def search(
        self,
        query: str,
        num_results: int = 10,
        vector_weight: float = 0.7
    ) -> list:
        """Perform hybrid search"""

        # Vector search
        vector_results = self.index.similarity_search(
            query_text=query,
            columns=["doc_id", "title", "content"],
            num_results=num_results * 2
        )

        # Keyword search using Spark SQL
        keyword_results = self.spark.sql(f"""
            SELECT
                doc_id,
                title,
                content,
                -- Simple BM25-like scoring
                (
                    SIZE(SPLIT(LOWER(content), '{query.lower()}')) - 1 +
                    SIZE(SPLIT(LOWER(title), '{query.lower()}')) * 2
                ) as keyword_score
            FROM documents
            WHERE LOWER(content) LIKE '%{query.lower()}%'
               OR LOWER(title) LIKE '%{query.lower()}%'
            ORDER BY keyword_score DESC
            LIMIT {num_results * 2}
        """).collect()

        # Combine and rank results
        combined = self._fuse_results(
            vector_results['result']['data_array'],
            keyword_results,
            vector_weight
        )

        return combined[:num_results]

    def _fuse_results(
        self,
        vector_results: list,
        keyword_results: list,
        vector_weight: float
    ) -> list:
        """Fuse results using reciprocal rank fusion"""

        scores = {}
        k = 60  # RRF constant

        # Score vector results
        for rank, row in enumerate(vector_results):
            doc_id = row[1]
            scores[doc_id] = scores.get(doc_id, 0) + \
                            vector_weight * (1 / (k + rank + 1))

        # Score keyword results
        for rank, row in enumerate(keyword_results):
            doc_id = row['doc_id']
            scores[doc_id] = scores.get(doc_id, 0) + \
                            (1 - vector_weight) * (1 / (k + rank + 1))

        # Sort by combined score
        ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)

        return ranked

Monitoring and Maintenance

# Monitor index status
def monitor_index(vsc, endpoint_name: str, index_name: str):
    """Monitor vector search index"""

    index = vsc.get_index(endpoint_name, index_name)

    print(f"Index: {index_name}")
    print(f"Status: {index.status}")
    print(f"Num vectors: {index.num_vectors}")
    print(f"Last updated: {index.last_updated}")

    # Check sync status for Delta Sync indexes
    if hasattr(index, 'sync_status'):
        print(f"Sync status: {index.sync_status}")

# Trigger manual sync
def sync_index(vsc, endpoint_name: str, index_name: str):
    """Trigger index sync with source table"""

    vsc.sync_index(
        endpoint_name=endpoint_name,
        index_name=index_name
    )
    print(f"Sync triggered for {index_name}")

Conclusion

Databricks Vector Search provides managed semantic search infrastructure integrated with your lakehouse. Use it to build powerful RAG applications, recommendation systems, and intelligent search experiences.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.