The AI-Powered Analytics Assistant is a production-oriented, multi-agent system that enables true self-service business intelligence (BI). It transforms natural language questions into validated SQL, retrieves datasets from a semantic warehouse, and produces interactive visualizations and narrative insights. Built with LangGraph for orchestration, OpenAI LLMs for reasoning, and Plotly/Streamlit for visualization, the system demonstrates agent collaboration, tool integration, and robust stateful workflows suitable for business analytics.
Self-service BI has long been a strategic goal for organizations seeking to empower business users to independently explore data and generate insights. In practice, however, traditional BI platforms often make self-service difficult:
The AI-Powered Analytics Assistant addresses these challenges with a conversational, agentic approach. Users ask plain-English questions; specialized agents parse intent, generate and validate SQL, fetch data, and render charts with narrative context. The system provides real-time progress updates, enforces validation checkpoints, and is easily extensible to new domains via a semantic layer that is currently YAML-based and can be federated to enterprise catalogs (e.g., Microsoft Purview) over time.
The foundation of true self-service BI lies in eliminating the guesswork that typically plagues LLM-driven analytics systems. Rather than forcing the language model to infer business rules, calculate KPI definitions, or guess at data relationships, this system employs a curated semantic configuration that explicitly encodes all necessary business logic. The current implementation expresses these semantics through YAML configuration files (detailed in Section 6) that comprehensively define metrics, dimensions, time grains, and filtering rules.
This architectural approach is deliberately designed for enterprise evolution. The same semantic contract that currently reads from YAML files can seamlessly transition to being backed by an enterprise data catalog such as Microsoft Purview, where business rules and data lineage live in a centrally governed system maintained by designated data owners. This separation of concerns creates a powerful advantage: teams can evolve business logic, add new metrics, or refine calculations without the overhead of retraining or fine-tuning machine learning models.
By leveraging this semantic layer for all data calculation and extraction operations, the application achieves maximum determinism in its responses. The LLM can focus on understanding user intent and selecting appropriate metrics rather than deriving complex business calculations, resulting in consistent, reliable answers that align with organizational standards and definitions.
The system's architecture leverages LangGraph, a state machine framework designed for complex AI agent interactions, to coordinate specialized agents while maintaining clean separation of concerns and enabling seamless extensibility.
While this project focuses on self‑service BI, the architecture is meant to work well for other multi‑agent apps too. The goals are simple: keep the code easy to change, use clear boundaries, and let multiple people work in parallel without conflicts. We aim for predictable data behavior and easy iteration. In practice, this means:
These map directly to the Core Architectural Principles below.
Core Architectural Principles:
Three-Tier Structure:
sql_generate_node.py, chart_render_node.py): Handle workflow coordination and state transitionssql_generation_service.py, charting_service.py): Contain business logic for respective domains
Key Advantages:
Incremental Updates with Minimal Impact
The architectural philosophy prioritizes operational flexibility and development velocity through deliberate design choices that minimize coupling between components. When a new SQL generation algorithm emerges or a chart rendering improvement becomes available, developers can update individual agents or nodes with surgical precision — i.e. there is no cascading changes across the system and no disruption to other workflows. This isolation means that enhancements roll out safely and quickly, whether it's swapping to a more sophisticated LLM for parsing or introducing an entirely new SQL generation and validation approach.
Comprehensive Testing and Standalone Development
Equally important is the architecture's support for comprehensive testing strategies. Each agent can be developed, tested, and validated in complete isolation before integration. The clear contracts defined by Pydantic models mean that developers can unit test individual nodes with mock inputs, integration test services independently, and even run agents standalone for debugging or experimentation. This standalone capability proves invaluable during development:
Confident System Evolution
The design also enables confident evolution of the system. Multiple implementation strategies can coexist, allowing teams to gradually migrate from one approach to another or run A/B tests comparing different agent behaviors. Combined with the clear layer boundaries, this creates an environment where improvements can be introduced incrementally, validated thoroughly, and scaled according to specific performance requirements.
__start__ → run_parsing turns the user question into structured DataQuestions; if none are valid, the flow ends. Otherwise init_loop seeds the queue and pick_next selects the next item. run_extractor generates a single Postgres SQL (from the YAML semantics), validates it (MCP preferred; asyncpg fallback), and executes it (MCP or SQLAlchemy) to produce a dataset. run_render_chart uses chart_hint + data to emit Plotly JSON and a short narrative, then validates the figure; any errors are fed back for a quick retry. accumulate stores results and progress; if more questions remain it loops to pick_next, else it ends (__end__).
State streams to the UI while session‑scoped JSON logs capture each step for traceability.
Underlying graphs invoked
graphs/orchestrator_graph.py coordinates the whole flow and streams state.run_parsing): calls graphs/parser_graph.py to produce DataQuestions.run_extractor): calls graphs/data_extractor_graph.py for SQL → validate → execute.run_render_chart): calls graphs/charting_graph.py to render + validate the figure.nodes/parser_node.py, nodes/parser_validation_node.pynodes/sql_generate_node.py, nodes/sql_validate_node.py, nodes/sql_extract_node.py, nodes/run_extractor_node.pynodes/chart_render_node.py, nodes/chart_validate_node.py, nodes/run_render_chart_node.pynodes/accumulate_and_advance_node.py, nodes/init_loop_node.py, nodes/pick_next_question_node.pystates/agentic_orchestrator_state.py
user_query, semantic (YAML), processed_questions, progress_messages, validation flags, etc.states/parser_state.pystates/charting_state.pystates/data_extractor_state.pyDesign notes:
{**state, "field": value}) to preserve fields like progress_messages across nodes.app.stream(initial, stream_mode="values") so Streamlit can update the UI as nodes complete.Structured JSON logging (session-aware)
The system now emits logs as newline-delimited JSON to support downstream analysis and shipping to external tools (e.g., ELK/OpenSearch, Datadog, Splunk). Each record is machine-readable and includes a session_id so runs can be analyzed per session or aggregated across sessions. The logger can write a single combined log file or one file per session, depending on configuration.
Typical fields include:
timestamp, level, loggersession_id and request_id for correlation across agents and nodesevent (e.g., run_started, progress, dq_chart_rendered) and/or messageExample entry (abbreviated):
{"timestamp":"2025-10-31T20:32:15.896Z","level":"INFO","session_id":"…","logger":"agentic_data_assistant","event":"run_started","request_id":"…","user_query":"Show monthly revenue by product in 2025."}
Multi-agent instrumentation
Agents and nodes log key steps end‑to‑end: parsing, SQL generation and validation, data extraction, chart generation and validation, and final rendering. These events capture the flow of work with consistent correlation IDs, so a single session can be traced from input to chart.
Designed for analysis and improvement
Because logs are structured, they can be queried to track:
This creates a practical feedback loop for tuning prompts, extending the semantic layer, improving validation rules, and optimizing performance. Session‑level logging also makes it easy to reproduce, debug, or audit specific user runs.
MCP is used for safe database access through a dedicated TCP server. The server lives at code/mcp_server/sql_postgres_tcp_server.py and runs as a separate process that uses newline‑delimited JSON (NDJSON) over TCP.
An MCP client is used to integrate the MCP tools with the application
Execute the following command in code/mcp_server before running the agents.
python -m mcp_server.sql_postgres_tcp_server
What the SQL MCP server provides:
sql.validate, sql.query, and schema.introspectinsert|update|delete|alter|drop|truncate|create|grant|revokestatement_timeoutProtocol (TCP, NDJSON):
{ "tool": "sql.validate"|"sql.query"|"schema.introspect", "arguments": {…} }{ "ok": true, "result": {…} } or { "ok": false, "error": "…" }How the app uses it:
SQLValidationService and DataExtractionService prefer MCP when MCP_ENABLED=1 (set in config.SETTINGS) and a client is available via utils.mcp_client_tcp.get_tcp_mcp_sql_client_from_settings(); otherwise they fall back to local validators (asyncpg/SQLAlchemy).Example service usage (MCP enabled):
# config.settings.py # ... MCP_ENABLED = os.getenv("MCP_ENABLED", 0) # "1" to enable MCP, "0" to disable MCP_SQL_MAX_ROWS = int(os.getenv("MCP_SQL_MAX_ROWS", "5000")) MCP_SQL_TIMEOUT_MS = int(os.getenv("MCP_SQL_TIMEOUT_MS", "20000")) MCP_TCP_HOST = os.getenv("MCP_TCP_HOST", "127.0.0.1") MCP_TCP_PORT = int(os.getenv("MCP_TCP_PORT", "8765"))
EXAMPLE USAGE:
from code.services.sql_validation_service import SQLValidationService from code.services.data_extraction_service import DataExtractionService sql = "SELECT date, revenue FROM fact_sales WHERE date >= '2025-01-01'" # Validate- will run via MCP if MCP_ENABLED = 1 validator = SQLValidationService() ok, err = validator.validate(sql) if not ok: raise ValueError(f"SQL invalid: {err}") # extract (Run SQL) - will run via MCP if MCP_ENABLED = 1 extractor = DataExtractionService() df = extractor.run_query(sql)
Development fallback (non‑MCP): see 4.2 for the direct SQLAlchemy adapter used mainly during local development. The SQL validator also supports a local asyncpg path when MCP is disabled.
For environments without MCP support, tools/sqldb_sqlalchemy.py provides direct SQLAlchemy connections. This fallback ensures compatibility while teams transition to MCP-based workflows and/or during development if mcp is not available.
All other tools (parsing helpers, chart validation, rendering utilities) run in‑process as plain Python functions that you can optionally register with an agent—no separate MCP servers are used for them.
Non‑MCP tools used in this app (run in‑process):
tools/user_parser_tools.py: alias_to_canonical, try_map_template — used by the user‑query parser/agenttools/chart_validation_tools.py: validate_plotly_fig_json — validates Plotly figure JSON produced by the charting agentYou can call these directly or register them with an agent. For example, the chart validator:
from langchain.tools import Tool from tools.chart_validation_tools import ( validate_plotly_fig_json, # tool object for agent use validate_plotly_fig_json_fn, # plain function for direct calls ) # Agent registration tools = [validate_plotly_fig_json] # Direct usage in services/nodes res = validate_plotly_fig_json_fn(fig_json) if not res["valid"]: # feed error back to the LLM or retry renderer handle_validation_error(res["error"])
The system's semantic understanding is driven by YAML configuration files that define business rules and metrics:
config/ag_data_extractor_config/warehouse.yaml: Database schema, table joins, column mappingsconfig/ag_user_query_parser_config/metrics.yaml: Valid metrics and dimension registry with canonical names and aliasesBenefits:
Example metrics definition:
metrics: - actual_revenue - budget_revenue - units_sold - gross_margin dimensions: - product - product_category - customer - region aliases: actual_revenue: [revenue, turnover, sales_revenue, income, total revenue]
The current YAML semantics are a bridge to an enterprise catalog (e.g., Microsoft Purview) without changing the public contract. We keep the same interface and swap YAML readers for catalog-backed adapters, gaining centralized governance, lineage-aware impact analysis, glossary-aligned terminology, and federated domain models under a unified API. Migration is incremental: maintain the contract, introduce adapters, migrate domains gradually, and move change control to catalog approval workflows—avoiding a disruptive rewrite.
models/user_request_parser_model.py
DataQuestion includes: metrics, dimensions, optional time_range, filters, sort, top_k, chart_hintdataset stored in a JSON-serializable form (e.g., list of dicts) for portabilitychart_figure_json stored as a string; narrative text optionalSQLGenerationInput in services/sql_generation_service.pyLLM structured output considerations:
additionalProperties: false).List[Dict[str, Any]] or post-process to a defined superset model when using strict function calling.Nodes increment validation_attempts counters and retry with corrected inputs. Example pattern:
if not is_valid and state["validation_attempts"] < max_attempts: # Regenerate with error feedback return generate_corrected_output(error_message)
When validation fails repeatedly, the system provides partial results with clear error explanations rather than complete failure.
setup/Clone the repository
git clone https://github.com/pkasseran/ai-powered-analytics-assistant.git
Install dependencies:
# Navigate to project cd ai-powered-analytics-assistant # Install the python dependencies pip install -r requirements.txt
Environment Variables:
.env.example and rename to .env# For using OpenAI OPENAI_API_KEY=sk-your-openai-api-key-here DEFAULT_LLM_MODEL="gpt-4-mini" # FOR SQLALCHEMY (direct Database connection) # Change as needed to match your local Postgres setup POSTGRES_URI=postgresql+psycopg2://postgres:postgres@localhost:5435/dwdb # FOR MCP CONFIGURATION # Change as needed to match your local Postgres setup MCP_PG_DSN=postgresql://postgres:postgres@localhost:5435/dwdb MCP_PG_MAX_ROWS=5000 MCP_PG_TIMEOUT_MS=20000 MCP_TCP_HOST=127.0.0.1 MCP_TCP_PORT=8765 MCP_ENABLED=1
Setup Database
Prerequisites
setup/dwdb.dump for restoring postgres database dataDefaults used by scripts/examples
localhost, Port: 5435, User: postgres, Password: postgres, Database: dwdbmacOS/Linux (Docker + restore)
cd setup chmod +x setup_docker_postgres_db.sh ./setup_docker_postgres_db.sh
What it does: pulls postgres:latest, starts postgres_dwdb (5435->5432), creates dwdb, and restores dwdb.dump if present.
Verify:
PGPASSWORD=postgres psql -h localhost -p 5435 -U postgres -d dwdb -c "\\dt"
Windows (PowerShell)
cd setup ./setup_docker_postgres_db.ps1
The script starts or reuses postgres_dwdb on port 5435, creates dwdb (if missing), restores dwdb.dump when available, and prints the connection string.
Verify:
docker exec -e PGPASSWORD=postgres postgres_dwdb psql -U postgres -d dwdb -c "\\dt"
Connection strings (align with env above)
postgresql+psycopg2://postgres:postgres@localhost:5435/dwdbpostgresql://postgres:postgres@localhost:5435/dwdbCommon operations
docker stop postgres_dwdbdocker start postgres_dwdbdocker rm -f postgres_dwdb (irreversible)Configure semantic layer (NO CHNAGE):
config/ag_data_extractor_config/warehouse.yaml for your database schema.config/ag_user_query_parser_config/metrics.yaml for business metrics and dimensions.Streamlit UI:
streamlit run code/app_streamlit.py
Make sure the following are set in .env
# FOR MCP CONFIGURATION # Change as needed to match your local Postgres setup MCP_PG_DSN=postgresql://postgres:postgres@localhost:5435/dwdb # Required MCP_PG_MAX_ROWS=5000 MCP_PG_TIMEOUT_MS=20000 MCP_TCP_HOST=127.0.0.1 MCP_TCP_PORT=8765 MCP_ENABLED=1
Run the PostgreSQL MCP server as a standalone TCP process:
# Start the server python -m code.mcp_server.sql_postgres_tcp_server
The app’s MCP client (utils.mcp_client) connects to this TCP server. SQLValidationService and DataExtractionService will automatically use it when constructed with use_mcp=True and when the client is configured in settings.