Skip to content

Milvus Vector Database Guide v0.1.18.0

Overview

NetIntel-OCR v0.1.18.0 features comprehensive Milvus integration for high-performance vector similarity search, enabling semantic search, document retrieval, and intelligent querying at scale. This guide covers all Milvus operations, optimization strategies, and best practices.

Table of Contents

  1. Milvus Setup
  2. Collection Management
  3. Vector Operations
  4. Index Management
  5. Search Operations
  6. Partitions and Segments
  7. Performance Optimization
  8. Monitoring and Maintenance
  9. Troubleshooting

Milvus Setup

Docker Deployment

# Run Milvus Standalone
docker-compose -f docker-compose-milvus.yml up -d

# docker-compose-milvus.yml
version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"

  milvus:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.3.3
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - etcd
      - minio

Configuration

# milvus.yml configuration
log:
  level: info
  file:
    rootPath: /var/log/milvus
    maxSize: 300MB
    maxAge: 10
    maxBackups: 20

dataCoord:
  segment:
    maxSize: 512MB
    sealProportion: 0.25

indexCoord:
  autoIndex:
    enable: true

queryCoord:
  autoHandoff: true
  autoBalance: true
  balanceIntervalSeconds: 60
  memoryUsageMaxDifferencePercentage: 30

queryNode:
  cache:
    enabled: true
    memoryLimit: 2GB

proxy:
  port: 19530
  internalPort: 19529

Connecting to Milvus

from pymilvus import connections, Collection, utility

# Connect to Milvus
connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    user="username",  # Optional
    password="password",  # Optional
    secure=False  # Set to True for TLS
)

# Check connection
print(f"Milvus server version: {utility.get_server_version()}")
print(f"Connected: {connections.has_connection('default')}")

Collection Management

Creating Collections

from pymilvus import CollectionSchema, FieldSchema, DataType, Collection

# Define schema
fields = [
    FieldSchema(
        name="id",
        dtype=DataType.INT64,
        is_primary=True,
        auto_id=True,
        description="Primary ID"
    ),
    FieldSchema(
        name="document_id",
        dtype=DataType.VARCHAR,
        max_length=100,
        description="Document identifier"
    ),
    FieldSchema(
        name="content",
        dtype=DataType.VARCHAR,
        max_length=65535,
        description="Text content"
    ),
    FieldSchema(
        name="embedding",
        dtype=DataType.FLOAT_VECTOR,
        dim=768,
        description="Text embedding vector"
    ),
    FieldSchema(
        name="metadata",
        dtype=DataType.JSON,
        description="Document metadata"
    ),
    FieldSchema(
        name="timestamp",
        dtype=DataType.INT64,
        description="Creation timestamp"
    )
]

# Create schema
schema = CollectionSchema(
    fields=fields,
    description="Document embeddings collection",
    enable_dynamic_field=True  # Allow additional fields
)

# Create collection
collection = Collection(
    name="documents",
    schema=schema,
    consistency_level="Session",  # Options: Strong, Session, Bounded, Eventually
    shards_num=2  # Number of shards
)

print(f"Collection created: {collection.name}")

Collection with Custom Properties

# Advanced collection creation with properties
collection = Collection(
    name="advanced_documents",
    schema=schema,
    properties={
        "collection.ttl.seconds": 3600,  # TTL for data
        "collection.autoID.enable": True,
        "collection.segment.rowLimit": 1024000
    }
)

# Set collection properties after creation
collection.set_properties({
    "collection.ttl.seconds": 7200
})

Managing Collections

from pymilvus import utility

# List all collections
collections = utility.list_collections()
print(f"Collections: {collections}")

# Check if collection exists
exists = utility.has_collection("documents")
print(f"Collection exists: {exists}")

# Get collection info
collection = Collection("documents")
print(f"Collection name: {collection.name}")
print(f"Description: {collection.description}")
print(f"Schema: {collection.schema}")
print(f"Number of entities: {collection.num_entities}")

# Rename collection
utility.rename_collection(
    old_collection_name="documents",
    new_collection_name="document_vectors"
)

# Drop collection
utility.drop_collection("old_collection")

Vector Operations

Inserting Vectors

import numpy as np
from datetime import datetime

# Prepare data
num_entities = 1000
dim = 768

