Skip to content

Observability API

The Observability API provides access to logs, metrics, and distributed traces collected from your AI Refinery projects via OpenTelemetry. You can use this API through the AIRefinery or AsyncAIRefinery client.

Note: The Observability API is available starting from SDK version 1.25.0. The server automatically resolves the caller's organization_id from the bearer token, so you do not need to supply it.

Deprecation Notice: The USE_AIR_API_V2_BASE_URL environment variable is deprecated and no longer needed as of SDK version 1.28.0. The observability endpoints are now served from the default API URL (api.airefinery.accenture.com).

Overview

The AIRefinery and AsyncAIRefinery clients expose three observability sub-clients:

Sub-client Endpoint Response Type Description
client.logs /observability/logs LogsResponse Query application logs
client.metrics /observability/metrics MetricsResponse Query application metrics
client.traces /observability/traces TracesResponse Query distributed traces

Each sub-client provides a query(**parameters) method that accepts the same parameters as the corresponding REST endpoint.

  • On AsyncAIRefinery, query() is an async method — use await.
  • On AIRefinery, query() is a synchronous blocking method.

Response Classes

Each query() method returns a typed response wrapper that provides:

  • Dict-like access for backward compatibility: response["status"], "key" in response
  • Typed helper methods for common access patterns: response.status(), response.samples()
  • Iterators for efficient data processing

Raises

  • httpx.HTTPStatusError: If the server returns a non-2xx status code.

Logs

Query application logs with timestamps, filterable by project, severity, and time range. Logs capture request handling, authentication flows, system interactions, and external dependency behavior.

Asynchronous Log Retrieval

AsyncAIRefinery.logs.query()

Parameters:
  • project_name (string, Optional): Project name to filter logs.
  • severity (string, Optional): Filter by severity level: debug, info, warning, error.
  • time_window (string, Optional): Time range for logs (e.g., '5m', '1h', '24h'). Default: '24h'.
  • limit (integer, Optional): Maximum number of log entries to return. Default: 500.
Returns:

Returns a LogsResponse object containing:

  • status()str: Query status (e.g., "success").
  • result_type()str: Result type (e.g., "streams").
  • samples()List[LogStreamSample]: List of log stream samples, each with:
    • stream: Dict of stream labels (e.g., {"severity": "error"}).
    • values: List of [timestamp_ns, message] tuples.
    • iter_messages(): Iterator yielding (timestamp_ns, message) tuples.
    • iter_messages_seconds(): Iterator yielding (timestamp_seconds, message) tuples.
  • streams()Iterator[Dict]: Iterator over raw stream dicts.
  • stream_values(stream_index)List: Raw values list for a specific stream.
  • iter_pairs()Iterator[Tuple[Dict, List]]: Iterator of (stream_labels, values) tuples.
  • Dict-like access: response["status"], "data" in response, response.keys().
Example Usage:
import asyncio
import os

from air import AsyncAIRefinery
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))


async def get_logs():
    # Initialize the async client with your API key
    client = AsyncAIRefinery(api_key=api_key)

    # Query logs filtered by project, severity, and time range
    response = await client.logs.query(
        project_name="project-x",
        severity="error",
        time_window="30m",
        limit=100,
    )

    # Check the query status using the typed helper method
    print(f"Status: {response.status()}")

    # Iterate over LogStreamSample objects returned by samples()
    # Each sample contains stream labels and log messages
    for sample in response.samples():
        print(f"Stream labels: {sample.stream}")
        # iter_messages_seconds() converts nanosecond timestamps to seconds
        for ts_seconds, message in sample.iter_messages_seconds():
            print(f"  [{ts_seconds}] {message}")

    # Dict-like access is supported for backward compatibility
    print(response["status"])


if __name__ == "__main__":
    asyncio.run(get_logs())

Synchronous Log Retrieval

AIRefinery.logs.query()

This method supports the same parameters and return structure as the asynchronous method (AsyncAIRefinery.logs.query()) described above.

Example Usage:
import os

from air import AIRefinery
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))


def get_logs():
    # Initialize the synchronous client
    client = AIRefinery(api_key=api_key)

    # Query logs for a specific project within the last 30 minutes
    response = client.logs.query(
        project_name="project-x",
        time_window="30m",
        limit=100,
    )

    # samples() returns a list of LogStreamSample dataclass objects
    print(f"Found {len(response.samples())} log streams")
    for sample in response.samples():
        # Convert nanosecond timestamps to seconds for readability
        for ts, msg in sample.iter_messages_seconds():
            print(f"[{ts}] {msg}")


if __name__ == "__main__":
    get_logs()

Metrics

Query application metrics covering inference performance, agent operations, token consumption, RAI compliance, and session analytics. For a complete list of available metrics, see Metrics & Traces Reference.

Asynchronous Metrics Retrieval

AsyncAIRefinery.metrics.query()

Parameters:
  • metric (string, Required): Metric name from the Metrics & Traces Reference (e.g., 'token_consumption', 'agent_task_total').
  • project_name (string, Optional): Project name to filter metrics.
  • agent_name (string, Optional): Agent name to filter metrics.
  • agent_class (string, Optional): Agent class to filter metrics (e.g., 'ToolUseAgent', 'SearchAgent').
  • model_key (string, Optional): Model identifier for inference metrics.
  • session_id (string, Optional): Session ID to filter metrics to a specific user session.
  • status (string, Optional): Status filter (e.g., 'success', 'failure', 'timeout'). Default: 'success'.
  • category (string, Optional): RAI rejection category: harassment, hate, self-harm, sexual, violence, illicit.
  • percentile (string, Optional): Percentile for latency metrics (e.g., '0.50', '0.95'). Default: '0.95'.
  • time_window (string, Optional): Time range (e.g., '5m', '1h', '24h'). Default: '1h'.
  • step (string, Optional): Bucket interval for time-series output (e.g., '15m', '1h'). Default: '1h'.
