langchain
openai
faiss-cpu
streamlit
pypdf
tiktoken
Artificial Intelligence (AI) is transforming business processes across industries.
Startups in 2025 are focusing on AI-driven marketing, customer personalization, and automation.
Digital transformation strategies now include machine learning-based analytics, predictive insights, and chatbots.
AI can help small businesses optimize their sales and reduce costs through smart automation.
(Replace or add PDFs later — build_index supports PDFs if you extend it.)
import os
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
os.environ.setdefault("OPENAI_API_KEY", "your_openai_api_key_here")
def build():
print("Loading documents from data/ ...")
loader = TextLoader("data/business.txt")
docs = loader.load()
print(f"Loaded {len(docs)} documents.")
print("Splitting documents into chunks ...")
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
chunks = splitter.split_documents(docs)
print(f"Created {len(chunks)} chunks.")
print("Creating embeddings and FAISS index (this will call OpenAI embeddings)...")
embeddings = OpenAIEmbeddings()
db = FAISS.from_documents(chunks, embeddings)
db.save_local("faiss_index")
print("✅ FAISS index saved to ./faiss_index")
if name == "main":
build()
Run once before using the app:
python build_index.py
import os
from typing import List, Dict, Any
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
os.environ.setdefault("OPENAI_API_KEY", "your_openai_api_key_here")
class AgentBase:
"""Base class for all agents — provides logging helper."""
def init(self, name: str):
self.name = name
self.log: List[str] = []
def info(self, text: str):
entry = f"[{self.name}] {text}"
self.log.append(entry)
print(entry)
class PlannerAgent(AgentBase):
"""
Planner: analyze user query and decide steps.
For this beginner version, it returns a simple plan dict.
"""
def plan(self, query: str) -> Dict[str, Any]:
self.info(f"Planning for query: {query}")
# Simple static plan: retrieve relevant docs then generate answer.
plan = {
"query": query,
"steps": ["retrieve", "generate", "critic", "summarize"],
"k_retrieval": 3
}
self.info(f"Plan created: {plan}")
return plan
class RetrieverAgent(AgentBase):
"""
Retriever: loads FAISS index and returns top-k documents.
"""
def init(self, name="Retriever"):
super().init(name)
self.embeddings = OpenAIEmbeddings()
index_path = "faiss_index"
if not os.path.isdir(index_path):
raise RuntimeError("FAISS index not found. Run build_index.py first.")
self.db = FAISS.load_local(index_path, self.embeddings, allow_dangerous_deserialization=True)
self.info("Loaded FAISS index.")
def retrieve(self, query: str, k: int = 3) -> List[Dict[str, Any]]:
self.info(f"Retrieving top {k} docs for query: {query}")
results = self.db.similarity_search_with_score(query, k=k)
# results is list of (Document, score)
hits = []
for doc, score in results:
snippet = (doc.page_content[:800] + "...") if len(doc.page_content) > 800 else doc.page_content
hits.append({"content": doc.page_content, "snippet": snippet, "score": float(score)})
self.info(f"Retrieved {len(hits)} hits.")
return hits
class GeneratorAgent(AgentBase):
"""
Generator: uses the retriever and LLM to produce an answer using RAG.
We'll use LangChain's RetrievalQA for a simple implementation.
"""
def init(self, retriever, name="Generator", model_name="gpt-4-turbo"):
super().init(name)
self.retriever = retriever
self.llm = OpenAI(model_name=model_name, temperature=0.2)
# Build a simple RetrievalQA chain on top of the passed retriever object
self.qa = RetrievalQA.from_chain_type(llm=self.llm, chain_type="stuff", retriever=self.retriever, return_source_documents=True)
def generate(self, query: str) -> Dict[str, Any]:
self.info(f"Generating answer for query: {query}")
# run returns dict with 'result' and 'source_documents' when return_source_documents=True
result = self.qa({"query": query})
answer = result.get("result") if isinstance(result, dict) else str(result)
source_docs = result.get("source_documents", [])
# prepare short sources list
sources = []
for d in source_docs:
snippet = (d.page_content[:500] + "...") if len(d.page_content) > 500 else d.page_content
sources.append({"snippet": snippet})
self.info(f"Generated answer (len {len(answer)} chars). Sources: {len(sources)}")
return {"answer": answer, "sources": sources}
class CriticAgent(AgentBase):
"""
Critic: checks answer vs retrieved snippets to reduce hallucination.
Very simple heuristic checks for presence of key terms from query in sources.
"""
def critique(self, query: str, answer: str, sources: List[Dict[str, Any]]) -> Dict[str, Any]:
self.info("Critic checking for hallucination / grounding...")
# Basic heuristic: check if at least one source contains a word from the query (after splitting)
query_terms = [t.lower() for t in query.split() if len(t) > 3]
matched = 0
for s in sources:
text = s.get("snippet", "").lower()
for term in query_terms:
if term in text:
matched += 1
break
grounded = matched > 0
critique = {
"grounded": grounded,
"matched_sources": matched,
"message": "Grounded in retrieved docs." if grounded else "No direct matches found in retrieved snippets — verify facts."
}
self.info(f"Critic result: {critique}")
return critique
class SummarizerAgent(AgentBase):
"""
Summarizer: shortens or formats the final answer and attaches citation info.
"""
def init(self, name="Summarizer", llm_model="gpt-4-turbo"):
super().init(name)
self.llm = OpenAI(model_name=llm_model, temperature=0.2)
def summarize(self, answer: str, sources: List[Dict[str, Any]], max_length: int = 400) -> Dict[str, Any]:
self.info("Summarizing final answer...")
# Simple truncate for beginners — or use the llm to rewrite concisely:
if len(answer) > max_length:
short = answer[:max_length].rsplit(".", 1)[0] + "..."
else:
short = answer
citations = []
for i, s in enumerate(sources[:3], start=1):
snip = s.get("snippet", "")
citations.append({"id": i, "text": snip[:300]})
final = {"summary": short, "citations": citations}
self.info("Summarization done.")
return final
class Orchestrator(AgentBase):
"""
Orchestrator coordinates agents. It returns an execution log and final structured response.
"""
def init(self):
super().init("Orchestrator")
self.planner = PlannerAgent("Planner")
self.retriever = RetrieverAgent("Retriever")
# For generator we pass Retriever's underlying retriever interface (db.as_retriever)
retriever_interface = self.retriever.db.as_retriever(search_kwargs={"k": 3})
self.generator = GeneratorAgent(retriever_interface, name="Generator")
self.critic = CriticAgent("Critic")
self.summarizer = SummarizerAgent("Summarizer")
self.agent_logs = []
def run(self, user_query: str) -> Dict[str, Any]:
self.info(f"Orchestration started for query: {user_query}")
plan = self.planner.plan(user_query)
k = plan.get("k_retrieval", 3)
# 1) Retrieval step
retrieved = self.retriever.retrieve(plan["query"], k=k)
# 2) Generation step
gen_result = self.generator.generate(plan["query"])
answer = gen_result["answer"]
sources = gen_result["sources"]
# 3) Critic step
critique = self.critic.critique(plan["query"], answer, sources)
# 4) Summarize final answer
final = self.summarizer.summarize(answer, sources)
# Build structured response
response = {
"query": user_query,
"plan": plan,
"retrieved": retrieved,
"answer": answer,
"critique": critique,
"final": final,
"logs": {
"planner": self.planner.log,
"retriever": self.retriever.log,
"generator": self.generator.log,
"critic": self.critic.log,
"summarizer": self.summarizer.log,
"orchestrator": self.log
}
}
self.info("Orchestration finished.")
return response
Notes on agents.py:
This single file contains the agent classes and the orchestrator.
Keep your_openai_api_key_here replaced with your real API key via env var or replace default string (better to set environment variable).
The Generator uses RetrievalQA which both retrieves and runs the LLM. Retriever is used separately to show evidence.
import streamlit as st
from agents import Orchestrator
import os
st.set_page_config(page_title="AI Business Assistant - Multi Agent", page_icon="🤖")
st.title("🤖 AI Business Assistant — Multi-Agent System")
st.markdown("""
This demo uses multiple cooperating agents:
Planner → Retriever → Generator → Critic → Summarizer.
Type a business or AI question and press Ask.
""")
query = st.text_input("Enter your question about business, startups, or AI:")
if "orchestrator" not in st.session_state:
st.session_state.orchestrator = Orchestrator()
orch: Orchestrator = st.session_state.orchestrator
if st.button("Ask") and query.strip():
with st.spinner("Agents are working..."):
result = orch.run(query)
# Show final summarized answer
st.subheader("Final (summarized) answer")
st.write(result["final"]["summary"])
st.subheader("Citations (top snippets)")
for c in result["final"]["citations"]:
st.write(f"- ({c['id']}) {c['text'][:500]}")
st.subheader("Full generated answer")
st.write(result["answer"])
st.subheader("Critic verdict")
st.write(result["critique"]["message"])
st.write(result["critique"])
st.subheader("Retrieved snippets (top results)")
for i, r in enumerate(result["retrieved"], start=1):
st.markdown(f"**Hit {i} (score {r['score']:.3f})**")
st.write(r["snippet"])
st.subheader("Agent logs")
for agent_name, logs in result["logs"].items():
st.markdown(f"**{agent_name} logs**")
for l in logs:
st.write(l)
else:
st.info("Type a question above and press Ask.")
This project demonstrates a small multi-agent system (MAS) for a RAG-powered AI Business Assistant.
Agents: Planner, Retriever, Generator, Critic, Summarizer coordinated by an Orchestrator.
ai_business_assistant_mas/pip install -r requirements.txt
export OPENAI_API_KEY="sk-..."
setx OPENAI_API_KEY "sk-..."
python build_index.py
streamlit run app.py
Open http://localhost:8501 in your browser.