The Deep Research Agent is a sophisticated multi-agent system designed to conduct comprehensive research on user-specified topics within a chosen domain. It leverages AI-driven tools, including the Meta LLaMA model, to generate targeted research questions, perform in-depth analysis, and compile findings into a professional report. This project demonstrates the system's production readiness, safety, and usability, making it an ideal tool for researchers, analysts, and professionals seeking data-driven insights.
The Deep Research Agent is a powerful, AI-powered tool for conducting deep research. It integrates the Meta LLaMA large language model with external tools like Tavily for web searches, ensuring accurate and up-to-date information. The system is built to be user-friendly, supporting iterative research refinement through a Streamlit-based interface. It automates the entire research pipeline, from question generation to report creation and storage in Google Docs.

The project is organized in a clean, modular structure for easy maintenance and scalability. Here's the directory tree:
deep-research-agent/
├── .env.example # Example environment variables file for configuration
├── .gitignore # Git ignore file to exclude unnecessary files
├── .gitattributes # Git attributes for handling file types
├── LICENSE # Project license file (e.g., MIT or Apache)
├── README.md # Main README with setup instructions and usage
├── Dockerfile # Docker configuration for containerization
├── docker-compose.yml # Docker Compose for multi-container setups
├── pytest.ini # Configuration for pytest testing framework
├── requirements.txt # List of Python dependencies
├── notebook/ # Jupyter notebooks for experimentation
│ └── deep_research.ipynb # Notebook for interactive deep research demos
├── tests/ # Test suite directory
│ ├── __init__.py # Init file for tests package
│ ├── conftest.py # Pytest fixtures and configurations
│ ├── unit/ # Unit tests
│ │ ├── __init__.py
│ │ ├── test_nodes.py # Tests for individual nodes
│ │ └── test_tools.py # Tests for tools like LLM and Composio
│ ├── integration/ # Integration tests
│ │ ├── __init__.py
│ │ └── test_workflow.py # Tests for workflow integration
│ └── system/ # System/end-to-end tests
│ ├── __init__.py
│ └── test_e2e.py # Full system tests including UI
└── src/ # Source code directory
├── __init__.py # Init file for src package
├── app.py # Streamlit application entry point
├── graph.py # LangGraph workflow definition
├── state.py # State management for the agent
├── config.py # Configuration management
├── resilience.py # Resilience patterns
├── monitoring/ # Monitoring tools
│ ├── __init__.py
│ ├── logger.py # Custom logging module
│ └── metrics.py # Metrics tracking (e.g., performance)
│ └── health.py # Health check endpoints
├── guardrails/ # Safety and validation modules
│ ├── __init__.py
│ ├── input_validator.py # Input validation logic
│ └── prompt_injection.py # Prompt injection detection
├── nodes/ # Agent nodes (research steps)
│ ├── __init__.py
│ └── nodes.py # Definitions of nodes like research_agent_node
├── tools/ # AI tools and integrations
│ ├── __init__.py
│ ├── llm.py # LLM wrapper (e.g., for Meta LLaMA)
│ └── composio_tools.py # Tools for external integrations like Google Docs
└── prompts.py # Prompt templates for the LLM
This structure follows best practices: separating concerns into source code, tests, and documentation.
Below is a flowchart illustrating the workflow of the Deep Research Agent. It starts with user input, proceeds through question generation, research, and ends with report creation and storage.
The workflow is cyclic if refinements are needed, ensuring iterative improvement.
Comprehensive testing ensures reliability. Tests are divided into unit, integration, and system levels using pytest.
Unit tests isolate and verify individual components.
# tests/unit/test_nodes.py from src.nodes.nodes import research_agent_node def test_research_agent_node_returns_dict(): state = {"topic": "AI in health", "domain": "Health"} result = research_agent_node(state) assert isinstance(result, dict) assert "questions" in result or "report" in result
Explanation: This test checks if the research_agent_node function processes a state dictionary (containing topic and domain) and returns a dictionary with expected keys like "questions" or "report". It ensures the node outputs the correct data structure without side effects.
Integration tests verify component interactions.
# tests/integration/test_workflow.py from src.graph import build_graph def test_workflow_end_to_end(): graph = build_graph() state = graph.invoke({"topic": "AI in health", "domain": "Health"}) assert "report" in state assert "<html>" in state["report"].lower()
Explanation: This test builds the full LangGraph workflow, invokes it with sample input, and asserts that the output state includes a "report" key with HTML content. It simulates the entire pipeline to catch integration issues.
System tests validate the end-to-end application, including the UI.
# tests/system/test_e2e.py import subprocess import time import requests def test_streamlit_ui_loads(): proc = subprocess.Popen(["streamlit", "run", "src/app.py", "--server.headless=true"]) time.sleep(10) # give it time to boot try: resp = requests.get("http://localhost:8501", timeout=5) assert resp.status_code == 200 assert "Deep Research Agent" in resp.text finally: proc.terminate() proc.wait()
Explanation: This test launches the Streamlit app in headless mode, waits for it to start, and sends an HTTP request to verify the UI loads correctly (status 200) and contains the expected title. It ensures the full system, including the web interface, functions as intended.
Prevents invalid or malicious inputs.
# guardrails/input_validator.py def validate_input(user_text: str) -> bool: MAX_LEN = 200 REJECTED_KEYWORDS = ["drop table", "delete from", "<script"] if not isinstance(user_text, str) or len(user_text) > MAX_LEN: return False lowered = user_text.lower() return not any(bad in lowered for bad in REJECTED_KEYWORDS)
Explanation: This function checks if the input is a string under 200 characters and doesn't contain SQL injection or XSS keywords. It returns True for safe inputs, blocking potential attacks early.
Detects and blocks prompt injection attempts.
# guardrails/prompt_injection.py import re PROMPT_INJ_PATTERN = re.compile(r"\b(ignore|disregard|forget|override).*\b(previous|instruction|prompt)\b", re.IGNORECASE) def detect_prompt_injection(text: str) -> bool: return bool(PROMPT_INJ_PATTERN.search(text))
Explanation: Uses regex to scan for patterns like "ignore previous instructions," common in prompt injections. Returns True if detected, allowing the system to block harmful prompts.
The Streamlit UI provides a simple, guided experience with enhanced error handling and metrics display.
# src/app.py # src/app.py import streamlit as st import time import json import traceback from typing import Dict, Any, Optional, List, Tuple from graph import WorkflowManager, WorkflowError, WorkflowState from monitoring.logger import get_logger from guardrails.input_validator import validate_input from guardrails.prompt_injection import detect_prompt_injection from nodes.nodes import NodeError, MaxRetriesExceededError # Constants MAX_RETRIES = 3 RETRY_DELAY = 1 # seconds # Initialize workflow manager workflow_manager = WorkflowManager() logger = get_logger("streamlit_ui") def initialize_session_state() -> None: """Initialize the session state variables if they don't exist.""" defaults = { 'research_started': False, 'research_complete': False, 'error_occurred': False, 'error_message': "", 'error_count': 0, 'last_error': None, 'error_traceback': None, 'show_technical_details': False, 'workflow_state': None, 'execution_metrics': { 'start_time': None, 'end_time': None, 'duration': None, 'nodes_executed': 0, 'retries': 0 } } for key, value in defaults.items(): if key not in st.session_state: st.session_state[key] = value def validate_inputs(topic: str, domain: str) -> tuple[bool, str]: """Validate user inputs. Args: topic: The research topic domain: The research domain/industry Returns: tuple: (is_valid, error_message) """ if not topic.strip(): return False, "Topic cannot be empty." if not domain.strip(): return False, "Domain cannot be empty." if not validate_input(topic) or not validate_input(domain): return False, "Invalid input detected. Please avoid special characters." if detect_prompt_injection(topic) or detect_prompt_injection(domain): logger.warning("Blocked suspicious input: %s | %s", topic, domain) return False, "Invalid input detected. Please try different keywords." return True, "" def run_research_workflow(topic: str, domain: str) -> Dict[str, Any]: """Run the research workflow with enhanced resilience patterns. Args: topic: The research topic domain: The research domain/industry Returns: dict: Result containing report, doc_url, or error information """ # Update execution metrics st.session_state.execution_metrics['start_time'] = time.time() try: # Execute the workflow using the workflow manager result = workflow_manager.execute_workflow(topic.strip(), domain.strip()) # Update execution metrics st.session_state.execution_metrics['end_time'] = time.time() st.session_state.execution_metrics['duration'] = ( st.session_state.execution_metrics['end_time'] - st.session_state.execution_metrics['start_time'] ) if result.get('node_statuses'): st.session_state.execution_metrics['nodes_executed'] = len([ node for node in result['node_statuses'].values() if node['status'] in ['completed', 'failed'] ]) st.session_state.execution_metrics['retries'] = sum( node.get('attempts', 1) - 1 for node in result['node_statuses'].values() ) # Store workflow state for debugging st.session_state.workflow_state = result # Log the workflow execution status if result["success"]: logger.info("Workflow completed successfully") else: logger.error("Workflow failed: %s", result.get('error', 'Unknown error')) return result except WorkflowError as e: error_msg = f"Workflow execution failed: {str(e)}" logger.error(error_msg, exc_info=True) return { "success": False, "error": error_msg, "type": "workflow_error" } except Exception as e: error_msg = f"An unexpected error occurred: {str(e)}" error_traceback = traceback.format_exc() logger.error("%s\n%s", error_msg, error_traceback) # Store error details for technical view st.session_state.last_error = str(e) st.session_state.error_traceback = error_traceback return { "success": False, "error": error_msg, "type": "unexpected_error" }
Explanation
script sets up the Streamlit app with input fields for topic and domain. On button click, it validates inputs, runs the workflow via LangGraph, displays a spinner during processing, and renders the HTML report with a download option. Safety checks are integrated to prevent errors or attacks.Catches and logs exceptions gracefully with circuit breaker patterns.
# src/nodes/nodes.py # src/nodes/nodes.py from monitoring.logger import get_logger from monitoring.metrics import timed from guardrails.input_validator import validate_input from guardrails.prompt_injection import detect_prompt_injection from resilience import retry_with_backoff, CircuitBreaker @timed @retry_with_backoff(max_retries=2) @CircuitBreaker(failure_threshold=3, recovery_timeout=60) def research_agent_node(state: GraphState) -> GraphState: """Conduct research based on generated questions. Args: state: Current graph state containing 'topic', 'domain', and 'questions' Returns: Updated state with 'report' key containing research findings Raises: NodeError: If research fails or no questions are provided """ try: topic = state["topic"].strip() questions = state.get("questions", []) if not questions: raise NodeError("No research questions provided") # Check cache for existing report questions_hash = hash(tuple(questions)) cache_key = f"report:{topic}:{questions_hash}" cached_report = cache.get(cache_key) if cached_report: logger.info("Using cached report for topic: %s", topic) return {**state, "report": cached_report} # Process questions in parallel for better performance findings = process_questions_parallel(topic, questions) if not findings: raise NodeError("No research findings were generated") # Generate the report report = generate_research_report(findings) # Cache the report cache.set(cache_key, report) logger.info("Successfully generated report, length=%d chars", len(report)) return {**state, "report": report} except Exception as e: logger.error("Error in research_agent_node: %s", str(e), exc_info=True) raise NodeError(f"Research failed: {str(e)}") from e
Explanation: This module implements a Streamlit-based web interface for the Deep Research Agent, providing a user-friendly frontend for initiating research workflows. It handles user input validation, manages session state, executes research workflows through the WorkflowManager, displays progress indicators, and renders research results with error handling and execution metrics. The app includes guardrails against prompt injection and provides comprehensive error reporting with technical details for debugging.
# src/resilience.py # src/resilience.py def retry_with_backoff( func: Callable[..., R], max_retries: int = DEFAULT_MAX_RETRIES, initial_delay: float = DEFAULT_INITIAL_DELAY, max_delay: float = DEFAULT_MAX_DELAY, exponential_base: float = 2.0, jitter: bool = True, exceptions: tuple = (Exception,), on_retry: Optional[Callable[[int, Exception], None]] = None, ) -> Callable[..., R]: @wraps(func) def wrapper(*args, **kwargs): delay = initial_delay last_exception = None for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except exceptions as e: last_exception = e if attempt == max_retries: logger.error(f"Max retries ({max_retries}) exceeded.") raise MaxRetriesExceededError( f"Max retries ({max_retries}) exceeded. Last error: {str(e)}" ) from e # Calculate next delay with exponential backoff and jitter delay = min(delay * (exponential_base ** attempt), max_delay) if jitter: import random delay = random.uniform(0, delay) if on_retry: on_retry(attempt + 1, e) logger.warning( f"Attempt {attempt + 1}/{max_retries} failed. " f"Retrying in {delay:.2f}s. Error: {str(e)}" ) time.sleep(delay) return wrapper class CircuitBreaker: def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 300.0, # 5 minutes name: str = "default", ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.name = name self._failures = 0 self._state = "CLOSED" self._last_failure_time = None self._test_mode = False @property def state(self) -> str: """Get the current state of the circuit breaker.""" if self._state == "OPEN" and self._should_try_recovery(): self._state = "HALF-OPEN" self._test_mode = True return self._state def _should_try_recovery(self) -> bool: """Check if we should attempt to recover from an open state.""" if self._state != "OPEN" or not self._last_failure_time: return False time_since_failure = time.time() - self._last_failure_time return time_since_failure >= self.recovery_timeout def record_failure(self): """Record a failed operation.""" self._failures += 1 self._last_failure_time = time.time() if self._failures >= self.failure_threshold: self._state = "OPEN" logger.warning( f"Circuit breaker '{self.name}' is now OPEN. " f"Failures: {self._failures}" ) def record_success(self): """Record a successful operation.""" if self._state == "HALF-OPEN" and self._test_mode: # Test operation succeeded, close the circuit self._reset() logger.info(f"Circuit breaker '{self.name}' is now CLOSED.") else: # Reset failure count on success self._failures = max(0, self._failures - 1) def _reset(self): """Reset the circuit breaker to its initial state.""" self._failures = 0 self._state = "CLOSED" self._last_failure_time = None self._test_mode = False def __call__(self, func: Callable[..., R]) -> Callable[..., R]: """Use the circuit breaker as a decorator.""" @wraps(func) def wrapper(*args, **kwargs): # Check circuit state if self.state == "OPEN": raise CircuitOpenError( f"Circuit '{self.name}' is OPEN. " f"Last failure: {self._last_failure_time}" ) try: result = func(*args, **kwargs) self.record_success() return result except Exception as e: self.record_failure() raise return wrapper
Explanation: This module implements comprehensive resilience patterns for AI applications, including retry mechanisms with exponential backoff, circuit breakers for fault tolerance, resource usage limits, state validation, and iteration controls. It provides decorators and context managers to make operations more robust against failures, timeouts, and resource constraints, ensuring AI systems can gracefully handle errors and recover from temporary issues.
Provides detailed, formatted logs for debugging with structured logging.
# monitoring/logger.py # monitoring/logger.py import logging import json from datetime import datetime def get_logger(name: str) -> logging.Logger: logger = logging.getLogger(name) logger.setLevel(logging.INFO) # Console handler with structured format console_handler = logging.StreamHandler() console_formatter = logging.Formatter( "%(asctime)s | %(levelname)s | %(name)s | %(message)s" ) console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # File handler for persistent logs file_handler = logging.FileHandler(f"logs/{name}_{datetime.now().strftime('%Y%m%d')}.log") file_formatter = logging.Formatter( "%(asctime)s | %(levelname)s | %(name)s | %(funcName)s:%(lineno)d | %(message)s" ) file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) return logger def log_structured_event(logger: logging.Logger, event_type: str, data: dict, level: str = "info"): """Log structured events with consistent formatting.""" log_data = { "timestamp": datetime.now().isoformat(), "event_type": event_type, "data": data } if level == "info": logger.info(json.dumps(log_data)) elif level == "warning": logger.warning(json.dumps(log_data)) elif level == "error": logger.error(json.dumps(log_data))
Explanation: This factory function creates a logger with INFO level, streaming output to console in a timestamped format. It's used across modules for consistent monitoring of events, warnings, and errors.
Provides comprehensive health monitoring for production deployment.
# monitoring/health.py """Health check endpoints for the application.""" from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import JSONResponse from typing import Dict, Any import logging # Set up logging logger = logging.getLogger(__name__) # Create router router = APIRouter() @router.get( "/healthz", summary="Health check endpoint", description="Returns the health status of the application.", response_description="Application health status", response_model=Dict[str, str] ) async def health_check() -> Dict[str, str]: """ Health check endpoint that returns the status of the application. Returns: Dict with status and timestamp """ try: # Add any health checks here (e.g., database connection, external services) return {"status": "ok"} except Exception as e: logger.error("Health check failed: %s", str(e), exc_info=True) raise HTTPException(status_code=503, detail="Service Unavailable") @router.get( "/readyz", summary="Readiness check endpoint", description="Returns the readiness status of the application.", response_description="Application readiness status", response_model=Dict[str, str] ) async def readiness_check() -> Dict[str, str]: """ Readiness check endpoint that verifies all required services are available. Returns: Dict with status and timestamp """ try: # Add readiness checks here (e.g., database connection, external services) return {"status": "ready"} except Exception as e: logger.error("Readiness check failed: %s", str(e), exc_info=True) raise HTTPException(status_code=503, detail="Service Not Ready")
Explanation: This module provides health check endpoints (/healthz and /readyz) for monitoring application status and readiness. It uses FastAPI routers to expose REST endpoints that return JSON responses indicating service health, with proper error handling and logging for troubleshooting service availability issues.
Enables containerized, scalable deployment with optimized images.
# Dockerfile FROM python:3.11-slim as builder WORKDIR /app # Install build dependencies RUN apt-get update && apt-get install -y \ build-essential \ && rm -rf /var/lib/apt/lists/* # Copy requirements and install Python dependencies COPY requirements.txt . RUN pip install --no-cache-dir --user -r requirements.txt # Production stage FROM python:3.11-slim WORKDIR /app # Copy Python packages from builder stage COPY /root/.local /root/.local # Create non-root user for security RUN useradd --create-home --shell /bin/bash app \ && chown -R app:app /app # Copy application code COPY . . # Create logs directory RUN mkdir -p logs && chown -R app:app logs # Switch to non-root user USER app # Add local bin to PATH ENV PATH=/root/.local/bin:$PATH # Expose Streamlit port EXPOSE 8501 # Health check HEALTHCHECK \ CMD curl -f http://localhost:8501/_stcore/health || exit 1 # Run the application CMD ["streamlit", "run", "src/app.py", "--server.address=0.0.0.0", "--server.port=8501"]
Explanation: This enhanced Dockerfile uses multi-stage builds to create optimized production images, includes security best practices with non-root users, adds health checks, and creates proper directory permissions for logging.
This project is released under the MIT License.
You are free to use, modify, and distribute the codebase for both commercial and non-commercial purposes, provided that the original copyright notice is retained.
There is no liability or warranty—use at your own risk.
The Deep Research Agent is a production-ready, AI-driven system that streamlines research workflows while prioritizing safety and usability. It showcases advanced agentic AI capabilities, from multi-agent orchestration to secure integrations. Future enhancements could include support for more LLMs, advanced analytics visualizations, or multi-domain expansions. This capstone project highlights readiness for real-world applications in research and analysis. 🚀