# Generate sample data
documents = [f"document_{i}" for i in range(num_entities)]
contents = [f"This is the content of document {i}" for i in range(num_entities)]
embeddings = np.random.random((num_entities, dim)).tolist()
timestamps = [int(datetime.now().timestamp()) for _ in range(num_entities)]
metadata = [{"page": i % 10, "category": f"cat_{i % 5}"} for i in range(num_entities)]

# Insert data
data = [
    documents,  # document_id
    contents,   # content
    embeddings, # embedding
    metadata,   # metadata
    timestamps  # timestamp
]

collection = Collection("documents")
insert_result = collection.insert(data)

print(f"Inserted {insert_result.insert_count} entities")
print(f"Primary keys: {insert_result.primary_keys[:5]}...")

# Flush to persist data
collection.flush()

Batch Insert with Error Handling

def batch_insert_with_retry(collection, data, batch_size=1000, max_retries=3):
    """Insert data in batches with retry logic"""

    total_inserted = 0
    failed_batches = []

    # Split data into batches
    num_entities = len(data[0])

    for start_idx in range(0, num_entities, batch_size):
        end_idx = min(start_idx + batch_size, num_entities)
        batch_data = [field[start_idx:end_idx] for field in data]

        # Try inserting batch with retries
        for attempt in range(max_retries):
            try:
                result = collection.insert(batch_data)
                total_inserted += result.insert_count
                print(f"Batch {start_idx}-{end_idx} inserted successfully")
                break
            except Exception as e:
                print(f"Attempt {attempt + 1} failed for batch {start_idx}-{end_idx}: {e}")
                if attempt == max_retries - 1:
                    failed_batches.append((start_idx, end_idx))
                else:
                    time.sleep(2 ** attempt)  # Exponential backoff

    collection.flush()

    return {
        "total_inserted": total_inserted,
        "failed_batches": failed_batches
    }

Upsert Operations

# Upsert: Insert or update if exists
upsert_data = [
    ["document_1", "document_2"],  # document_id
    ["Updated content 1", "Updated content 2"],  # content
    [embedding1, embedding2],  # embeddings
    [{"updated": True}, {"updated": True}],  # metadata
    [timestamp1, timestamp2]  # timestamps
]

collection.upsert(upsert_data)
collection.flush()

Delete Operations

# Delete by primary key
collection.delete(expr="id in [1, 2, 3, 4, 5]")

# Delete by expression
collection.delete(expr="document_id == 'document_123'")

# Delete with complex expression
collection.delete(
    expr="timestamp < 1609459200 and JSON_CONTAINS(metadata, '{\"category\": \"obsolete\"}')"
)

collection.flush()

Index Management

Creating Indexes

# Create IVF_FLAT index (good for accuracy)
index_params = {
    "metric_type": "L2",  # L2, IP, COSINE
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024}
}

collection.create_index(
    field_name="embedding",
    index_params=index_params,
    index_name="embedding_index"
)

# Create IVF_SQ8 index (balanced accuracy/speed/size)
index_params_sq8 = {
    "metric_type": "IP",  # Inner Product
    "index_type": "IVF_SQ8",
    "params": {"nlist": 2048}
}

collection.create_index(
    field_name="embedding",
    index_params=index_params_sq8
)

# Create HNSW index (good for high recall)
index_params_hnsw = {
    "metric_type": "COSINE",
    "index_type": "HNSW",
    "params": {
        "M": 16,
        "efConstruction": 200
    }
}

collection.create_index(
    field_name="embedding",
    index_params=index_params_hnsw
)

# Create scalar index
collection.create_index(
    field_name="timestamp",
    index_name="timestamp_index"
)

Index Types Comparison

Index Type Speed Accuracy Memory Use Case
FLAT Slow 100% High Small datasets (<1M)
IVF_FLAT Medium High Medium General purpose
IVF_SQ8 Fast Good Low Large datasets
IVF_PQ Very Fast Moderate Very Low Huge datasets
HNSW Fast High High High recall needed
ANNOY Fast Good Low Read-heavy workloads
DISKANN Fast High Low Disk-based search

Managing Indexes

# Check index status
utility.index_building_progress("documents")

# List indexes
indexes = collection.indexes
for index in indexes:
    print(f"Index: {index.field_name} - {index.params}")

# Drop index
collection.drop_index(index_name="embedding_index")

# Rebuild index
collection.drop_index()
collection.create_index(field_name="embedding", index_params=new_params)

Search Operations

