Flow Superagent with Triage¶
Objective¶
This tutorial provides two demos, one for each Triage mode (all-match
and first-match
). Each demo demonstrates how conditions are evaluated and how agents may be skipped based on the user-defined triage parameters and resulting votes from parent agents.
The demos provide examples of using a FlowSuperAgent
with conditional triage to manage instances of UtilityAgent
and enable custom routing logic. The process logic in the FlowSuperAgent
is represented as a "Directed Acyclic Graph (DAG)," where each node corresponds to a UtilityAgent
and edges denote the message flow. With conditional triage enabled, users can define custom condition(s) and agent(s) to be executed if the condition(s) are satisfied.
Tutorial Workflow:¶
First, we will examine a configuration showing the all-match
mode, and then proceed with first-match
triage.
Onboarding Advisor (all-match)¶
Tutorial Description¶
We will use the AI Refinery SDK to create and run an AI system that can provide help with onboarding a new hire. In this tutorial, you'll utilize instances of SearchAgent
and PlanningAgent
to collect information about a new hire and direct their onboarding as necessary.
To coordinate the workflow between these agents, a FlowSuperAgent
named "Onboarding Advisor" will be employed to manage and oversee the process.
Agent Workflow Overview¶
The figure below depicts the logical graph flow managed by the "Onboarding Advisor", where green nodes represent instances of PlanningAgent
, and blue nodes correspond to instances of SearchAgent
. The white node represents a conditional triage instance - this means that descendants of the agent with conditional triage may be skipped, depending on user-specified conditions and the structure of the graph.
Example Queries¶
-
Where can I find the slides from my orientation for my new role as a SWE?
-
What are some of my to-do tasks to finish onboarding as a new AI Consultant?
-
Which repos should I clone as a new backend developer?
-
What’s expected of new HR team members in the first week?
-
How do I access the policy documents as a new hire in operations?
Steps¶
1. Configuration file¶
To use the FlowSuperAgent
, we will create a yaml file with the correct configuration. This is where the agents are defined and the flow is customized.
As a first step, you need to allow the orchestrator to invoke the given Flow Superagent by listing the superagent in the orchestrator's agent_list
.
orchestrator:
agent_list:
- agent_name: "Onboarding Advisor" # Register the FlowSuperAgent so the orchestrator knows it can invoke it.
Then, you should define the assistant agents of the FlowSuperAgent
as utility agents and list them under utility_agents
.
utility_agents:
- agent_class: PlanningAgent
agent_name: "Role Classifier" # Decides if the user's role is technical or non-technical.
agent_description: "Decide whether the role of the newly hired user is technical or non-technical"
- agent_class: SearchAgent
agent_name: "Technical Onboarding Agent" # Finds info about technical onboarding processes.
agent_description: "Find relevant information about technical onboarding at Accenture"
- agent_class: PlanningAgent
agent_name: "Technical Training Planner" # Creates a plan for technical onboarding tasks.
agent_description: "Provide a plan for technical onboarding tasks to be completed at Accenture"
- agent_class: SearchAgent
agent_name: "Non Technical Onboarding Agent" # Finds info about non-technical onboarding processes.
agent_description: "Find relevant information about non-technical onboarding at Accenture"
- agent_class: PlanningAgent
agent_name: "Non Technical Training Planner" # Creates a plan for non-technical onboarding tasks.
agent_description: "Provide a plan for non-technical onboarding tasks to be completed at Accenture"
Then, you should define the FlowSuperAgent
and configure its workflow. To do so you should:
- Define the vertices as entries in the
agent_list
of theFlowSuperAgent
. - Define the edges by specifying for each
agent_name
in theagent_list
thenext_step
that will be taken. - Choose the desired triage behavior (it can be either
all-match
orfirst-match
) and express this accordingly in the YAML file.
super_agents:
- agent_class: FlowSuperAgent
agent_name: "Onboarding Advisor" # The main FlowSuperAgent orchestrating the workflow.
agent_description: |
The onboarding advisor can help users find resources as new employees at a specific company.
config:
goal: |
The goal is to route the user to the correct resources for onboarding tasks.
show_triage_feedback: True # Whether to display feedback about triage.
agent_list: # Define the workflow and connections between agents.
- agent_name: 'Role Classifier'
mode: all-match # Runs all matching conditions instead of stopping at first match.
next_step: # Conditional routing from Role Classifier based on detected role.
- condition: 'user role is technical' # If technical, go to Technical Onboarding Agent.
to: ["Technical Onboarding Agent"]
- condition: 'user role is non technical' # If non-technical, go to Non Technical Onboarding Agent.
to: ["Non Technical Onboarding Agent"]
- default: true # Fallback route if no condition matches.
to: ['Technical Training Planner']
- agent_name: "Technical Onboarding Agent"
next_step:
- "Technical Training Planner" # After finding info, move to creating a training plan.
- agent_name: "Non Technical Onboarding Agent"
next_step:
- "Non Technical Training Planner" # After finding info, move to creating a training plan.
- agent_name: "Technical Training Planner" # End node for technical onboarding.
- agent_name: "Non Technical Training Planner" # End node for non-technical onboarding.
In this example, the conditional triage is defined at the 'Role Classifier' node. This means that if the user role is technical, Role Classifier will vote to skip 'Non Technical Onboarding Agent' and all its children. Since these nodes have no other parents, they will be skipped.
2. Python file¶
Now, you can start the development of your assistant using these lines of code:
import argparse
import asyncio
import os
from air import DistillerClient, login
from dotenv import load_dotenv
from air.utils import async_print
load_dotenv()
# Authenticate with your credentials using environment variables.
auth = login(
account=str(os.getenv("ACCOUNT")),
api_key=str(os.getenv("API_KEY")),
)
async def process_query(query, project):
"""
Process a single query using a new DistillerClient instance.
Ensures each query is independent by using a unique session UUID.
"""
client = DistillerClient()
# Generate a unique identifier for this temporary session using process ID.
session_uuid = f"temp_user_{os.getpid()}"
async with client(
project=project,
uuid=session_uuid,
) as dc:
# Send query to the FlowSuperAgent project.
responses = await dc.query(query=query)
print(f"-------\nQuery: {query}")
# Iterate through streaming responses from agents in the workflow.
async for response in responses:
role = response.get("role", "Unknown Agent") # Agent name
content = response.get("content", "No content") # Agent's reply
await async_print(f"\n<<< Response from {role} >>>")
await async_print(content)
await async_print("-" * 20)
# Clear all stored memory for this session so next run is clean.
await dc.reset_memory()
await async_print("Memory reset complete.")
await async_print("Query handled successfully.")
await async_print("-" * 20)
async def flow_super_agent_triage_am_demo():
"""
Demonstrates running the FlowSuperAgent with an all-match triage configuration.
Each query is processed sequentially with a fresh session.
"""
# First, register the FlowSuperAgent project from its YAML configuration.
registration_client = DistillerClient()
registration_client.create_project(
config_path="flow_superagent_triage_all_match.yaml",
project="onboarding_advisor",
)
# Example queries to run through the FlowSuperAgent.
queries = [
"Can you please tell me my next onboarding task? I am joining as an AI Research Scientist."
]
# Process each query one by one.
for query in queries:
await process_query(query, "onboarding_advisor")
if __name__ == "__main__":
# Entry point: run the asynchronous demo function.
asyncio.run(flow_super_agent_triage_am_demo())
Support Router (first-match)¶
Tutorial Description¶
This example, Support Router
, demonstrates Triage in first-match
mode (unlike the all-match
mode used in the previous example). In first-match
mode, once a condition evaluates to true, all subsequent conditions are ignored.
Agent Workflow Overview¶
The diagram below shows the workflow DAG for this example:
Green nodes represent PlanningAgent
instances, and the white node represents a conditional triage step. The two central nodes share a common child node—illustrating how skip votes from multiple parents propagate to downstream agents, as shown below.
Example Queries¶
- "I need help with this billing issue urgently and I am very angry about this situation."
- "My internet keeps disconnecting. Please fix this ASAP."
- "I was double-charged on my bill and want a refund."
- "How do I reset my password?"
Steps¶
1. Configuration file¶
orchestrator:
agent_list:
- agent_name: "Support Router" # The FlowSuperAgent that will orchestrate the workflow.
utility_agents:
- agent_class: PlanningAgent
agent_name: "Support Classifier" # Determines if the issue is technical or billing.
agent_description: "Classify the support issue type as technical or billing."
- agent_class: PlanningAgent
agent_name: "Technical Troubleshooter" # Handles troubleshooting steps for technical issues.
agent_description: "Provide troubleshooting help for technical issues."
- agent_class: PlanningAgent
agent_name: "Billing Assistant" # Handles all billing-related questions and actions.
agent_description: "Assist with billing-related questions and actions."
- agent_class: PlanningAgent
agent_name: "Final Support Summary" # Summarizes the actions taken and final recommendations.
agent_description: "Summarize the solution steps and next actions for the user."
super_agents:
- agent_class: FlowSuperAgent
agent_name: "Support Router" # The main agent routing incoming queries.
agent_description: |
Directs support requests to the correct agents based on the user's issue type and combines their outputs into a final summary.
config:
goal: |
Classify the issue and handle the support request accordingly.
show_triage_feedback: True # Show feedback about triage.
agent_list: # Define the workflow.
- agent_name: Support Classifier
mode: first-match # Stops at the first matching condition instead of checking all.
next_step: # Conditional routing from the Support Classifier.
- condition: 'issue is technical' # If issue is technical, go to Technical Troubleshooter.
to: ['Technical Troubleshooter']
- condition: 'issue is billing' # If issue is billing, go to Billing Assistant.
to: ['Billing Assistant']
- default: true # If no match, route to both Technical and Billing agents.
to: ['Billing Assistant', 'Technical Troubleshooter']
- agent_name: 'Technical Troubleshooter'
next_step:
- 'Final Support Summary' # After troubleshooting, provide a final summary.
- agent_name: 'Billing Assistant'
next_step:
- 'Final Support Summary' # After billing assistance, provide a final summary.
- agent_name: 'Final Support Summary' # End node that compiles the results for the user.
In this setup, the nodes Billing Assistant
and Technical Troubleshooter
share the same and only parent, Support Classifier
, which will vote to skip either depending on the query. These votes will be propagated to the child node, Final Support Summary
. However, this node will only be skipped if all its parents vote to skip it. Based on the queries provided here, we can therefore expect only one of the two parents to vote to skip this node, meaning it will be executed.
2. Python file¶
To run the system, use the following code:
"""
This script demonstrates how to use FlowSuperAgent
with a first-match triage routing strategy.
Function process_query creates a new DistillerClient
instance for each query and formats the output.
"""
import argparse
import asyncio
import os
from air import DistillerClient, login
from dotenv import load_dotenv
from air.utils import async_print
load_dotenv()
# Authenticate using account credentials from environment variables.
auth = login(
account=str(os.getenv("ACCOUNT")),
api_key=str(os.getenv("API_KEY")),
)
async def process_query(query, project):
"""
Process a single query using a new client instance.
Ensures that each query is handled in its own isolated session.
"""
client = DistillerClient()
# Create a unique session ID for this run based on the process ID.
session_uuid = f"temp_user_{os.getpid()}"
async with client(
project=project,
uuid=session_uuid,
) as dc:
# Send the query to the project and await streaming responses.
responses = await dc.query(query=query)
print(f"-------\nQuery: {query}")
# Iterate through the agents' responses in order.
async for response in responses:
role = response.get("role", "Unknown Agent") # Which agent responded
content = response.get("content", "No content") # Response text
await async_print(f"\n<<< Response from {role} >>>")
await async_print(content)
await async_print("-" * 20)
# Clear the stored conversation memory for this session.
await dc.reset_memory()
await async_print("Memory reset complete.")
await async_print("Query handled successfully.")
await async_print("-" * 20)
async def flow_super_agent_triage_fm_demo():
"""
Demo function for running the FlowSuperAgent
with a first-match triage configuration.
Queries are processed sequentially,
each in a fresh session to avoid state leakage.
"""
# Register the project with the FlowSuperAgent config.
registration_client = DistillerClient()
registration_client.create_project(
config_path="flow_superagent_triage_first_match.yaml",
project="support_router",
)
# Example query to run through the support router.
queries = ["I'm having a technical issue in my account, can you help me fix it?"]
# Process each query sequentially.
for query in queries:
await process_query(query, "support_router")
if __name__ == "__main__":
# Entry point: run the demo asynchronously.
asyncio.run(flow_super_agent_triage_fm_demo())