Skip to main content

Overview

This example demonstrates how to build a chatbot with Mini RAG that maintains conversation history and provides contextual responses based on your document knowledge base.

Features

Conversation Memory

Track and maintain conversation history

Context-Aware

Use conversation context for better answers

Session Management

Support multiple concurrent users

Source Attribution

Show which documents informed the response

Complete Implementation

import os
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
from mini import (
    AgenticRAG,
    LLMConfig,
    RetrievalConfig,
    EmbeddingModel,
    VectorStore
)
from dotenv import load_dotenv

load_dotenv()

@dataclass
class Message:
    """Represents a single message in the conversation."""
    role: str  # "user" or "assistant"
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    sources: Optional[List[Dict]] = None

@dataclass
class Conversation:
    """Represents a conversation session."""
    session_id: str
    messages: List[Message] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    
    def add_message(self, role: str, content: str, sources: Optional[List[Dict]] = None):
        """Add a message to the conversation."""
        message = Message(role=role, content=content, sources=sources)
        self.messages.append(message)
    
    def get_context_window(self, last_n: int = 5) -> List[Message]:
        """Get the last N messages for context."""
        return self.messages[-last_n:] if len(self.messages) > last_n else self.messages
    
    def format_history(self) -> str:
        """Format conversation history as text."""
        formatted = []
        for msg in self.messages:
            formatted.append(f"{msg.role.upper()}: {msg.content}")
        return "\n".join(formatted)

class DocumentChatbot:
    """
    A chatbot that uses RAG to answer questions based on indexed documents,
    with support for conversation history and multi-session management.
    """
    
    def __init__(
        self,
        milvus_uri: str,
        milvus_token: str,
        collection_name: str = "chatbot_kb",
        use_conversation_context: bool = True
    ):
        """Initialize the chatbot."""
        # Initialize RAG components
        self.embedding_model = EmbeddingModel()
        self.vector_store = VectorStore(
            uri=milvus_uri,
            token=milvus_token,
            collection_name=collection_name,
            dimension=1536
        )
        
        self.rag = AgenticRAG(
            vector_store=self.vector_store,
            embedding_model=self.embedding_model,
            llm_config=LLMConfig(
                model="gpt-4o-mini",
                temperature=0.7
            ),
            retrieval_config=RetrievalConfig(
                top_k=10,
                rerank_top_k=3,
                use_query_rewriting=True,
                use_reranking=True
            )
        )
        
        # Conversation management
        self.conversations: Dict[str, Conversation] = {}
        self.use_conversation_context = use_conversation_context
    
    def add_documents(self, document_paths: List[str]) -> Dict[str, int]:
        """
        Add documents to the knowledge base.
        
        Args:
            document_paths: List of paths to documents
            
        Returns:
            Dictionary mapping document paths to number of chunks
        """
        results = {}
        print("📚 Indexing documents...")
        
        for doc_path in document_paths:
            try:
                num_chunks = self.rag.index_document(doc_path)
                results[doc_path] = num_chunks
                print(f"  ✓ {doc_path}: {num_chunks} chunks")
            except Exception as e:
                print(f"  ✗ {doc_path}: Error - {e}")
                results[doc_path] = 0
        
        total_chunks = sum(results.values())
        print(f"\n✅ Total: {total_chunks} chunks indexed\n")
        
        return results
    
    def start_conversation(self, session_id: str) -> Conversation:
        """
        Start a new conversation session.
        
        Args:
            session_id: Unique identifier for the session
            
        Returns:
            New Conversation object
        """
        conversation = Conversation(session_id=session_id)
        self.conversations[session_id] = conversation
        return conversation
    
    def get_conversation(self, session_id: str) -> Optional[Conversation]:
        """Get a conversation by session ID."""
        return self.conversations.get(session_id)
    
    def chat(
        self,
        session_id: str,
        user_message: str,
        include_sources: bool = True
    ) -> Dict:
        """
        Send a message and get a response.
        
        Args:
            session_id: Unique session identifier
            user_message: The user's message
            include_sources: Whether to include source documents
            
        Returns:
            Dictionary with answer, sources, and metadata
        """
        # Initialize session if it doesn't exist
        if session_id not in self.conversations:
            self.start_conversation(session_id)
        
        conversation = self.conversations[session_id]
        
        # Build query with conversation context if enabled
        if self.use_conversation_context and len(conversation.messages) > 0:
            # Get recent context
            context_messages = conversation.get_context_window(last_n=3)
            context = "\n".join([
                f"{msg.role}: {msg.content}"
                for msg in context_messages
            ])
            
            # Enhance query with context
            enhanced_query = f"""
            Previous conversation:
            {context}
            
            Current question: {user_message}
            """
        else:
            enhanced_query = user_message
        
        # Query the RAG system
        try:
            response = self.rag.query(enhanced_query)
            
            # Extract sources
            sources = []
            if include_sources:
                sources = [
                    {
                        "text": chunk.text[:200] + "...",
                        "score": chunk.reranked_score or chunk.score,
                        "metadata": chunk.metadata
                    }
                    for chunk in response.retrieved_chunks
                ]
            
            # Add messages to conversation history
            conversation.add_message("user", user_message)
            conversation.add_message("assistant", response.answer, sources)
            
            return {
                "answer": response.answer,
                "sources": sources,
                "metadata": response.metadata,
                "session_id": session_id
            }
            
        except Exception as e:
            error_msg = f"I apologize, but I encountered an error: {str(e)}"
            conversation.add_message("user", user_message)
            conversation.add_message("assistant", error_msg)
            
            return {
                "answer": error_msg,
                "sources": [],
                "metadata": {"error": str(e)},
                "session_id": session_id
            }
    
    def get_history(self, session_id: str) -> List[Dict]:
        """
        Get conversation history for a session.
        
        Args:
            session_id: Unique session identifier
            
        Returns:
            List of message dictionaries
        """
        conversation = self.conversations.get(session_id)
        if not conversation:
            return []
        
        return [
            {
                "role": msg.role,
                "content": msg.content,
                "timestamp": msg.timestamp.isoformat(),
                "sources": msg.sources
            }
            for msg in conversation.messages
        ]
    
    def clear_history(self, session_id: str) -> bool:
        """
        Clear conversation history for a session.
        
        Args:
            session_id: Unique session identifier
            
        Returns:
            True if cleared, False if session not found
        """
        if session_id in self.conversations:
            self.conversations[session_id].messages = []
            return True
        return False
    
    def delete_session(self, session_id: str) -> bool:
        """
        Delete a conversation session.
        
        Args:
            session_id: Unique session identifier
            
        Returns:
            True if deleted, False if session not found
        """
        if session_id in self.conversations:
            del self.conversations[session_id]
            return True
        return False
    
    def get_stats(self) -> Dict:
        """Get chatbot statistics."""
        total_conversations = len(self.conversations)
        total_messages = sum(
            len(conv.messages)
            for conv in self.conversations.values()
        )
        
        return {
            "total_conversations": total_conversations,
            "total_messages": total_messages,
            "active_sessions": list(self.conversations.keys()),
            "rag_stats": self.rag.get_stats()
        }