# Load collection into memory
collection.load()

# Prepare search vectors
search_vectors = [[random.random() for _ in range(768)] for _ in range(5)]

# Basic search
search_params = {
    "metric_type": "L2",
    "params": {"nprobe": 16}
}

results = collection.search(
    data=search_vectors,
    anns_field="embedding",
    param=search_params,
    limit=10,
    output_fields=["document_id", "content", "metadata"]
)

# Process results
for i, result in enumerate(results):
    print(f"\nQuery {i} results:")
    for j, hit in enumerate(result):
        print(f"  {j}. ID: {hit.id}, Score: {hit.distance}")
        print(f"     Document: {hit.entity.get('document_id')}")
        print(f"     Content: {hit.entity.get('content')[:100]}...")
# Search with filter expressions
filter_expr = "timestamp > 1609459200 and JSON_CONTAINS(metadata, '{\"category\": \"cat_1\"}')"

results = collection.search(
    data=search_vectors,
    anns_field="embedding",
    param=search_params,
    limit=10,
    expr=filter_expr,
    output_fields=["document_id", "content", "metadata", "timestamp"]
)
def hybrid_search(collection, text_query, vector_query, alpha=0.7):
    """
    Perform hybrid search combining vector and keyword search

    Args:
        collection: Milvus collection
        text_query: Text for keyword matching
        vector_query: Vector for similarity search
        alpha: Weight for vector search (1-alpha for keyword)
    """

    # Vector search
    vector_results = collection.search(
        data=[vector_query],
        anns_field="embedding",
        param={"metric_type": "L2", "params": {"nprobe": 32}},
        limit=50,
        output_fields=["document_id", "content", "metadata"]
    )[0]

    # Keyword search using filter
    keyword_filter = f"content like '%{text_query}%'"
    keyword_results = collection.query(
        expr=keyword_filter,
        output_fields=["document_id", "content", "metadata"],
        limit=50
    )

    # Combine and rerank results
    combined_scores = {}

    # Add vector search results
    for i, hit in enumerate(vector_results):
        doc_id = hit.entity.get('document_id')
        vector_score = 1.0 / (1.0 + hit.distance)  # Convert distance to score
        combined_scores[doc_id] = alpha * vector_score

    # Add keyword search results
    for i, result in enumerate(keyword_results):
        doc_id = result['document_id']
        keyword_score = 1.0 - (i / len(keyword_results))  # Rank-based score

        if doc_id in combined_scores:
            combined_scores[doc_id] += (1 - alpha) * keyword_score
        else:
            combined_scores[doc_id] = (1 - alpha) * keyword_score

    # Sort by combined score
    ranked_results = sorted(
        combined_scores.items(),
        key=lambda x: x[1],
        reverse=True
    )

    return ranked_results[:10]
# Search within a distance range
range_search_params = {
    "metric_type": "L2",
    "params": {
        "nprobe": 16,
        "radius": 1.0,  # Maximum distance
        "range_filter": 0.5  # Minimum distance
    }
}

results = collection.search(
    data=search_vectors,
    anns_field="embedding",
    param=range_search_params,
    limit=10
)
def multi_vector_search(collection, query_vectors, aggregation="mean"):
    """
    Search with multiple query vectors

    Args:
        collection: Milvus collection
        query_vectors: List of query vectors
        aggregation: How to combine results ("mean", "max", "reciprocal_rank")
    """

    all_results = []

    # Perform searches for each vector
    for vector in query_vectors:
        results = collection.search(
            data=[vector],
            anns_field="embedding",
            param={"metric_type": "L2", "params": {"nprobe": 16}},
            limit=50,
            output_fields=["document_id", "content"]
        )[0]
        all_results.append(results)

    # Aggregate results
    if aggregation == "mean":
        # Average scores across queries
        score_map = {}
        count_map = {}

        for results in all_results:
            for hit in results:
                doc_id = hit.entity.get('document_id')
                if doc_id not in score_map:
                    score_map[doc_id] = 0
                    count_map[doc_id] = 0

                score_map[doc_id] += hit.distance
                count_map[doc_id] += 1

        # Calculate mean scores
        final_scores = {
            doc_id: score_map[doc_id] / count_map[doc_id]
            for doc_id in score_map
        }

    elif aggregation == "reciprocal_rank":
        # Reciprocal Rank Fusion
        rrf_scores = {}
        k = 60  # RRF parameter

        for results in all_results:
            for rank, hit in enumerate(results):
                doc_id = hit.entity.get('document_id')
                if doc_id not in rrf_scores:
                    rrf_scores[doc_id] = 0

                rrf_scores[doc_id] += 1.0 / (k + rank + 1)

        final_scores = rrf_scores

    # Sort and return top results
    sorted_results = sorted(
        final_scores.items(),
        key=lambda x: x[1],
        reverse=(aggregation == "reciprocal_rank")
    )

    return sorted_results[:10]