Returns:

Returns a MetricsResponse object containing:

  • status()str: Query status (e.g., "success").
  • query()str: The PromQL query executed.
  • result_type()str: Result type (e.g., "matrix", "vector").
  • samples()List[MetricSample]: List of metric samples, each with:
    • metric: Dict of metric labels.
    • values: List of [timestamp, value] tuples (for matrix) or single value.
  • sample_values(sample_index)List: Raw values for a specific sample.
  • iter_pairs()Iterator[Tuple[Dict, List]]: Iterator of (metric_labels, values) tuples.
  • Dict-like access: response["status"], "data" in response, response.keys().
Example Usage:
import asyncio
import os

from air import AsyncAIRefinery
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))


async def get_metrics():
    # Initialize the async client with your API key
    client = AsyncAIRefinery(api_key=api_key)

    # Query token consumption metrics for a project over the last hour
    response = await client.metrics.query(
        metric="token_consumption",
        project_name="project-x",
        time_window="1h",
    )

    # Access response metadata via typed helper methods
    # query() returns the PromQL query that was executed
    print(f"Status: {response.status()}, Query: {response.query()}")

    # samples() returns a list of MetricSample dataclass objects
    # Each sample has 'metric' (labels dict) and 'values' (time-series data)
    for sample in response.samples():
        print(f"Labels: {sample.metric}")
        print(f"Values: {sample.values}")

    # iter_pairs() provides a convenient iterator over (labels, values) tuples
    for labels, values in response.iter_pairs():
        print(f"{labels} -> {values}")


if __name__ == "__main__":
    asyncio.run(get_metrics())

Synchronous Metrics Retrieval

AIRefinery.metrics.query()

This method supports the same parameters and return structure as the asynchronous method (AsyncAIRefinery.metrics.query()) described above.

Example Usage:
import os

from air import AIRefinery
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))


def get_metrics():
    # Initialize the synchronous client
    client = AIRefinery(api_key=api_key)

    # Query agent task metrics filtered by project and session
    response = client.metrics.query(
        metric="agent_task_total",
        project_name="project-x",
        session_id="sess-abc123",
        time_window="1h",
    )

    # result_type() returns 'matrix' (time-series) or 'vector' (instant)
    print(f"Result type: {response.result_type()}")
    # Iterate over MetricSample objects containing labels and values
    for sample in response.samples():
        print(f"{sample.metric}: {sample.values}")


if __name__ == "__main__":
    get_metrics()

Traces

Query distributed traces across AI Refinery services, enabling you to inspect agent workflows, identify performance bottlenecks, and debug cross-service interactions. For available trace presets, see Metrics & Traces Reference.

Asynchronous Trace Retrieval

AsyncAIRefinery.traces.query()

Parameters:
  • trace (string, Required): Trace name from the Metrics & Traces Reference (e.g., 'inference_traces', 'distiller_traces').
  • project_name (string, Optional): Project name to filter traces.
  • trace_id (string, Optional): Specific trace ID to retrieve.
  • time_window (string, Optional): Time range for query (e.g., '5m', '1h', '24h').
  • detail (boolean, Optional): Whether to include detailed trace information. Default: true.
  • limit (integer, Optional): Maximum number of traces to return. Default: 100.
Returns:

Returns a TracesResponse object containing:

  • traces()List[Dict]: List of trace objects.
  • batches()List[TraceBatch]: List of trace batches, each with:
    • resource: Resource information dict.
    • scope_spans: List of scope span objects.
  • spans()List[Dict]: Flattened list of all spans across all batches.
  • iter_spans()Iterator[Dict]: Iterator over all spans.
  • Dict-like access: response["traces"], "batches" in response, response.keys().
Example Usage:
import asyncio
import os

from air import AsyncAIRefinery
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))


async def get_traces():
    # Initialize the async client with your API key
    client = AsyncAIRefinery(api_key=api_key)

    # Query distiller traces for a project over the last hour
    response = await client.traces.query(
        trace="distiller_traces",
        project_name="project-x",
        time_window="1h",
    )

    # traces() returns the list of trace objects
    print(f"Found {len(response.traces())} traces")

    # iter_spans() flattens all spans across all batches into a single iterator
    for span in response.iter_spans():
        print(f"Span: {span.get('name')} - {span.get('status')}")

    # batches() returns TraceBatch dataclass objects with resource metadata
    for batch in response.batches():
        print(f"Resource: {batch.resource}")


if __name__ == "__main__":
    asyncio.run(get_traces())

Synchronous Trace Retrieval

AIRefinery.traces.query()

This method supports the same parameters and return structure as the asynchronous method (AsyncAIRefinery.traces.query()) described above.

Example Usage:
import os

from air import AIRefinery
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))


def get_traces():
    # Initialize the synchronous client
    client = AIRefinery(api_key=api_key)

    # Query inference traces over the last hour
    response = client.traces.query(
        trace="inference_traces",
        time_window="1h",
    )

    # traces() returns the raw trace list, spans() returns all spans flattened
    print(f"Traces: {len(response.traces())}")
    print(f"Total spans: {len(response.spans())}")
    # iter_spans() iterates over spans without loading all into memory
    for span in response.iter_spans():
        print(span.get("name"))


if __name__ == "__main__":
    get_traces()