
This python project shows you how to create a custom agent that can query either your LlamaCloud index for RAG-based retrieval or a separate SQL query engine as a tool. In this example, we'll use PDFs of Wikipedia pages of US cities and a SQL database of their populations and states as documents.

NOTE: Any Text-to-SQL application should be aware that executing arbitrary SQL queries can be a security risk. It is recommended to take precautions as needed, such as using restricted roles, read-only databases, sandboxing, etc.
See the RagBased TextToSql in action:
![]()
Video Demo of Text-To-Sql in Action:
Video Demo
Set up Token keys for OpenAI and Llama Cloud Apis in the .env file. You will find the required code in the config.py
OPENAI_API_KEY=<YOUR OPEN AI TOKEN KEY> LlamaCloud_TOKEN_KEY=<YOUR LLAMACLOUD TOKEN KEY>
Download the following Wikipedia pages into PDFs by either pressing Ctrl-P/Cmd-P or right-clicking and selecting "Print" and then "Save as PDF" as the destination.
-New York City
-Los Angeles
-Chicago
-Houston
-Miami
-Seattle
After that, create a new index in LlamaCloud and upload your PDFs.For this you need to create an account in LlamaCloud and create the index using your OpenAi key or by selecting any other LLM model.

Ensure you have Python 3.11 or later installed
pip install -u requirements.txt
Run the app by running the following command:
streamlit run TextToSql.py
The SQL database in this example will be created in memory and will contain three columns: the city name, the city's population, and the state the city is located in. The table creation and the information for each city is shown in the snippets below.
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, insert # Initialize the SQLite database engine engine = create_engine("sqlite:///:memory:", future=True) metadata_obj = MetaData() # Define city statistics table table_name = "city_stats" city_stats_table = Table( table_name, metadata_obj, Column("city_name", String(16), primary_key=True), Column("population", Integer), Column("state", String(16), nullable=False), ) # Create the table metadata_obj.create_all(engine) # Insert initial data rows = [ {"city_name": "New York City", "population": 8336000, "state": "New York"}, {"city_name": "Los Angeles", "population": 3822000, "state": "California"}, {"city_name": "Chicago", "population": 2665000, "state": "Illinois"}, {"city_name": "Houston", "population": 2303000, "state": "Texas"}, {"city_name": "Miami", "population": 449514, "state": "Florida"}, {"city_name": "Seattle", "population": 749256, "state": "Washington"}, ] for row in rows: stmt = insert(city_stats_table).values(**row) with engine.begin() as connection: connection.execute(stmt) def get_database_engine(): """Returns the database engine instance.""" return engine
Create a query engine based on SQL database.
from llama_index.core.query_engine import NLSQLTableQueryEngine sql_database = SQLDatabase(engine, include_tables=["city_stats"]) sql_query_engine = NLSQLTableQueryEngine( sql_database=sql_database, tables=["city_stats"] )
Create an index and a query engine around the index you've created.
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex index = LlamaCloudIndex( name="<Your Index Name>", project_name="<Your Project Name>", organization_id="<Your Org ID>", api_key="<Your API Key>" ) llama_cloud_query_engine = index.as_query_engine()
Create a query engine tool around these query engines.
from llama_index.core.tools import QueryEngineTool sql_tool = QueryEngineTool.from_defaults( query_engine=sql_query_engine, description=( "Useful for translating a natural language query into a SQL query over" " a table containing: city_stats, containing the population/state of" " each city located in the USA." ), name="sql_tool" ) cities = ["New York City", "Los Angeles", "Chicago", "Houston", "Miami", "Seattle"] llama_cloud_tool = QueryEngineTool.from_defaults( query_engine=llama_cloud_query_engine, description=( f"Useful for answering semantic questions about certain cities in the US." ), name="llama_cloud_tool" )
We'll create a workflow that acts as an agent around the two query engines. In this workflow, we need four events:
-GatherToolsEvent: Gets all tools that need to be called (which is determined by the LLM).
-ToolCallEvent: An individual tool call. Multiple of these events will be triggered at the same time.
-ToolCallEventResult: Gets result from a tool call.
-GatherEvent: Returned from dispatcher that triggers the ToolCallEvent.
This workflow consists of the following steps:
-chat(): Appends the message to the chat history. This chat history is fed into the LLM, along with the given tools, and the LLM determines which tools to call. This returns a GatherToolsEvent.
-dispatch_calls(): Triggers a ToolCallEvent for each tool call given in the GatherToolsEvent using send_event().Returns a GatherEvent with the number of tool calls.
-call_tool(): Calls an individual tool. This step will run multiple times if there is more than one tool call. This step calls the tool and appends the result as a chat message to the chat history. It returns a ToolCallEventResult with the result of the tool call.
-gather(): Gathers the results from all tool calls using collect_events(). Waits for all tool calls to finish, then feeds chat history (following all tool calls) into the LLM. Returns the response from the LLM.
from typing import Dict, List, Any, Optional from llama_index.core.tools import BaseTool from llama_index.core.llms import ChatMessage from llama_index.core.llms.llm import ToolSelection, LLM from llama_index.core.workflow import ( Workflow, Event, StartEvent, StopEvent, step, ) from llama_index.core.base.response.schema import Response from llama_index.core.tools import FunctionTool class InputEvent(Event): """Input event.""" class GatherToolsEvent(Event): """Gather Tools Event""" tool_calls: Any class ToolCallEvent(Event): """Tool Call event""" tool_call: ToolSelection class ToolCallEventResult(Event): """Tool call event result.""" msg: ChatMessage class RouterOutputAgentWorkflow(Workflow): """Custom router output agent workflow.""" def __init__(self, tools: List[BaseTool], timeout: Optional[float] = 10.0, disable_validation: bool = False, verbose: bool = False, llm: Optional[LLM] = None, chat_history: Optional[List[ChatMessage]] = None, ): """Constructor.""" super().__init__(timeout=timeout, disable_validation=disable_validation, verbose=verbose) self.tools: List[BaseTool] = tools self.tools_dict: Optional[Dict[str, BaseTool]] = {tool.metadata.name: tool for tool in self.tools} self.llm: LLM = llm or OpenAI(temperature=0, model="gpt-3.5-turbo") self.chat_history: List[ChatMessage] = chat_history or [] def reset(self) -> None: """Resets Chat History""" self.chat_history = [] @step() async def prepare_chat(self, ev: StartEvent) -> InputEvent: message = ev.get("message") if message is None: raise ValueError("'message' field is required.") # add msg to chat history chat_history = self.chat_history chat_history.append(ChatMessage(role="user", content=message)) return InputEvent() @step() async def chat(self, ev: InputEvent) -> GatherToolsEvent | StopEvent: """Appends msg to chat history, then gets tool calls.""" # Put msg into LLM with tools included chat_res = await self.llm.achat_with_tools( self.tools, chat_history=self.chat_history, verbose=self._verbose, allow_parallel_tool_calls=True ) tool_calls = self.llm.get_tool_calls_from_response(chat_res, error_on_no_tool_call=False) ai_message = chat_res.message self.chat_history.append(ai_message) if self._verbose: print(f"Chat message: {ai_message.content}") # no tool calls, return chat message. if not tool_calls: return StopEvent(result=ai_message.content) return GatherToolsEvent(tool_calls=tool_calls) @step(pass_context=True) async def dispatch_calls(self, ctx: Context, ev: GatherToolsEvent) -> ToolCallEvent: """Dispatches calls.""" tool_calls = ev.tool_calls await ctx.set("num_tool_calls", len(tool_calls)) # trigger tool call events for tool_call in tool_calls: ctx.send_event(ToolCallEvent(tool_call=tool_call)) return None @step() async def call_tool(self, ev: ToolCallEvent) -> ToolCallEventResult: """Calls tool.""" tool_call = ev.tool_call # get tool ID and function call id_ = tool_call.tool_id if self._verbose: print(f"Calling function {tool_call.tool_name} with msg {tool_call.tool_kwargs}") # call function and put result into a chat message tool = self.tools_dict[tool_call.tool_name] output = await tool.acall(**tool_call.tool_kwargs) msg = ChatMessage( name=tool_call.tool_name, content=str(output), role="tool", additional_kwargs={ "tool_call_id": id_, "name": tool_call.tool_name } ) return ToolCallEventResult(msg=msg) @step(pass_context=True) async def gather(self, ctx: Context, ev: ToolCallEventResult) -> StopEvent | None: """Gathers tool calls.""" # wait for all tool call events to finish. tool_events = ctx.collect_events(ev, [ToolCallEventResult] * await ctx.get("num_tool_calls")) if not tool_events: return None for tool_event in tool_events: # append tool call chat messages to history self.chat_history.append(tool_event.msg) # # after all tool calls finish, pass input event back, restart agent loop return InputEvent()
Create the workflow instance.
wf = RouterOutputAgentWorkflow(tools=[sql_tool, llama_cloud_tool], verbose=True, timeout=120)
Visualize Workflow
from llama_index.utils.workflow import draw_all_possible_flows draw_all_possible_flows(RouterOutputAgentWorkflow)
from IPython.display import display, Markdown result = await wf.run(message="Which city has the highest population?") display(Markdown(result))
Running step prepare_chat Step prepare_chat produced event InputEvent Running step chat Chat message: None Step chat produced event GatherToolsEvent Running step dispatch_calls Step dispatch_calls produced no event Running step call_tool Calling function sql_tool with msg {'input': 'SELECT city FROM city_stats ORDER BY population DESC LIMIT 1'} Step call_tool produced event ToolCallEventResult Running step gather Step gather produced event InputEvent Running step chat Chat message: New York City has the highest population. Step chat produced event StopEvent New York City has the highest population.