Enterprise Features Guide v0.1.18.0¶
Overview¶
NetIntel-OCR v0.1.18.0 includes enterprise-grade features for deduplication, performance monitoring, batch processing, and module management. This guide covers all enterprise capabilities designed for production deployments at scale.
Table of Contents¶
- Deduplication System
- Performance Monitoring
- Batch Processing
- Module Management
- Configuration Templates
- Distributed Processing
- High Availability
- Compliance and Audit
Deduplication System¶
Overview¶
The deduplication system uses multiple methods to identify and eliminate duplicate documents: - MD5: Exact duplicate detection - SimHash: Near-duplicate detection with hamming distance - CDC (Content-Defined Chunking): Block-level deduplication
Configuration¶
# dedup-config.yml
deduplication:
enabled: true
methods:
md5:
enabled: true
store_hashes: true
simhash:
enabled: true
bits: 128 # 64 or 128
hamming_threshold: 5 # 0-10 for similarity
cdc:
enabled: true
min_chunk_size: 4096
max_chunk_size: 16384
avg_chunk_size: 8192
algorithm: "xxhash" # xxhash, md5, sha256
storage:
backend: "redis" # redis, sqlite, postgresql
connection:
host: localhost
port: 6379
db: 0
Using Deduplication API¶
import requests
import hashlib
# Initialize dedup system
response = requests.post(
"http://localhost:8000/api/v2/deduplication/initialize",
headers={"Authorization": f"Bearer {token}"},
json={
"clear_existing": False,
"config": {
"simhash_bits": 128,
"cdc_enabled": True
}
}
)
# Check for duplicates before processing
def check_duplicate(file_path):
# Calculate MD5
with open(file_path, 'rb') as f:
md5_hash = hashlib.md5(f.read()).hexdigest()
response = requests.post(
"http://localhost:8000/api/v2/deduplication/check",
headers={"Authorization": f"Bearer {token}"},
json={
"document_path": file_path,
"md5_hash": md5_hash,
"dedup_mode": "full", # exact, fuzzy, hybrid, full
"simhash_bits": 128,
"hamming_threshold": 5
}
)
result = response.json()
if result["is_duplicate"]:
print(f"Duplicate found: {result['similar_documents']}")
return True
return False
# Find similar documents
def find_similar(document_id, threshold=0.85):
response = requests.post(
"http://localhost:8000/api/v2/deduplication/find-similar",
headers={"Authorization": f"Bearer {token}"},
json={
"document_id": document_id,
"similarity_threshold": threshold,
"include_cdc_analysis": True,
"limit": 20
}
)
return response.json()["similar_documents"]
Advanced Deduplication with CDC¶
class CDCDeduplicator:
"""Content-Defined Chunking for block-level deduplication"""
def __init__(self, min_size=4096, max_size=16384, avg_size=8192):
self.min_size = min_size
self.max_size = max_size
self.avg_size = avg_size
self.mask = avg_size - 1
self.chunk_store = {} # Hash -> content mapping
def chunk_document(self, content):
"""Split document into content-defined chunks"""
chunks = []
offset = 0
window_size = 48
while offset < len(content):
# Find chunk boundary using rolling hash
chunk_end = min(offset + self.max_size, len(content))
chunk_start = offset
# Look for chunk boundary
if offset + self.min_size < len(content):
for i in range(offset + self.min_size, chunk_end):
# Rolling hash window
window = content[i-window_size:i]
hash_val = self._rolling_hash(window)
if (hash_val & self.mask) == 0:
chunk_end = i
break
# Extract chunk
chunk = content[chunk_start:chunk_end]
chunk_hash = hashlib.xxhash64(chunk).hexdigest()
chunks.append({
"hash": chunk_hash,
"offset": chunk_start,
"size": len(chunk),
"is_duplicate": chunk_hash in self.chunk_store
})
# Store unique chunks
if chunk_hash not in self.chunk_store:
self.chunk_store[chunk_hash] = chunk
offset = chunk_end
return chunks
def _rolling_hash(self, data):
"""Calculate rolling hash using xxhash"""
import xxhash
return xxhash.xxh64(data).intdigest()
def calculate_dedup_ratio(self, chunks):
"""Calculate deduplication ratio"""
total_size = sum(c["size"] for c in chunks)
unique_size = sum(c["size"] for c in chunks if not c["is_duplicate"])
dedup_ratio = 1 - (unique_size / total_size) if total_size > 0 else 0
return {
"total_chunks": len(chunks),
"unique_chunks": sum(1 for c in chunks if not c["is_duplicate"]),
"total_size": total_size,
"unique_size": unique_size,
"dedup_ratio": dedup_ratio,
"space_saved": total_size - unique_size
}
Performance Monitoring¶
Real-time Metrics Collection¶
class PerformanceMonitor:
"""Enterprise performance monitoring system"""
def __init__(self, api_base="http://localhost:8000"):
self.api_base = api_base
self.metrics_history = []
def get_current_metrics(self):
"""Get current system metrics"""
response = requests.get(
f"{self.api_base}/api/v2/performance/metrics",
headers={"Authorization": f"Bearer {token}"}
)
metrics = response.json()
# Add timestamp
metrics["timestamp"] = datetime.now().isoformat()
self.metrics_history.append(metrics)
return metrics
def run_benchmark(self, test_type="comprehensive"):
"""Run performance benchmark"""
benchmarks = {
"simhash": {
"dataset_size": 10000,
"iterations": 5
},
"cdc": {
"dataset_size": 5000,
"iterations": 3
},
"vector_search": {
"dataset_size": 100000,
"iterations": 10
},
"kg_extraction": {
"dataset_size": 1000,
"iterations": 3
}
}
results = {}
for bench_type, params in benchmarks.items():
response = requests.post(
f"{self.api_base}/api/v2/performance/benchmark",
headers={"Authorization": f"Bearer {token}"},
json={
"test_type": bench_type,
**params
}
)
results[bench_type] = response.json()
return results
def analyze_performance(self):
"""Analyze performance trends"""
if len(self.metrics_history) < 2:
return None
# Calculate trends
cpu_trend = self._calculate_trend([m["cpu_usage"]["current"] for m in self.metrics_history])
memory_trend = self._calculate_trend([m["memory"]["used_gb"] for m in self.metrics_history])
# Identify bottlenecks
latest = self.metrics_history[-1]
bottlenecks = []
if latest["cpu_usage"]["current"] > 80:
bottlenecks.append("High CPU usage")
if latest["memory"]["used_gb"] / (latest["memory"]["used_gb"] + latest["memory"]["available_gb"]) > 0.9:
bottlenecks.append("High memory usage")
if latest["processing_stats"]["documents_per_hour"] < 50:
bottlenecks.append("Low throughput")
return {
"trends": {
"cpu": cpu_trend,
"memory": memory_trend
},
"bottlenecks": bottlenecks,
"recommendations": self._generate_recommendations(bottlenecks)
}
def _calculate_trend(self, values):
"""Calculate trend direction"""
if len(values) < 2:
return "stable"
recent = values[-5:]
avg_recent = sum(recent) / len(recent)
avg_all = sum(values) / len(values)
if avg_recent > avg_all * 1.1:
return "increasing"
elif avg_recent < avg_all * 0.9:
return "decreasing"
else:
return "stable"
def _generate_recommendations(self, bottlenecks):
"""Generate performance recommendations"""
recommendations = []
if "High CPU usage" in bottlenecks:
recommendations.append("Consider scaling horizontally or optimizing algorithms")
if "High memory usage" in bottlenecks:
recommendations.append("Enable swap or increase RAM allocation")
if "Low throughput" in bottlenecks:
recommendations.append("Increase worker count or optimize pipeline")
return recommendations
Custom Metrics¶
# Define custom metrics
custom_metrics = {
"document_processing": {
"pages_per_second": 0,
"ocr_accuracy": 0,
"extraction_errors": 0
},
"search_performance": {
"avg_latency_ms": 0,
"queries_per_second": 0,
"cache_hit_rate": 0
},
"storage_efficiency": {
"dedup_ratio": 0,
"compression_ratio": 0,
"storage_saved_gb": 0
}
}
# Report custom metrics
response = requests.post(
"http://localhost:8000/api/v2/performance/metrics/custom",
headers={"Authorization": f"Bearer {token}"},
json=custom_metrics
)
Batch Processing¶
Batch Job Configuration¶
# batch-config.yml
batch_processing:
max_workers: 16
worker_type: "process" # thread, process, distributed
batch_size: 100
checkpoint_interval: 500
memory_limit: "8GB"
queue:
type: "redis" # redis, rabbitmq, sqs
max_size: 10000
ttl: 86400
error_handling:
max_retries: 3
retry_delay: 60
dead_letter_queue: true
output:
format: "jsonl"
compression: "gzip"
partitioning: "date" # date, size, count
Batch Processing Implementation¶
class EnterpriseBatchProcessor:
"""Enterprise batch processing system"""
def __init__(self, config_path="batch-config.yml"):
self.config = self._load_config(config_path)
self.job_queue = []
self.results = {}
def submit_batch_job(self, input_path, output_path, options=None):
"""Submit batch processing job"""
job_request = {
"input_path": input_path,
"output_path": output_path,
"parallel_workers": self.config["max_workers"],
"batch_size": self.config["batch_size"],
"checkpoint_interval": self.config["checkpoint_interval"],
"auto_merge": True,
"skip_existing": True,
"processing_options": options or {
"enable_ocr": True,
"enable_kg": True,
"enable_vector": True,
"enable_dedup": True
}
}
response = requests.post(
"http://localhost:8000/api/v2/batch/submit",
headers={"Authorization": f"Bearer {token}"},
json=job_request
)
job_info = response.json()
self.job_queue.append(job_info["batch_id"])
return job_info
def monitor_batch_jobs(self):
"""Monitor all batch jobs"""
monitoring_data = {}
for batch_id in self.job_queue:
response = requests.get(
f"http://localhost:8000/api/v2/batch/{batch_id}/progress",
headers={"Authorization": f"Bearer {token}"}
)
progress = response.json()
monitoring_data[batch_id] = {
"status": progress["status"],
"progress": f"{progress['processed']}/{progress['total_documents']}",
"throughput": progress["current_throughput"],
"eta": progress["eta_minutes"],
"errors": progress.get("failed", 0)
}
# Alert on issues
if progress["failed"] > 10:
self._send_alert(f"High error rate in batch {batch_id}: {progress['failed']} failures")
return monitoring_data
def handle_failed_documents(self, batch_id):
"""Handle failed documents in batch"""
response = requests.get(
f"http://localhost:8000/api/v2/batch/{batch_id}/failures",
headers={"Authorization": f"Bearer {token}"}
)
failures = response.json()
# Retry failed documents
retry_batch = {
"documents": failures["failed_documents"],
"retry_strategy": "exponential_backoff",
"max_retries": 5
}
response = requests.post(
f"http://localhost:8000/api/v2/batch/{batch_id}/retry",
headers={"Authorization": f"Bearer {token}"},
json=retry_batch
)
return response.json()
def optimize_batch_performance(self, batch_id):
"""Optimize batch processing based on performance"""
# Get current performance metrics
response = requests.get(
f"http://localhost:8000/api/v2/batch/{batch_id}/metrics",
headers={"Authorization": f"Bearer {token}"}
)
metrics = response.json()
# Calculate optimal settings
optimization = {}
if metrics["avg_document_time"] > 60: # Slow processing
optimization["parallel_workers"] = min(32, self.config["max_workers"] * 2)
optimization["batch_size"] = max(10, self.config["batch_size"] // 2)
if metrics["memory_usage"] > 0.8: # High memory
optimization["batch_size"] = max(10, self.config["batch_size"] // 2)
optimization["enable_streaming"] = True
if metrics["error_rate"] > 0.1: # High errors
optimization["validation_level"] = "strict"
optimization["retry_policy"] = "aggressive"
# Apply optimizations
if optimization:
response = requests.patch(
f"http://localhost:8000/api/v2/batch/{batch_id}/optimize",
headers={"Authorization": f"Bearer {token}"},
json=optimization
)
return response.json()
return {"status": "no_optimization_needed"}
Distributed Batch Processing¶
import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
class DistributedBatchProcessor:
"""Distributed batch processing across multiple nodes"""
def __init__(self, nodes):
self.nodes = nodes # List of worker node URLs
self.task_queue = mp.Queue()
self.result_queue = mp.Queue()
async def distribute_work(self, documents, strategy="round_robin"):
"""Distribute documents across worker nodes"""
if strategy == "round_robin":
node_assignments = self._round_robin_distribution(documents)
elif strategy == "load_balanced":
node_assignments = await self._load_balanced_distribution(documents)
elif strategy == "locality_aware":
node_assignments = self._locality_aware_distribution(documents)
else:
raise ValueError(f"Unknown strategy: {strategy}")
# Submit work to nodes
tasks = []
for node, docs in node_assignments.items():
task = self._submit_to_node(node, docs)
tasks.append(task)
# Wait for all nodes to complete
results = await asyncio.gather(*tasks)
return self._merge_results(results)
def _round_robin_distribution(self, documents):
"""Simple round-robin distribution"""
assignments = {node: [] for node in self.nodes}
for i, doc in enumerate(documents):
node = self.nodes[i % len(self.nodes)]
assignments[node].append(doc)
return assignments
async def _load_balanced_distribution(self, documents):
"""Distribute based on node load"""
# Get load from each node
loads = {}
for node in self.nodes:
response = await self._get_node_load(node)
loads[node] = response["load"]
# Sort nodes by load
sorted_nodes = sorted(loads.items(), key=lambda x: x[1])
# Distribute proportionally
assignments = {node: [] for node in self.nodes}
for i, doc in enumerate(documents):
# Assign to least loaded node
node = sorted_nodes[0][0]
assignments[node].append(doc)
# Update load estimate
sorted_nodes[0] = (node, sorted_nodes[0][1] + 1)
sorted_nodes.sort(key=lambda x: x[1])
return assignments
async def _submit_to_node(self, node, documents):
"""Submit batch to worker node"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{node}/process_batch",
json={"documents": documents},
timeout=aiohttp.ClientTimeout(total=3600)
) as response:
return await response.json()
Module Management¶
Dynamic Module Configuration¶
class ModuleManager:
"""Enterprise module management system"""
def __init__(self):
self.modules = self._discover_modules()
self.config = self._load_module_config()
def _discover_modules(self):
"""Discover available modules"""
response = requests.get(
"http://localhost:8000/api/v2/modules/status",
headers={"Authorization": f"Bearer {token}"}
)
return response.json()
def enable_module(self, module_name, config=None):
"""Enable a specific module"""
request_data = {
"module": module_name,
"enabled": True,
"config": config or {}
}
response = requests.post(
f"http://localhost:8000/api/v2/modules/{module_name}/enable",
headers={"Authorization": f"Bearer {token}"},
json=request_data
)
return response.json()
def configure_modules(self, configuration):
"""Configure multiple modules"""
response = requests.post(
"http://localhost:8000/api/v2/modules/configure",
headers={"Authorization": f"Bearer {token}"},
json=configuration
)
return response.json()
def get_module_dependencies(self, module_name):
"""Get module dependencies"""
response = requests.get(
f"http://localhost:8000/api/v2/modules/{module_name}/dependencies",
headers={"Authorization": f"Bearer {token}"}
)
return response.json()
def validate_module_configuration(self):
"""Validate current module configuration"""
response = requests.post(
"http://localhost:8000/api/v2/modules/validate",
headers={"Authorization": f"Bearer {token}"}
)
validation_result = response.json()
if not validation_result["valid"]:
print("Configuration issues found:")
for issue in validation_result["issues"]:
print(f" - {issue}")
return validation_result
def optimize_module_loading(self):
"""Optimize module loading order"""
# Get module dependencies
dependencies = {}
for module in self.modules["installed"]:
deps = self.get_module_dependencies(module)
dependencies[module] = deps["dependencies"]
# Topological sort for optimal loading order
loading_order = self._topological_sort(dependencies)
return {
"optimal_order": loading_order,
"current_order": list(self.modules["installed"].keys()),
"recommendation": "Apply optimal loading order for faster startup"
}
def _topological_sort(self, dependencies):
"""Topological sort for dependency resolution"""
visited = set()
stack = []
def visit(module):
if module in visited:
return
visited.add(module)
for dep in dependencies.get(module, []):
visit(dep)
stack.append(module)
for module in dependencies:
visit(module)
return stack[::-1]
Configuration Templates¶
Template Management¶
class ConfigurationTemplateManager:
"""Manage and apply configuration templates"""
def __init__(self):
self.templates = self._load_templates()
def _load_templates(self):
"""Load available templates"""
response = requests.get(
"http://localhost:8000/api/v2/config/templates",
headers={"Authorization": f"Bearer {token}"}
)
return response.json()["templates"]
def apply_template(self, template_name, customizations=None):
"""Apply configuration template"""
request_data = {
"template": template_name,
"customize": customizations or {}
}
response = requests.post(
"http://localhost:8000/api/v2/config/apply-template",
headers={"Authorization": f"Bearer {token}"},
json=request_data
)
return response.json()
def create_custom_template(self, name, config):
"""Create custom configuration template"""
template_data = {
"name": name,
"description": f"Custom template created on {datetime.now()}",
"config": config,
"metadata": {
"created_by": "admin",
"version": "1.0.0",
"compatible_versions": ["0.1.18.0"]
}
}
response = requests.post(
"http://localhost:8000/api/v2/config/templates/create",
headers={"Authorization": f"Bearer {token}"},
json=template_data
)
return response.json()
def get_template_recommendations(self, workload_profile):
"""Get template recommendations based on workload"""
profiles = {
"small": "minimal",
"medium": "development",
"large": "production",
"enterprise": "enterprise",
"cloud": "cloud"
}
recommended = profiles.get(workload_profile, "minimal")
# Get template details
template = next((t for t in self.templates if t["name"] == recommended), None)
return {
"recommended_template": recommended,
"reason": f"Best suited for {workload_profile} workloads",
"config": template,
"alternatives": [t["name"] for t in self.templates if t["name"] != recommended]
}
Enterprise Configuration Examples¶
# enterprise-config.yml
enterprise:
deployment:
mode: "distributed"
nodes: 8
regions: ["us-east", "us-west", "eu-central"]
high_availability:
enabled: true
replication_factor: 3
failover_timeout: 30
health_check_interval: 10
security:
encryption_at_rest: true
encryption_in_transit: true
audit_logging: true
compliance_mode: ["HIPAA", "PCI-DSS", "SOC2"]
performance:
cache_size: "32GB"
worker_threads: 32
connection_pool_size: 100
batch_size: 1000
monitoring:
metrics_collection_interval: 10
log_level: "INFO"
alerting:
enabled: true
channels: ["email", "slack", "pagerduty"]
backup:
enabled: true
schedule: "0 2 * * *"
retention_days: 30
destinations: ["s3", "azure", "local"]
Distributed Processing¶
Multi-Node Architecture¶
class DistributedOrchestrator:
"""Orchestrate processing across multiple nodes"""
def __init__(self, config):
self.nodes = config["nodes"]
self.coordinator_url = config["coordinator"]
self.health_checker = HealthChecker(self.nodes)
async def process_distributed(self, job):
"""Process job across distributed nodes"""
# 1. Split job into tasks
tasks = self._split_job(job)
# 2. Check node health
healthy_nodes = await self.health_checker.get_healthy_nodes()
# 3. Distribute tasks
assignments = self._assign_tasks(tasks, healthy_nodes)
# 4. Execute tasks
results = await self._execute_distributed(assignments)
# 5. Handle failures
failed_tasks = [t for t in results if t["status"] == "failed"]
if failed_tasks:
results = await self._handle_failures(failed_tasks)
# 6. Aggregate results
return self._aggregate_results(results)
def _split_job(self, job):
"""Split job into distributable tasks"""
tasks = []
chunk_size = job["total_documents"] // len(self.nodes)
for i in range(len(self.nodes)):
start = i * chunk_size
end = start + chunk_size if i < len(self.nodes) - 1 else job["total_documents"]
tasks.append({
"id": f"task_{i}",
"start": start,
"end": end,
"job_id": job["id"],
"config": job["config"]
})
return tasks
async def _execute_distributed(self, assignments):
"""Execute tasks on assigned nodes"""
async def execute_on_node(node, task):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{node}/execute",
json=task,
timeout=aiohttp.ClientTimeout(total=3600)
) as response:
return await response.json()
except Exception as e:
return {
"status": "failed",
"error": str(e),
"task": task
}
tasks = []
for node, node_tasks in assignments.items():
for task in node_tasks:
tasks.append(execute_on_node(node, task))
return await asyncio.gather(*tasks)
High Availability¶
Failover Configuration¶
class HighAvailabilityManager:
"""Manage high availability and failover"""
def __init__(self, config):
self.primary = config["primary"]
self.replicas = config["replicas"]
self.health_check_interval = config["health_check_interval"]
self.failover_timeout = config["failover_timeout"]
async def monitor_health(self):
"""Monitor health of all nodes"""
while True:
# Check primary
primary_healthy = await self._check_node_health(self.primary)
if not primary_healthy:
# Initiate failover
await self._initiate_failover()
# Check replicas
for replica in self.replicas:
replica_healthy = await self._check_node_health(replica)
if not replica_healthy:
await self._handle_replica_failure(replica)
await asyncio.sleep(self.health_check_interval)
async def _initiate_failover(self):
"""Initiate failover to replica"""
# Select new primary
for replica in self.replicas:
if await self._check_node_health(replica):
# Promote replica to primary
await self._promote_to_primary(replica)
# Update configuration
self.replicas.remove(replica)
self.replicas.append(self.primary)
self.primary = replica
# Notify clients
await self._notify_failover(replica)
break
async def _promote_to_primary(self, node):
"""Promote replica to primary"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{node}/promote",
json={"role": "primary"}
) as response:
return await response.json()
Compliance and Audit¶
Audit System¶
class ComplianceAuditSystem:
"""Enterprise compliance and audit system"""
def __init__(self):
self.audit_log = []
self.compliance_rules = self._load_compliance_rules()
def log_operation(self, operation):
"""Log operation for audit"""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation["type"],
"user": operation["user"],
"resource": operation["resource"],
"result": operation["result"],
"metadata": operation.get("metadata", {})
}
self.audit_log.append(audit_entry)
# Check compliance
violations = self._check_compliance(audit_entry)
if violations:
self._handle_violations(violations)
# Persist to audit store
self._persist_audit_entry(audit_entry)
def generate_compliance_report(self, standard="SOC2"):
"""Generate compliance report"""
report = {
"standard": standard,
"period": {
"start": datetime.now() - timedelta(days=30),
"end": datetime.now()
},
"summary": {},
"details": [],
"violations": [],
"recommendations": []
}
# Analyze audit log
for entry in self.audit_log:
# Check against compliance rules
for rule in self.compliance_rules[standard]:
if not self._evaluate_rule(rule, entry):
report["violations"].append({
"rule": rule["name"],
"entry": entry,
"severity": rule["severity"]
})
# Generate summary
report["summary"] = {
"total_operations": len(self.audit_log),
"violations": len(report["violations"]),
"compliance_score": 100 - (len(report["violations"]) / len(self.audit_log) * 100)
}
return report
def _check_compliance(self, entry):
"""Check entry against compliance rules"""
violations = []
for standard, rules in self.compliance_rules.items():
for rule in rules:
if not self._evaluate_rule(rule, entry):
violations.append({
"standard": standard,
"rule": rule,
"entry": entry
})
return violations
def _evaluate_rule(self, rule, entry):
"""Evaluate compliance rule"""
# Example rule evaluation
if rule["type"] == "data_retention":
# Check if data retention policy is followed
return True
if rule["type"] == "access_control":
# Check if proper access controls are in place
return entry["user"] in rule["allowed_users"]
if rule["type"] == "encryption":
# Check if data is encrypted
return entry.get("metadata", {}).get("encrypted", False)
return True
Best Practices¶
1. Deduplication Strategy¶
# Optimal deduplication pipeline
def optimal_dedup_pipeline(document):
# 1. Quick MD5 check
if check_md5_exists(document):
return "exact_duplicate"
# 2. SimHash for near-duplicates
if check_simhash_similarity(document) > 0.95:
return "near_duplicate"
# 3. CDC for partial duplicates
chunks = cdc_chunk(document)
if get_dedup_ratio(chunks) > 0.5:
return "partial_duplicate"
return "unique"
2. Batch Processing Optimization¶
# Optimal batch configuration
optimal_batch = {
"workers": min(32, cpu_count() * 2),
"batch_size": 100 if memory_available() > 16 else 50,
"checkpoint_interval": 500,
"error_strategy": "dead_letter_queue",
"monitoring": "prometheus"
}
3. Module Loading¶
# Load modules in optimal order
module_order = [
"base",
"performance", # C++ extensions first
"kg", # Heavy modules
"vector",
"api", # API last
"mcp"
]
Next Steps¶
- Configure Security: See the Authentication & Security Guide
- Deploy to Production: Follow the Production Deployment Guide
- Learn API v2: Read the API v2 Guide