Skip to content

Flow Superagent with Triage

Objective

This tutorial provides two demos, one for each Triage mode (all-match and first-match). Each demo demonstrates how conditions are evaluated and how agents may be skipped based on the user-defined triage parameters and resulting votes from parent agents.

The demos provide examples of using a FlowSuperAgent with conditional triage to manage instances of UtilityAgent and enable custom routing logic. The process logic in the FlowSuperAgent is represented as a "Directed Acyclic Graph (DAG)," where each node corresponds to a UtilityAgent and edges denote the message flow. With conditional triage enabled, users can define custom condition(s) and agent(s) to be executed if the condition(s) are satisfied.

Tutorial Workflow:

First, we will examine a configuration showing the all-match mode, and then proceed with first-match triage.

Onboarding Advisor (all-match)

Tutorial Description

We will use the AI Refinery SDK to create and run an AI system that can provide help with onboarding a new hire. In this tutorial, you'll utilize instances of SearchAgent and PlanningAgent to collect information about a new hire and direct their onboarding as necessary.

To coordinate the workflow between these agents, a FlowSuperAgent named "Onboarding Advisor" will be employed to manage and oversee the process.

Agent Workflow Overview

The figure below depicts the logical graph flow managed by the "Onboarding Advisor", where green nodes represent instances of PlanningAgent, and blue nodes correspond to instances of SearchAgent. The white node represents a conditional triage instance - this means that descendants of the agent with conditional triage may be skipped, depending on user-specified conditions and the structure of the graph.

Logical Graph Flow Managed by Onboarding Advisor

Example Queries

  • Where can I find the slides from my orientation for my new role as a SWE?

  • What are some of my to-do tasks to finish onboarding as a new AI Consultant?

  • Which repos should I clone as a new backend developer?

  • What’s expected of new HR team members in the first week?

  • How do I access the policy documents as a new hire in operations?

Steps

1. Configuration file

To use the FlowSuperAgent, we will create a yaml file with the correct configuration. This is where the agents are defined and the flow is customized.

As a first step, you need to allow the orchestrator to invoke the given Flow Superagent by listing the superagent in the orchestrator's agent_list.

orchestrator:
  agent_list:
    - agent_name: "Onboarding Advisor"  # Register the FlowSuperAgent so the orchestrator knows it can invoke it.

Then, you should define the assistant agents of the FlowSuperAgent as utility agents and list them under utility_agents.

  utility_agents:
  - agent_class: PlanningAgent
    agent_name: "Role Classifier"  # Decides if the user's role is technical or non-technical.
    agent_description: "Decide whether the role of the newly hired user is technical or non-technical"

  - agent_class: SearchAgent
    agent_name: "Technical Onboarding Agent"  # Finds info about technical onboarding processes.
    agent_description: "Find relevant information about technical onboarding at Accenture"

  - agent_class: PlanningAgent
    agent_name: "Technical Training Planner"  # Creates a plan for technical onboarding tasks.
    agent_description: "Provide a plan for technical onboarding tasks to be completed at Accenture"

  - agent_class: SearchAgent
    agent_name: "Non Technical Onboarding Agent"  # Finds info about non-technical onboarding processes.
    agent_description: "Find relevant information about non-technical onboarding at Accenture"

  - agent_class: PlanningAgent
    agent_name: "Non Technical Training Planner"  # Creates a plan for non-technical onboarding tasks.
    agent_description: "Provide a plan for non-technical onboarding tasks to be completed at Accenture"

Then, you should define the FlowSuperAgent and configure its workflow. To do so you should:

  • Define the vertices as entries in the agent_list of the FlowSuperAgent.
  • Define the edges by specifying for each agent_name in the agent_list the next_step that will be taken.
  • Choose the desired triage behavior (it can be either all-match or first-match) and express this accordingly in the YAML file.
