Overview
This example shows how to build a complete document question-answering system using Mini RAG. Users can ask questions and get answers based on your document collection.Complete Example
Copy
import os
from mini import (
AgenticRAG,
LLMConfig,
RetrievalConfig,
EmbeddingModel,
VectorStore
)
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def setup_rag():
"""Initialize the RAG system."""
# Initialize embedding model
embedding_model = EmbeddingModel()
# Initialize vector store
vector_store = VectorStore(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN"),
collection_name="company_docs",
dimension=1536
)
# Initialize RAG with optimal settings
rag = AgenticRAG(
vector_store=vector_store,
embedding_model=embedding_model,
llm_config=LLMConfig(
model="gpt-4o-mini",
temperature=0.7
),
retrieval_config=RetrievalConfig(
top_k=10,
rerank_top_k=3,
use_query_rewriting=True,
use_reranking=True
)
)
return rag
def index_documents(rag, document_paths):
"""Index a collection of documents."""
print("📄 Indexing documents...")
total_chunks = 0
for doc_path in document_paths:
try:
num_chunks = rag.index_document(doc_path)
total_chunks += num_chunks
print(f" ✓ {doc_path}: {num_chunks} chunks")
except Exception as e:
print(f" ✗ {doc_path}: Error - {e}")
print(f"\n✅ Total: {total_chunks} chunks indexed\n")
return total_chunks
def interactive_qa(rag):
"""Run interactive Q&A session."""
print("=== Document Q&A System ===")
print("Ask questions about your documents (type 'quit' to exit)\n")
while True:
# Get user question
question = input("❓ Question: ").strip()
if question.lower() in ['quit', 'exit', 'q']:
print("\nGoodbye! 👋")
break
if not question:
continue
try:
# Query the RAG system
response = rag.query(question)
# Display answer
print(f"\n💬 Answer:\n{response.answer}\n")
# Show sources (optional)
if response.retrieved_chunks:
print(f"📚 Sources ({len(response.retrieved_chunks)} chunks):")
for i, chunk in enumerate(response.retrieved_chunks, 1):
score = chunk.reranked_score or chunk.score
source = chunk.metadata.get('source', 'Unknown')
print(f" {i}. [Score: {score:.3f}] {source}")
print()
except Exception as e:
print(f"\n❌ Error: {e}\n")
def main():
# Setup RAG system
rag = setup_rag()
# Documents to index
documents = [
"./docs/employee_handbook.pdf",
"./docs/company_policies.pdf",
"./docs/benefits_guide.pdf",
"./docs/faq.pdf"
]
# Index documents (only needed once)
stats = rag.get_stats()
if stats['total_documents'] == 0:
index_documents(rag, documents)
else:
print(f"📊 Using existing index: {stats['total_documents']} chunks\n")
# Start interactive Q&A
interactive_qa(rag)
if __name__ == "__main__":
main()
Running the Example
1
Save the Code
Save the code above as
document_qa.py2
Prepare Documents
Place your documents in a
./docs/ folder3
Configure Environment
Ensure your
.env file has the required credentials4
Run
Copy
python document_qa.py
Example Session
Copy
📄 Indexing documents...
✓ ./docs/employee_handbook.pdf: 45 chunks
✓ ./docs/company_policies.pdf: 32 chunks
✓ ./docs/benefits_guide.pdf: 28 chunks
✓ ./docs/faq.pdf: 15 chunks
✅ Total: 120 chunks indexed
=== Document Q&A System ===
Ask questions about your documents (type 'quit' to exit)
❓ Question: What is the vacation policy?
💬 Answer:
The company provides 15 days of paid vacation per year for full-time
employees. Vacation days accrue monthly and can be used after completing
3 months of employment. Unused vacation days can be carried over to the
next year, up to a maximum of 5 days.
📚 Sources (3 chunks):
1. [Score: 0.945] employee_handbook.pdf
2. [Score: 0.892] company_policies.pdf
3. [Score: 0.856] benefits_guide.pdf
❓ Question: How do I reset my password?
💬 Answer:
To reset your password, go to the company portal login page and click
"Forgot Password". Enter your email address and you'll receive a reset
link. The link is valid for 24 hours. If you don't receive the email,
check your spam folder or contact IT support.
📚 Sources (3 chunks):
1. [Score: 0.923] faq.pdf
2. [Score: 0.901] employee_handbook.pdf
3. [Score: 0.878] faq.pdf
❓ Question: quit
Goodbye! 👋
Features Demonstrated
Document Indexing
Load and index multiple documents from different formats
Query Processing
Automatic query rewriting and optimized retrieval
Re-ranking
Re-rank results for best quality answers
Source Attribution
Show which documents were used to generate answers
Enhancements
Add Metadata
Copy
# Index with metadata for better filtering
rag.index_document(
"employee_handbook.pdf",
metadata={
"category": "hr",
"department": "human_resources",
"last_updated": "2024-01-15",
"version": "2.0"
}
)
Add Citations
Copy
def format_answer_with_citations(response):
"""Format answer with inline citations."""
answer = response.answer
print(f"\n💬 Answer:\n{answer}\n")
print("📚 References:")
for i, chunk in enumerate(response.retrieved_chunks, 1):
source = chunk.metadata.get('source', 'Unknown')
page = chunk.metadata.get('page', 'N/A')
print(f" [{i}] {source} (page {page})")
print(f" {chunk.text[:100]}...")
print()
Add History
Copy
class QASession:
def __init__(self, rag):
self.rag = rag
self.history = []
def ask(self, question):
response = self.rag.query(question)
self.history.append({
"question": question,
"answer": response.answer,
"sources": len(response.retrieved_chunks)
})
return response
def show_history(self):
print("\n📜 Session History:")
for i, item in enumerate(self.history, 1):
print(f"{i}. Q: {item['question']}")
print(f" A: {item['answer'][:100]}...")
print(f" Sources: {item['sources']}\n")
Add Filters
Copy
# Allow filtering by category
def ask_with_filter(rag, question, category=None):
"""Ask question with optional category filter."""
# Note: Filtering would be implemented in the search call
response = rag.query(question)
return response
# Usage
response = ask_with_filter(rag, "What is the policy?", category="hr")
Production Enhancements
Add Observability
Copy
from mini import ObservabilityConfig
rag = AgenticRAG(
vector_store=vector_store,
embedding_model=embedding_model,
observability_config=ObservabilityConfig(enabled=True)
)
Add Caching
Copy
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_query(question: str):
"""Cache common questions."""
return rag.query(question)
Add Error Handling
Copy
def robust_query(rag, question, max_retries=3):
"""Query with retry logic."""
for attempt in range(max_retries):
try:
return rag.query(question)
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Retry {attempt + 1}/{max_retries} after error: {e}")
time.sleep(1 * (attempt + 1))
