Overview
Mini RAG is designed as a library that can be easily integrated into your Python applications. This guide shows you how to add RAG capabilities to various types of applications.Integration Patterns
REST API
FastAPI or Flask backend
Chatbot
Conversational interfaces
CLI Tool
Command-line applications
Data Pipeline
ETL and batch processing
FastAPI Integration
Create a REST API for your RAG system.Basic Setup
Copy
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from mini import AgenticRAG, EmbeddingModel, VectorStore
import os
app = FastAPI(title="RAG API")
# Global RAG instance
rag = None
@app.on_event("startup")
async def startup_event():
"""Initialize RAG on startup."""
global rag
embedding_model = EmbeddingModel()
vector_store = VectorStore(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN"),
collection_name="knowledge_base",
dimension=1536
)
rag = AgenticRAG(
vector_store=vector_store,
embedding_model=embedding_model
)
print("RAG system initialized")
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown."""
if rag:
rag.vector_store.disconnect()
print("RAG system shutdown")
# Request/Response models
class QueryRequest(BaseModel):
question: str
top_k: int = 10
rerank_top_k: int = 3
class QueryResponse(BaseModel):
answer: str
sources: list
metadata: dict
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""Query the RAG system."""
try:
response = rag.query(
query=request.question,
top_k=request.top_k,
rerank_top_k=request.rerank_top_k
)
return QueryResponse(
answer=response.answer,
sources=[
{
"text": chunk.text[:200],
"score": chunk.reranked_score or chunk.score,
"metadata": chunk.metadata
}
for chunk in response.retrieved_chunks
],
metadata=response.metadata
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy", "rag_initialized": rag is not None}
@app.get("/stats")
async def stats():
"""Get system statistics."""
try:
return rag.get_stats()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
With File Upload
Copy
from fastapi import FastAPI, File, UploadFile
from pathlib import Path
import shutil
@app.post("/upload")
async def upload_document(file: UploadFile = File(...)):
"""Upload and index a document."""
try:
# Save uploaded file
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
file_path = upload_dir / file.filename
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Index the document
num_chunks = rag.index_document(str(file_path))
return {
"filename": file.filename,
"chunks": num_chunks,
"status": "indexed"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
With Authentication
Copy
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Verify API token."""
token = credentials.credentials
if token != os.getenv("API_TOKEN"):
raise HTTPException(status_code=401, detail="Invalid token")
return token
@app.post("/query")
async def query(
request: QueryRequest,
token: str = Depends(verify_token)
):
"""Protected query endpoint."""
# ... rest of implementation
Flask Integration
Alternative REST API using Flask.Copy
from flask import Flask, request, jsonify
from mini import AgenticRAG, EmbeddingModel, VectorStore
import os
app = Flask(__name__)
# Initialize RAG
embedding_model = EmbeddingModel()
vector_store = VectorStore(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN"),
collection_name="knowledge_base",
dimension=1536
)
rag = AgenticRAG(vector_store=vector_store, embedding_model=embedding_model)
@app.route('/query', methods=['POST'])
def query():
"""Query endpoint."""
try:
data = request.get_json()
question = data.get('question')
if not question:
return jsonify({"error": "Question is required"}), 400
response = rag.query(question)
return jsonify({
"answer": response.answer,
"sources": [
{
"text": chunk.text[:200],
"score": chunk.reranked_score or chunk.score
}
for chunk in response.retrieved_chunks
]
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
"""Health check."""
return jsonify({"status": "healthy"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
Chatbot Integration
Build a chatbot with conversation history.Copy
from mini import AgenticRAG, EmbeddingModel, VectorStore
from typing import List, Dict
import os
class DocumentChatbot:
"""Chatbot with RAG capabilities."""
def __init__(self, milvus_uri: str, milvus_token: str, collection_name: str = "chatbot_kb"):
# Initialize RAG
self.embedding_model = EmbeddingModel()
self.vector_store = VectorStore(
uri=milvus_uri,
token=milvus_token,
collection_name=collection_name,
dimension=1536
)
self.rag = AgenticRAG(
vector_store=self.vector_store,
embedding_model=self.embedding_model
)
# Conversation history
self.conversations: Dict[str, List[Dict]] = {}
def add_documents(self, document_paths: List[str]) -> int:
"""Add documents to the knowledge base."""
return self.rag.index_documents(document_paths)
def start_conversation(self, session_id: str):
"""Start a new conversation."""
self.conversations[session_id] = []
def chat(self, session_id: str, user_message: str) -> str:
"""Send a message and get a response."""
# Initialize session if needed
if session_id not in self.conversations:
self.start_conversation(session_id)
# Add user message to history
self.conversations[session_id].append({
"role": "user",
"content": user_message
})
# Query RAG
response = self.rag.query(user_message)
# Add assistant response to history
self.conversations[session_id].append({
"role": "assistant",
"content": response.answer
})
return response.answer
def get_history(self, session_id: str) -> List[Dict]:
"""Get conversation history."""
return self.conversations.get(session_id, [])
def clear_history(self, session_id: str):
"""Clear conversation history."""
if session_id in self.conversations:
self.conversations[session_id] = []
def get_last_sources(self, session_id: str) -> List[Dict]:
"""Get sources from last query."""
# This would require storing sources with the conversation
# Implementation depends on your needs
pass
# Usage
chatbot = DocumentChatbot(
milvus_uri=os.getenv("MILVUS_URI"),
milvus_token=os.getenv("MILVUS_TOKEN")
)
# Add knowledge base
chatbot.add_documents(["faq.pdf", "manual.pdf"])
# Chat
session_id = "user123"
answer1 = chatbot.chat(session_id, "How do I reset my password?")
answer2 = chatbot.chat(session_id, "What if I don't receive the email?")
# Get history
history = chatbot.get_history(session_id)
CLI Tool Integration
Create a command-line RAG interface.Copy
import click
import os
from pathlib import Path
from mini import AgenticRAG, EmbeddingModel, VectorStore
from dotenv import load_dotenv
load_dotenv()
@click.group()
@click.pass_context
def cli(ctx):
"""RAG CLI Tool."""
# Initialize RAG
embedding_model = EmbeddingModel()
vector_store = VectorStore(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN"),
collection_name="cli_docs",
dimension=1536
)
ctx.obj = AgenticRAG(
vector_store=vector_store,
embedding_model=embedding_model
)
@cli.command()
@click.argument('paths', nargs=-1, type=click.Path(exists=True))
@click.pass_context
def index(ctx, paths):
"""Index documents."""
rag = ctx.obj
for path in paths:
path = Path(path)
if path.is_file():
click.echo(f"Indexing {path}...")
num_chunks = rag.index_document(str(path))
click.echo(f" ✓ {num_chunks} chunks indexed")
elif path.is_dir():
files = list(path.glob("**/*.pdf")) + list(path.glob("**/*.docx"))
click.echo(f"Found {len(files)} documents in {path}")
with click.progressbar(files) as bar:
for file in bar:
rag.index_document(str(file))
@cli.command()
@click.argument('question')
@click.option('--top-k', default=10, help='Number of chunks to retrieve')
@click.option('--show-sources', is_flag=True, help='Show source documents')
@click.pass_context
def query(ctx, question, top_k, show_sources):
"""Ask a question."""
rag = ctx.obj
response = rag.query(question, top_k=top_k)
click.echo("\n" + "="*80)
click.echo(f"Question: {question}")
click.echo("="*80)
click.echo(f"\nAnswer:\n{response.answer}\n")
if show_sources:
click.echo("Sources:")
for i, chunk in enumerate(response.retrieved_chunks, 1):
score = chunk.reranked_score or chunk.score
source = chunk.metadata.get('source', 'Unknown')
click.echo(f" {i}. [Score: {score:.3f}] {source}")
@cli.command()
@click.pass_context
def stats(ctx):
"""Show system statistics."""
rag = ctx.obj
stats = rag.get_stats()
click.echo("\nRAG System Statistics:")
click.echo(f" Collection: {stats['collection_name']}")
click.echo(f" Total chunks: {stats['total_documents']}")
@cli.command()
@click.pass_context
def interactive(ctx):
"""Start interactive Q&A session."""
rag = ctx.obj
click.echo("Interactive Q&A (type 'quit' to exit)\n")
while True:
question = click.prompt("Question", type=str)
if question.lower() in ['quit', 'exit', 'q']:
break
response = rag.query(question)
click.echo(f"\nAnswer: {response.answer}\n")
if __name__ == '__main__':
cli()
Data Pipeline Integration
Integrate with ETL or data processing pipelines.Copy
from mini import DocumentLoader, Chunker, EmbeddingModel, VectorStore
import pandas as pd
from pathlib import Path
from typing import List
class DocumentProcessor:
"""Process documents in batch pipelines."""
def __init__(self):
self.loader = DocumentLoader()
self.chunker = Chunker()
self.embedding_model = EmbeddingModel()
self.vector_store = VectorStore(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN"),
collection_name="pipeline_docs",
dimension=1536
)
def process_documents(self, file_paths: List[str]) -> pd.DataFrame:
"""Process documents and return DataFrame."""
results = []
for path in file_paths:
try:
# Load and chunk
text = self.loader.load(path)
chunks = self.chunker.chunk(text)
# Generate embeddings
embeddings = self.embedding_model.embed_chunks(
[chunk.text for chunk in chunks]
)
# Store results
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
results.append({
'source': path,
'chunk_id': i,
'text': chunk.text,
'embedding': embedding,
'token_count': chunk.token_count
})
except Exception as e:
print(f"Error processing {path}: {e}")
return pd.DataFrame(results)
def index_batch(self, df: pd.DataFrame):
"""Index a batch of documents."""
self.vector_store.insert(
embeddings=df['embedding'].tolist(),
texts=df['text'].tolist(),
metadata=[
{
'source': row['source'],
'chunk_id': row['chunk_id'],
'tokens': row['token_count']
}
for _, row in df.iterrows()
]
)
# Usage in pipeline
processor = DocumentProcessor()
# Process batch
documents = list(Path("./documents").glob("*.pdf"))
df = processor.process_documents([str(f) for f in documents])
# Index
processor.index_batch(df)
# Save metadata
df.drop('embedding', axis=1).to_csv('processed_docs.csv', index=False)
Async Integration
For asynchronous applications.Copy
from fastapi import FastAPI
import asyncio
from concurrent.futures import ThreadPoolExecutor
from mini import AgenticRAG, EmbeddingModel, VectorStore
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)
# Initialize RAG
rag = AgenticRAG(
vector_store=VectorStore(...),
embedding_model=EmbeddingModel()
)
async def async_query(question: str):
"""Run RAG query in thread pool."""
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
executor,
rag.query,
question
)
return response
@app.post("/query")
async def query(question: str):
"""Async query endpoint."""
response = await async_query(question)
return {"answer": response.answer}
Best Practices
Initialize Once
Initialize Once
Create RAG instance once at application startup, not per request.
Copy
# Good - startup event
@app.on_event("startup")
async def startup():
global rag
rag = AgenticRAG(...)
# Bad - per request
@app.post("/query")
def query(q: str):
rag = AgenticRAG(...) # Don't do this!
Error Handling
Error Handling
Handle errors gracefully and return meaningful messages.
Copy
try:
response = rag.query(question)
return {"answer": response.answer}
except Exception as e:
logger.error(f"Query failed: {e}")
raise HTTPException(
status_code=500,
detail="Failed to process query"
)
Resource Cleanup
Resource Cleanup
Clean up resources on shutdown.
Copy
@app.on_event("shutdown")
async def shutdown():
if rag:
rag.vector_store.disconnect()
Connection Pooling
Connection Pooling
Reuse connections instead of creating new ones.
Copy
# RAG reuses the same vector store connection
# No need to disconnect/reconnect per request