super_agents:
  - agent_class: FlowSuperAgent
    agent_name: "Onboarding Advisor"  # The main FlowSuperAgent orchestrating the workflow.
    agent_description: |
      The onboarding advisor can help users find resources as new employees at a specific company.
    config:
      goal: |
        The goal is to route the user to the correct resources for onboarding tasks.
      show_triage_feedback: True  # Whether to display feedback about triage.
      agent_list:  # Define the workflow and connections between agents.
        - agent_name: 'Role Classifier'
          mode: all-match  # Runs all matching conditions instead of stopping at first match.
          next_step:  # Conditional routing from Role Classifier based on detected role.
          - condition: 'user role is technical'  # If technical, go to Technical Onboarding Agent.
            to: ["Technical Onboarding Agent"]
          - condition: 'user role is non technical'  # If non-technical, go to Non Technical Onboarding Agent.
            to: ["Non Technical Onboarding Agent"]
          - default: true  # Fallback route if no condition matches.
            to: ['Technical Training Planner']

        - agent_name: "Technical Onboarding Agent"
          next_step:
          - "Technical Training Planner"  # After finding info, move to creating a training plan.

        - agent_name: "Non Technical Onboarding Agent"
          next_step:
          - "Non Technical Training Planner"  # After finding info, move to creating a training plan.

        - agent_name: "Technical Training Planner"  # End node for technical onboarding.

        - agent_name: "Non Technical Training Planner"  # End node for non-technical onboarding.

In this example, the conditional triage is defined at the 'Role Classifier' node. This means that if the user role is technical, Role Classifier will vote to skip 'Non Technical Onboarding Agent' and all its children. Since these nodes have no other parents, they will be skipped.

2. Python file

Now, you can start the development of your assistant using these lines of code:

import argparse
import asyncio
import os

from air import DistillerClient, login
from dotenv import load_dotenv
from air.utils import async_print

load_dotenv()

# Authenticate with your credentials using environment variables.
auth = login(
    account=str(os.getenv("ACCOUNT")),
    api_key=str(os.getenv("API_KEY")),
)


async def process_query(query, project):
    """
    Process a single query using a new DistillerClient instance.
    Ensures each query is independent by using a unique session UUID.
    """
    client = DistillerClient()
    # Generate a unique identifier for this temporary session using process ID.
    session_uuid = f"temp_user_{os.getpid()}"
    async with client(
        project=project,
        uuid=session_uuid,
    ) as dc:
        # Send query to the FlowSuperAgent project.
        responses = await dc.query(query=query)
        print(f"-------\nQuery: {query}")
        # Iterate through streaming responses from agents in the workflow.
        async for response in responses:
            role = response.get("role", "Unknown Agent")  # Agent name
            content = response.get("content", "No content")  # Agent's reply
            await async_print(f"\n<<< Response from {role} >>>")
            await async_print(content)
            await async_print("-" * 20)

        # Clear all stored memory for this session so next run is clean.
        await dc.reset_memory()
        await async_print("Memory reset complete.")
        await async_print("Query handled successfully.")
        await async_print("-" * 20)


async def flow_super_agent_triage_am_demo():
    """
    Demonstrates running the FlowSuperAgent with an all-match triage configuration.
    Each query is processed sequentially with a fresh session.
    """
    # First, register the FlowSuperAgent project from its YAML configuration.
    registration_client = DistillerClient()
    registration_client.create_project(
        config_path="flow_superagent_triage_all_match.yaml",
        project="onboarding_advisor",
    )

    # Example queries to run through the FlowSuperAgent.
    queries = [
        "Can you please tell me my next onboarding task? I am joining as an AI Research Scientist."
    ]
    # Process each query one by one.
    for query in queries:
        await process_query(query, "onboarding_advisor")


if __name__ == "__main__":
  # Entry point: run the asynchronous demo function.
  asyncio.run(flow_super_agent_triage_am_demo())

Support Router (first-match)

Tutorial Description

This example, Support Router, demonstrates Triage in first-match mode (unlike the all-match mode used in the previous example). In first-match mode, once a condition evaluates to true, all subsequent conditions are ignored.

Agent Workflow Overview

The diagram below shows the workflow DAG for this example:

Support Router DAG

Green nodes represent PlanningAgent instances, and the white node represents a conditional triage step. The two central nodes share a common child node—illustrating how skip votes from multiple parents propagate to downstream agents, as shown below.

Example Queries

  • "I need help with this billing issue urgently and I am very angry about this situation."
  • "My internet keeps disconnecting. Please fix this ASAP."
  • "I was double-charged on my bill and want a refund."
  • "How do I reset my password?"

Steps

1. Configuration file

orchestrator:
  agent_list:
    - agent_name: "Support Router"  # The FlowSuperAgent that will orchestrate the workflow.

