Retrieval-Augmented Generation system provides a way to retrieve relevant information from your own documents or incorporated external sources and using the context to generate accurate answers hence enhances LLM responses.
A chatbot without rag can respond can respond to common questions based on the training dataset but it may fail to answer questions due to lacking either up-to-date information or domain-specific knowledge. By adding external sources to the chatbot we would be solving the limitation.
RAG has paved the way for truly scalable and flexible collaborative AI applications such as question answering, dialogue systems, and content generation.
A RAG system consists of 2 key components:
β’ A retrieval model that fetches relevant information from an external knowledge source, which could be a database, search engine, or any other information repository.
β’ A language model that generates responses based on the retrieved knowledge.
Letβs create a simple Python implementation of RAG (Retrieval-Augmented Generation) API system that allows you to query your own documentsβi.e., a predefined datasetβusing a local database for retrieval and an LLM-powered response generator.
We will primarily use Python and LangChain. LangChain is a versatile framework designed to help developers easily build applications with LLMs, available in both Python and JavaScript. To run the models, we will use OpenAI (link).
The system will comprise of the following components/elements:
Embedding Model : A pretrained language model that converts input text into embeddings. An embedding is a vector (list) of floating numbers that capture semantic meaning. These vectors are used to search for relevant information in the dataset. OpenAIβs text embeddings measure the relatedness of text strings based on the distance. Small distances suggest high relatedness and large distances suggest low relatedness.
Vector Database: A specialized storage system designed for storing knowledge and corresponding embeddings vectors.
You can extract the embedding vector, save it in a vector database, and use for many different use cases.
ChatBot: A language model that generates responses based on retrieved knowledge. This can be any language model such as GPT, Llama, or Gemma.
rag-app/
βββ app.py # Main application
βββ requirements.txt # Dependencies
βββ data/ # Directory for your source documents
βββ config.py # Configuration settings
βββ embeddings/ # Directory for storing embeddings
β βββ chroma_db/ # ChromaDB storage
βββ modules/
β βββ init.py
β βββ document_loader.py
β βββ embedder.py
β βββ retriever.py
β βββ generator.py
langchain>=0.0.267
langchain-openai>=0.0.2
chromadb>=0.4.15
pypdf>=3.15.1
sentence-transformers>=2.2.2
python-dotenv>=1.0.0
fastapi>=0.104.0
uvicorn>=0.23.2
OPENAI_API_KEY=your-api-key
import os from dotenv import load_dotenv # Load environment variables load_dotenv() # Configuration OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") CHUNK_SIZE = 1000 CHUNK_OVERLAP = 200 EMBEDDING_MODEL = "text-embedding-3-small" LLM_MODEL = "gpt-3.5-turbo" TEMPERATURE = 0 VECTOR_DB_PATH = "./embeddings/chroma_db" DATA_PATH = "./data"
This is the first step in creating a RAG system. The system processes and organizes large vast amounts of data. It involves breaking the dataset (or documents) into small chunks and calculating the vector representation of each chunk with an embedding model.
The size of each chunk can vary depending on the dataset and the application. For example, in a document retrieval system, each chunk can be a paragraph or a sentence. In a dialogue system, each chunk can be a conversation turn.
For indexing, weβll use ChromaDB, a free and open-source vector database, and OpenAIβs (text-embedding-3-model )
model to create embeddings for retrieval.
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from typing import List from langchain.schema import Document import os import config class DocumentProcessor: def __init__(self, data_dir=config.DATA_PATH): self.data_dir = data_dir self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=config.CHUNK_SIZE, chunk_overlap=config.CHUNK_OVERLAP ) def load_documents(self) -> List[Document]: """Load documents from the data directory""" if not os.path.exists(self.data_dir): os.makedirs(self.data_dir) raise FileNotFoundError(f"No documents found in {self.data_dir}") loader = DirectoryLoader(self.data_dir, glob="**/*.pdf", loader_cls=PyPDFLoader) documents = loader.load() return documents def process_documents(self) -> List[Document]: """Load and split documents into chunks""" documents = self.load_documents() chunks = self.text_splitter.split_documents(documents) print(f"Split {len(documents)} documents into {len(chunks)} chunks") return chunks
from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from typing import List from langchain.schema import Document import os import config class DocumentEmbedder: def __init__(self, persist_directory=config.VECTOR_DB_PATH): os.environ["OPENAI_API_KEY"] = config.OPENAI_API_KEY self.embeddings = OpenAIEmbeddings(model= EMBEDDING_MODEL) self.persist_directory = persist_directory def create_vector_store(self, documents: List[Document]) -> Chroma: """Create and persist a vector store from documents""" if not os.path.exists(self.persist_directory): os.makedirs(self.persist_directory) vector_store = Chroma.from_documents( documents=documents, embedding=self.embeddings, persist_directory=self.persist_directory ) vector_store.persist() return vector_store def load_vector_store(self) -> Chroma: """Load an existing vector store""" if not os.path.exists(self.persist_directory): raise FileNotFoundError(f"No vector store found at {self.persist_directory}") vector_store = Chroma( persist_directory=self.persist_directory, embedding_function=self.embeddings ) return vector_store
After the indexing phase, each chunk with its corresponding embedding vector will be stored in the ChromaDB vector database. The embedding vectors can be later used to retrieve relevant information based on a given query.
To compare the similarity between two vectors, we measure the distance between the two vectors preferably using the cosine similarity. Small distances suggest high relatedness and large distances suggest low relatedness.
Given a user input, the input/query is vectorized i.e. given a vector representation with an embedding model. We then compare it against vectors in the database using similarity measures to find the most relevant pieces of information/chunks. Chroma uses cosine similarity by default, so we leverage that directly. Relevant chunks are retrieved from storage using a Retriever.
Many types of retrieval systems exist. Some of the common ones include Vector stores, search apis, relational or graph databases and lexical search. Advanced retrieval patterns include the Ensemble and the Multi-Vector retrievers.
For the retriever, we are going to use vector stores. A vector store can be used as a retriever by using calling the as_retriever()
method.
For the LLM, we will be using OpenAIβs (gpt-4o-mini)
.
from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import LLMChainExtractor from langchain_openai import ChatOpenAI from langchain_community.vectorstores import Chroma import os import config class DocumentRetriever: def __init__(self, vector_store: Chroma): os.environ["OPENAI_API_KEY"] = config.OPENAI_API_KEY self.vector_store = vector_store self.llm = ChatOpenAI(temperature=config.TEMPERATURE) def setup_retriever(self, use_compression=True, top_k=4, similarity_threshold=0.7): """Set up the retriever with cosine similarity and optional compression""" # Create base retriever using cosine similarity (Chroma's default) # Using similarity_score_threshold to filter by cosine similarity scores base_retriever = self.vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={ "k": top_k, "score_threshold": similarity_threshold } ) if use_compression: compressor = LLMChainExtractor.from_llm(self.llm) retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=base_retriever ) return retriever return base_retriever
A Chatbot / LLM generates an answer based on the retrieved knowledge that includes both the question, answer with the sources.
from langchain.chains import RetrievalQA from langchain_openai import ChatOpenAI from langchain.retrievers import BaseRetriever import os import config class ResponseGenerator: def __init__(self, retriever: BaseRetriever): os.environ["OPENAI_API_KEY"] = config.OPENAI_API_KEY self.retriever = retriever self.llm = ChatOpenAI(model_name=config.LLM_MODEL, temperature=config.TEMPERATURE) def setup_qa_chain(self): """Set up the question-answering chain""" qa_chain = RetrievalQA.from_chain_type( llm=self.llm, chain_type="stuff", retriever=self.retriever, return_source_documents=True ) return qa_chain def generate_response(self, query: str): """Generate a response to the query""" qa_chain = self.setup_qa_chain() result = qa_chain({"query": query}) return { "question": query, "answer": result["result"], "sources": [doc.page_content for doc in result["source_documents"]] }
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel import uvicorn import os from modules.document_loader import DocumentProcessor from modules.embedder import DocumentEmbedder from modules.retriever import DocumentRetriever from modules.generator import ResponseGenerator import config app = FastAPI(title="RAG API") # Global variables generator = None class Query(BaseModel): text: str class RebuildRequest(BaseModel): rebuild: bool = True def setup_rag_system(rebuild_index=False): """Set up the RAG system""" document_processor = DocumentProcessor() embedder = DocumentEmbedder() # Create or load vector store if rebuild_index or not os.path.exists(config.VECTOR_DB_PATH): print("Building document index...") chunks = document_processor.process_documents() vector_store = embedder.create_vector_store(chunks) else: print("Loading existing document index...") vector_store = embedder.load_vector_store() # Set up retriever and generator retriever = DocumentRetriever(vector_store).setup_retriever() generator = ResponseGenerator(retriever) return generator @app.on_event("startup") async def startup_event(): global generator generator = setup_rag_system() @app.post("/query") async def process_query(query: Query): global generator if not generator: return {"error": "RAG system not initialized"} result = generator.generate_response(query.text) return result @app.post("/rebuild-index") async def rebuild_index(request: RebuildRequest, background_tasks: BackgroundTasks): global generator def rebuild(): global generator generator = setup_rag_system(rebuild_index=True) background_tasks.add_task(rebuild) return {"message": "Rebuilding index in background"} @app.get("/health") async def health_check(): return {"status": "healthy"} if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)
That was a basic walkthrough of a simple Python RAG API which can be particularly valuable for prototyping, small to medium-sized applications, and organizations that want to experiment with AI-powered information retrieval without complex infrastructure requirements.
RAG has key advantages in that they can be tailored to specific industries or use cases. Can incorporate new information without retraining models hence being cost-effective to organizations who are adopting generative AI models to domain-specific use cases.
Enabling the building of sophisticated and intelligent question-answering (Q&A) chatbots, RAG represents a significant milestone in making LLMs more knowledgeable and accurate.
Github Repository - Source code and development
Huggingface Tutorial - Make your own rag
Python LangChain Documentation - Build a Retrieval Augmented Generation App
LangChain VectorStores Documentation
OpenAI Embeddings Guide - Vector Embeddings