Agent Runner
The Agent Runner is responsible for executing agentic flows in Astonish. It loads agent configurations, initializes tools, builds the execution graph, and runs the flow.
Overview
The agent_runner.py
module contains the core functionality for running agents. It provides the run_agent
function, which is the main entry point for executing agentic flows.
Functions
run_agent
async def run_agent(agent: str) -> None
Runs an agentic flow with the specified name.
Parameters
agent
(str): The name of the agent to run. This should correspond to a YAML file in one of the agent directories.
Returns
None
Example
await run_agent("file_summarizer")
Process
- Sets up colorama for colored terminal output
- Loads the agent configuration from the YAML file
- Initializes MCP tools if available
- Initializes the state with variables from the output models
- Builds the execution graph using the agent configuration
- Runs the graph with the initial state
- Handles any errors that occur during execution
Error Handling
The Agent Runner includes robust error handling to ensure that errors during agent execution are properly reported to the user. It handles:
- Missing agent files
- Errors loading agent configurations
- Errors initializing MCP tools
- Errors building the execution graph
- Errors during graph execution
Dependencies
The Agent Runner depends on several other modules:
astonish.globals
: Global variables and configurationlangchain.globals
: LangChain configurationlanggraph.checkpoint.sqlite.aio.AsyncSqliteSaver
: Checkpoint saving for graph executionastonish.core.utils
: Utility functions for setup, loading agents, and printingastonish.core.graph_builder
: Functions for building and running the execution graph
Implementation Details
State Initialization
The Agent Runner initializes the state dictionary with variables from the output models of all nodes in the agent configuration. This ensures that all variables used in the flow are properly initialized.
# Initialize state
initial_state = {}
for node in config['nodes']:
if 'output_model' in node:
for field, type_ in node['output_model'].items():
if field not in initial_state:
initial_state[field] = None
# Add initialization for limit_counter_field
if 'limit_counter_field' in node:
limit_counter_field = node['limit_counter_field']
if limit_counter_field not in initial_state:
initial_state[limit_counter_field] = 0 # Initialize to 0
Error Tracking
The Agent Runner adds special fields to the state for tracking errors:
# Add error tracking fields
initial_state['_error'] = None
initial_state['_end'] = False
These fields are used by the error handling system to track and report errors during execution.
Graph Execution
The Agent Runner uses the AsyncSqliteSaver
to save checkpoints during graph execution. This allows for potential future features like resuming interrupted flows.
async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
thread = {"configurable": {"thread_id": "1"}, "recursion_limit": 200}
# Build and run the graph
graph = build_graph(config, mcp_client, checkpointer)
final_state = await run_graph(graph, initial_state, thread)
Related Modules
- Graph Builder: Builds and runs the execution graph
- Node Functions: Defines the functions for different node types
- Utils: Utility functions for agent execution