Skip to main content

Overview

Mini RAG is designed as a library that can be easily integrated into your Python applications. This guide shows you how to add RAG capabilities to various types of applications.

Integration Patterns

REST API

FastAPI or Flask backend

Chatbot

Conversational interfaces

CLI Tool

Command-line applications

Data Pipeline

ETL and batch processing

FastAPI Integration

Create a REST API for your RAG system.

Basic Setup

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from mini import AgenticRAG, EmbeddingModel, VectorStore
import os

app = FastAPI(title="RAG API")

# Global RAG instance
rag = None

@app.on_event("startup")
async def startup_event():
    """Initialize RAG on startup."""
    global rag
    
    embedding_model = EmbeddingModel()
    vector_store = VectorStore(
        uri=os.getenv("MILVUS_URI"),
        token=os.getenv("MILVUS_TOKEN"),
        collection_name="knowledge_base",
        dimension=1536
    )
    
    rag = AgenticRAG(
        vector_store=vector_store,
        embedding_model=embedding_model
    )
    print("RAG system initialized")

@app.on_event("shutdown")
async def shutdown_event():
    """Cleanup on shutdown."""
    if rag:
        rag.vector_store.disconnect()
    print("RAG system shutdown")

# Request/Response models
class QueryRequest(BaseModel):
    question: str
    top_k: int = 10
    rerank_top_k: int = 3

class QueryResponse(BaseModel):
    answer: str
    sources: list
    metadata: dict

