Overview
This example demonstrates how to build a chatbot with Mini RAG that maintains conversation history and provides contextual responses based on your document knowledge base.Features
Conversation Memory
Track and maintain conversation history
Context-Aware
Use conversation context for better answers
Session Management
Support multiple concurrent users
Source Attribution
Show which documents informed the response
Complete Implementation
Copy
import os
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
from mini import (
AgenticRAG,
LLMConfig,
RetrievalConfig,
EmbeddingModel,
VectorStore
)
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Message:
"""Represents a single message in the conversation."""
role: str # "user" or "assistant"
content: str
timestamp: datetime = field(default_factory=datetime.now)
sources: Optional[List[Dict]] = None
@dataclass
class Conversation:
"""Represents a conversation session."""
session_id: str
messages: List[Message] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
def add_message(self, role: str, content: str, sources: Optional[List[Dict]] = None):
"""Add a message to the conversation."""
message = Message(role=role, content=content, sources=sources)
self.messages.append(message)
def get_context_window(self, last_n: int = 5) -> List[Message]:
"""Get the last N messages for context."""
return self.messages[-last_n:] if len(self.messages) > last_n else self.messages
def format_history(self) -> str:
"""Format conversation history as text."""
formatted = []
for msg in self.messages:
formatted.append(f"{msg.role.upper()}: {msg.content}")
return "\n".join(formatted)
class DocumentChatbot:
"""
A chatbot that uses RAG to answer questions based on indexed documents,
with support for conversation history and multi-session management.
"""
def __init__(
self,
milvus_uri: str,
milvus_token: str,
collection_name: str = "chatbot_kb",
use_conversation_context: bool = True
):
"""Initialize the chatbot."""
# Initialize RAG components
self.embedding_model = EmbeddingModel()
self.vector_store = VectorStore(
uri=milvus_uri,
token=milvus_token,
collection_name=collection_name,
dimension=1536
)
self.rag = AgenticRAG(
vector_store=self.vector_store,
embedding_model=self.embedding_model,
llm_config=LLMConfig(
model="gpt-4o-mini",
temperature=0.7
),
retrieval_config=RetrievalConfig(
top_k=10,
rerank_top_k=3,
use_query_rewriting=True,
use_reranking=True
)
)
# Conversation management
self.conversations: Dict[str, Conversation] = {}
self.use_conversation_context = use_conversation_context
def add_documents(self, document_paths: List[str]) -> Dict[str, int]:
"""
Add documents to the knowledge base.
Args:
document_paths: List of paths to documents
Returns:
Dictionary mapping document paths to number of chunks
"""
results = {}
print("📚 Indexing documents...")
for doc_path in document_paths:
try:
num_chunks = self.rag.index_document(doc_path)
results[doc_path] = num_chunks
print(f" ✓ {doc_path}: {num_chunks} chunks")
except Exception as e:
print(f" ✗ {doc_path}: Error - {e}")
results[doc_path] = 0
total_chunks = sum(results.values())
print(f"\n✅ Total: {total_chunks} chunks indexed\n")
return results
def start_conversation(self, session_id: str) -> Conversation:
"""
Start a new conversation session.
Args:
session_id: Unique identifier for the session
Returns:
New Conversation object
"""
conversation = Conversation(session_id=session_id)
self.conversations[session_id] = conversation
return conversation
def get_conversation(self, session_id: str) -> Optional[Conversation]:
"""Get a conversation by session ID."""
return self.conversations.get(session_id)
def chat(
self,
session_id: str,
user_message: str,
include_sources: bool = True
) -> Dict:
"""
Send a message and get a response.
Args:
session_id: Unique session identifier
user_message: The user's message
include_sources: Whether to include source documents
Returns:
Dictionary with answer, sources, and metadata
"""
# Initialize session if it doesn't exist
if session_id not in self.conversations:
self.start_conversation(session_id)
conversation = self.conversations[session_id]
# Build query with conversation context if enabled
if self.use_conversation_context and len(conversation.messages) > 0:
# Get recent context
context_messages = conversation.get_context_window(last_n=3)
context = "\n".join([
f"{msg.role}: {msg.content}"
for msg in context_messages
])
# Enhance query with context
enhanced_query = f"""
Previous conversation:
{context}
Current question: {user_message}
"""
else:
enhanced_query = user_message
# Query the RAG system
try:
response = self.rag.query(enhanced_query)
# Extract sources
sources = []
if include_sources:
sources = [
{
"text": chunk.text[:200] + "...",
"score": chunk.reranked_score or chunk.score,
"metadata": chunk.metadata
}
for chunk in response.retrieved_chunks
]
# Add messages to conversation history
conversation.add_message("user", user_message)
conversation.add_message("assistant", response.answer, sources)
return {
"answer": response.answer,
"sources": sources,
"metadata": response.metadata,
"session_id": session_id
}
except Exception as e:
error_msg = f"I apologize, but I encountered an error: {str(e)}"
conversation.add_message("user", user_message)
conversation.add_message("assistant", error_msg)
return {
"answer": error_msg,
"sources": [],
"metadata": {"error": str(e)},
"session_id": session_id
}
def get_history(self, session_id: str) -> List[Dict]:
"""
Get conversation history for a session.
Args:
session_id: Unique session identifier
Returns:
List of message dictionaries
"""
conversation = self.conversations.get(session_id)
if not conversation:
return []
return [
{
"role": msg.role,
"content": msg.content,
"timestamp": msg.timestamp.isoformat(),
"sources": msg.sources
}
for msg in conversation.messages
]
def clear_history(self, session_id: str) -> bool:
"""
Clear conversation history for a session.
Args:
session_id: Unique session identifier
Returns:
True if cleared, False if session not found
"""
if session_id in self.conversations:
self.conversations[session_id].messages = []
return True
return False
def delete_session(self, session_id: str) -> bool:
"""
Delete a conversation session.
Args:
session_id: Unique session identifier
Returns:
True if deleted, False if session not found
"""
if session_id in self.conversations:
del self.conversations[session_id]
return True
return False
def get_stats(self) -> Dict:
"""Get chatbot statistics."""
total_conversations = len(self.conversations)
total_messages = sum(
len(conv.messages)
for conv in self.conversations.values()
)
return {
"total_conversations": total_conversations,
"total_messages": total_messages,
"active_sessions": list(self.conversations.keys()),
"rag_stats": self.rag.get_stats()
}
def main():
"""Example usage of the DocumentChatbot."""
# Initialize chatbot
chatbot = DocumentChatbot(
milvus_uri=os.getenv("MILVUS_URI"),
milvus_token=os.getenv("MILVUS_TOKEN"),
use_conversation_context=True
)
# Index documents (do this once)
documents = [
"./docs/employee_handbook.pdf",
"./docs/company_policies.pdf",
"./docs/faq.pdf"
]
stats = chatbot.rag.get_stats()
if stats['total_documents'] == 0:
chatbot.add_documents(documents)
else:
print(f"📊 Using existing index: {stats['total_documents']} chunks\n")
# Start interactive chat
print("=== Document Chatbot ===")
print("Type 'quit' to exit, 'history' to see conversation, 'clear' to clear history\n")
session_id = "user_123"
while True:
# Get user input
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("\nGoodbye! 👋")
break
if user_input.lower() == 'history':
history = chatbot.get_history(session_id)
print("\n📜 Conversation History:")
for msg in history:
print(f" {msg['role'].upper()}: {msg['content']}")
print()
continue
if user_input.lower() == 'clear':
chatbot.clear_history(session_id)
print("✓ History cleared\n")
continue
# Get chatbot response
result = chatbot.chat(session_id, user_input)
# Display response
print(f"\nBot: {result['answer']}\n")
# Optionally show sources
if result['sources']:
print(f"📚 Sources ({len(result['sources'])} documents):")
for i, source in enumerate(result['sources'][:3], 1):
score = source['score']
metadata = source['metadata']
source_file = metadata.get('source', 'Unknown')
print(f" {i}. [Score: {score:.3f}] {source_file}")
print()
if __name__ == "__main__":
main()
Running the Example
1
Install Dependencies
Copy
uv add mini-rag
2
Set Environment Variables
Create a
.env file with your credentials:Copy
OPENAI_API_KEY=sk-...
MILVUS_URI=https://...
MILVUS_TOKEN=...
3
Prepare Documents
Place your documents in a
./docs/ folder4
Run the Chatbot
Copy
python chatbot.py
Example Session
Copy
📚 Indexing documents...
✓ ./docs/employee_handbook.pdf: 45 chunks
✓ ./docs/company_policies.pdf: 32 chunks
✓ ./docs/faq.pdf: 15 chunks
✅ Total: 92 chunks indexed
=== Document Chatbot ===
Type 'quit' to exit, 'history' to see conversation, 'clear' to clear history
You: What is the vacation policy?
Bot: The company provides 15 days of paid vacation per year for full-time
employees. Vacation days accrue monthly and can be used after completing
3 months of employment.
📚 Sources (3 documents):
1. [Score: 0.945] employee_handbook.pdf
2. [Score: 0.892] company_policies.pdf
3. [Score: 0.856] faq.pdf
You: Can I carry them over to next year?
Bot: Yes, you can carry over unused vacation days to the next year, but with
a maximum of 5 days. Any days beyond that will be forfeited.
📚 Sources (3 documents):
1. [Score: 0.923] employee_handbook.pdf
2. [Score: 0.901] company_policies.pdf
3. [Score: 0.878] faq.pdf
You: history
📜 Conversation History:
USER: What is the vacation policy?
ASSISTANT: The company provides 15 days of paid vacation...
USER: Can I carry them over to next year?
ASSISTANT: Yes, you can carry over unused vacation days...
You: quit
Goodbye! 👋
API Integration
Integrate the chatbot into a FastAPI application:Copy
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import uuid
app = FastAPI()
# Initialize chatbot
chatbot = DocumentChatbot(
milvus_uri=os.getenv("MILVUS_URI"),
milvus_token=os.getenv("MILVUS_TOKEN")
)
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
class ChatResponse(BaseModel):
answer: str
session_id: str
sources: list
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Chat endpoint."""
# Generate session ID if not provided
session_id = request.session_id or str(uuid.uuid4())
# Get response
result = chatbot.chat(session_id, request.message)
return ChatResponse(
answer=result['answer'],
session_id=session_id,
sources=result['sources']
)
@app.get("/history/{session_id}")
async def get_history(session_id: str):
"""Get conversation history."""
history = chatbot.get_history(session_id)
return {"history": history}
@app.delete("/history/{session_id}")
async def clear_history(session_id: str):
"""Clear conversation history."""
success = chatbot.clear_history(session_id)
if not success:
raise HTTPException(status_code=404, detail="Session not found")
return {"status": "cleared"}
@app.get("/stats")
async def get_stats():
"""Get chatbot statistics."""
return chatbot.get_stats()
Enhancements
Add Sentiment Analysis
Copy
from textblob import TextBlob
def analyze_sentiment(text: str) -> str:
"""Analyze message sentiment."""
blob = TextBlob(text)
polarity = blob.sentiment.polarity
if polarity > 0.3:
return "positive"
elif polarity < -0.3:
return "negative"
else:
return "neutral"
# Use in chatbot
sentiment = analyze_sentiment(user_message)
print(f"Sentiment: {sentiment}")
Add Response Streaming
Copy
async def stream_response(session_id: str, message: str):
"""Stream chatbot response word by word."""
result = chatbot.chat(session_id, message)
for word in result['answer'].split():
yield f"{word} "
await asyncio.sleep(0.05)
Add Feedback Collection
Copy
@dataclass
class Message:
role: str
content: str
timestamp: datetime = field(default_factory=datetime.now)
sources: Optional[List[Dict]] = None
feedback: Optional[str] = None # 'positive' or 'negative'
def provide_feedback(session_id: str, message_index: int, feedback: str):
"""Provide feedback on a response."""
conversation = chatbot.get_conversation(session_id)
if conversation and message_index < len(conversation.messages):
conversation.messages[message_index].feedback = feedback