Partitions and Segments

Creating and Managing Partitions

# Create partitions for data organization
collection.create_partition("2024_Q1")
collection.create_partition("2024_Q2")
collection.create_partition("2024_Q3")
collection.create_partition("2024_Q4")

# List partitions
partitions = collection.partitions
for partition in partitions:
    print(f"Partition: {partition.name}, Entities: {partition.num_entities}")

# Insert into specific partition
partition = collection.partition("2024_Q1")
partition.insert(data)

# Load specific partitions for search
collection.load(partition_names=["2024_Q3", "2024_Q4"])

# Search within partitions
results = collection.search(
    data=search_vectors,
    anns_field="embedding",
    param=search_params,
    limit=10,
    partition_names=["2024_Q3", "2024_Q4"]
)

# Drop partition
collection.drop_partition("2024_Q1")

Segment Management

# Get segment information
from pymilvus import utility

# Get query segment info
segments = utility.get_query_segment_info("documents")
for segment in segments:
    print(f"Segment ID: {segment.segmentID}")
    print(f"Collection: {segment.collectionID}")
    print(f"Partition: {segment.partitionID}")
    print(f"Mem size: {segment.mem_size}")
    print(f"Num rows: {segment.num_rows}")
    print(f"Index: {segment.index_name}")

# Compact segments
collection.compact()

# Get compaction state
compaction_state = collection.get_compaction_state()
print(f"Compaction state: {compaction_state}")

# Manual flush
collection.flush(_async=False)

Performance Optimization

1. Index Optimization

def optimize_index_for_dataset(collection, num_entities, dim):
    """
    Choose optimal index based on dataset characteristics
    """

    if num_entities < 10000:
        # Small dataset: use FLAT for accuracy
        index_params = {
            "metric_type": "L2",
            "index_type": "FLAT"
        }
    elif num_entities < 100000:
        # Medium dataset: use IVF_FLAT
        index_params = {
            "metric_type": "L2",
            "index_type": "IVF_FLAT",
            "params": {"nlist": int(np.sqrt(num_entities))}
        }
    elif num_entities < 1000000:
        # Large dataset: use IVF_SQ8
        index_params = {
            "metric_type": "L2",
            "index_type": "IVF_SQ8",
            "params": {"nlist": int(4 * np.sqrt(num_entities))}
        }
    else:
        # Very large dataset: use IVF_PQ or HNSW
        if dim > 128:
            # High dimension: use PQ for compression
            index_params = {
                "metric_type": "L2",
                "index_type": "IVF_PQ",
                "params": {
                    "nlist": 4096,
                    "m": 8,  # PQ subvector count
                    "nbits": 8  # Bits per subvector
                }
            }
        else:
            # Lower dimension: use HNSW
            index_params = {
                "metric_type": "L2",
                "index_type": "HNSW",
                "params": {
                    "M": 16,
                    "efConstruction": 200
                }
            }

    collection.create_index(
        field_name="embedding",
        index_params=index_params
    )

    return index_params

2. Search Optimization

def optimize_search_params(index_type, recall_requirement=0.9):
    """
    Get optimal search parameters based on index type and recall requirement
    """

    search_params = {"metric_type": "L2"}

    if index_type == "IVF_FLAT" or index_type == "IVF_SQ8":
        if recall_requirement >= 0.95:
            search_params["params"] = {"nprobe": 64}
        elif recall_requirement >= 0.9:
            search_params["params"] = {"nprobe": 32}
        else:
            search_params["params"] = {"nprobe": 16}

    elif index_type == "IVF_PQ":
        if recall_requirement >= 0.95:
            search_params["params"] = {"nprobe": 128}
        elif recall_requirement >= 0.9:
            search_params["params"] = {"nprobe": 64}
        else:
            search_params["params"] = {"nprobe": 32}

    elif index_type == "HNSW":
        if recall_requirement >= 0.95:
            search_params["params"] = {"ef": 128}
        elif recall_requirement >= 0.9:
            search_params["params"] = {"ef": 64}
        else:
            search_params["params"] = {"ef": 32}

    return search_params

