Milvus Vector Database Guide v0.1.18.0¶
Overview¶
NetIntel-OCR v0.1.18.0 features comprehensive Milvus integration for high-performance vector similarity search, enabling semantic search, document retrieval, and intelligent querying at scale. This guide covers all Milvus operations, optimization strategies, and best practices.
Table of Contents¶
- Milvus Setup
- Collection Management
- Vector Operations
- Index Management
- Search Operations
- Partitions and Segments
- Performance Optimization
- Monitoring and Maintenance
- Troubleshooting
Milvus Setup¶
Docker Deployment¶
# Run Milvus Standalone
docker-compose -f docker-compose-milvus.yml up -d
# docker-compose-milvus.yml
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
milvus:
container_name: milvus-standalone
image: milvusdb/milvus:v2.3.3
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- etcd
- minio
Configuration¶
# milvus.yml configuration
log:
level: info
file:
rootPath: /var/log/milvus
maxSize: 300MB
maxAge: 10
maxBackups: 20
dataCoord:
segment:
maxSize: 512MB
sealProportion: 0.25
indexCoord:
autoIndex:
enable: true
queryCoord:
autoHandoff: true
autoBalance: true
balanceIntervalSeconds: 60
memoryUsageMaxDifferencePercentage: 30
queryNode:
cache:
enabled: true
memoryLimit: 2GB
proxy:
port: 19530
internalPort: 19529
Connecting to Milvus¶
from pymilvus import connections, Collection, utility
# Connect to Milvus
connections.connect(
alias="default",
host="localhost",
port="19530",
user="username", # Optional
password="password", # Optional
secure=False # Set to True for TLS
)
# Check connection
print(f"Milvus server version: {utility.get_server_version()}")
print(f"Connected: {connections.has_connection('default')}")
Collection Management¶
Creating Collections¶
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection
# Define schema
fields = [
FieldSchema(
name="id",
dtype=DataType.INT64,
is_primary=True,
auto_id=True,
description="Primary ID"
),
FieldSchema(
name="document_id",
dtype=DataType.VARCHAR,
max_length=100,
description="Document identifier"
),
FieldSchema(
name="content",
dtype=DataType.VARCHAR,
max_length=65535,
description="Text content"
),
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=768,
description="Text embedding vector"
),
FieldSchema(
name="metadata",
dtype=DataType.JSON,
description="Document metadata"
),
FieldSchema(
name="timestamp",
dtype=DataType.INT64,
description="Creation timestamp"
)
]
# Create schema
schema = CollectionSchema(
fields=fields,
description="Document embeddings collection",
enable_dynamic_field=True # Allow additional fields
)
# Create collection
collection = Collection(
name="documents",
schema=schema,
consistency_level="Session", # Options: Strong, Session, Bounded, Eventually
shards_num=2 # Number of shards
)
print(f"Collection created: {collection.name}")
Collection with Custom Properties¶
# Advanced collection creation with properties
collection = Collection(
name="advanced_documents",
schema=schema,
properties={
"collection.ttl.seconds": 3600, # TTL for data
"collection.autoID.enable": True,
"collection.segment.rowLimit": 1024000
}
)
# Set collection properties after creation
collection.set_properties({
"collection.ttl.seconds": 7200
})
Managing Collections¶
from pymilvus import utility
# List all collections
collections = utility.list_collections()
print(f"Collections: {collections}")
# Check if collection exists
exists = utility.has_collection("documents")
print(f"Collection exists: {exists}")
# Get collection info
collection = Collection("documents")
print(f"Collection name: {collection.name}")
print(f"Description: {collection.description}")
print(f"Schema: {collection.schema}")
print(f"Number of entities: {collection.num_entities}")
# Rename collection
utility.rename_collection(
old_collection_name="documents",
new_collection_name="document_vectors"
)
# Drop collection
utility.drop_collection("old_collection")
Vector Operations¶
Inserting Vectors¶
import numpy as np
from datetime import datetime
# Prepare data
num_entities = 1000
dim = 768
# Generate sample data
documents = [f"document_{i}" for i in range(num_entities)]
contents = [f"This is the content of document {i}" for i in range(num_entities)]
embeddings = np.random.random((num_entities, dim)).tolist()
timestamps = [int(datetime.now().timestamp()) for _ in range(num_entities)]
metadata = [{"page": i % 10, "category": f"cat_{i % 5}"} for i in range(num_entities)]
# Insert data
data = [
documents, # document_id
contents, # content
embeddings, # embedding
metadata, # metadata
timestamps # timestamp
]
collection = Collection("documents")
insert_result = collection.insert(data)
print(f"Inserted {insert_result.insert_count} entities")
print(f"Primary keys: {insert_result.primary_keys[:5]}...")
# Flush to persist data
collection.flush()
Batch Insert with Error Handling¶
def batch_insert_with_retry(collection, data, batch_size=1000, max_retries=3):
"""Insert data in batches with retry logic"""
total_inserted = 0
failed_batches = []
# Split data into batches
num_entities = len(data[0])
for start_idx in range(0, num_entities, batch_size):
end_idx = min(start_idx + batch_size, num_entities)
batch_data = [field[start_idx:end_idx] for field in data]
# Try inserting batch with retries
for attempt in range(max_retries):
try:
result = collection.insert(batch_data)
total_inserted += result.insert_count
print(f"Batch {start_idx}-{end_idx} inserted successfully")
break
except Exception as e:
print(f"Attempt {attempt + 1} failed for batch {start_idx}-{end_idx}: {e}")
if attempt == max_retries - 1:
failed_batches.append((start_idx, end_idx))
else:
time.sleep(2 ** attempt) # Exponential backoff
collection.flush()
return {
"total_inserted": total_inserted,
"failed_batches": failed_batches
}
Upsert Operations¶
# Upsert: Insert or update if exists
upsert_data = [
["document_1", "document_2"], # document_id
["Updated content 1", "Updated content 2"], # content
[embedding1, embedding2], # embeddings
[{"updated": True}, {"updated": True}], # metadata
[timestamp1, timestamp2] # timestamps
]
collection.upsert(upsert_data)
collection.flush()
Delete Operations¶
# Delete by primary key
collection.delete(expr="id in [1, 2, 3, 4, 5]")
# Delete by expression
collection.delete(expr="document_id == 'document_123'")
# Delete with complex expression
collection.delete(
expr="timestamp < 1609459200 and JSON_CONTAINS(metadata, '{\"category\": \"obsolete\"}')"
)
collection.flush()
Index Management¶
Creating Indexes¶
# Create IVF_FLAT index (good for accuracy)
index_params = {
"metric_type": "L2", # L2, IP, COSINE
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index(
field_name="embedding",
index_params=index_params,
index_name="embedding_index"
)
# Create IVF_SQ8 index (balanced accuracy/speed/size)
index_params_sq8 = {
"metric_type": "IP", # Inner Product
"index_type": "IVF_SQ8",
"params": {"nlist": 2048}
}
collection.create_index(
field_name="embedding",
index_params=index_params_sq8
)
# Create HNSW index (good for high recall)
index_params_hnsw = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {
"M": 16,
"efConstruction": 200
}
}
collection.create_index(
field_name="embedding",
index_params=index_params_hnsw
)
# Create scalar index
collection.create_index(
field_name="timestamp",
index_name="timestamp_index"
)
Index Types Comparison¶
| Index Type | Speed | Accuracy | Memory | Use Case |
|---|---|---|---|---|
| FLAT | Slow | 100% | High | Small datasets (<1M) |
| IVF_FLAT | Medium | High | Medium | General purpose |
| IVF_SQ8 | Fast | Good | Low | Large datasets |
| IVF_PQ | Very Fast | Moderate | Very Low | Huge datasets |
| HNSW | Fast | High | High | High recall needed |
| ANNOY | Fast | Good | Low | Read-heavy workloads |
| DISKANN | Fast | High | Low | Disk-based search |
Managing Indexes¶
# Check index status
utility.index_building_progress("documents")
# List indexes
indexes = collection.indexes
for index in indexes:
print(f"Index: {index.field_name} - {index.params}")
# Drop index
collection.drop_index(index_name="embedding_index")
# Rebuild index
collection.drop_index()
collection.create_index(field_name="embedding", index_params=new_params)
Search Operations¶
Basic Vector Search¶
# Load collection into memory
collection.load()
# Prepare search vectors
search_vectors = [[random.random() for _ in range(768)] for _ in range(5)]
# Basic search
search_params = {
"metric_type": "L2",
"params": {"nprobe": 16}
}
results = collection.search(
data=search_vectors,
anns_field="embedding",
param=search_params,
limit=10,
output_fields=["document_id", "content", "metadata"]
)
# Process results
for i, result in enumerate(results):
print(f"\nQuery {i} results:")
for j, hit in enumerate(result):
print(f" {j}. ID: {hit.id}, Score: {hit.distance}")
print(f" Document: {hit.entity.get('document_id')}")
print(f" Content: {hit.entity.get('content')[:100]}...")
Filtered Search¶
# Search with filter expressions
filter_expr = "timestamp > 1609459200 and JSON_CONTAINS(metadata, '{\"category\": \"cat_1\"}')"
results = collection.search(
data=search_vectors,
anns_field="embedding",
param=search_params,
limit=10,
expr=filter_expr,
output_fields=["document_id", "content", "metadata", "timestamp"]
)
Hybrid Search¶
def hybrid_search(collection, text_query, vector_query, alpha=0.7):
"""
Perform hybrid search combining vector and keyword search
Args:
collection: Milvus collection
text_query: Text for keyword matching
vector_query: Vector for similarity search
alpha: Weight for vector search (1-alpha for keyword)
"""
# Vector search
vector_results = collection.search(
data=[vector_query],
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 32}},
limit=50,
output_fields=["document_id", "content", "metadata"]
)[0]
# Keyword search using filter
keyword_filter = f"content like '%{text_query}%'"
keyword_results = collection.query(
expr=keyword_filter,
output_fields=["document_id", "content", "metadata"],
limit=50
)
# Combine and rerank results
combined_scores = {}
# Add vector search results
for i, hit in enumerate(vector_results):
doc_id = hit.entity.get('document_id')
vector_score = 1.0 / (1.0 + hit.distance) # Convert distance to score
combined_scores[doc_id] = alpha * vector_score
# Add keyword search results
for i, result in enumerate(keyword_results):
doc_id = result['document_id']
keyword_score = 1.0 - (i / len(keyword_results)) # Rank-based score
if doc_id in combined_scores:
combined_scores[doc_id] += (1 - alpha) * keyword_score
else:
combined_scores[doc_id] = (1 - alpha) * keyword_score
# Sort by combined score
ranked_results = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)
return ranked_results[:10]
Range Search¶
# Search within a distance range
range_search_params = {
"metric_type": "L2",
"params": {
"nprobe": 16,
"radius": 1.0, # Maximum distance
"range_filter": 0.5 # Minimum distance
}
}
results = collection.search(
data=search_vectors,
anns_field="embedding",
param=range_search_params,
limit=10
)
Multi-Vector Search¶
def multi_vector_search(collection, query_vectors, aggregation="mean"):
"""
Search with multiple query vectors
Args:
collection: Milvus collection
query_vectors: List of query vectors
aggregation: How to combine results ("mean", "max", "reciprocal_rank")
"""
all_results = []
# Perform searches for each vector
for vector in query_vectors:
results = collection.search(
data=[vector],
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 16}},
limit=50,
output_fields=["document_id", "content"]
)[0]
all_results.append(results)
# Aggregate results
if aggregation == "mean":
# Average scores across queries
score_map = {}
count_map = {}
for results in all_results:
for hit in results:
doc_id = hit.entity.get('document_id')
if doc_id not in score_map:
score_map[doc_id] = 0
count_map[doc_id] = 0
score_map[doc_id] += hit.distance
count_map[doc_id] += 1
# Calculate mean scores
final_scores = {
doc_id: score_map[doc_id] / count_map[doc_id]
for doc_id in score_map
}
elif aggregation == "reciprocal_rank":
# Reciprocal Rank Fusion
rrf_scores = {}
k = 60 # RRF parameter
for results in all_results:
for rank, hit in enumerate(results):
doc_id = hit.entity.get('document_id')
if doc_id not in rrf_scores:
rrf_scores[doc_id] = 0
rrf_scores[doc_id] += 1.0 / (k + rank + 1)
final_scores = rrf_scores
# Sort and return top results
sorted_results = sorted(
final_scores.items(),
key=lambda x: x[1],
reverse=(aggregation == "reciprocal_rank")
)
return sorted_results[:10]
Partitions and Segments¶
Creating and Managing Partitions¶
# Create partitions for data organization
collection.create_partition("2024_Q1")
collection.create_partition("2024_Q2")
collection.create_partition("2024_Q3")
collection.create_partition("2024_Q4")
# List partitions
partitions = collection.partitions
for partition in partitions:
print(f"Partition: {partition.name}, Entities: {partition.num_entities}")
# Insert into specific partition
partition = collection.partition("2024_Q1")
partition.insert(data)
# Load specific partitions for search
collection.load(partition_names=["2024_Q3", "2024_Q4"])
# Search within partitions
results = collection.search(
data=search_vectors,
anns_field="embedding",
param=search_params,
limit=10,
partition_names=["2024_Q3", "2024_Q4"]
)
# Drop partition
collection.drop_partition("2024_Q1")
Segment Management¶
# Get segment information
from pymilvus import utility
# Get query segment info
segments = utility.get_query_segment_info("documents")
for segment in segments:
print(f"Segment ID: {segment.segmentID}")
print(f"Collection: {segment.collectionID}")
print(f"Partition: {segment.partitionID}")
print(f"Mem size: {segment.mem_size}")
print(f"Num rows: {segment.num_rows}")
print(f"Index: {segment.index_name}")
# Compact segments
collection.compact()
# Get compaction state
compaction_state = collection.get_compaction_state()
print(f"Compaction state: {compaction_state}")
# Manual flush
collection.flush(_async=False)
Performance Optimization¶
1. Index Optimization¶
def optimize_index_for_dataset(collection, num_entities, dim):
"""
Choose optimal index based on dataset characteristics
"""
if num_entities < 10000:
# Small dataset: use FLAT for accuracy
index_params = {
"metric_type": "L2",
"index_type": "FLAT"
}
elif num_entities < 100000:
# Medium dataset: use IVF_FLAT
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": int(np.sqrt(num_entities))}
}
elif num_entities < 1000000:
# Large dataset: use IVF_SQ8
index_params = {
"metric_type": "L2",
"index_type": "IVF_SQ8",
"params": {"nlist": int(4 * np.sqrt(num_entities))}
}
else:
# Very large dataset: use IVF_PQ or HNSW
if dim > 128:
# High dimension: use PQ for compression
index_params = {
"metric_type": "L2",
"index_type": "IVF_PQ",
"params": {
"nlist": 4096,
"m": 8, # PQ subvector count
"nbits": 8 # Bits per subvector
}
}
else:
# Lower dimension: use HNSW
index_params = {
"metric_type": "L2",
"index_type": "HNSW",
"params": {
"M": 16,
"efConstruction": 200
}
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
return index_params
2. Search Optimization¶
def optimize_search_params(index_type, recall_requirement=0.9):
"""
Get optimal search parameters based on index type and recall requirement
"""
search_params = {"metric_type": "L2"}
if index_type == "IVF_FLAT" or index_type == "IVF_SQ8":
if recall_requirement >= 0.95:
search_params["params"] = {"nprobe": 64}
elif recall_requirement >= 0.9:
search_params["params"] = {"nprobe": 32}
else:
search_params["params"] = {"nprobe": 16}
elif index_type == "IVF_PQ":
if recall_requirement >= 0.95:
search_params["params"] = {"nprobe": 128}
elif recall_requirement >= 0.9:
search_params["params"] = {"nprobe": 64}
else:
search_params["params"] = {"nprobe": 32}
elif index_type == "HNSW":
if recall_requirement >= 0.95:
search_params["params"] = {"ef": 128}
elif recall_requirement >= 0.9:
search_params["params"] = {"ef": 64}
else:
search_params["params"] = {"ef": 32}
return search_params
3. Batch Processing Optimization¶
class MilvusBatchProcessor:
"""Optimized batch processing for Milvus operations"""
def __init__(self, collection, batch_size=1000):
self.collection = collection
self.batch_size = batch_size
self.buffer = []
def add_to_buffer(self, data):
"""Add data to buffer and auto-flush when full"""
self.buffer.append(data)
if len(self.buffer) >= self.batch_size:
self.flush()
def flush(self):
"""Flush buffer to Milvus"""
if not self.buffer:
return
# Reorganize data for batch insert
batch_data = [[] for _ in range(len(self.buffer[0]))]
for item in self.buffer:
for i, field_value in enumerate(item):
batch_data[i].append(field_value)
# Insert batch
self.collection.insert(batch_data)
self.buffer = []
def search_parallel(self, query_vectors, num_threads=4):
"""Parallel search for multiple queries"""
import concurrent.futures
def single_search(vector):
return self.collection.search(
data=[vector],
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 16}},
limit=10
)[0]
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
results = list(executor.map(single_search, query_vectors))
return results
4. Memory Optimization¶
# Configure memory settings
from pymilvus import utility
# Set memory limit for index
utility.set_config("queryNode.cache.memoryLimit", "4GB")
# Configure segment settings
utility.set_config("dataCoord.segment.maxSize", "256MB")
utility.set_config("dataCoord.segment.sealProportion", "0.25")
# Release collection from memory when not needed
collection.release()
# Load only needed partitions
collection.load(partition_names=["recent_data"])
# Use resource groups for isolation
utility.create_resource_group(
name="high_priority",
config={
"limits": {"node_num": 2},
"requests": {"node_num": 1}
}
)
# Transfer replicas to resource group
utility.update_resource_groups(
source="__default__",
target="high_priority",
collection_names=["documents"]
)
Monitoring and Maintenance¶
Monitoring Metrics¶
from pymilvus import utility
import json
def get_milvus_metrics():
"""Get comprehensive Milvus metrics"""
metrics = {}
# System info
metrics['server_version'] = utility.get_server_version()
# Collections info
collections = utility.list_collections()
metrics['collections'] = {}
for coll_name in collections:
coll = Collection(coll_name)
coll.load()
metrics['collections'][coll_name] = {
'num_entities': coll.num_entities,
'partitions': len(coll.partitions),
'indexes': [idx.field_name for idx in coll.indexes],
'loaded': utility.load_state(coll_name)
}
# Get query segment info
segments = utility.get_query_segment_info(coll_name)
metrics['collections'][coll_name]['segments'] = [
{
'id': seg.segmentID,
'rows': seg.num_rows,
'mem_size': seg.mem_size,
'index': seg.index_name
}
for seg in segments
]
# Resource groups
resource_groups = utility.list_resource_groups()
metrics['resource_groups'] = resource_groups
return metrics
# Get and display metrics
metrics = get_milvus_metrics()
print(json.dumps(metrics, indent=2))
Maintenance Tasks¶
def perform_maintenance(collection_name):
"""Perform routine maintenance tasks"""
collection = Collection(collection_name)
# 1. Compact segments
print("Compacting segments...")
collection.compact()
# Wait for compaction
import time
while True:
state = collection.get_compaction_state()
if state.state != "Executing":
break
time.sleep(5)
print(f"Compaction completed: {state}")
# 2. Rebuild index if needed
segments = utility.get_query_segment_info(collection_name)
unindexed_rows = sum(seg.num_rows for seg in segments if not seg.index_name)
if unindexed_rows > 10000: # Threshold for rebuilding
print(f"Rebuilding index for {unindexed_rows} unindexed rows...")
collection.drop_index()
collection.create_index(
field_name="embedding",
index_params={
"metric_type": "L2",
"index_type": "IVF_SQ8",
"params": {"nlist": 2048}
}
)
# 3. Balance segments across nodes
utility.load_balance(
collection_name=collection_name,
src_node_id=1,
dst_node_ids=[2, 3],
sealed_segment_ids=None # Balance all segments
)
# 4. Clean up deleted data
collection.flush()
print("Maintenance completed")
Backup and Recovery¶
def backup_collection(collection_name, backup_path):
"""Backup collection data and schema"""
import pickle
import os
collection = Collection(collection_name)
# Get schema
schema = collection.schema.to_dict()
# Query all data
all_data = collection.query(
expr="id > 0",
output_fields=["*"],
limit=None
)
# Save backup
backup = {
"schema": schema,
"data": all_data,
"num_entities": collection.num_entities,
"timestamp": datetime.now().isoformat()
}
backup_file = os.path.join(backup_path, f"{collection_name}_backup.pkl")
with open(backup_file, 'wb') as f:
pickle.dump(backup, f)
print(f"Backup saved to {backup_file}")
return backup_file
def restore_collection(backup_file, new_collection_name=None):
"""Restore collection from backup"""
import pickle
with open(backup_file, 'rb') as f:
backup = pickle.load(f)
# Create collection from schema
schema_dict = backup['schema']
fields = []
for field in schema_dict['fields']:
fields.append(FieldSchema(
name=field['name'],
dtype=DataType[field['type']],
**field.get('params', {})
))
schema = CollectionSchema(
fields=fields,
description=schema_dict.get('description', '')
)
collection_name = new_collection_name or schema_dict['name']
collection = Collection(name=collection_name, schema=schema)
# Restore data
if backup['data']:
# Reorganize data for insert
field_names = [field['name'] for field in schema_dict['fields'] if not field.get('auto_id')]
insert_data = []
for field_name in field_names:
field_data = [item[field_name] for item in backup['data']]
insert_data.append(field_data)
collection.insert(insert_data)
collection.flush()
print(f"Collection {collection_name} restored with {collection.num_entities} entities")
return collection
Troubleshooting¶
Common Issues and Solutions¶
1. Connection Issues¶
def diagnose_connection():
"""Diagnose Milvus connection issues"""
import socket
host = "localhost"
port = 19530
# Check if port is open
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((host, port))
if result == 0:
print(f"✓ Port {port} is open")
else:
print(f"✗ Port {port} is closed")
return False
# Try connecting
try:
connections.connect(alias="test", host=host, port=str(port))
print("✓ Successfully connected to Milvus")
# Check server version
version = utility.get_server_version()
print(f"✓ Server version: {version}")
connections.disconnect("test")
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
return False
2. Search Performance Issues¶
def diagnose_search_performance(collection_name):
"""Diagnose search performance issues"""
collection = Collection(collection_name)
# Check if collection is loaded
load_state = utility.load_state(collection_name)
print(f"Load state: {load_state}")
if load_state != "Loaded":
print("⚠ Collection not loaded in memory")
collection.load()
# Check index status
indexes = collection.indexes
if not indexes:
print("⚠ No index created")
else:
for index in indexes:
print(f"Index on {index.field_name}: {index.params}")
# Check segment info
segments = utility.get_query_segment_info(collection_name)
print(f"Number of segments: {len(segments)}")
# Check for too many small segments
small_segments = [s for s in segments if s.num_rows < 1024]
if len(small_segments) > 10:
print(f"⚠ Too many small segments ({len(small_segments)}), consider compaction")
# Test search performance
import time
import numpy as np
test_vector = np.random.random(768).tolist()
# Warm up
collection.search(
data=[test_vector],
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 16}},
limit=10
)
# Measure search time
times = []
for _ in range(10):
start = time.time()
collection.search(
data=[test_vector],
anns_field="embedding",
param={"metric_type": "L2", "params": {"nprobe": 16}},
limit=10
)
times.append(time.time() - start)
avg_time = sum(times) / len(times)
print(f"Average search time: {avg_time*1000:.2f}ms")
if avg_time > 0.1:
print("⚠ Search is slow, consider:")
print(" - Reducing nprobe parameter")
print(" - Using a faster index type (IVF_SQ8, IVF_PQ)")
print(" - Adding more query nodes")
3. Memory Issues¶
def diagnose_memory_usage(collection_name):
"""Diagnose memory usage issues"""
collection = Collection(collection_name)
# Get memory usage
segments = utility.get_query_segment_info(collection_name)
total_memory = sum(seg.mem_size for seg in segments)
print(f"Total memory usage: {total_memory / (1024**3):.2f} GB")
print(f"Number of segments: {len(segments)}")
# Memory per segment
for seg in segments:
print(f" Segment {seg.segmentID}: {seg.mem_size / (1024**2):.2f} MB ({seg.num_rows} rows)")
# Suggestions
if total_memory > 10 * (1024**3): # > 10GB
print("\n⚠ High memory usage detected")
print("Suggestions:")
print(" - Use disk-based index (DiskANN)")
print(" - Enable scalar field filtering to reduce loaded data")
print(" - Use smaller embedding dimensions")
print(" - Partition data and load only needed partitions")
print(" - Use quantization (IVF_SQ8, IVF_PQ)")
Error Recovery¶
def recover_from_errors(collection_name):
"""Recover from common error states"""
try:
collection = Collection(collection_name)
# Try to release and reload
print("Attempting to release and reload collection...")
try:
collection.release()
except:
pass
time.sleep(2)
# Reload
collection.load()
# Verify
state = utility.load_state(collection_name)
if state == "Loaded":
print("✓ Collection recovered and loaded")
else:
print("✗ Failed to load collection")
# Try dropping and recreating index
print("Attempting to rebuild index...")
collection.drop_index()
collection.create_index(
field_name="embedding",
index_params={
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
)
collection.load()
except Exception as e:
print(f"Recovery failed: {e}")
print("Manual intervention may be required")
Best Practices¶
1. Schema Design¶
# Good schema design practices
good_schema = CollectionSchema(
fields=[
# Use auto_id for primary key
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
# Use appropriate max_length for VARCHAR
FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=100),
# Use JSON for flexible metadata
FieldSchema(name="metadata", dtype=DataType.JSON),
# Choose appropriate vector dimension
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
],
# Enable dynamic fields for flexibility
enable_dynamic_field=True,
# Add meaningful description
description="Document embeddings with metadata"
)
2. Insert Best Practices¶
# Batch inserts for efficiency
BATCH_SIZE = 10000
# Use context manager for automatic flush
with collection.insert_context():
for batch in data_batches:
collection.insert(batch)
# Automatic flush on context exit
3. Search Best Practices¶
# Pre-filter to reduce search space
efficient_search = collection.search(
data=query_vectors,
anns_field="embedding",
param=search_params,
limit=10,
# Filter first, then search
expr="timestamp > 1609459200 AND category IN ['A', 'B']",
# Only return needed fields
output_fields=["doc_id", "title"],
# Use consistency level based on needs
consistency_level="Session"
)
4. Production Configuration¶
# production-milvus.yml
etcd:
endpoints:
- etcd-0:2379
- etcd-1:2379
- etcd-2:2379
minio:
address: minio:9000
useSSL: true
bucketName: milvus-prod
proxy:
port: 19530
maxImportSize: 17179869184 # 16GB
dataCoord:
segment:
maxSize: 1024 # MB
sealProportion: 0.25
queryCoord:
autoHandoff: true
autoBalance: true
overloadedMemoryThresholdPercentage: 90
queryNode:
cache:
enabled: true
memoryLimit: 8192 # MB
log:
level: warn
file:
maxSize: 1024 # MB
maxBackups: 10
Next Steps¶
- Learn about Enterprise Features: See the Enterprise Features Guide
- Configure Authentication: Read the Authentication & Security Guide
- Deploy to Production: Follow the Production Deployment Guide