Skip to content

Flow Superagent with Loops

Objective

This tutorial demonstrates how to use the Loop feature of the FlowSuperAgent to enable iterative workflows. The demo shows how a workflow can rewind to an upstream loop head node when a user-defined condition is met at a loop tail node, repeating up to a configurable maximum number of iterations.

Tutorial Workflow

We will examine a configuration showing a FlowSuperAgent that drives an iterative project intake session, rewinding to re-collect stakeholder information if it is found to be incomplete after the customer specialist has run.

Information Collection Agent

Tutorial Description

This tutorial uses the AI Refinery SDK to build an intake assistant with three PlanningAgent instances. A FlowSuperAgent named "Information Collection Agent" coordinates the specialists and enforces iterative collection logic via a loop condition.

Agent Workflow Overview

The diagram below shows the workflow graph managed by the "Information Collection Agent":

  • All nodes are instances of PlanningAgent.
  • Stakeholder Specialist runs first, feeding into Customer Specialist.
  • Customer Specialist is the loop tail: if stakeholder information is incomplete after it executes, execution rewinds to Stakeholder Specialist (the loop head), re-running both nodes.
  • This loop can trigger up to 2 times.
  • Once the condition is false or the iteration limit is reached, Supplier Specialist runs to complete the intake.
Stakeholder Specialist  ──►  Customer Specialist  ──►  Supplier Specialist
        ▲                           │ (loop condition:
        └───────────────────────────┘  stakeholder info incomplete)

Example Queries

  • "I need to start a new vendor onboarding project."
  • "We're kicking off a new client engagement and need to capture intake details."
  • "Please help me collect the project requirements for our new initiative."
  • "Let's begin the intake process for our upcoming supplier contract."
  • "Call the Information Collection Agent to collect additional all necessary data. None of the project information has been provided yet."

Steps

1. Configuration File

To use the FlowSuperAgent, we will create a YAML file with the correct configuration. This is where the agents are defined and the loop workflow is set up.

As a first step, allow the orchestrator to invoke the FlowSuperAgent by listing it in the orchestrator's agent_list:

orchestrator:
  agent_list:
    - agent_name: 'Information Collection Agent'  # Register the FlowSuperAgent with the orchestrator.

Then, define the specialist agents as utility agents under utility_agents:

utility_agents:
  - agent_class: PlanningAgent
    agent_name: "Stakeholder Specialist"
    agent_description: "Collects stakeholder information and follow-ups for the intake session."

  - agent_class: PlanningAgent
    agent_name: "Supplier Specialist"
    agent_description: "Confirms suppliers and captures supplier-related details using available tools."

  - agent_class: PlanningAgent
    agent_name: "Customer Specialist"
    agent_description: "Confirms customer needs and captures customer-related details using available tools."

Then, define the FlowSuperAgent and configure its loop workflow. To do so:

  • Define the vertices as entries in the agent_list of the FlowSuperAgent.
  • Define the edges by specifying for each agent_name the next_step that will be taken.
  • At the loop tail node, define a condition with a loop block specifying the loop head (to) and max_iterations.
  • Include exactly one default block at the end of the loop tail's next_step to define the forward path when the loop exits.

super_agents:
  - agent_class: FlowSuperAgent
    agent_name: "Information Collection Agent"
    agent_description: "Collect user specified information to complete the project intake requirements."
    config:
      goal: |
        Evaluate project progress, invoke the appropriate specialist agent, and surface the next
        follow-up question so the intake can advance through stakeholder, supplier, and customer
        information collection to complete the project requirements.
      show_loop_feedback: true  # Display feedback about loop condition evaluations in the output.
      agent_list:
        - agent_name: "Stakeholder Specialist"  # Loop head — execution rewinds to here.
          next_step:
            - 'Customer Specialist'

        - agent_name: 'Customer Specialist'  # Loop tail — loop conditions are evaluated here.
          next_step:
            - condition: 'stakeholder info is incomplete'  # If true, rewind to Stakeholder Specialist.
              loop:
                to: ['Stakeholder Specialist']  # Must be upstream of Customer Specialist.
                max_iterations: 2               # Allow at most 2 rewinds.
            - default: true                     # Proceed forward when condition is false or limit reached.
              to: ['Supplier Specialist']

        - agent_name: "Supplier Specialist"  # End node — runs once intake is complete.
In this example:

  • Loop feedback will appear in the output.
  • The loop condition is evaluated at Customer Specialist after it executes.
  • If 'stakeholder info is incomplete' is true, execution rewinds to Stakeholder Specialist, re-running both Stakeholder Specialist and Customer Specialist (up to 2 times).
  • If the condition is false or the iteration limit is reached, Supplier Specialist runs to complete the intake.

2. Python File

Now, you can start the development of your assistant using these lines of code:

import asyncio
import os

from air import AsyncAIRefinery
from air.utils import async_print


async def process_query(query, project, client):
    """
    Process a single query using a new DistillerClient instance.
    Ensures each query is independent by using a unique session UUID.
    """
    # Generate a unique identifier for this temporary session using process ID.
    session_uuid = f"temp_user_{os.getpid()}"
    async with client(
        project=project,
        uuid=session_uuid,
    ) as dc:
        # Send query to the FlowSuperAgent project.
        responses = await dc.query(query=query)
        print(f"-------\nQuery: {query}")
        # Iterate through streaming responses from agents in the workflow.
        async for response in responses:
            role = response.get("role", "Unknown Agent")  # Agent name
            content = response.get("content", "No content")  # Agent's reply
            await async_print(f"\n<<< Response from {role} >>>")
            await async_print(content)
            await async_print("-" * 20)

        # Clear all stored memory for this session so next run is clean.
        await dc.reset_memory()
        await async_print("Memory reset complete.")
        await async_print("Query handled successfully.")
        await async_print("-" * 20)


async def flow_super_agent_loop_demo(api_key):
    """
    Demonstrates running the FlowSuperAgent with a loop configuration.
    Each query is processed sequentially with a fresh session.
    """
    # First, register the FlowSuperAgent project from its YAML configuration.
    client = AsyncAIRefinery(api_key=api_key)
    client.create_project(
        config_path="flow_superagent_loop.yaml",
        project="information_collection_agent",
    )

    # Example queries to run through the FlowSuperAgent.
    queries = [
        "Call the Information Collection Agent to collect additional all necessary data. None of the project information has been provided yet."
    ]
    # Process each query one by one.
    for query in queries:
        await process_query(query, "information_collection_agent", client)


if __name__ == "__main__":
    # Load API_KEY and ACCOUNT from local environment variables
    api_key = str(os.getenv("API_KEY"))

    # Entry point: run the asynchronous demo function.
    asyncio.run(flow_super_agent_loop_demo(api_key))