Realtime Distiller API¶
Realtime Distiller extends AI Refinery's Distiller to support real-time streaming interactions with both text and voice input. It supports:
Push-to-talk mode:
- Voice input: Real-time audio streaming from microphone
- Voice output: Speech synthesis responses
- Text input: Text queries with voice responses
Barge-in mode:
- Voice input: Continuous audio streaming, including during AI playback
- Voice output: Interruptible speech synthesis responses
- Interruption: Speak at any time to interrupt AI audio — detected by server-side Voice Activity Detection (VAD)
Before you begin, you must create an authenticated AsyncAIRefinery client, as shown below. All Realtime Distiller APIs are accessed via client.realtime_distiller.
import os
from air import AsyncAIRefinery
from dotenv import load_dotenv
load_dotenv() # loads your API_KEY from your local '.env' file
api_key=str(os.getenv("API_KEY"))
client = AsyncAIRefinery(api_key=api_key)
Realtime Distiller Workflow¶
Preliminaries¶
Creating Your Project¶
client.realtime_distiller.create_project() (synchronous)¶
Creates a new project based on the specified YAML configuration file.
Parameters:
config_path(str): The path to the YAML configuration file.project(str): A name for your project (letters, digits, hyphens, underscores only).
Returns:
bool:Trueif the project is successfully created.
Project Versioning:
- Realtime Distiller automatically handles project versioning, starting at version 0.
- The first time you create a project with a given name, it is assigned version 0. If you create another project with the same name, Distiller increments the version to 1, and so on.
- By default, connections are made to the latest project version unless a specific version is specified. For more details, refer to the distiller connection section below.
Example:
# This command registers the project "example" using the "example.yaml" configuration file.
client.realtime_distiller.create_project(config_path="example.yaml", project="example")
Downloading Your Project Configuration¶
client.realtime_distiller.download_project() (synchronous)¶
Retrieves the configuration of a specified project from the server.
Parameters:
project(str): The name of the project whose configuration you want to download.project_version(str, optional): The version of the project configuration to download. Defaults to the latest version if not provided.
Returns:
dict: A Python dictionary containing the downloaded configuration.
Example:
# This command downloads version "1" of the "example" project.
project_config = client.realtime_distiller.download_project(project="example", project_version="1")
Connecting to Realtime Distiller¶
client.realtime_distiller.__call__() (asynchronous)¶
Establishes an asynchronous connection (via a WebSocket) to the RealtimeDistiller endpoint for a specific project. Usage of this function within an async context manager allows easy management of all Distiller-related operations.
Parameters:
project(str): The project name (letters, digits, hyphens, underscores only).uuid(str): A unique user identifier (letters, digits, hyphens, underscores only).executor_dict(dict[str, Callable], optional): A dictionary mapping custom agent names to callable functions. These callables are invoked when their corresponding agents are triggered by the super agent or orchestrator. Defaults to{}.project_version(str, optional): The project version to connect to. If not provided, Distiller uses the latest version.
Returns:
_VoiceDistillerContextManager: An asynchronous context manager that handles operations within the given project.
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
# Your asynchronous operations here
pass
Audio Input¶
client.realtime_distiller.send_audio_chunk() (asynchronous)¶
Send chunks of audio bytes containing voice query to WebSocket asynchronously. Typically used within a loop to stream audio input.
Parameters:
audio_bytes(bytes): Raw audio data to send to the server.
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
async for audio_chunk in audio:
await vc.send_audio_chunk(audio_chunk)
Text Input¶
client.realtime_distiller.send_text_query() (asynchronous)¶
Send text-based query to the WebSocket asynchronously.
Parameters:
text(str): The text query to send.
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
text = "example query"
await vc.send_text_query(text)
Response Stream¶
client.realtime_distiller.get_responses() (asynchronous)¶
Continuously retrieve output (text or audio) responses from the WebSocket asynchronously.
Yields:
Dict: A dictionary representing a Realtime Event, containing a response type and an optional response content. Responses can be status events, text response, or speech response in the form of streamed audio chunks.
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
async for response in vc.get_responses():
print(response)
Cancel Response¶
client.realtime_distiller.cancel_response() (asynchronous)¶
Requests cancellation of the current in-progress TTS (speech synthesis) by sending a response.cancel event to the server. Note that cancellation only affects audio playback — the Distiller (LLM) continues processing all agents to completion in the background.
Parameters:
None.
Returns:
None
Behavior:
- Sends a
response.cancelevent to the server - The server stops TTS synthesis for the current agent immediately
- If additional agents are queued (multi-agent flows), the next agent's response begins automatically — only the current agent is skipped
- If no more agents are queued, the server emits
response.audio.done,response.text.done, andresponse.doneto close the response - If called when no response is active (e.g. between queries, before
response.created, or afterresponse.done), the call returns immediately without sending any event to the server
Single-agent example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
await vc.send_text_query("Tell me about the solar system")
async for response in vc.get_responses():
if response.get("type") == "response.audio.delta":
# ... handle audio ...
if should_cancel:
await vc.cancel_response()
# response.done arrives normally after cancel,
# terminating the get_responses() loop
Multi-agent example (per-agent cancellation):
In multi-agent flows (e.g., FlowSuperAgent), each agent's TTS can be cancelled individually. When cancelled, the server skips the current agent and moves to the next one. The response only ends after all agents have either played or been skipped.
To enable per-agent cancellation with the wrapper methods, pass a cancel_event (asyncio.Event). The SDK provides a built-in CancelOnKeypress helper that listens for spacebar presses:
from air.distiller.utils import realtime_helper
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
print("Press [SPACE] to skip the current agent...")
async with realtime_helper.CancelOnKeypress() as cancel_event:
await vc.send_text_and_respond(
text="How can I protect my investments?",
sample_rate=16000,
cancel_event=cancel_event,
)
Realtime Wrapper Methods¶
High-level methods that handle the complete voice interaction loop. These wrap the base voice APIs (send_audio_chunk(), send_text_query(), get_responses()) to provide a ready-to-use, end-to-end realtime voice experience.
client.realtime_distiller.listen_and_respond() (asynchronous)¶
Captures audio from the microphone, streams it to the server, and plays back audio responses through the speaker.
Parameters:
sample_rate(int, optional): Audio sample rate in Hz. Must match thesample_ratein your YAMLspeech_config. Defaults to16000.cancel_event(asyncio.Event, optional): When provided, enables mid-response cancellation. Setting this event triggerscancel_response()and stops audio playback for the current agent. In multi-agent flows, the event is automatically cleared when the next agent starts, allowing each agent to be cancelled individually with separate triggers. SeeCancelOnKeypressfor a ready-to-use spacebar-based trigger.
Behavior:
- Streams microphone audio to the server using
send_audio_chunk() - Stops microphone capture when the server begins responding
- Receives server responses via
get_responses() - Plays TTS audio responses through the speaker
- Prints text transcriptions
- If
cancel_eventis provided and set, cancels the current agent's audio playback. In multi-agent flows, the next agent resumes automatically.
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
await vc.listen_and_respond(sample_rate=16000)
Example with cancellation:
from air.distiller.utils import realtime_helper
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
async with realtime_helper.CancelOnKeypress() as cancel_event:
await vc.listen_and_respond(
sample_rate=16000,
cancel_event=cancel_event,
)
client.realtime_distiller.send_text_and_respond() (asynchronous)¶
Sends a text query to the server and plays back audio responses through the speaker.
Parameters:
text(str): The text query to send.sample_rate(int, optional): Audio sample rate in Hz. Must match thesample_ratein your YAMLspeech_config. Defaults to16000.cancel_event(asyncio.Event, optional): When provided, enables mid-response cancellation. Setting this event triggerscancel_response()and stops audio playback for the current agent. In multi-agent flows, the event is automatically cleared when the next agent starts, allowing each agent to be cancelled individually with separate triggers.
Raises:
ValueError: Iftextis empty.
Behavior:
- Sends the text query using
send_text_query() - Receives server responses via
get_responses() - Plays TTS audio responses through the speaker
- Prints text transcriptions
- If
cancel_eventis provided and set, cancels the current agent's audio playback. In multi-agent flows, the next agent resumes automatically.
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
await vc.send_text_and_respond(
text="example query",
sample_rate=16000
)
Example with cancellation:
from air.distiller.utils import realtime_helper
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
async with realtime_helper.CancelOnKeypress() as cancel_event:
await vc.send_text_and_respond(
text="example query",
sample_rate=16000,
cancel_event=cancel_event,
)
Barge-In Mode¶
Barge-in enables users to interrupt an AI audio response by speaking during playback. The server continuously monitors the incoming audio stream using Voice Activity Detection (VAD) and sends a response.interrupted event when user speech is detected while the AI is active.
Barge-in is server-initiated: the server decides when an interruption has occurred and notifies the client. This is distinct from cancel_response(), which is a deliberate client-initiated action.
Behavior:
- The client continuously streams microphone audio to the server — including during AI playback
- The server VAD monitors the stream and checks whether the AI is currently active using three signals: server TTS state, client playback state (from
notify_playback_started/stopped), and a configurable grace period after the last audio chunk was sent - When speech is detected above the configured threshold and the lockout period has elapsed, the server sends
response.interruptedand stops TTS synthesis - The client stops audio playback, clears buffered audio, and resumes listening for the user's next query
Note: Barge-in requires Acoustic Echo Cancellation (AEC) on the client to prevent the AI's own audio from triggering false interruptions. See the tutorial for AEC options.
For configuration details (YAML setup and VAD parameters), see the Barge-In Tutorial.
Playback State Notifications¶
client.realtime_distiller.notify_playback_started() (asynchronous)¶
Notifies the server that the client has started playing TTS audio. Call this when the first audio chunk of a response begins playing. Used by the server VAD to accurately determine when client playback is active for barge-in detection.
Parameters:
None.
Returns:
None
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
playback_started = False
async for response in vc.get_responses():
if response.get("type") == "response.audio.delta":
if not playback_started:
await vc.notify_playback_started()
playback_started = True
# ... play audio chunk ...
client.realtime_distiller.notify_playback_stopped() (asynchronous)¶
Notifies the server that the client has finished playing TTS audio. Call this after the last audio chunk finishes playing normally. Do not call after a response.interrupted event — the server already knows playback was cut short.
Parameters:
None.
Returns:
None
Example:
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
async for response in vc.get_responses():
if response.get("type") == "response.audio.done":
await vc.notify_playback_stopped()
Handling response.interrupted¶
When response.interrupted is received in get_responses(), stop audio playback immediately and discard any buffered audio chunks. The server follows this event with response.done to close the response normally.
async with client.realtime_distiller(
project="example",
uuid="test"
) as vc:
playback_started = False
audio_chunks = []
async for response in vc.get_responses():
t = response.get("type")
if t == "response.audio.delta":
audio_chunks.append(response.get("audio"))
if not playback_started:
playback_started = True
await vc.notify_playback_started()
# ... play audio chunk ...
elif t == "response.interrupted":
# Barge-in detected — stop playback and discard buffered audio
audio_chunks.clear()
playback_started = False
# response.done follows to close the stream
elif t == "response.done":
if playback_started:
await vc.notify_playback_stopped()
break
Realtime Events¶
Response events representing status, text response or speech response.
| Type | Fields/Description |
|---|---|
session.created |
Status event indicating Realtime session creation |
response.audio_transcript.delta |
delta (string) : Partial transcription text |
response.audio_transcript.done |
text (string) : Final transcription text |
response.created |
Status event indicating response has started |
response.audio.delta |
audio (string) : Base64-encoded audio chunk. |
response.audio.done |
Status event indicating current audio response is complete. |
response.text.delta |
content (string): Partial text output from Distiller.role (string): The agent name that produced this text.In multi-agent flows, a new response.text.delta with a different role signals an agent boundary. |
response.text.done |
Status event indicating Distiller text response is completed. |
response.done |
Status event indicating response has completed |
response.cancel |
Client-initiated event to request cancellation of the current in-progress response. In multi-agent flows, cancels only the current agent's TTS — the next agent resumes automatically. |
response.interrupted |
Server-initiated event indicating barge-in was detected. On receipt, stop audio playback immediately and clear pending audio buffers. Always followed by response.done. |
playback.started |
Client-initiated event sent via notify_playback_started()when the client begins playing TTS audio. Informs the server VAD that audio is actively playing. |
playback.stopped |
Client-initiated event sent via notify_playback_stopped()when the client finishes playing TTS audio normally. Clears the server's client-playback tracking state. |
For examples of using Realtime Distiller, check out the tutorials: