Skip to main content

Overview

This guide covers everything you need to know to deploy Mini RAG applications to production, including performance optimization, monitoring, security, and scalability.

Pre-Deployment Checklist

1

Environment Configuration

Set up proper environment variables and secrets management
2

Resource Planning

Determine infrastructure requirements (CPU, memory, GPU)
3

Monitoring Setup

Configure observability and logging
4

Security Review

Implement authentication and rate limiting
5

Performance Testing

Load test and optimize configuration

Environment Configuration

Environment Variables

Never hardcode credentials. Use environment variables:
# Production .env
OPENAI_API_KEY=sk-prod-...
MILVUS_URI=https://prod-milvus.company.com
MILVUS_TOKEN=${VAULT_MILVUS_TOKEN}
COHERE_API_KEY=${VAULT_COHERE_KEY}

# Langfuse for monitoring
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=${VAULT_LANGFUSE_SECRET}
LANGFUSE_HOST=https://cloud.langfuse.com

# Application settings
LOG_LEVEL=INFO
API_RATE_LIMIT=100
MAX_UPLOAD_SIZE=10485760  # 10MB

Secrets Management

Use a proper secrets manager:

AWS Secrets Manager

import boto3
import json
from botocore.exceptions import ClientError

def get_secret(secret_name: str) -> dict:
    """Get secret from AWS Secrets Manager."""
    client = boto3.client('secretsmanager', region_name='us-east-1')
    
    try:
        response = client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    except ClientError as e:
        raise Exception(f"Failed to get secret: {e}")

# Load secrets
secrets = get_secret("prod/minirag/credentials")
os.environ['OPENAI_API_KEY'] = secrets['openai_key']
os.environ['MILVUS_TOKEN'] = secrets['milvus_token']

HashiCorp Vault

import hvac

def load_vault_secrets():
    """Load secrets from Vault."""
    client = hvac.Client(url='https://vault.company.com')
    client.auth.approle.login(
        role_id=os.getenv('VAULT_ROLE_ID'),
        secret_id=os.getenv('VAULT_SECRET_ID')
    )
    
    secrets = client.secrets.kv.v2.read_secret_version(
        path='prod/minirag'
    )
    
    return secrets['data']['data']

Performance Optimization

Configuration Tuning

Optimize for production workloads:
from mini import AgenticRAG, LLMConfig, RetrievalConfig, RerankerConfig

rag = AgenticRAG(
    vector_store=vector_store,
    embedding_model=embedding_model,
    llm_config=LLMConfig(
        model="gpt-4o-mini",  # Faster, cheaper for production
        temperature=0.3,       # Lower for consistent answers
        timeout=30.0,          # Shorter timeout
        max_retries=2          # Fewer retries
    ),
    retrieval_config=RetrievalConfig(
        top_k=10,              # Balance speed/quality
        rerank_top_k=3,        # Keep it focused
        use_query_rewriting=True,
        use_reranking=True,
        use_hybrid_search=True  # Better quality
    ),
    reranker_config=RerankerConfig(
        type="cohere",         # High quality, fast
        kwargs={
            "model": "rerank-english-v3.0"
        }
    )
)

Caching

Implement caching for common queries:
from functools import lru_cache
from hashlib import sha256
import redis
import json

# Redis cache
redis_client = redis.Redis(
    host='redis.company.com',
    port=6379,
    db=0,
    decode_responses=True
)

def cached_query(rag, question: str, ttl: int = 3600):
    """Query with Redis caching."""
    # Create cache key
    cache_key = f"rag_query:{sha256(question.encode()).hexdigest()}"
    
    # Check cache
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Query RAG
    response = rag.query(question)
    result = {
        "answer": response.answer,
        "sources": len(response.retrieved_chunks)
    }
    
    # Cache result
    redis_client.setex(cache_key, ttl, json.dumps(result))
    
    return result

# Memory cache for very hot queries
@lru_cache(maxsize=1000)
def memory_cached_query(question: str):
    """In-memory cache for hottest queries."""
    return rag.query(question)

Connection Pooling

Reuse connections efficiently:
from mini import VectorStore
import atexit

# Single global connection
_vector_store = None

def get_vector_store():
    """Get or create vector store connection."""
    global _vector_store
    
    if _vector_store is None:
        _vector_store = VectorStore(
            uri=os.getenv("MILVUS_URI"),
            token=os.getenv("MILVUS_TOKEN"),
            collection_name="production",
            dimension=1536
        )
        
        # Ensure cleanup on exit
        atexit.register(lambda: _vector_store.disconnect())
    
    return _vector_store

Batch Processing

Process documents in batches:
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed

def index_documents_batch(
    rag: AgenticRAG,
    document_paths: List[str],
    max_workers: int = 4
):
    """Index documents in parallel."""
    results = {}
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_doc = {
            executor.submit(rag.index_document, doc): doc
            for doc in document_paths
        }
        
        # Collect results
        for future in as_completed(future_to_doc):
            doc = future_to_doc[future]
            try:
                num_chunks = future.result()
                results[doc] = {"status": "success", "chunks": num_chunks}
            except Exception as e:
                results[doc] = {"status": "error", "error": str(e)}
    
    return results

Monitoring and Observability

Langfuse Integration

Enable comprehensive monitoring:
from mini import ObservabilityConfig

rag = AgenticRAG(
    vector_store=vector_store,
    embedding_model=embedding_model,
    observability_config=ObservabilityConfig(
        enabled=True,
        public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
        secret_key=os.getenv("LANGFUSE_SECRET_KEY")
    )
)

Logging

Implement structured logging:
import logging
import json
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)

class StructuredLogger:
    """Structured JSON logging."""
    
    @staticmethod
    def log_query(question: str, response, duration: float):
        """Log query with structured data."""
        logger.info(json.dumps({
            "event": "query",
            "timestamp": datetime.utcnow().isoformat(),
            "question": question,
            "answer_length": len(response.answer),
            "num_sources": len(response.retrieved_chunks),
            "duration_ms": duration * 1000,
            "metadata": response.metadata
        }))
    
    @staticmethod
    def log_error(error: Exception, context: dict):
        """Log error with context."""
        logger.error(json.dumps({
            "event": "error",
            "timestamp": datetime.utcnow().isoformat(),
            "error_type": type(error).__name__,
            "error_message": str(error),
            "context": context
        }))

# Usage
import time

start = time.time()
try:
    response = rag.query(question)
    duration = time.time() - start
    StructuredLogger.log_query(question, response, duration)
except Exception as e:
    StructuredLogger.log_error(e, {"question": question})

Metrics

Track key metrics:
from prometheus_client import Counter, Histogram, Gauge
from fastapi import FastAPI
from prometheus_client import make_asgi_app

# Define metrics
query_counter = Counter(
    'rag_queries_total',
    'Total number of queries'
)
query_duration = Histogram(
    'rag_query_duration_seconds',
    'Query duration in seconds'
)
index_counter = Counter(
    'rag_documents_indexed_total',
    'Total documents indexed'
)
error_counter = Counter(
    'rag_errors_total',
    'Total errors',
    ['error_type']
)
active_requests = Gauge(
    'rag_active_requests',
    'Number of active requests'
)

# FastAPI integration
app = FastAPI()

# Mount Prometheus metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)

@app.post("/query")
async def query(request: QueryRequest):
    """Query with metrics."""
    active_requests.inc()
    query_counter.inc()
    
    try:
        with query_duration.time():
            response = rag.query(request.question)
        return {"answer": response.answer}
    except Exception as e:
        error_counter.labels(error_type=type(e).__name__).inc()
        raise
    finally:
        active_requests.dec()

Security

API Authentication

Implement proper authentication:
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta

security = HTTPBearer()

def create_token(user_id: str) -> str:
    """Create JWT token."""
    payload = {
        "user_id": user_id,
        "exp": datetime.utcnow() + timedelta(hours=24)
    }
    return jwt.encode(payload, os.getenv("JWT_SECRET"), algorithm="HS256")

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """Verify JWT token."""
    try:
        payload = jwt.decode(
            credentials.credentials,
            os.getenv("JWT_SECRET"),
            algorithms=["HS256"]
        )
        return payload["user_id"]
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/query")
async def query(
    request: QueryRequest,
    user_id: str = Depends(verify_token)
):
    """Protected query endpoint."""
    logger.info(f"Query from user: {user_id}")
    response = rag.query(request.question)
    return {"answer": response.answer}