def main():
    """Example usage of the DocumentChatbot."""
    # Initialize chatbot
    chatbot = DocumentChatbot(
        milvus_uri=os.getenv("MILVUS_URI"),
        milvus_token=os.getenv("MILVUS_TOKEN"),
        use_conversation_context=True
    )
    
    # Index documents (do this once)
    documents = [
        "./docs/employee_handbook.pdf",
        "./docs/company_policies.pdf",
        "./docs/faq.pdf"
    ]
    
    stats = chatbot.rag.get_stats()
    if stats['total_documents'] == 0:
        chatbot.add_documents(documents)
    else:
        print(f"📊 Using existing index: {stats['total_documents']} chunks\n")
    
    # Start interactive chat
    print("=== Document Chatbot ===")
    print("Type 'quit' to exit, 'history' to see conversation, 'clear' to clear history\n")
    
    session_id = "user_123"
    
    while True:
        # Get user input
        user_input = input("You: ").strip()
        
        if not user_input:
            continue
        
        if user_input.lower() in ['quit', 'exit', 'q']:
            print("\nGoodbye! 👋")
            break
        
        if user_input.lower() == 'history':
            history = chatbot.get_history(session_id)
            print("\n📜 Conversation History:")
            for msg in history:
                print(f"  {msg['role'].upper()}: {msg['content']}")
            print()
            continue
        
        if user_input.lower() == 'clear':
            chatbot.clear_history(session_id)
            print("✓ History cleared\n")
            continue
        
        # Get chatbot response
        result = chatbot.chat(session_id, user_input)
        
        # Display response
        print(f"\nBot: {result['answer']}\n")
        
        # Optionally show sources
        if result['sources']:
            print(f"📚 Sources ({len(result['sources'])} documents):")
            for i, source in enumerate(result['sources'][:3], 1):
                score = source['score']
                metadata = source['metadata']
                source_file = metadata.get('source', 'Unknown')
                print(f"  {i}. [Score: {score:.3f}] {source_file}")
            print()

