6 min read
Milvus Vector Database: High-Performance Similarity Search
Milvus is a high-performance, open-source vector database designed for massive-scale similarity search. Built with Go and optimized for production workloads, it can handle billions of vectors with millisecond latency.
Getting Started
# Install Milvus client
pip install pymilvus openai
# Run Milvus with Docker Compose
# Download docker-compose.yml from Milvus docs
docker-compose up -d
from pymilvus import (
connections,
Collection,
CollectionSchema,
FieldSchema,
DataType,
utility
)
import openai
# Connect to Milvus
connections.connect("default", host="localhost", port="19530")
# Configure Azure OpenAI
openai.api_type = "azure"
openai.api_base = "https://your-resource.openai.azure.com/"
openai.api_version = "2023-03-15-preview"
openai.api_key = "your-azure-key"
Creating Collections
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection
def create_collection(
name: str,
dimension: int = 1536,
description: str = ""
):
"""Create a Milvus collection."""
# Define fields
fields = [
FieldSchema(
name="id",
dtype=DataType.VARCHAR,
max_length=100,
is_primary=True,
auto_id=False
),
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=dimension
),
FieldSchema(
name="text",
dtype=DataType.VARCHAR,
max_length=5000
),
FieldSchema(
name="category",
dtype=DataType.VARCHAR,
max_length=100
),
FieldSchema(
name="timestamp",
dtype=DataType.INT64
)
]
schema = CollectionSchema(
fields=fields,
description=description
)
collection = Collection(
name=name,
schema=schema
)
return collection
# Create collection
collection = create_collection("azure_docs", description="Azure documentation")
# Check existing collections
print(utility.list_collections())
Creating Indexes
# Create index for vector search
index_params = {
"metric_type": "IP", # Inner Product (cosine for normalized vectors)
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
# Index types comparison
INDEX_TYPES = {
"FLAT": {
"description": "Brute force, 100% accurate",
"use_case": "Small datasets < 1M",
"params": {}
},
"IVF_FLAT": {
"description": "Inverted file index",
"use_case": "Balanced accuracy/speed",
"params": {"nlist": 1024} # Number of clusters
},
"IVF_SQ8": {
"description": "IVF with scalar quantization",
"use_case": "Memory constrained",
"params": {"nlist": 1024}
},
"HNSW": {
"description": "Graph-based, very fast",
"use_case": "Low latency requirements",
"params": {"M": 16, "efConstruction": 256}
}
}
# HNSW index for low latency
hnsw_params = {
"metric_type": "IP",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 256}
}
Inserting Data
from typing import List
import time
def get_embedding(text: str) -> List[float]:
"""Get embedding from Azure OpenAI."""
response = openai.Embedding.create(
engine="text-embedding-ada-002",
input=text
)
return response['data'][0]['embedding']
def insert_documents(
collection: Collection,
documents: List[dict]
):
"""Insert documents into collection."""
ids = []
embeddings = []
texts = []
categories = []
timestamps = []
for doc in documents:
ids.append(doc["id"])
embeddings.append(get_embedding(doc["text"]))
texts.append(doc["text"][:5000]) # Truncate to max length
categories.append(doc.get("category", ""))
timestamps.append(int(time.time()))
# Insert data
collection.insert([
ids,
embeddings,
texts,
categories,
timestamps
])
# Flush to persist
collection.flush()
return len(ids)
# Insert example documents
documents = [
{
"id": "doc1",
"text": "Azure Virtual Machines provide scalable IaaS compute",
"category": "compute"
},
{
"id": "doc2",
"text": "Azure Functions is a serverless compute service",
"category": "compute"
},
{
"id": "doc3",
"text": "Azure Cosmos DB is a globally distributed NoSQL database",
"category": "database"
}
]
count = insert_documents(collection, documents)
print(f"Inserted {count} documents")
Searching
def search(
collection: Collection,
query: str,
top_k: int = 10,
output_fields: List[str] = None,
expr: str = None
):
"""Search for similar vectors."""
# Load collection to memory
collection.load()
# Get query embedding
query_embedding = get_embedding(query)
# Search parameters
search_params = {
"metric_type": "IP",
"params": {"nprobe": 16} # Number of clusters to search
}
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=top_k,
output_fields=output_fields or ["text", "category"],
expr=expr # Filter expression
)
return results
# Simple search
results = search(collection, "serverless computing")
for hits in results:
for hit in hits:
print(f"[{hit.score:.4f}] {hit.entity.get('text')[:60]}...")
# Search with filter
results = search(
collection,
"database for high throughput",
expr='category == "database"'
)
# Complex filter expressions
expr_examples = [
'category == "compute"',
'category in ["compute", "database"]',
'timestamp > 1672531200',
'category == "compute" and timestamp > 1672531200'
]
Batch Operations
def batch_insert(
collection: Collection,
documents: List[dict],
batch_size: int = 1000
):
"""Insert documents in batches."""
total = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
ids = [doc["id"] for doc in batch]
texts = [doc["text"][:5000] for doc in batch]
categories = [doc.get("category", "") for doc in batch]
timestamps = [int(time.time()) for _ in batch]
# Batch embed
response = openai.Embedding.create(
engine="text-embedding-ada-002",
input=texts
)
embeddings = [item['embedding'] for item in response['data']]
collection.insert([ids, embeddings, texts, categories, timestamps])
total += len(batch)
print(f"Inserted batch {i//batch_size + 1}, total: {total}")
collection.flush()
return total
Complete Search Service
from pymilvus import connections, Collection, utility
from typing import List, Dict, Optional
class MilvusSearchService:
"""Search service using Milvus and Azure OpenAI."""
def __init__(
self,
host: str = "localhost",
port: str = "19530",
embedding_deployment: str = "text-embedding-ada-002"
):
connections.connect("default", host=host, port=port)
self.embedding_deployment = embedding_deployment
def _embed(self, text: str) -> List[float]:
"""Get embedding for text."""
response = openai.Embedding.create(
engine=self.embedding_deployment,
input=text
)
return response['data'][0]['embedding']
def _embed_batch(self, texts: List[str]) -> List[List[float]]:
"""Get embeddings for multiple texts."""
response = openai.Embedding.create(
engine=self.embedding_deployment,
input=texts
)
return [item['embedding'] for item in response['data']]
def get_collection(self, name: str) -> Collection:
"""Get or create collection."""
if utility.has_collection(name):
return Collection(name)
else:
raise ValueError(f"Collection {name} does not exist")
def add_documents(
self,
collection_name: str,
documents: List[Dict],
text_field: str = "text",
batch_size: int = 100
):
"""Add documents to collection."""
collection = self.get_collection(collection_name)
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
texts = [doc[text_field][:5000] for doc in batch]
embeddings = self._embed_batch(texts)
# Prepare data
ids = [doc.get("id", str(hash(doc[text_field]))) for doc in batch]
categories = [doc.get("category", "") for doc in batch]
timestamps = [int(time.time()) for _ in batch]
collection.insert([ids, embeddings, texts, categories, timestamps])
collection.flush()
def search(
self,
collection_name: str,
query: str,
top_k: int = 10,
filter_expr: Optional[str] = None
) -> List[Dict]:
"""Search for similar documents."""
collection = self.get_collection(collection_name)
collection.load()
query_embedding = self._embed(query)
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "IP", "params": {"nprobe": 16}},
limit=top_k,
output_fields=["text", "category"],
expr=filter_expr
)
return [
{
"id": hit.id,
"score": hit.score,
"text": hit.entity.get("text"),
"category": hit.entity.get("category")
}
for hits in results
for hit in hits
]
def delete_documents(
self,
collection_name: str,
expr: str
):
"""Delete documents matching expression."""
collection = self.get_collection(collection_name)
collection.delete(expr)
def get_stats(self, collection_name: str) -> Dict:
"""Get collection statistics."""
collection = self.get_collection(collection_name)
return {
"name": collection_name,
"num_entities": collection.num_entities,
"schema": str(collection.schema)
}
# Usage
service = MilvusSearchService()
# Search
results = service.search("azure_docs", "serverless computing", top_k=5)
for r in results:
print(f"[{r['score']:.4f}] {r['text'][:60]}...")
Performance Tuning
# Search parameter tuning
SEARCH_PARAMS = {
"IVF_FLAT": {
"nprobe": 16, # Increase for accuracy, decrease for speed
},
"HNSW": {
"ef": 64, # Increase for accuracy, decrease for speed
}
}
# Memory management
collection.release() # Release from memory when not in use
collection.load() # Load when searching
# Partition for better performance
collection.create_partition("compute")
collection.create_partition("database")
# Insert to specific partition
collection.insert(data, partition_name="compute")
# Search specific partition
collection.search(
data=[query_embedding],
partition_names=["compute"],
...
)
Best Practices
- Choose index wisely: HNSW for low latency, IVF for memory efficiency
- Tune search parameters: Balance accuracy and speed
- Use partitions: For filtered searches and data management
- Manage memory: Load/release collections appropriately
- Batch operations: Insert in batches for efficiency
- Monitor performance: Track query latency and recall
Resources
- Milvus Documentation
- PyMilvus
- Zilliz Cloud (Managed Milvus)