@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    """Query the RAG system."""
    try:
        response = rag.query(
            query=request.question,
            top_k=request.top_k,
            rerank_top_k=request.rerank_top_k
        )
        
        return QueryResponse(
            answer=response.answer,
            sources=[
                {
                    "text": chunk.text[:200],
                    "score": chunk.reranked_score or chunk.score,
                    "metadata": chunk.metadata
                }
                for chunk in response.retrieved_chunks
            ],
            metadata=response.metadata
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    """Health check endpoint."""
    return {"status": "healthy", "rag_initialized": rag is not None}

@app.get("/stats")
async def stats():
    """Get system statistics."""
    try:
        return rag.get_stats()
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

With File Upload

from fastapi import FastAPI, File, UploadFile
from pathlib import Path
import shutil

@app.post("/upload")
async def upload_document(file: UploadFile = File(...)):
    """Upload and index a document."""
    try:
        # Save uploaded file
        upload_dir = Path("uploads")
        upload_dir.mkdir(exist_ok=True)
        file_path = upload_dir / file.filename
        
        with file_path.open("wb") as buffer:
            shutil.copyfileobj(file.file, buffer)
        
        # Index the document
        num_chunks = rag.index_document(str(file_path))
        
        return {
            "filename": file.filename,
            "chunks": num_chunks,
            "status": "indexed"
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

With Authentication

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Verify API token."""
    token = credentials.credentials
    if token != os.getenv("API_TOKEN"):
        raise HTTPException(status_code=401, detail="Invalid token")
    return token

@app.post("/query")
async def query(
    request: QueryRequest,
    token: str = Depends(verify_token)
):
    """Protected query endpoint."""
    # ... rest of implementation

Flask Integration

Alternative REST API using Flask.
from flask import Flask, request, jsonify
from mini import AgenticRAG, EmbeddingModel, VectorStore
import os

app = Flask(__name__)

# Initialize RAG
embedding_model = EmbeddingModel()
vector_store = VectorStore(
    uri=os.getenv("MILVUS_URI"),
    token=os.getenv("MILVUS_TOKEN"),
    collection_name="knowledge_base",
    dimension=1536
)
rag = AgenticRAG(vector_store=vector_store, embedding_model=embedding_model)

@app.route('/query', methods=['POST'])
def query():
    """Query endpoint."""
    try:
        data = request.get_json()
        question = data.get('question')
        
        if not question:
            return jsonify({"error": "Question is required"}), 400
        
        response = rag.query(question)
        
        return jsonify({
            "answer": response.answer,
            "sources": [
                {
                    "text": chunk.text[:200],
                    "score": chunk.reranked_score or chunk.score
                }
                for chunk in response.retrieved_chunks
            ]
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/health', methods=['GET'])
def health():
    """Health check."""
    return jsonify({"status": "healthy"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

Chatbot Integration

Build a chatbot with conversation history.
from mini import AgenticRAG, EmbeddingModel, VectorStore
from typing import List, Dict
import os

class DocumentChatbot:
    """Chatbot with RAG capabilities."""
    
    def __init__(self, milvus_uri: str, milvus_token: str, collection_name: str = "chatbot_kb"):
        # Initialize RAG
        self.embedding_model = EmbeddingModel()
        self.vector_store = VectorStore(
            uri=milvus_uri,
            token=milvus_token,
            collection_name=collection_name,
            dimension=1536
        )
        self.rag = AgenticRAG(
            vector_store=self.vector_store,
            embedding_model=self.embedding_model
        )
        
        # Conversation history
        self.conversations: Dict[str, List[Dict]] = {}
    
    def add_documents(self, document_paths: List[str]) -> int:
        """Add documents to the knowledge base."""
        return self.rag.index_documents(document_paths)
    
    def start_conversation(self, session_id: str):
        """Start a new conversation."""
        self.conversations[session_id] = []
    
    def chat(self, session_id: str, user_message: str) -> str:
        """Send a message and get a response."""
        # Initialize session if needed
        if session_id not in self.conversations:
            self.start_conversation(session_id)
        
        # Add user message to history
        self.conversations[session_id].append({
            "role": "user",
            "content": user_message
        })
        
        # Query RAG
        response = self.rag.query(user_message)
        
        # Add assistant response to history
        self.conversations[session_id].append({
            "role": "assistant",
            "content": response.answer
        })
        
        return response.answer
    
    def get_history(self, session_id: str) -> List[Dict]:
        """Get conversation history."""
        return self.conversations.get(session_id, [])
    
    def clear_history(self, session_id: str):
        """Clear conversation history."""
        if session_id in self.conversations:
            self.conversations[session_id] = []
    
    def get_last_sources(self, session_id: str) -> List[Dict]:
        """Get sources from last query."""
        # This would require storing sources with the conversation
        # Implementation depends on your needs
        pass

# Usage
chatbot = DocumentChatbot(
    milvus_uri=os.getenv("MILVUS_URI"),
    milvus_token=os.getenv("MILVUS_TOKEN")
)

# Add knowledge base
chatbot.add_documents(["faq.pdf", "manual.pdf"])

# Chat
session_id = "user123"
answer1 = chatbot.chat(session_id, "How do I reset my password?")
answer2 = chatbot.chat(session_id, "What if I don't receive the email?")

# Get history
history = chatbot.get_history(session_id)

CLI Tool Integration

Create a command-line RAG interface.
import click
import os
from pathlib import Path
from mini import AgenticRAG, EmbeddingModel, VectorStore
from dotenv import load_dotenv

load_dotenv()

@click.group()
@click.pass_context
def cli(ctx):
    """RAG CLI Tool."""
    # Initialize RAG
    embedding_model = EmbeddingModel()
    vector_store = VectorStore(
        uri=os.getenv("MILVUS_URI"),
        token=os.getenv("MILVUS_TOKEN"),
        collection_name="cli_docs",
        dimension=1536
    )
    ctx.obj = AgenticRAG(
        vector_store=vector_store,
        embedding_model=embedding_model
    )

@cli.command()
@click.argument('paths', nargs=-1, type=click.Path(exists=True))
@click.pass_context
def index(ctx, paths):
    """Index documents."""
    rag = ctx.obj
    
    for path in paths:
        path = Path(path)
        if path.is_file():
            click.echo(f"Indexing {path}...")
            num_chunks = rag.index_document(str(path))
            click.echo(f"  ✓ {num_chunks} chunks indexed")
        elif path.is_dir():
            files = list(path.glob("**/*.pdf")) + list(path.glob("**/*.docx"))
            click.echo(f"Found {len(files)} documents in {path}")
            with click.progressbar(files) as bar:
                for file in bar:
                    rag.index_document(str(file))

@cli.command()
@click.argument('question')
@click.option('--top-k', default=10, help='Number of chunks to retrieve')
@click.option('--show-sources', is_flag=True, help='Show source documents')
@click.pass_context
def query(ctx, question, top_k, show_sources):
    """Ask a question."""
    rag = ctx.obj
    
    response = rag.query(question, top_k=top_k)
    
    click.echo("\n" + "="*80)
    click.echo(f"Question: {question}")
    click.echo("="*80)
    click.echo(f"\nAnswer:\n{response.answer}\n")
    
    if show_sources:
        click.echo("Sources:")
        for i, chunk in enumerate(response.retrieved_chunks, 1):
            score = chunk.reranked_score or chunk.score
            source = chunk.metadata.get('source', 'Unknown')
            click.echo(f"  {i}. [Score: {score:.3f}] {source}")

@cli.command()
@click.pass_context
def stats(ctx):
    """Show system statistics."""
    rag = ctx.obj
    stats = rag.get_stats()
    
    click.echo("\nRAG System Statistics:")
    click.echo(f"  Collection: {stats['collection_name']}")
    click.echo(f"  Total chunks: {stats['total_documents']}")

@cli.command()
@click.pass_context
def interactive(ctx):
    """Start interactive Q&A session."""
    rag = ctx.obj
    
    click.echo("Interactive Q&A (type 'quit' to exit)\n")
    
    while True:
        question = click.prompt("Question", type=str)
        
        if question.lower() in ['quit', 'exit', 'q']:
            break
        
        response = rag.query(question)
        click.echo(f"\nAnswer: {response.answer}\n")

if __name__ == '__main__':
    cli()

Data Pipeline Integration

Integrate with ETL or data processing pipelines.
from mini import DocumentLoader, Chunker, EmbeddingModel, VectorStore
import pandas as pd
from pathlib import Path
from typing import List

class DocumentProcessor:
    """Process documents in batch pipelines."""
    
    def __init__(self):
        self.loader = DocumentLoader()
        self.chunker = Chunker()
        self.embedding_model = EmbeddingModel()
        self.vector_store = VectorStore(
            uri=os.getenv("MILVUS_URI"),
            token=os.getenv("MILVUS_TOKEN"),
            collection_name="pipeline_docs",
            dimension=1536
        )
    
    def process_documents(self, file_paths: List[str]) -> pd.DataFrame:
        """Process documents and return DataFrame."""
        results = []
        
        for path in file_paths:
            try:
                # Load and chunk
                text = self.loader.load(path)
                chunks = self.chunker.chunk(text)
                
                # Generate embeddings
                embeddings = self.embedding_model.embed_chunks(
                    [chunk.text for chunk in chunks]
                )
                
                # Store results
                for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
                    results.append({
                        'source': path,
                        'chunk_id': i,
                        'text': chunk.text,
                        'embedding': embedding,
                        'token_count': chunk.token_count
                    })
            except Exception as e:
                print(f"Error processing {path}: {e}")
        
        return pd.DataFrame(results)
    
    def index_batch(self, df: pd.DataFrame):
        """Index a batch of documents."""
        self.vector_store.insert(
            embeddings=df['embedding'].tolist(),
            texts=df['text'].tolist(),
            metadata=[
                {
                    'source': row['source'],
                    'chunk_id': row['chunk_id'],
                    'tokens': row['token_count']
                }
                for _, row in df.iterrows()
            ]
        )

# Usage in pipeline
processor = DocumentProcessor()

# Process batch
documents = list(Path("./documents").glob("*.pdf"))
df = processor.process_documents([str(f) for f in documents])

# Index
processor.index_batch(df)

# Save metadata
df.drop('embedding', axis=1).to_csv('processed_docs.csv', index=False)

Async Integration

For asynchronous applications.
from fastapi import FastAPI
import asyncio
from concurrent.futures import ThreadPoolExecutor
from mini import AgenticRAG, EmbeddingModel, VectorStore

app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)

# Initialize RAG
rag = AgenticRAG(
    vector_store=VectorStore(...),
    embedding_model=EmbeddingModel()
)

async def async_query(question: str):
    """Run RAG query in thread pool."""
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(
        executor,
        rag.query,
        question
    )
    return response

@app.post("/query")
async def query(question: str):
    """Async query endpoint."""
    response = await async_query(question)
    return {"answer": response.answer}

Best Practices

Create RAG instance once at application startup, not per request.
# Good - startup event
@app.on_event("startup")
async def startup():
    global rag
    rag = AgenticRAG(...)

# Bad - per request
@app.post("/query")
def query(q: str):
    rag = AgenticRAG(...)  # Don't do this!
Handle errors gracefully and return meaningful messages.
try:
    response = rag.query(question)
    return {"answer": response.answer}
except Exception as e:
    logger.error(f"Query failed: {e}")
    raise HTTPException(
        status_code=500,
        detail="Failed to process query"
    )
Clean up resources on shutdown.
@app.on_event("shutdown")
async def shutdown():
    if rag:
        rag.vector_store.disconnect()
Reuse connections instead of creating new ones.
# RAG reuses the same vector store connection
# No need to disconnect/reconnect per request

Next Steps