Rate Limiting

Prevent abuse with rate limiting:
from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/query")
@limiter.limit("10/minute")  # 10 requests per minute
async def query(request: Request, query_request: QueryRequest):
    """Rate-limited query endpoint."""
    response = rag.query(query_request.question)
    return {"answer": response.answer}

Input Validation

Validate and sanitize inputs:
from pydantic import BaseModel, validator, Field

class QueryRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=500)
    top_k: int = Field(default=10, ge=1, le=50)
    rerank_top_k: int = Field(default=3, ge=1, le=10)
    
    @validator('question')
    def validate_question(cls, v):
        """Validate question content."""
        # Remove dangerous characters
        dangerous_chars = ['<', '>', '{', '}', '\\x00']
        for char in dangerous_chars:
            if char in v:
                raise ValueError(f"Invalid character: {char}")
        return v.strip()

Deployment Strategies

Docker Deployment

Create a production Dockerfile:
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Kubernetes Deployment

Deploy with Kubernetes:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: minirag-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: minirag-api
  template:
    metadata:
      labels:
        app: minirag-api
    spec:
      containers:
      - name: minirag
        image: your-registry/minirag:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: minirag-secrets
              key: openai-key
        - name: MILVUS_URI
          valueFrom:
            configMapKeyRef:
              name: minirag-config
              key: milvus-uri
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: minirag-service
spec:
  selector:
    app: minirag-api
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

AWS Lambda (Serverless)

Deploy as serverless function:
# lambda_handler.py
import json
import os
from mini import AgenticRAG, EmbeddingModel, VectorStore

# Initialize once (outside handler)
embedding_model = EmbeddingModel()
vector_store = VectorStore(
    uri=os.getenv("MILVUS_URI"),
    token=os.getenv("MILVUS_TOKEN"),
    collection_name="production",
    dimension=1536
)
rag = AgenticRAG(vector_store=vector_store, embedding_model=embedding_model)

def lambda_handler(event, context):
    """AWS Lambda handler."""
    try:
        body = json.loads(event['body'])
        question = body.get('question')
        
        response = rag.query(question)
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'answer': response.answer,
                'sources': len(response.retrieved_chunks)
            })
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

Scaling Considerations

Horizontal Scaling

  • Use load balancer to distribute requests across multiple instances
  • Each instance maintains its own connection to Milvus
  • Consider read replicas for Milvus for better performance

Vertical Scaling

  • Increase CPU for faster embedding generation
  • Increase memory for larger models or caches
  • Use GPU instances for local reranking models

Database Scaling

  • Use Milvus cluster mode for high availability
  • Configure appropriate sharding and replication
  • Monitor and optimize index parameters

Disaster Recovery

Backup Strategy

# Backup vector store
def backup_vector_store():
    """Backup vector store data."""
    # Export collection
    rag.vector_store.export_collection("backup.json")
    
    # Upload to S3
    import boto3
    s3 = boto3.client('s3')
    s3.upload_file(
        'backup.json',
        'minirag-backups',
        f'backup-{datetime.now().strftime("%Y%m%d-%H%M%S")}.json'
    )

Health Checks

@app.get("/health")
async def health_check():
    """Comprehensive health check."""
    checks = {}
    
    # Check vector store
    try:
        count = rag.vector_store.count()
        checks['vector_store'] = {
            'status': 'healthy',
            'document_count': count
        }
    except Exception as e:
        checks['vector_store'] = {
            'status': 'unhealthy',
            'error': str(e)
        }
    
    # Check embedding model
    try:
        test_embedding = embedding_model.embed_query("test")
        checks['embedding_model'] = {'status': 'healthy'}
    except Exception as e:
        checks['embedding_model'] = {
            'status': 'unhealthy',
            'error': str(e)
        }
    
    # Overall status
    all_healthy = all(c['status'] == 'healthy' for c in checks.values())
    
    return {
        'status': 'healthy' if all_healthy else 'unhealthy',
        'checks': checks
    }

Next Steps