3. Batch Processing Optimization

class MilvusBatchProcessor:
    """Optimized batch processing for Milvus operations"""

    def __init__(self, collection, batch_size=1000):
        self.collection = collection
        self.batch_size = batch_size
        self.buffer = []

    def add_to_buffer(self, data):
        """Add data to buffer and auto-flush when full"""
        self.buffer.append(data)

        if len(self.buffer) >= self.batch_size:
            self.flush()

    def flush(self):
        """Flush buffer to Milvus"""
        if not self.buffer:
            return

        # Reorganize data for batch insert
        batch_data = [[] for _ in range(len(self.buffer[0]))]

        for item in self.buffer:
            for i, field_value in enumerate(item):
                batch_data[i].append(field_value)

        # Insert batch
        self.collection.insert(batch_data)
        self.buffer = []

    def search_parallel(self, query_vectors, num_threads=4):
        """Parallel search for multiple queries"""
        import concurrent.futures

        def single_search(vector):
            return self.collection.search(
                data=[vector],
                anns_field="embedding",
                param={"metric_type": "L2", "params": {"nprobe": 16}},
                limit=10
            )[0]

        with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
            results = list(executor.map(single_search, query_vectors))

        return results

4. Memory Optimization

# Configure memory settings
from pymilvus import utility

# Set memory limit for index
utility.set_config("queryNode.cache.memoryLimit", "4GB")

# Configure segment settings
utility.set_config("dataCoord.segment.maxSize", "256MB")
utility.set_config("dataCoord.segment.sealProportion", "0.25")

# Release collection from memory when not needed
collection.release()

# Load only needed partitions
collection.load(partition_names=["recent_data"])

# Use resource groups for isolation
utility.create_resource_group(
    name="high_priority",
    config={
        "limits": {"node_num": 2},
        "requests": {"node_num": 1}
    }
)

# Transfer replicas to resource group
utility.update_resource_groups(
    source="__default__",
    target="high_priority",
    collection_names=["documents"]
)

Monitoring and Maintenance

Monitoring Metrics

from pymilvus import utility
import json

def get_milvus_metrics():
    """Get comprehensive Milvus metrics"""

    metrics = {}

    # System info
    metrics['server_version'] = utility.get_server_version()

    # Collections info
    collections = utility.list_collections()
    metrics['collections'] = {}

    for coll_name in collections:
        coll = Collection(coll_name)
        coll.load()

        metrics['collections'][coll_name] = {
            'num_entities': coll.num_entities,
            'partitions': len(coll.partitions),
            'indexes': [idx.field_name for idx in coll.indexes],
            'loaded': utility.load_state(coll_name)
        }

        # Get query segment info
        segments = utility.get_query_segment_info(coll_name)
        metrics['collections'][coll_name]['segments'] = [
            {
                'id': seg.segmentID,
                'rows': seg.num_rows,
                'mem_size': seg.mem_size,
                'index': seg.index_name
            }
            for seg in segments
        ]

    # Resource groups
    resource_groups = utility.list_resource_groups()
    metrics['resource_groups'] = resource_groups

    return metrics

# Get and display metrics
metrics = get_milvus_metrics()
print(json.dumps(metrics, indent=2))

Maintenance Tasks

def perform_maintenance(collection_name):
    """Perform routine maintenance tasks"""

    collection = Collection(collection_name)

    # 1. Compact segments
    print("Compacting segments...")
    collection.compact()

    # Wait for compaction
    import time
    while True:
        state = collection.get_compaction_state()
        if state.state != "Executing":
            break
        time.sleep(5)

    print(f"Compaction completed: {state}")

    # 2. Rebuild index if needed
    segments = utility.get_query_segment_info(collection_name)
    unindexed_rows = sum(seg.num_rows for seg in segments if not seg.index_name)

    if unindexed_rows > 10000:  # Threshold for rebuilding
        print(f"Rebuilding index for {unindexed_rows} unindexed rows...")
        collection.drop_index()
        collection.create_index(
            field_name="embedding",
            index_params={
                "metric_type": "L2",
                "index_type": "IVF_SQ8",
                "params": {"nlist": 2048}
            }
        )

    # 3. Balance segments across nodes
    utility.load_balance(
        collection_name=collection_name,
        src_node_id=1,
        dst_node_ids=[2, 3],
        sealed_segment_ids=None  # Balance all segments
    )

    # 4. Clean up deleted data
    collection.flush()

    print("Maintenance completed")

Backup and Recovery

def backup_collection(collection_name, backup_path):
    """Backup collection data and schema"""

    import pickle
    import os

    collection = Collection(collection_name)

    # Get schema
    schema = collection.schema.to_dict()

    # Query all data
    all_data = collection.query(
        expr="id > 0",
        output_fields=["*"],
        limit=None
    )

    # Save backup
    backup = {
        "schema": schema,
        "data": all_data,
        "num_entities": collection.num_entities,
        "timestamp": datetime.now().isoformat()
    }

    backup_file = os.path.join(backup_path, f"{collection_name}_backup.pkl")

    with open(backup_file, 'wb') as f:
        pickle.dump(backup, f)

    print(f"Backup saved to {backup_file}")
    return backup_file

def restore_collection(backup_file, new_collection_name=None):
    """Restore collection from backup"""

    import pickle

    with open(backup_file, 'rb') as f:
        backup = pickle.load(f)

    # Create collection from schema
    schema_dict = backup['schema']
    fields = []

    for field in schema_dict['fields']:
        fields.append(FieldSchema(
            name=field['name'],
            dtype=DataType[field['type']],
            **field.get('params', {})
        ))

    schema = CollectionSchema(
        fields=fields,
        description=schema_dict.get('description', '')
    )

    collection_name = new_collection_name or schema_dict['name']
    collection = Collection(name=collection_name, schema=schema)

    # Restore data
    if backup['data']:
        # Reorganize data for insert
        field_names = [field['name'] for field in schema_dict['fields'] if not field.get('auto_id')]

        insert_data = []
        for field_name in field_names:
            field_data = [item[field_name] for item in backup['data']]
            insert_data.append(field_data)

        collection.insert(insert_data)
        collection.flush()

    print(f"Collection {collection_name} restored with {collection.num_entities} entities")
    return collection

Troubleshooting

Common Issues and Solutions

1. Connection Issues

def diagnose_connection():
    """Diagnose Milvus connection issues"""

    import socket

    host = "localhost"
    port = 19530

    # Check if port is open
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex((host, port))

    if result == 0:
        print(f"✓ Port {port} is open")
    else:
        print(f"✗ Port {port} is closed")
        return False

    # Try connecting
    try:
        connections.connect(alias="test", host=host, port=str(port))
        print("✓ Successfully connected to Milvus")

        # Check server version
        version = utility.get_server_version()
        print(f"✓ Server version: {version}")

        connections.disconnect("test")
        return True

    except Exception as e:
        print(f"✗ Connection failed: {e}")
        return False

2. Search Performance Issues

def diagnose_search_performance(collection_name):
    """Diagnose search performance issues"""

    collection = Collection(collection_name)

    # Check if collection is loaded
    load_state = utility.load_state(collection_name)
    print(f"Load state: {load_state}")

    if load_state != "Loaded":
        print("⚠ Collection not loaded in memory")
        collection.load()

    # Check index status
    indexes = collection.indexes
    if not indexes:
        print("⚠ No index created")
    else:
        for index in indexes:
            print(f"Index on {index.field_name}: {index.params}")

    # Check segment info
    segments = utility.get_query_segment_info(collection_name)
    print(f"Number of segments: {len(segments)}")

    # Check for too many small segments
    small_segments = [s for s in segments if s.num_rows < 1024]
    if len(small_segments) > 10:
        print(f"⚠ Too many small segments ({len(small_segments)}), consider compaction")

    # Test search performance
    import time
    import numpy as np

    test_vector = np.random.random(768).tolist()

    # Warm up
    collection.search(
        data=[test_vector],
        anns_field="embedding",
        param={"metric_type": "L2", "params": {"nprobe": 16}},
        limit=10
    )

    # Measure search time
    times = []
    for _ in range(10):
        start = time.time()
        collection.search(
            data=[test_vector],
            anns_field="embedding",
            param={"metric_type": "L2", "params": {"nprobe": 16}},
            limit=10
        )
        times.append(time.time() - start)

    avg_time = sum(times) / len(times)
    print(f"Average search time: {avg_time*1000:.2f}ms")

    if avg_time > 0.1:
        print("⚠ Search is slow, consider:")
        print("  - Reducing nprobe parameter")
        print("  - Using a faster index type (IVF_SQ8, IVF_PQ)")
        print("  - Adding more query nodes")

3. Memory Issues

def diagnose_memory_usage(collection_name):
    """Diagnose memory usage issues"""

    collection = Collection(collection_name)

    # Get memory usage
    segments = utility.get_query_segment_info(collection_name)
    total_memory = sum(seg.mem_size for seg in segments)

    print(f"Total memory usage: {total_memory / (1024**3):.2f} GB")
    print(f"Number of segments: {len(segments)}")

    # Memory per segment
    for seg in segments:
        print(f"  Segment {seg.segmentID}: {seg.mem_size / (1024**2):.2f} MB ({seg.num_rows} rows)")

    # Suggestions
    if total_memory > 10 * (1024**3):  # > 10GB
        print("\n⚠ High memory usage detected")
        print("Suggestions:")
        print("  - Use disk-based index (DiskANN)")
        print("  - Enable scalar field filtering to reduce loaded data")
        print("  - Use smaller embedding dimensions")
        print("  - Partition data and load only needed partitions")
        print("  - Use quantization (IVF_SQ8, IVF_PQ)")

Error Recovery

def recover_from_errors(collection_name):
    """Recover from common error states"""

    try:
        collection = Collection(collection_name)

        # Try to release and reload
        print("Attempting to release and reload collection...")
        try:
            collection.release()
        except:
            pass

        time.sleep(2)

        # Reload
        collection.load()

        # Verify
        state = utility.load_state(collection_name)
        if state == "Loaded":
            print("✓ Collection recovered and loaded")
        else:
            print("✗ Failed to load collection")

            # Try dropping and recreating index
            print("Attempting to rebuild index...")
            collection.drop_index()
            collection.create_index(
                field_name="embedding",
                index_params={
                    "metric_type": "L2",
                    "index_type": "IVF_FLAT",
                    "params": {"nlist": 128}
                }
            )
            collection.load()

    except Exception as e:
        print(f"Recovery failed: {e}")
        print("Manual intervention may be required")

Best Practices

1. Schema Design

# Good schema design practices
good_schema = CollectionSchema(
    fields=[
        # Use auto_id for primary key
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),

        # Use appropriate max_length for VARCHAR
        FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=100),

        # Use JSON for flexible metadata
        FieldSchema(name="metadata", dtype=DataType.JSON),

        # Choose appropriate vector dimension
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)
    ],
    # Enable dynamic fields for flexibility
    enable_dynamic_field=True,

    # Add meaningful description
    description="Document embeddings with metadata"
)

2. Insert Best Practices

# Batch inserts for efficiency
BATCH_SIZE = 10000

# Use context manager for automatic flush
with collection.insert_context():
    for batch in data_batches:
        collection.insert(batch)
# Automatic flush on context exit

3. Search Best Practices

# Pre-filter to reduce search space
efficient_search = collection.search(
    data=query_vectors,
    anns_field="embedding",
    param=search_params,
    limit=10,
    # Filter first, then search
    expr="timestamp > 1609459200 AND category IN ['A', 'B']",
    # Only return needed fields
    output_fields=["doc_id", "title"],
    # Use consistency level based on needs
    consistency_level="Session"
)

4. Production Configuration

# production-milvus.yml
etcd:
  endpoints:
    - etcd-0:2379
    - etcd-1:2379
    - etcd-2:2379

minio:
  address: minio:9000
  useSSL: true
  bucketName: milvus-prod

proxy:
  port: 19530
  maxImportSize: 17179869184  # 16GB

dataCoord:
  segment:
    maxSize: 1024  # MB
    sealProportion: 0.25

queryCoord:
  autoHandoff: true
  autoBalance: true
  overloadedMemoryThresholdPercentage: 90

queryNode:
  cache:
    enabled: true
    memoryLimit: 8192  # MB

log:
  level: warn
  file:
    maxSize: 1024  # MB
    maxBackups: 10

Next Steps

  1. Learn about Enterprise Features: See the Enterprise Features Guide
  2. Configure Authentication: Read the Authentication & Security Guide
  3. Deploy to Production: Follow the Production Deployment Guide