utility_agents:
  - agent_class: PlanningAgent
    agent_name: "Support Classifier"  # Determines if the issue is technical or billing.
    agent_description: "Classify the support issue type as technical or billing."

  - agent_class: PlanningAgent
    agent_name: "Technical Troubleshooter"  # Handles troubleshooting steps for technical issues.
    agent_description: "Provide troubleshooting help for technical issues."

  - agent_class: PlanningAgent
    agent_name: "Billing Assistant"  # Handles all billing-related questions and actions.
    agent_description: "Assist with billing-related questions and actions."

  - agent_class: PlanningAgent
    agent_name: "Final Support Summary"  # Summarizes the actions taken and final recommendations.
    agent_description: "Summarize the solution steps and next actions for the user."

super_agents:
  - agent_class: FlowSuperAgent
    agent_name: "Support Router"  # The main agent routing incoming queries.
    agent_description: |
      Directs support requests to the correct agents based on the user's issue type and combines their outputs into a final summary.
    config:
      goal: |
        Classify the issue and handle the support request accordingly.
      show_triage_feedback: True  # Show feedback about triage.
      agent_list:  # Define the workflow.
        - agent_name: Support Classifier
          mode: first-match  # Stops at the first matching condition instead of checking all.
          next_step:  # Conditional routing from the Support Classifier.
            - condition: 'issue is technical'  # If issue is technical, go to Technical Troubleshooter.
              to: ['Technical Troubleshooter']
            - condition: 'issue is billing'  # If issue is billing, go to Billing Assistant.
              to: ['Billing Assistant']
            - default: true  # If no match, route to both Technical and Billing agents.
              to: ['Billing Assistant', 'Technical Troubleshooter']

        - agent_name: 'Technical Troubleshooter'
          next_step:
            - 'Final Support Summary'  # After troubleshooting, provide a final summary.

        - agent_name: 'Billing Assistant'
          next_step:
            - 'Final Support Summary'  # After billing assistance, provide a final summary.

        - agent_name: 'Final Support Summary'  # End node that compiles the results for the user.

In this setup, the nodes Billing Assistant and Technical Troubleshooter share the same and only parent, Support Classifier, which will vote to skip either depending on the query. These votes will be propagated to the child node, Final Support Summary. However, this node will only be skipped if all its parents vote to skip it. Based on the queries provided here, we can therefore expect only one of the two parents to vote to skip this node, meaning it will be executed.

2. Python file

To run the system, use the following code:

"""
This script demonstrates how to use FlowSuperAgent
with a first-match triage routing strategy.

Function process_query creates a new DistillerClient
instance for each query and formats the output.
"""

import argparse
import asyncio
import os

from air import DistillerClient, login
from dotenv import load_dotenv
from air.utils import async_print

load_dotenv()

# Authenticate using account credentials from environment variables.
auth = login(
    account=str(os.getenv("ACCOUNT")),
    api_key=str(os.getenv("API_KEY")),
)


async def process_query(query, project):
    """
    Process a single query using a new client instance.
    Ensures that each query is handled in its own isolated session.
    """
    client = DistillerClient()
    # Create a unique session ID for this run based on the process ID.
    session_uuid = f"temp_user_{os.getpid()}"
    async with client(
        project=project,
        uuid=session_uuid,
    ) as dc:
        # Send the query to the project and await streaming responses.
        responses = await dc.query(query=query)
        print(f"-------\nQuery: {query}")
        # Iterate through the agents' responses in order.
        async for response in responses:
            role = response.get("role", "Unknown Agent")  # Which agent responded
            content = response.get("content", "No content")  # Response text
            await async_print(f"\n<<< Response from {role} >>>")
            await async_print(content)
            await async_print("-" * 20)

        # Clear the stored conversation memory for this session.
        await dc.reset_memory()
        await async_print("Memory reset complete.")
        await async_print("Query handled successfully.")
        await async_print("-" * 20)


async def flow_super_agent_triage_fm_demo():
    """
    Demo function for running the FlowSuperAgent
    with a first-match triage configuration.

    Queries are processed sequentially,
    each in a fresh session to avoid state leakage.
    """
    # Register the project with the FlowSuperAgent config.
    registration_client = DistillerClient()
    registration_client.create_project(
        config_path="flow_superagent_triage_first_match.yaml",
        project="support_router",
    )
    # Example query to run through the support router.
    queries = ["I'm having a technical issue in my account, can you help me fix it?"]
    # Process each query sequentially.
    for query in queries:
        await process_query(query, "support_router")


if __name__ == "__main__":
  # Entry point: run the demo asynchronously.
  asyncio.run(flow_super_agent_triage_fm_demo())