Skip to content

Realtime Distiller API

Realtime Distiller extends AI Refinery's Distiller to support real-time streaming interactions with both text and voice input. It supports:

  • Voice input: Real-time audio streaming from microphone
  • Voice output: Speech synthesis responses
  • Text input: Text queries with voice responses

Before you begin, you must create an authenticated AsyncAIRefinery client, as shown below. All Realtime Distiller APIs are accessed via client.realtime_distiller.

import os
from air import AsyncAIRefinery
from dotenv import load_dotenv


load_dotenv() # loads your API_KEY from your local '.env' file
api_key=str(os.getenv("API_KEY"))


client = AsyncAIRefinery(api_key=api_key)

Realtime Distiller Workflow


Realtime Distiller Workflow
Realtime Distiller Workflow

Preliminaries

Creating Your Project

client.realtime_distiller.create_project() (synchronous)

Creates a new project based on the specified YAML configuration file.

Parameters:

  • config_path (str): The path to the YAML configuration file.
  • project (str): A name for your project (letters, digits, hyphens, underscores only).

Returns:

  • bool: True if the project is successfully created.

Project Versioning:

  • Realtime Distiller automatically handles project versioning, starting at version 0.
  • The first time you create a project with a given name, it is assigned version 0. If you create another project with the same name, Distiller increments the version to 1, and so on.
  • By default, connections are made to the latest project version unless a specific version is specified. For more details, refer to the distiller connection section below.

Example:

# This command registers the project "example" using the "example.yaml" configuration file.
client.realtime_distiller.create_project(config_path="example.yaml", project="example")

Downloading Your Project Configuration

client.realtime_distiller.download_project() (synchronous)

Retrieves the configuration of a specified project from the server.

Parameters:

  • project (str): The name of the project whose configuration you want to download.
  • project_version (str, optional): The version of the project configuration to download. Defaults to the latest version if not provided.

Returns:

  • dict: A Python dictionary containing the downloaded configuration.

Example:

# This command downloads version "1" of the "example" project.
project_config = client.realtime_distiller.download_project(project="example", project_version="1")

Connecting to Realtime Distiller

client.realtime_distiller.__call__() (asynchronous)

Establishes an asynchronous connection (via a WebSocket) to the RealtimeDistiller endpoint for a specific project. Usage of this function within an async context manager allows easy management of all Distiller-related operations.

Parameters:

  • project (str): The project name (letters, digits, hyphens, underscores only).
  • uuid (str): A unique user identifier (letters, digits, hyphens, underscores only).
  • executor_dict (dict[str, Callable], optional): A dictionary mapping custom agent names to callable functions. These callables are invoked when their corresponding agents are triggered by the super agent or orchestrator. Defaults to {}.
  • project_version (str, optional): The project version to connect to. If not provided, Distiller uses the latest version.

Returns:

  • _VoiceDistillerContextManager: An asynchronous context manager that handles operations within the given project.

Example:

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    # Your asynchronous operations here
    pass

Audio Input

client.realtime_distiller.send_audio_chunk() (asynchronous)

Send chunks of audio bytes containing voice query to WebSocket asynchronously. Typically used within a loop to stream audio input.

Parameters:

  • audio_bytes (bytes): Raw audio data to send to the server.

Example:

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    async for audio_chunk in audio:
        await vc.send_audio_chunk(audio_chunk)

Text Input

client.realtime_distiller.send_text_query() (asynchronous)

Send text-based query to the WebSocket asynchronously.

Parameters:

  • text (str): The text query to send.

Example:

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    text = "example query"
    await vc.send_text_query(text)

Response Stream

client.realtime_distiller.get_responses() (asynchronous)

Continuously retrieve output (text or audio) responses from the WebSocket asynchronously.

Yields:

  • Dict: A dictionary representing a Realtime Event, containing a response type and an optional response content. Responses can be status events, text response, or speech response in the form of streamed audio chunks.

Example:

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    async for response in vc.get_responses():
        print(response)

Cancel Response

client.realtime_distiller.cancel_response() (asynchronous)

Requests cancellation of the current in-progress TTS (speech synthesis) by sending a response.cancel event to the server. Note that cancellation only affects audio playback — the Distiller (LLM) continues processing all agents to completion in the background.

Parameters:

None.

Returns:

  • None

Behavior:

  1. Sends a response.cancel event to the server
  2. The server stops TTS synthesis for the current agent immediately
  3. If additional agents are queued (multi-agent flows), the next agent's response begins automatically — only the current agent is skipped
  4. If no more agents are queued, the server emits response.audio.done, response.text.done, and response.done to close the response
  5. If called when no response is active (e.g. between queries, before response.created, or after response.done), the call returns immediately without sending any event to the server

