API Gateway (FastAPI) β exposes REST / websocket endpoints, auth, rate limiting.
Orchestrator service β receives user request, builds plan, enqueues agent tasks.
Agent workers (microservices / Celery workers) β Retriever, Generator, Critic, Summarizer.
Vector DB β production-ready: Pinecone / Milvus / Weaviate / Qdrant. (Pinecone or Qdrant are easiest to manage.)
LLM provider β OpenAI, or managed model hosting. Use provider clients from secure env vars.
Message broker β Redis (simple) or RabbitMQ (robust). Iβll show Redis+Celery for simplicity.
Persistent DB / audit logs β Postgres for storing queries, user metadata, citations, cost metrics.
Object storage β S3-compatible (AWS S3 / DigitalOcean Spaces) for uploaded files / backups.
Observability β Prometheus + Grafana for metrics, ELK or Loki for logs, Sentry for errors.
CI/CD β GitHub Actions to build and push Docker images and deploy to Kubernetes (EKS/GKE/AKS) or Cloud Run.
Flow: client β FastAPI β orchestrator β enqueue tasks β workers (retrieve β generate β critic β summarize) β Postgres logs β return to client.
Vector DB: Pinecone or Qdrant (Pinecone easiest; Qdrant cheaper/self-host). Donβt use FAISS alone in prod unless you manage persistence & scaling yourself.
LLM: Start with OpenAI (gpt-4o or gpt-4o-mini for cost/speed tradeoffs). Add caching & response truncation.
Workers: Run each agent as a Celery worker type (separate queues) so you can scale generator workers independently (theyβre expensive).
Authentication: OAuth2 / JWT for users. Rate limit per-user.
Secrets: Store API keys in a secrets manager (AWS Secrets Manager / GCP Secret Manager / GitHub Secrets). Never commit keys.
Costs: Track LLM tokens per query; limit prompt size and chunk size; batch or cache frequent queries.
Security: TLS everywhere, store PII hashed/encrypted; review privacy/regulatory needs (GDPR).
Below are ready-to-copy files for a containerized production style using a FastAPI gateway + Celery workers + Redis broker + Postgres + Qdrant/Pinecone optional. First weβll show a simple monorepo layout:
ai_business_assistant_prod/
βββ api/ # FastAPI gateway (exposes REST)
β βββ app.py
β βββ Dockerfile
βββ orchestrator/ # Orchestrator service (enqueues tasks)
β βββ orchestrator.py
β βββ Dockerfile
βββ workers/ # Celery worker that runs agent functions
β βββ tasks.py
β βββ Dockerfile
βββ k8s/ # Kubernetes manifests
β βββ deployment-api.yaml
β βββ deployment-worker.yaml
β βββ ...
βββ docker-compose.yaml
βββ requirements.txt
βββ infra/ # (optional) Terraform / Cloud config
A β requirements.txt
fastapi
uvicorn[standard]
celery[redis]
redis
requests
python-dotenv
pydantic
sqlalchemy
psycopg2-binary
openai
langchain
qdrant-client
pinecone-client
loguru
sentry-sdk
prometheus-client
gunicorn
B β FastAPI gateway: api/app.py
import os
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from redis import Redis
from celery import Celery
import requests
from loguru import logger
CELERY_BROKER = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0")
CELERY_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/1")
celery = Celery("api", broker=CELERY_BROKER, backend=CELERY_BACKEND)
app = FastAPI(title="AI Business Assistant - API")
class QueryRequest(BaseModel):
user_id: str
query: str
@app.post("/v1/query")
def submit_query(req: QueryRequest):
# Basic validation
if not req.query or not req.user_id:
raise HTTPException(status_code=400, detail="user_id and query required")
# Enqueue orchestration task (orchestrator handles enqueueing agent subtasks)
task = celery.send_task("orchestrator.run_orchestration", args=[req.user_id, req.query])
logger.info(f"Enqueued orchestration task {task.id} for user {req.user_id}")
return {"task_id": task.id, "status": "queued"}
@app.get("/v1/result/{task_id}")
def get_result(task_id: str):
res = celery.AsyncResult(task_id)
if res.state == "PENDING":
return {"status": "pending"}
if res.state in ("STARTED", "RETRY"):
return {"status": res.state}
if res.state == "SUCCESS":
return {"status": "success", "result": res.result}
return {"status": res.state, "info": str(res.info)}
Notes: The FastAPI app only enqueues a top-level orchestration task. You can also make the API block/wait with a timeout or use websockets for updates.
C β Celery tasks / worker: workers/tasks.py
import os
from celery import Celery, Task
from loguru import logger
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Qdrant
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
BROKER = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0")
BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/1")
celery = Celery("workers", broker=BROKER, backend=BACKEND)
OPENAI_KEY = os.getenv("OPENAI_API_KEY")
class BaseTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {"max_retries": 3, "countdown": 5}
retry_backoff = True
@celery.task(base=BaseTask, name="orchestrator.run_orchestration")
def run_orchestration(user_id: str, query: str):
"""
High-level orchestration: decide plan, run retriever -> generator -> critic -> summarizer.
Keep this function small; in real world, orchestrator may enqueue subtasks separately.
"""
logger.info(f"Orchestration started for user {user_id}: {query}")
# 1) Retriever (vector DB)
embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_KEY)
# Example with Qdrant; adapt to your vector DB choice
qdrant_url = os.getenv("QDRANT_URL", "http://qdrant:6333")
db = Qdrant(client=None, url=qdrant_url, collection_name="business", embeddings=embeddings)
retriever = db.as_retriever(search_kwargs={"k": 4})
# 2) Generator (LLM) using RetrievalQA (LangChain)
llm = OpenAI(openai_api_key=OPENAI_KEY, model_name="gpt-4o", temperature=0.2)
qa = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True)
result = qa({"query": query})
answer = result.get("result")
source_docs = result.get("source_documents", [])
# 3) Critic (simple heuristic)
grounded = False
for d in source_docs:
if any(w in d.page_content.lower() for w in query.lower().split()):
grounded = True
break
# 4) Summarize (optionally call LLM again)
summary = answer if len(answer) < 600 else answer[:600] + "..."
# 5) Store to DB / audit (sketch)
# TODO: Implement Postgres insertion
logger.info(f"Orchestration finished. Grounded={grounded}")
response = {
"user_id": user_id,
"query": query,
"answer": answer,
"summary": summary,
"grounded": grounded,
"sources": [d.page_content[:400] for d in source_docs],
}
return response
Notes: This simple orchestration runs synchronously inside the Celery task. For longer or more complex flows, split into separate Celery tasks per agent and use task chaining / chord to coordinate.
D β Dockerfiles
api/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY ./requirements.txt /app/
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY ./api /app/api
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "api.app
", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]workers/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY ./requirements.txt /app/
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY ./workers /app/workers
ENV PYTHONUNBUFFERED=1
CMD ["celery", "-A", "workers.tasks", "worker", "--loglevel=info", "--concurrency=1"]
E β docker-compose.yaml (local / staging)
version: "3.8"
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: app
POSTGRES_PASSWORD: example
POSTGRES_DB: app_db
volumes:
- pgdata:/var/lib/postgresql/data
ports:
- "5432:5432"
qdrant:
image: qdrant/qdrant
api:
build:
context: .
dockerfile: api/Dockerfile
depends_on:
- redis
- qdrant
- postgres
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
ports:
- "8000:8000"
worker:
build:
context: .
dockerfile: workers/Dockerfile
depends_on:
- redis
- qdrant
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
volumes:
pgdata:
qdrant_storage:
Usage (local):
export OPENAI_API_KEY="sk-..."
docker compose up --build
F β Kubernetes snippets (production)
Below are minimal manifests. Youβll want to parameterize with Helm in a real deployment.
k8s/deployment-api.yaml
apiVersion: v1
kind: Service
metadata:
name: ai-api-svc
spec:
selector:
app: ai-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
k8s/deployment-worker.yaml (for Celery workers)
apiVersion: apps/v1
kind: Deployment
metadata:
name: ai-worker
spec:
replicas: 2
selector:
matchLabels:
app: ai-worker
template:
metadata:
labels:
app: ai-worker
spec:
containers:
- name: worker
image: ghcr.io/yourorg/ai-worker
Notes: Use Kubernetes HorizontalPodAutoscaler (HPA) to scale generator worker replicas based on CPU / custom metrics (e.g., queue length). Use KEDA to autoscale based on Redis queue length.
G β GitHub Actions (CI): .github/workflows/ci-cd.yaml
name: CI/CD
on:
push:
branches: [ main ]
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build API image
run: |
docker build -t ghcr.io/${{ github.repository_owner }}/ai-api:latest -f api/Dockerfile .
docker build -t ghcr.io/${{ github.repository_owner }}/ai-worker:latest -f workers/Dockerfile .
- name: Log in to GitHub Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Push images
run: |
docker push ghcr.io/${{ github.repository_owner }}/ai-api:latest
docker push ghcr.io/${{ github.repository_owner }}/ai-worker:latest
deploy:
runs-on: ubuntu-latest
needs: build-and-push
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to Kubernetes
uses: appleboy/ssh-action@v0.1.6
with:
host:
key:
Note: Replace the deploy step with your cloud providerβs GitHub Action (e.g., GKE, EKS, or use kubectl with kubeconfig stored as secret).
Logging: Use structured logs (JSON). Send to Loki or ELK stack. Add request_id/correlation ID for tracing.
Metrics: Expose Prometheus metrics from FastAPI and Celery (task counts, queue length, worker concurrency). Track:
LLM token usage per request
Latency (retrieval time, LLM time)
Error rate per user
Tracing: Use OpenTelemetry to trace requests through API β orchestrator β workers β LLM.
Alerts: CPU/memory, queue backlog, latency SLO breaches, LLM API error spikes.
Cache answers for repeated queries β use Redis with TTL.
Limit context: store chunk size around 500β1000 tokens.
Batch retrieval & embeddings for multiple requests.
Use cheaper LLMs for drafts (gpt-4o-mini) and only call larger models for final polishing.
Use streaming or progressive responses to show partial answers early.
Monitor token usage and set per-user/tenant quotas.
PII: Never send PII to LLM providers unless contractually allowed. Hash/anonymize before embedding.
Data retention: define retention policies for logs and vector DB.
GDPR: allow users to delete their data; keep a deletion pipeline that deletes vector embeddings and DB rows.
Encryption: encrypt secrets at rest; use TLS in transit.
Create an ingestion pipeline (Airflow / simple cron job):
fetch_files() β extract_text() β split_and_embed() β upsert_to_vector_db()
Run periodically or trigger on file upload.
Keep metadata (source URL, doc id, timestamp) stored in Postgres for traceability.
Unit tests for agent logic (mock LLM & vector DB).
Integration tests using staging LLM keys / small models.
E2E tests for orchestration: submit query β verify final answer exists, audit logs saved.
Use contract tests for service interfaces (e.g., API returns 200 & task_id).
Staging: 2 small VMs (api + worker), small Redis, Qdrant single node β cost: ~$30β100/mo depending on cloud.
Production initial: Load balancer + 2-3 API pods, 3 worker pods (gen), Redis managed, Pinecone proto β cost: $200β600/mo depending on LLM usage.
Scale: If many users, scale generator workers vertically and consider LLM batching/async modes.
β A complete monorepo zipped with the files above (API, workers, Dockerfiles, docker-compose).
β Split Celery tasks into separate agent tasks (retrieve β generate β critic β summarize) with Celery chains & chords.
β Helm chart to deploy to Kubernetes with values for secrets, resource requests/limits.
β Terraform skeleton to provision infra (EKS/GKE, RDS, Redis, S3, Pinecone setup guide).
β Full CI/CD pipeline (GitHub Actions) tuned for your cloud provider.
β Staging deploy guide to Streamlit Cloud/Hugging Face (for demo) vs. Kubernetes (prod).
ai_business_assistant_prod_split/
βββ api/
β βββ app.py
β βββ Dockerfile
βββ orchestrator/
β βββ orchestrator.py
β βββ Dockerfile
βββ workers/
β βββ tasks.py
β βββ Dockerfile
βββ data/
β βββ business.txt
βββ faiss_index/ # optional (created by build_index.py)
βββ build_index.py
βββ docker-compose.yml
βββ requirements.txt
βββ README.md
fastapi
uvicorn[standard]
celery[redis]
redis
python-dotenv
pydantic
openai
langchain
faiss-cpu
qdrant-client
loguru
psycopg2-binary
sqlalchemy
prometheus-client
version: "3.8"
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
qdrant:
image: qdrant/qdrant
api:
build:
context: ./api
dockerfile: Dockerfile
depends_on:
- redis
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
ports:
- "8000:8000"
orchestrator:
build:
context: ./orchestrator
dockerfile: Dockerfile
depends_on:
- redis
- qdrant
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
worker:
build:
context: ./workers
dockerfile: Dockerfile
depends_on:
- redis
- qdrant
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
command: ["celery", "-A", "tasks.celery", "worker", "--loglevel=info", "-Q", "retrieval,generation,critique,summarize,orch"]
volumes:
qdrant_storage:
import os
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
os.environ.setdefault("OPENAI_API_KEY", "your_openai_api_key_here")
def build():
loader = TextLoader("data/business.txt")
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
chunks = splitter.split_documents(docs)
embed = OpenAIEmbeddings()
db = FAISS.from_documents(chunks, embed)
db.save_local("faiss_index")
print("FAISS index built in ./faiss_index")
if name == "main":
build()
Repo layout
ai_business_assistant_prod_split/
βββ api/
β βββ app.py
β βββ Dockerfile
βββ orchestrator/
β βββ orchestrator.py
β βββ Dockerfile
βββ workers/
β βββ tasks.py
β βββ Dockerfile
βββ data/
β βββ business.txt
βββ faiss_index/ # optional (created by build_index.py)
βββ build_index.py
βββ docker-compose.yml
βββ requirements.txt
βββ README.md
fastapi
uvicorn[standard]
celery[redis]
redis
python-dotenv
pydantic
openai
langchain
faiss-cpu
qdrant-client
loguru
psycopg2-binary
sqlalchemy
prometheus-client
version: "3.8"
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
qdrant:
image: qdrant/qdrant
api:
build:
context: ./api
dockerfile: Dockerfile
depends_on:
- redis
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
ports:
- "8000:8000"
orchestrator:
build:
context: ./orchestrator
dockerfile: Dockerfile
depends_on:
- redis
- qdrant
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
worker:
build:
context: ./workers
dockerfile: Dockerfile
depends_on:
- redis
- qdrant
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- OPENAI_API_KEY=${OPENAI_API_KEY}
- QDRANT_URL=http://qdrant:6333
command: ["celery", "-A", "tasks.celery", "worker", "--loglevel=info", "-Q", "retrieval,generation,critique,summarize,orch"]
volumes:
qdrant_storage:
import os
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
os.environ.setdefault("OPENAI_API_KEY", "your_openai_api_key_here")
def build():
loader = TextLoader("data/business.txt")
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
chunks = splitter.split_documents(docs)
embed = OpenAIEmbeddings()
db = FAISS.from_documents(chunks, embed)
db.save_local("faiss_index")
print("FAISS index built in ./faiss_index")
if name == "main":
build()
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery import Celery
app = FastAPI(title="AI Business Assistant - API")
CELERY_BROKER = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0")
CELERY_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/1")
celery = Celery("api", broker=CELERY_BROKER, backend=CELERY_BACKEND)
class QueryRequest(BaseModel):
user_id: str
query: str
@app.post("/v1/query")
def submit_query(req: QueryRequest):
if not req.query.strip():
raise HTTPException(status_code=400, detail="Empty query")
# enqueue high-level orchestrator task (runs chain of agent tasks)
task = celery.send_task("tasks.run_orchestration", args=[req.user_id, req.query], queue="orch")
return {"task_id": task.id, "status": "queued"}
@app.get("/v1/result/{task_id}")
def get_result(task_id: str):
res = celery.AsyncResult(task_id)
if res.state == "PENDING":
return {"status": "pending"}
if res.state in ("STARTED", "RETRY"):
return {"status": res.state}
if res.state == "SUCCESS":
return {"status": "success", "result": res.result}
return {"status": res.state, "info": str(res.info)}
api/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY ../requirements.txt /app/
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY . /app
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app
from celery import Celery, chain
import os
CELERY_BROKER = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0")
CELERY_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/1")
celery = Celery("orch", broker=CELERY_BROKER, backend=CELERY_BACKEND)
def start_orchestration(user_id: str, query: str):
# Build chain: retrieve -> generate -> critic -> summarize
task_chain = chain(
celery.signature("tasks.retrieve_task", args=[user_id, query], queue="retrieval"),
celery.signature("tasks.generate_task", args=[user_id], queue="generation"),
celery.signature("tasks.critic_task", args=[user_id], queue="critique"),
celery.signature("tasks.summarize_task", args=[user_id], queue="summarize")
)
res = task_chain.apply_async()
return res.id
if name == "main":
import sys
if len(sys.argv) >= 3:
print(start_orchestration(sys.argv[1], " ".join(sys.argv[2:])))
else:
print("usage: python orchestrator.py <user_id> <query...>")
orchestrator/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY ../requirements.txt /app/
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY . /app
ENV PYTHONUNBUFFERED=1
CMD ["python", "orchestrator.py"]
This file contains Celery app and task definitions: retrieve_task, generate_task, critic_task, summarize_task, and a convenience run_orchestration that chains them if called directly.
import os
from celery import Celery, Task, chain
from loguru import logger
from typing import List, Dict, Any
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS, Qdrant
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
BROKER = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0")
BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/1")
OPENAI_KEY = os.getenv("OPENAI_API_KEY")
celery = Celery("tasks", broker=BROKER, backend=BACKEND)
class BaseTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {"max_retries": 2, "countdown": 5}
retry_backoff = True
def load_vectorstore():
"""
Try FAISS local first, else Qdrant (production). Returns a retriever object.
"""
embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_KEY)
# Prefer FAISS (local) if index exists
if os.path.isdir("faiss_index"):
db = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
return db.as_retriever(search_kwargs={"k": 3})
# Else try Qdrant
qdrant_url = os.getenv("QDRANT_URL")
if qdrant_url:
client = None # langchain's Qdrant wrapper uses client if provided; keep None to use default
db = Qdrant(client=client, url=qdrant_url, collection_name="business", embeddings=embeddings)
return db.as_retriever(search_kwargs={"k": 3})
raise RuntimeError("No vectorstore available (create faiss_index or set QDRANT_URL)")
@celery.task(base=BaseTask, name="tasks.retrieve_task")
def retrieve_task(user_id: str, query: str) -> Dict[str, Any]:
"""
Returns: {'query': query, 'hits': [{'content':..., 'score':...},...]}
"""
logger.info(f"[Retriever] user={user_id} query={query}")
retriever = load_vectorstore()
hits = retriever.get_relevant_documents(query) if hasattr(retriever, "get_relevant_documents") else retriever.search(query, k=3)
formatted = []
for d in hits:
snippet = (d.page_content[:800] + "...") if len(d.page_content) > 800 else d.page_content
formatted.append({"content": d.page_content, "snippet": snippet})
logger.info(f"[Retriever] found {len(formatted)} hits")
return {"query": query, "hits": formatted}
@celery.task(base=BaseTask, name="tasks.generate_task")
def generate_task(retrieve_res: Dict[str, Any]) -> Dict[str, Any]:
"""
Input: output of retrieve_task
Returns: {'answer': ..., 'sources': [...]}
"""
query = retrieve_res["query"]
logger.info(f"[Generator] generating answer for: {query}")
retriever = load_vectorstore()
llm = OpenAI(openai_api_key=OPENAI_KEY, model_name="gpt-4o", temperature=0.2)
qa = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True)
result = qa({"query": query})
answer = result.get("result") if isinstance(result, dict) else str(result)
source_docs = result.get("source_documents", [])
sources = [{"snippet": (d.page_content[:500] + "...") if len(d.page_content) > 500 else d.page_content} for d in source_docs]
logger.info(f"[Generator] produced answer length {len(answer)} and {len(sources)} source docs")
return {"query": query, "answer": answer, "sources": sources}
@celery.task(base=BaseTask, name="tasks.critic_task")
def critic_task(gen_res: Dict[str, Any]) -> Dict[str, Any]:
"""
Checks grounding: basic heuristics; can be replaced with LLM-based verifier.
Returns gen_res augmented with 'critique'
"""
query = gen_res["query"]
answer = gen_res["answer"]
sources = gen_res.get("sources", [])
terms = [t.lower() for t in query.split() if len(t) > 3]
matched = 0
for s in sources:
text = s.get("snippet", "").lower()
for term in terms:
if term in text:
matched += 1
break
grounded = matched > 0
critique = {"grounded": grounded, "matched_sources": matched,
"message": "Grounded" if grounded else "No matches in sources - verify"}
logger.info(f"[Critic] critique: {critique}")
gen_res["critique"] = critique
return gen_res
@celery.task(base=BaseTask, name="tasks.summarize_task")
def summarize_task(critic_res: Dict[str, Any]) -> Dict[str, Any]:
"""
Final formatting: create summary & citations. Returns final payload.
"""
answer = critic_res["answer"]
sources = critic_res.get("sources", [])
# simple truncate summary for now
summary = answer if len(answer) <= 600 else answer[:600].rsplit(".", 1)[0] + "..."
citations = [{"id": i+1, "text": s["snippet"][:300]} for i, s in enumerate(sources[:3])]
final = {
"query": critic_res["query"],
"summary": summary,
"answer": answer,
"critique": critic_res.get("critique", {}),
"citations": citations
}
logger.info(f"[Summarizer] finalizing result for query: {critic_res['query']}")
return final
@celery.task(base=BaseTask, name="tasks.run_orchestration")
def run_orchestration(user_id: str, query: str):
"""
Runs the full chain synchronously via Celery chord/chain and returns final result.
This task will create a chain (retrieve->generate->critic->summarize) and wait for it to complete.
"""
logger.info(f"[Orchestrator-Task] start orchestration user={user_id}")
# Build chain: note that Celery expects signatures. We pass args progressively by letting tasks accept previous result.
result = chain(
retrieve_task.s(user_id, query).set(queue="retrieval"),
generate_task.s().set(queue="generation"),
critic_task.s().set(queue="critique"),
summarize_task.s().set(queue="summarize")
)()
# .get() blocks until result; Celery will wait under the backend
final = result.get()
logger.info(f"[Orchestrator-Task] finished user={user_id}")
return final
workers/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY ../requirements.txt /app/
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY . /app
ENV PYTHONUNBUFFERED=1
CMD ["celery", "-A", "tasks.celery", "worker", "--loglevel=info"]
Note: the worker service in docker-compose overrides the command to set proper queues and concurrency.
Artificial Intelligence (AI) is transforming business processes across industries.
Startups in 2025 are focusing on AI-driven marketing, customer personalization, and automation.
Digital transformation strategies include machine learning-based analytics, predictive insights, and chatbots.
AI can help small businesses optimize sales and reduce costs through smart automation.
This project runs a multi-agent RAG pipeline with tasks separated into Celery queues:
retrieval -> generation -> critique -> summarize.
.env with:OPENAI_API_KEY=sk-...
docker compose up --build
Use API:
POST http://localhost:8000/v1/query
Body: {"user_id":"u1","query":"What are AI business trends in 2025?"}
Poll result:
GET http://localhost:8000/v1/result/<task_id>
faiss_index). Create it with python build_index.py.Secrets: Donβt store API keys in code β use environment variables or secret stores.
Rate limiting & auth: Add JWT auth to API and per-user rate limits.
Task timeouts: Configure Celery task time limits and retries.
Observability: Add Prometheus metrics & structured logs.
Scaling: Run multiple worker replicas, scale generator workers separately because LLM calls are the expensive part.
Convert this to Kubernetes + Helm manifests (ready for EKS/GKE).
Add Postgres audit logging for queries & citations (with SQLAlchemy models and migrations).
Replace FAISS with a Pinecone or Qdrant upsert example and ingestion pipeline.
Add a simple web UI (Streamlit) that calls the API and streams progress via websockets.
Add unit tests & mocking for LLMs and vector DB to enable CI.
API available: http://localhost:8000/v1/query
API available: http://localhost:8000/v1/query