if __name__ == "__main__":
    main()

Running the Example

1

Install Dependencies

uv add mini-rag
2

Set Environment Variables

Create a .env file with your credentials:
OPENAI_API_KEY=sk-...
MILVUS_URI=https://...
MILVUS_TOKEN=...
3

Prepare Documents

Place your documents in a ./docs/ folder
4

Run the Chatbot

python chatbot.py

Example Session

📚 Indexing documents...
  ✓ ./docs/employee_handbook.pdf: 45 chunks
  ✓ ./docs/company_policies.pdf: 32 chunks
  ✓ ./docs/faq.pdf: 15 chunks

✅ Total: 92 chunks indexed

=== Document Chatbot ===
Type 'quit' to exit, 'history' to see conversation, 'clear' to clear history

You: What is the vacation policy?

Bot: The company provides 15 days of paid vacation per year for full-time 
employees. Vacation days accrue monthly and can be used after completing 
3 months of employment.

📚 Sources (3 documents):
  1. [Score: 0.945] employee_handbook.pdf
  2. [Score: 0.892] company_policies.pdf
  3. [Score: 0.856] faq.pdf

You: Can I carry them over to next year?

Bot: Yes, you can carry over unused vacation days to the next year, but with 
a maximum of 5 days. Any days beyond that will be forfeited.

📚 Sources (3 documents):
  1. [Score: 0.923] employee_handbook.pdf
  2. [Score: 0.901] company_policies.pdf
  3. [Score: 0.878] faq.pdf

You: history

📜 Conversation History:
  USER: What is the vacation policy?
  ASSISTANT: The company provides 15 days of paid vacation...
  USER: Can I carry them over to next year?
  ASSISTANT: Yes, you can carry over unused vacation days...

You: quit

Goodbye! 👋

API Integration

Integrate the chatbot into a FastAPI application:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uuid

app = FastAPI()

# Initialize chatbot
chatbot = DocumentChatbot(
    milvus_uri=os.getenv("MILVUS_URI"),
    milvus_token=os.getenv("MILVUS_TOKEN")
)

class ChatRequest(BaseModel):
    message: str
    session_id: Optional[str] = None

class ChatResponse(BaseModel):
    answer: str
    session_id: str
    sources: list

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """Chat endpoint."""
    # Generate session ID if not provided
    session_id = request.session_id or str(uuid.uuid4())
    
    # Get response
    result = chatbot.chat(session_id, request.message)
    
    return ChatResponse(
        answer=result['answer'],
        session_id=session_id,
        sources=result['sources']
    )

@app.get("/history/{session_id}")
async def get_history(session_id: str):
    """Get conversation history."""
    history = chatbot.get_history(session_id)
    return {"history": history}

@app.delete("/history/{session_id}")
async def clear_history(session_id: str):
    """Clear conversation history."""
    success = chatbot.clear_history(session_id)
    if not success:
        raise HTTPException(status_code=404, detail="Session not found")
    return {"status": "cleared"}

@app.get("/stats")
async def get_stats():
    """Get chatbot statistics."""
    return chatbot.get_stats()

Enhancements

Add Sentiment Analysis

from textblob import TextBlob

def analyze_sentiment(text: str) -> str:
    """Analyze message sentiment."""
    blob = TextBlob(text)
    polarity = blob.sentiment.polarity
    
    if polarity > 0.3:
        return "positive"
    elif polarity < -0.3:
        return "negative"
    else:
        return "neutral"

# Use in chatbot
sentiment = analyze_sentiment(user_message)
print(f"Sentiment: {sentiment}")

Add Response Streaming

async def stream_response(session_id: str, message: str):
    """Stream chatbot response word by word."""
    result = chatbot.chat(session_id, message)
    
    for word in result['answer'].split():
        yield f"{word} "
        await asyncio.sleep(0.05)

Add Feedback Collection

@dataclass
class Message:
    role: str
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    sources: Optional[List[Dict]] = None
    feedback: Optional[str] = None  # 'positive' or 'negative'
    
def provide_feedback(session_id: str, message_index: int, feedback: str):
    """Provide feedback on a response."""
    conversation = chatbot.get_conversation(session_id)
    if conversation and message_index < len(conversation.messages):
        conversation.messages[message_index].feedback = feedback

Next Steps