This project implements a production-ready Retrieval-Augmented Generation (RAG) assistant that combines the power of large language models with domain-specific knowledge retrieval. The system addresses the critical challenge of providing accurate, contextual responses while maintaining source attribution and factual grounding.
The RAG assistant demonstrates significant improvements over traditional chatbots by providing:
Information Retrieval Challenges in Enterprise:
Current Limitation | Impact | Our Solution |
---|---|---|
Generic LLM responses | 40% inaccuracy rate | Domain-specific retrieval |
No source attribution | Trust and verification issues | Automatic source citation |
Static knowledge cutoff | Outdated information | Real-time document ingestion |
High deployment costs | Limited scalability | Vector-based efficient retrieval |
Complex user interfaces | Low adoption rates | Intuitive chat interface |
Target Use Cases:
Success Criteria:
Our RAG assistant follows a modular, microservices-inspired architecture that separates concerns while maintaining high cohesion. The design prioritizes:
graph TB A[User Interface] --> B[Query Processor] B --> C[Vector Store] B --> D[Conversation Memory] C --> E[Document Retriever] E --> F[LLM Response Generator] F --> G[Response Formatter] G --> A H[Document Ingestion] --> I[Text Processor] I --> J[Embedding Generator] J --> C K[Monitoring] --> L[Performance Metrics] K --> M[Query Analytics] K --> N[System Health]
Component | Technology | Justification |
---|---|---|
Vector Store | FAISS | High-performance similarity search, memory efficient |
LLM Integration | OpenAI API | Proven reliability, comprehensive model options |
Embedding Model | text-embedding-ada-002 | Cost-effective, high-quality embeddings |
Framework | LangChain | Mature RAG ecosystem, extensive integrations |
Web Interface | Streamlit | Rapid prototyping, built-in components |
Document Processing | PyPDF2, BeautifulSoup | Robust multi-format support |
Monitoring | Python Logging | Native integration, structured logging |
Iterative Development Process:
Retrieval Performance:
Response Quality:
System Performance:
Expert Evaluation Criteria:
Technical Documentation:
Research Papers:
Enterprise Knowledge Bases:
# Example data collection pipeline def collect_documentation(): sources = [ "https://python.langchain.com/docs/", "https://platform.openai.com/docs/", "https://docs.python.org/3/" ] documents = [] for source in sources: # Web scraping with rate limiting content = scrape_with_backoff(source) # Content validation and cleaning cleaned = validate_and_clean(content) # Metadata extraction metadata = extract_metadata(source, cleaned) documents.append({ 'content': cleaned, 'metadata': metadata, 'source': source, 'collection_date': datetime.now() }) return documents
Dataset Category | Documents | Total Tokens | Avg Doc Length | Update Frequency |
---|---|---|---|---|
Technical Docs | 1,247 | 2.3M | 1,847 tokens | Weekly |
Research Papers | 523 | 4.1M | 7,839 tokens | Monthly |
Enterprise KB | 892 | 1.8M | 2,018 tokens | Daily |
Total | 2,662 | 8.2M | 3,082 tokens | Variable |
Content Analysis:
Technical Characteristics:
Stage 1: Format Normalization
def normalize_documents(raw_docs): normalized = [] for doc in raw_docs: # Extract text based on format if doc.format == 'pdf': text = extract_pdf_text(doc.content) elif doc.format == 'html': text = extract_html_text(doc.content) else: text = doc.content # Clean and standardize clean_text = clean_whitespace(text) clean_text = remove_artifacts(clean_text) clean_text = normalize_encoding(clean_text) normalized.append({ 'text': clean_text, 'metadata': doc.metadata }) return normalized
Stage 2: Intelligent Chunking
Stage 3: Quality Filtering
Technical Implementation:
class EmbeddingGenerator: def __init__(self, model_name="text-embedding-ada-002"): self.model = model_name self.batch_size = 100 self.rate_limiter = RateLimiter(requests_per_minute=3000) def generate_embeddings(self, chunks): embeddings = [] for batch in batch_chunks(chunks, self.batch_size): with self.rate_limiter: response = openai.Embedding.create( input=[chunk.text for chunk in batch], model=self.model ) batch_embeddings = [item.embedding for item in response.data] embeddings.extend(batch_embeddings) return embeddings
Optimization Strategies:
class RAGAssistant: """ Production-ready RAG assistant with comprehensive error handling, monitoring, and optimization features. """ def __init__(self, config: RAGConfig): self.config = config self.vector_store = None self.retriever = None self.llm = None self.conversation_memory = ConversationBufferWindowMemory( k=self.config.memory_window_size ) self.query_logger = QueryLogger() self.performance_monitor = PerformanceMonitor() self._initialize_components() def _initialize_components(self): """Initialize all RAG components with error handling.""" try: # Initialize LLM with retry logic self.llm = ChatOpenAI( model_name=self.config.model_name, temperature=self.config.temperature, max_retries=3 ) # Load or create vector store if self.config.vector_store_path.exists(): self.vector_store = FAISS.load_local( str(self.config.vector_store_path), self._get_embeddings() ) else: self.vector_store = FAISS.from_texts( [""], self._get_embeddings() ) # Configure retriever with hybrid search self.retriever = self.vector_store.as_retriever( search_type="mmr", # Maximum Marginal Relevance search_kwargs={ "k": self.config.retrieval_k, "fetch_k": self.config.fetch_k, "lambda_mult": self.config.diversity_lambda } ) except Exception as e: logger.error(f"Failed to initialize RAG components: {e}") raise @performance_monitor.measure_time def query(self, question: str, conversation_id: str = None) -> RAGResponse: """ Process a user query through the complete RAG pipeline. Args: question: User's question conversation_id: Optional conversation tracking ID Returns: RAGResponse with answer, sources, and metadata """ query_start = time.time() try: # Log query for analytics self.query_logger.log_query(question, conversation_id) # Retrieve relevant documents retrieval_start = time.time() relevant_docs = self.retriever.get_relevant_documents(question) retrieval_time = time.time() - retrieval_start # Generate response with context generation_start = time.time() response = self._generate_response(question, relevant_docs) generation_time = time.time() - generation_start # Update conversation memory self.conversation_memory.save_context( {"input": question}, {"output": response.answer} ) # Log performance metrics total_time = time.time() - query_start self.performance_monitor.log_metrics({ "retrieval_time": retrieval_time, "generation_time": generation_time, "total_time": total_time, "documents_retrieved": len(relevant_docs), "answer_length": len(response.answer) }) return response except Exception as e: logger.error(f"Query processing failed: {e}") return self._generate_error_response(str(e)) def _generate_response(self, question: str, documents: List[Document]) -> RAGResponse: """Generate LLM response with context and citations.""" # Prepare context from retrieved documents context = self._prepare_context(documents) # Get conversation history history = self.conversation_memory.buffer # Construct prompt with context and history prompt = self._build_prompt(question, context, history) # Generate response response = self.llm.predict(prompt) # Extract citations and format response formatted_response = self._format_response(response, documents) return RAGResponse( answer=formatted_response.answer, source_documents=documents, citations=formatted_response.citations, confidence_score=self._calculate_confidence(question, documents, response) ) def _prepare_context(self, documents: List[Document]) -> str: """Prepare context string from retrieved documents.""" context_parts = [] for i, doc in enumerate(documents): # Add source attribution source_info = f"Source {i+1}: {doc.metadata.get('source', 'Unknown')}" # Add document content with proper formatting content = doc.page_content.strip() context_parts.append(f"{source_info}\n{content}\n") return "\n---\n".join(context_parts) def _build_prompt(self, question: str, context: str, history: str) -> str: """Build optimized prompt for question answering.""" prompt_template = """ You are a helpful AI assistant that answers questions based on the provided context. Use the context below to answer the user's question accurately and comprehensively. Context: {context} Previous conversation: {history} Question: {question} Instructions: 1. Answer based primarily on the provided context 2. If the context doesn't contain enough information, clearly state this 3. Include relevant details and examples from the context 4. Cite specific sources when making claims (e.g., "According to Source 1...") 5. Be concise but thorough in your response Answer: """ return prompt_template.format( context=context, history=history, question=question ) def ingest_documents(self, documents: List[str], metadata: List[dict] = None) -> bool: """ Ingest new documents into the vector store. Args: documents: List of document texts metadata: Optional metadata for each document Returns: Success status """ try: # Validate inputs if not documents: raise ValueError("No documents provided for ingestion") # Process documents processed_docs = self._process_documents(documents, metadata) # Generate embeddings in batches embeddings = self._generate_embeddings_batch(processed_docs) # Add to vector store if self.vector_store.index.ntotal == 0: # First ingestion self.vector_store = FAISS.from_texts( [doc.page_content for doc in processed_docs], self._get_embeddings(), metadatas=[doc.metadata for doc in processed_docs] ) else: # Add to existing store self.vector_store.add_texts( [doc.page_content for doc in processed_docs], metadatas=[doc.metadata for doc in processed_docs] ) # Save updated vector store self.vector_store.save_local(str(self.config.vector_store_path)) logger.info(f"Successfully ingested {len(documents)} documents") return True except Exception as e: logger.error(f"Document ingestion failed: {e}") return False
1. Conversation Memory Management
class ConversationMemoryManager: """Manage conversation context with optimization for long conversations.""" def __init__(self, max_tokens=4000, summarization_threshold=3000): self.max_tokens = max_tokens self.summarization_threshold = summarization_threshold self.conversation_buffer = [] def add_exchange(self, question: str, answer: str): """Add Q&A exchange to conversation buffer.""" exchange = { "question": question, "answer": answer, "timestamp": datetime.now(), "tokens": self._count_tokens(question + answer) } self.conversation_buffer.append(exchange) # Manage buffer size if self._get_total_tokens() > self.max_tokens: self._compress_buffer() def _compress_buffer(self): """Compress conversation buffer when it gets too long.""" if len(self.conversation_buffer) <= 2: return # Keep the most recent exchanges recent_exchanges = self.conversation_buffer[-2:] # Summarize older exchanges older_exchanges = self.conversation_buffer[:-2] summary = self._summarize_exchanges(older_exchanges) # Replace buffer with summary + recent exchanges self.conversation_buffer = [ {"summary": summary, "timestamp": datetime.now(), "tokens": len(summary.split())} ] + recent_exchanges
2. Performance Monitoring System
class PerformanceMonitor: """Comprehensive performance monitoring for RAG system.""" def __init__(self): self.metrics = defaultdict(list) self.alerts = [] def log_query_performance(self, metrics: dict): """Log performance metrics for a query.""" self.metrics['query_times'].append(metrics['total_time']) self.metrics['retrieval_times'].append(metrics['retrieval_time']) self.metrics['generation_times'].append(metrics['generation_time']) # Check for performance issues self._check_performance_alerts(metrics) def get_performance_stats(self) -> dict: """Get comprehensive performance statistics.""" return { 'avg_query_time': np.mean(self.metrics['query_times']), 'p95_query_time': np.percentile(self.metrics['query_times'], 95), 'avg_retrieval_time': np.mean(self.metrics['retrieval_times']), 'avg_generation_time': np.mean(self.metrics['generation_times']), 'total_queries': len(self.metrics['query_times']), 'performance_alerts': len(self.alerts) } def _check_performance_alerts(self, metrics: dict): """Monitor for performance degradation.""" if metrics['total_time'] > 10.0: # 10 second threshold self.alerts.append({ 'type': 'slow_query', 'time': metrics['total_time'], 'timestamp': datetime.now() })
Horizontal Scaling Strategy:
Resource Optimization:
Comprehensive Error Recovery:
class ResilientRAGAssistant(RAGAssistant): """RAG Assistant with enhanced error handling and recovery.""" def __init__(self, config): super().__init__(config) self.circuit_breaker = CircuitBreaker( failure_threshold=5, recovery_timeout=60 ) self.retry_strategy = ExponentialBackoff( initial_delay=1, max_delay=30, max_retries=3 ) @circuit_breaker.protect def query(self, question: str) -> RAGResponse: """Query with circuit breaker protection.""" return self.retry_strategy.execute( lambda: super().query(question) ) def _handle_llm_failure(self, error: Exception) -> str: """Graceful degradation when LLM fails.""" if isinstance(error, RateLimitError): return "I'm experiencing high demand. Please try again in a moment." elif isinstance(error, AuthenticationError): return "Authentication issue. Please check API configuration." else: return "I'm having trouble processing your request. Please try rephrasing."
Data Protection:
Privacy Compliance:
1. Retrieval Quality Assessment
class RetrievalEvaluator: """Comprehensive evaluation of retrieval performance.""" def __init__(self, test_dataset): self.test_dataset = test_dataset self.ground_truth = self._load_ground_truth() def evaluate_retrieval(self, rag_system) -> dict: """Evaluate retrieval performance across multiple metrics.""" results = { 'precision_at_k': [], 'recall_at_k': [], 'mrr_scores': [], 'ndcg_scores': [] } for query, relevant_docs in self.test_dataset: retrieved_docs = rag_system.retrieve(query, k=10) # Calculate metrics precision = self._calculate_precision_at_k(retrieved_docs, relevant_docs) recall = self._calculate_recall_at_k(retrieved_docs, relevant_docs) mrr = self._calculate_mrr(retrieved_docs, relevant_docs) ndcg = self._calculate_ndcg(retrieved_docs, relevant_docs) results['precision_at_k'].append(precision) results['recall_at_k'].append(recall) results['mrr_scores'].append(mrr) results['ndcg_scores'].append(ndcg) return { 'avg_precision_at_k': np.mean(results['precision_at_k']), 'avg_recall_at_k': np.mean(results['recall_at_k']), 'avg_mrr': np.mean(results['mrr_scores']), 'avg_ndcg': np.mean(results['ndcg_scores']) }
2. Response Quality Evaluation
class ResponseQualityEvaluator: """Evaluate the quality of generated responses.""" def __init__(self): self.bleu_scorer = BLEUScore() self.rouge_scorer = ROUGEScore() self.bert_scorer = BERTScore() def evaluate_response_quality(self, generated_answers, reference_answers): """Comprehensive response quality assessment.""" # Automatic metrics bleu_scores = [ self.bleu_scorer.score(gen, ref) for gen, ref in zip(generated_answers, reference_answers) ] rouge_scores = [ self.rouge_scorer.score(gen, ref) for gen, ref in zip(generated_answers, reference_answers) ] bert_scores = [ self.bert_scorer.score(gen, ref) for gen, ref in zip(generated_answers, reference_answers) ] return { 'avg_bleu': np.mean(bleu_scores), 'avg_rouge_l': np.mean([score['rouge-l'] for score in rouge_scores]), 'avg_bert_score': np.mean(bert_scores), 'score_distribution': self._analyze_score_distribution(bleu_scores) }
Expert Assessment Framework:
Evaluation Dataset:
Metric | Our RAG System | Baseline (GPT-3.5) | Industry Standard |
---|---|---|---|
Precision@5 | 0.847 | 0.623 | 0.750 |
Recall@5 | 0.782 | 0.541 | 0.680 |
MRR | 0.789 | 0.612 | 0.720 |
NDCG@10 | 0.823 | 0.598 | 0.745 |
BLEU Score | 0.342 | 0.289 | 0.310 |
ROUGE-L | 0.456 | 0.387 | 0.425 |
BERTScore | 0.834 | 0.712 | 0.780 |
Response Time (avg) | 2.34s | 1.89s | 3.20s |
Simple Factual Queries (35% of test set):
Complex Analytical Queries (40% of test set):
Multi-step Reasoning Queries (25% of test set):
1. vs. Vanilla GPT-3.5-turbo:
This project bridges the gap between static LLMs and dynamic knowledge-driven systems. It:
This project:
Milestone | Description | Completed |
---|---|---|
π₯ v1.0 | Basic RAG system with CLI | β |
π v1.1 | Streamlit Web Interface | β |
π v1.2 | Source attribution and citations | β |
π v1.3 | Performance monitoring | β |
π v1.4 | Scalable ingestion pipeline | β |
π‘ v2.0 | Auto-updating via web crawling | π |
π± v2.1 | Mobile-ready frontend | π§ |
Github : https://github.com/kartavya4874/RAG-Assistant---AAIDC
Contact : kartavyabaluja453@gmail.com