Overview
This guide covers everything you need to know to deploy Mini RAG applications to production, including performance optimization, monitoring, security, and scalability.Pre-Deployment Checklist
1
Environment Configuration
Set up proper environment variables and secrets management
2
Resource Planning
Determine infrastructure requirements (CPU, memory, GPU)
3
Monitoring Setup
Configure observability and logging
4
Security Review
Implement authentication and rate limiting
5
Performance Testing
Load test and optimize configuration
Environment Configuration
Environment Variables
Never hardcode credentials. Use environment variables:Copy
# Production .env
OPENAI_API_KEY=sk-prod-...
MILVUS_URI=https://prod-milvus.company.com
MILVUS_TOKEN=${VAULT_MILVUS_TOKEN}
COHERE_API_KEY=${VAULT_COHERE_KEY}
# Langfuse for monitoring
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=${VAULT_LANGFUSE_SECRET}
LANGFUSE_HOST=https://cloud.langfuse.com
# Application settings
LOG_LEVEL=INFO
API_RATE_LIMIT=100
MAX_UPLOAD_SIZE=10485760 # 10MB
Secrets Management
Use a proper secrets manager:AWS Secrets Manager
Copy
import boto3
import json
from botocore.exceptions import ClientError
def get_secret(secret_name: str) -> dict:
"""Get secret from AWS Secrets Manager."""
client = boto3.client('secretsmanager', region_name='us-east-1')
try:
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except ClientError as e:
raise Exception(f"Failed to get secret: {e}")
# Load secrets
secrets = get_secret("prod/minirag/credentials")
os.environ['OPENAI_API_KEY'] = secrets['openai_key']
os.environ['MILVUS_TOKEN'] = secrets['milvus_token']
HashiCorp Vault
Copy
import hvac
def load_vault_secrets():
"""Load secrets from Vault."""
client = hvac.Client(url='https://vault.company.com')
client.auth.approle.login(
role_id=os.getenv('VAULT_ROLE_ID'),
secret_id=os.getenv('VAULT_SECRET_ID')
)
secrets = client.secrets.kv.v2.read_secret_version(
path='prod/minirag'
)
return secrets['data']['data']
Performance Optimization
Configuration Tuning
Optimize for production workloads:Copy
from mini import AgenticRAG, LLMConfig, RetrievalConfig, RerankerConfig
rag = AgenticRAG(
vector_store=vector_store,
embedding_model=embedding_model,
llm_config=LLMConfig(
model="gpt-4o-mini", # Faster, cheaper for production
temperature=0.3, # Lower for consistent answers
timeout=30.0, # Shorter timeout
max_retries=2 # Fewer retries
),
retrieval_config=RetrievalConfig(
top_k=10, # Balance speed/quality
rerank_top_k=3, # Keep it focused
use_query_rewriting=True,
use_reranking=True,
use_hybrid_search=True # Better quality
),
reranker_config=RerankerConfig(
type="cohere", # High quality, fast
kwargs={
"model": "rerank-english-v3.0"
}
)
)
Caching
Implement caching for common queries:Copy
from functools import lru_cache
from hashlib import sha256
import redis
import json
# Redis cache
redis_client = redis.Redis(
host='redis.company.com',
port=6379,
db=0,
decode_responses=True
)
def cached_query(rag, question: str, ttl: int = 3600):
"""Query with Redis caching."""
# Create cache key
cache_key = f"rag_query:{sha256(question.encode()).hexdigest()}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Query RAG
response = rag.query(question)
result = {
"answer": response.answer,
"sources": len(response.retrieved_chunks)
}
# Cache result
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
# Memory cache for very hot queries
@lru_cache(maxsize=1000)
def memory_cached_query(question: str):
"""In-memory cache for hottest queries."""
return rag.query(question)
Connection Pooling
Reuse connections efficiently:Copy
from mini import VectorStore
import atexit
# Single global connection
_vector_store = None
def get_vector_store():
"""Get or create vector store connection."""
global _vector_store
if _vector_store is None:
_vector_store = VectorStore(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN"),
collection_name="production",
dimension=1536
)
# Ensure cleanup on exit
atexit.register(lambda: _vector_store.disconnect())
return _vector_store
Batch Processing
Process documents in batches:Copy
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
def index_documents_batch(
rag: AgenticRAG,
document_paths: List[str],
max_workers: int = 4
):
"""Index documents in parallel."""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_doc = {
executor.submit(rag.index_document, doc): doc
for doc in document_paths
}
# Collect results
for future in as_completed(future_to_doc):
doc = future_to_doc[future]
try:
num_chunks = future.result()
results[doc] = {"status": "success", "chunks": num_chunks}
except Exception as e:
results[doc] = {"status": "error", "error": str(e)}
return results
Monitoring and Observability
Langfuse Integration
Enable comprehensive monitoring:Copy
from mini import ObservabilityConfig
rag = AgenticRAG(
vector_store=vector_store,
embedding_model=embedding_model,
observability_config=ObservabilityConfig(
enabled=True,
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY")
)
)
Logging
Implement structured logging:Copy
import logging
import json
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class StructuredLogger:
"""Structured JSON logging."""
@staticmethod
def log_query(question: str, response, duration: float):
"""Log query with structured data."""
logger.info(json.dumps({
"event": "query",
"timestamp": datetime.utcnow().isoformat(),
"question": question,
"answer_length": len(response.answer),
"num_sources": len(response.retrieved_chunks),
"duration_ms": duration * 1000,
"metadata": response.metadata
}))
@staticmethod
def log_error(error: Exception, context: dict):
"""Log error with context."""
logger.error(json.dumps({
"event": "error",
"timestamp": datetime.utcnow().isoformat(),
"error_type": type(error).__name__,
"error_message": str(error),
"context": context
}))
# Usage
import time
start = time.time()
try:
response = rag.query(question)
duration = time.time() - start
StructuredLogger.log_query(question, response, duration)
except Exception as e:
StructuredLogger.log_error(e, {"question": question})
Metrics
Track key metrics:Copy
from prometheus_client import Counter, Histogram, Gauge
from fastapi import FastAPI
from prometheus_client import make_asgi_app
# Define metrics
query_counter = Counter(
'rag_queries_total',
'Total number of queries'
)
query_duration = Histogram(
'rag_query_duration_seconds',
'Query duration in seconds'
)
index_counter = Counter(
'rag_documents_indexed_total',
'Total documents indexed'
)
error_counter = Counter(
'rag_errors_total',
'Total errors',
['error_type']
)
active_requests = Gauge(
'rag_active_requests',
'Number of active requests'
)
# FastAPI integration
app = FastAPI()
# Mount Prometheus metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.post("/query")
async def query(request: QueryRequest):
"""Query with metrics."""
active_requests.inc()
query_counter.inc()
try:
with query_duration.time():
response = rag.query(request.question)
return {"answer": response.answer}
except Exception as e:
error_counter.labels(error_type=type(e).__name__).inc()
raise
finally:
active_requests.dec()
Security
API Authentication
Implement proper authentication:Copy
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
security = HTTPBearer()
def create_token(user_id: str) -> str:
"""Create JWT token."""
payload = {
"user_id": user_id,
"exp": datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, os.getenv("JWT_SECRET"), algorithm="HS256")
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token."""
try:
payload = jwt.decode(
credentials.credentials,
os.getenv("JWT_SECRET"),
algorithms=["HS256"]
)
return payload["user_id"]
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/query")
async def query(
request: QueryRequest,
user_id: str = Depends(verify_token)
):
"""Protected query endpoint."""
logger.info(f"Query from user: {user_id}")
response = rag.query(request.question)
return {"answer": response.answer}
Rate Limiting
Prevent abuse with rate limiting:Copy
from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/query")
@limiter.limit("10/minute") # 10 requests per minute
async def query(request: Request, query_request: QueryRequest):
"""Rate-limited query endpoint."""
response = rag.query(query_request.question)
return {"answer": response.answer}
Input Validation
Validate and sanitize inputs:Copy
from pydantic import BaseModel, validator, Field
class QueryRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=500)
top_k: int = Field(default=10, ge=1, le=50)
rerank_top_k: int = Field(default=3, ge=1, le=10)
@validator('question')
def validate_question(cls, v):
"""Validate question content."""
# Remove dangerous characters
dangerous_chars = ['<', '>', '{', '}', '\\x00']
for char in dangerous_chars:
if char in v:
raise ValueError(f"Invalid character: {char}")
return v.strip()
Deployment Strategies
Docker Deployment
Create a production Dockerfile:Copy
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Kubernetes Deployment
Deploy with Kubernetes:Copy
apiVersion: apps/v1
kind: Deployment
metadata:
name: minirag-api
spec:
replicas: 3
selector:
matchLabels:
app: minirag-api
template:
metadata:
labels:
app: minirag-api
spec:
containers:
- name: minirag
image: your-registry/minirag:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: minirag-secrets
key: openai-key
- name: MILVUS_URI
valueFrom:
configMapKeyRef:
name: minirag-config
key: milvus-uri
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: minirag-service
spec:
selector:
app: minirag-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
AWS Lambda (Serverless)
Deploy as serverless function:Copy
# lambda_handler.py
import json
import os
from mini import AgenticRAG, EmbeddingModel, VectorStore
# Initialize once (outside handler)
embedding_model = EmbeddingModel()
vector_store = VectorStore(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN"),
collection_name="production",
dimension=1536
)
rag = AgenticRAG(vector_store=vector_store, embedding_model=embedding_model)
def lambda_handler(event, context):
"""AWS Lambda handler."""
try:
body = json.loads(event['body'])
question = body.get('question')
response = rag.query(question)
return {
'statusCode': 200,
'body': json.dumps({
'answer': response.answer,
'sources': len(response.retrieved_chunks)
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
Scaling Considerations
Horizontal Scaling
- Use load balancer to distribute requests across multiple instances
- Each instance maintains its own connection to Milvus
- Consider read replicas for Milvus for better performance
Vertical Scaling
- Increase CPU for faster embedding generation
- Increase memory for larger models or caches
- Use GPU instances for local reranking models
Database Scaling
- Use Milvus cluster mode for high availability
- Configure appropriate sharding and replication
- Monitor and optimize index parameters
Disaster Recovery
Backup Strategy
Copy
# Backup vector store
def backup_vector_store():
"""Backup vector store data."""
# Export collection
rag.vector_store.export_collection("backup.json")
# Upload to S3
import boto3
s3 = boto3.client('s3')
s3.upload_file(
'backup.json',
'minirag-backups',
f'backup-{datetime.now().strftime("%Y%m%d-%H%M%S")}.json'
)
Health Checks
Copy
@app.get("/health")
async def health_check():
"""Comprehensive health check."""
checks = {}
# Check vector store
try:
count = rag.vector_store.count()
checks['vector_store'] = {
'status': 'healthy',
'document_count': count
}
except Exception as e:
checks['vector_store'] = {
'status': 'unhealthy',
'error': str(e)
}
# Check embedding model
try:
test_embedding = embedding_model.embed_query("test")
checks['embedding_model'] = {'status': 'healthy'}
except Exception as e:
checks['embedding_model'] = {
'status': 'unhealthy',
'error': str(e)
}
# Overall status
all_healthy = all(c['status'] == 'healthy' for c in checks.values())
return {
'status': 'healthy' if all_healthy else 'unhealthy',
'checks': checks
}