Single-agent example:

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    await vc.send_text_query("Tell me about the solar system")
    async for response in vc.get_responses():
        if response.get("type") == "response.audio.delta":
            # ... handle audio ...
            if should_cancel:
                await vc.cancel_response()
        # response.done arrives normally after cancel,
        # terminating the get_responses() loop

Multi-agent example (per-agent cancellation):

In multi-agent flows (e.g., FlowSuperAgent), each agent's TTS can be cancelled individually. When cancelled, the server skips the current agent and moves to the next one. The response only ends after all agents have either played or been skipped.

To enable per-agent cancellation with the wrapper methods, pass a cancel_event (asyncio.Event). The SDK provides a built-in CancelOnKeypress helper that listens for spacebar presses:

from air.distiller.utils import realtime_helper

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    print("Press [SPACE] to skip the current agent...")
    async with realtime_helper.CancelOnKeypress() as cancel_event:
        await vc.send_text_and_respond(
            text="How can I protect my investments?",
            sample_rate=16000,
            cancel_event=cancel_event,
        )

Realtime Wrapper Methods

High-level methods that handle the complete voice interaction loop. These wrap the base voice APIs (send_audio_chunk(), send_text_query(), get_responses()) to provide a ready-to-use, end-to-end realtime voice experience.

client.realtime_distiller.listen_and_respond() (asynchronous)

Captures audio from the microphone, streams it to the server, and plays back audio responses through the speaker.

Parameters:

  • sample_rate (int, optional): Audio sample rate in Hz. Must match the sample_rate in your YAML speech_config. Defaults to 16000.
  • cancel_event (asyncio.Event, optional): When provided, enables mid-response cancellation. Setting this event triggers cancel_response() and stops audio playback for the current agent. In multi-agent flows, the event is automatically cleared when the next agent starts, allowing each agent to be cancelled individually with separate triggers. See CancelOnKeypress for a ready-to-use spacebar-based trigger.

Behavior:

  1. Streams microphone audio to the server using send_audio_chunk()
  2. Stops microphone capture when the server begins responding
  3. Receives server responses via get_responses()
  4. Plays TTS audio responses through the speaker
  5. Prints text transcriptions
  6. If cancel_event is provided and set, cancels the current agent's audio playback. In multi-agent flows, the next agent resumes automatically.

Example:

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    await vc.listen_and_respond(sample_rate=16000)

Example with cancellation:

from air.distiller.utils import realtime_helper

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    async with realtime_helper.CancelOnKeypress() as cancel_event:
        await vc.listen_and_respond(
            sample_rate=16000,
            cancel_event=cancel_event,
        )


client.realtime_distiller.send_text_and_respond() (asynchronous)

Sends a text query to the server and plays back audio responses through the speaker.

Parameters:

  • text (str): The text query to send.
  • sample_rate (int, optional): Audio sample rate in Hz. Must match the sample_rate in your YAML speech_config. Defaults to 16000.
  • cancel_event (asyncio.Event, optional): When provided, enables mid-response cancellation. Setting this event triggers cancel_response() and stops audio playback for the current agent. In multi-agent flows, the event is automatically cleared when the next agent starts, allowing each agent to be cancelled individually with separate triggers.

Raises:

  • ValueError: If text is empty.

Behavior:

  1. Sends the text query using send_text_query()
  2. Receives server responses via get_responses()
  3. Plays TTS audio responses through the speaker
  4. Prints text transcriptions
  5. If cancel_event is provided and set, cancels the current agent's audio playback. In multi-agent flows, the next agent resumes automatically.

Example:

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    await vc.send_text_and_respond(
        text="example query",
        sample_rate=16000
    )

Example with cancellation:

from air.distiller.utils import realtime_helper

async with client.realtime_distiller(
    project="example",
    uuid="test"
) as vc:
    async with realtime_helper.CancelOnKeypress() as cancel_event:
        await vc.send_text_and_respond(
            text="example query",
            sample_rate=16000,
            cancel_event=cancel_event,
        )


Realtime Events

Response events representing status, text response or speech response.

Type Fields/Description
session.created Status event indicating Realtime session creation
response.audio_transcript.delta delta (string) : Partial transcription text
response.audio_transcript.done text (string) : Final transcription text
response.created Status event indicating response has started
response.audio.delta audio (string) : Base64-encoded audio chunk.
response.audio.done Status event indicating current audio response is complete.
response.text.delta content (string): Partial text output from Distiller.
role (string): The agent name that produced this text.
In multi-agent flows, a new response.text.delta with a different role signals an agent boundary.
response.text.done Status event indicating Distiller text response is completed.
response.done Status event indicating response has completed
response.cancel Client-initiated event to request cancellation of the current in-progress response.
In multi-agent flows, cancels only the current agent's TTS — the next agent resumes automatically.

For examples of using Realtime Distiller, check out the tutorials: