Observability API¶
The Observability API provides access to logs, metrics, and distributed traces collected from your AI Refinery projects via OpenTelemetry. You can use this API through the AIRefinery or AsyncAIRefinery client.
Note: The Observability API is available starting from SDK version 1.25.0. The server automatically resolves the caller's
organization_idfrom the bearer token, so you do not need to supply it.Deprecation Notice: The
USE_AIR_API_V2_BASE_URLenvironment variable is deprecated and no longer needed as of SDK version 1.28.0. The observability endpoints are now served from the default API URL (api.airefinery.accenture.com).
Overview¶
The AIRefinery and AsyncAIRefinery clients expose three observability sub-clients:
| Sub-client | Endpoint | Response Type | Description |
|---|---|---|---|
client.logs |
/observability/logs |
LogsResponse |
Query application logs |
client.metrics |
/observability/metrics |
MetricsResponse |
Query application metrics |
client.traces |
/observability/traces |
TracesResponse |
Query distributed traces |
Each sub-client provides a query(**parameters) method that accepts the same parameters as the corresponding REST endpoint.
- On
AsyncAIRefinery,query()is an async method — useawait. - On
AIRefinery,query()is a synchronous blocking method.
Response Classes¶
Each query() method returns a typed response wrapper that provides:
- Dict-like access for backward compatibility:
response["status"],"key" in response - Typed helper methods for common access patterns:
response.status(),response.samples() - Iterators for efficient data processing
Raises¶
httpx.HTTPStatusError: If the server returns a non-2xx status code.
Logs¶
Query application logs with timestamps, filterable by project, severity, and time range. Logs capture request handling, authentication flows, system interactions, and external dependency behavior.
Asynchronous Log Retrieval¶
AsyncAIRefinery.logs.query()¶
Parameters:¶
- project_name (string, Optional): Project name to filter logs.
- severity (string, Optional): Filter by severity level:
debug,info,warning,error. - time_window (string, Optional): Time range for logs (e.g.,
'5m','1h','24h'). Default:'24h'. - limit (integer, Optional): Maximum number of log entries to return. Default:
500.
Returns:¶
Returns a LogsResponse object containing:
- status() →
str: Query status (e.g.,"success"). - result_type() →
str: Result type (e.g.,"streams"). - samples() →
List[LogStreamSample]: List of log stream samples, each with:stream: Dict of stream labels (e.g.,{"severity": "error"}).values: List of[timestamp_ns, message]tuples.iter_messages(): Iterator yielding(timestamp_ns, message)tuples.iter_messages_seconds(): Iterator yielding(timestamp_seconds, message)tuples.
- streams() →
Iterator[Dict]: Iterator over raw stream dicts. - stream_values(stream_index) →
List: Raw values list for a specific stream. - iter_pairs() →
Iterator[Tuple[Dict, List]]: Iterator of(stream_labels, values)tuples. - Dict-like access:
response["status"],"data" in response,response.keys().
Example Usage:¶
import asyncio
import os
from air import AsyncAIRefinery
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))
async def get_logs():
# Initialize the async client with your API key
client = AsyncAIRefinery(api_key=api_key)
# Query logs filtered by project, severity, and time range
response = await client.logs.query(
project_name="project-x",
severity="error",
time_window="30m",
limit=100,
)
# Check the query status using the typed helper method
print(f"Status: {response.status()}")
# Iterate over LogStreamSample objects returned by samples()
# Each sample contains stream labels and log messages
for sample in response.samples():
print(f"Stream labels: {sample.stream}")
# iter_messages_seconds() converts nanosecond timestamps to seconds
for ts_seconds, message in sample.iter_messages_seconds():
print(f" [{ts_seconds}] {message}")
# Dict-like access is supported for backward compatibility
print(response["status"])
if __name__ == "__main__":
asyncio.run(get_logs())
Synchronous Log Retrieval¶
AIRefinery.logs.query()¶
This method supports the same parameters and return structure as the asynchronous method (AsyncAIRefinery.logs.query()) described above.
Example Usage:¶
import os
from air import AIRefinery
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))
def get_logs():
# Initialize the synchronous client
client = AIRefinery(api_key=api_key)
# Query logs for a specific project within the last 30 minutes
response = client.logs.query(
project_name="project-x",
time_window="30m",
limit=100,
)
# samples() returns a list of LogStreamSample dataclass objects
print(f"Found {len(response.samples())} log streams")
for sample in response.samples():
# Convert nanosecond timestamps to seconds for readability
for ts, msg in sample.iter_messages_seconds():
print(f"[{ts}] {msg}")
if __name__ == "__main__":
get_logs()
Metrics¶
Query application metrics covering inference performance, agent operations, token consumption, RAI compliance, and session analytics. For a complete list of available metrics, see Metrics & Traces Reference.
Asynchronous Metrics Retrieval¶
AsyncAIRefinery.metrics.query()¶
Parameters:¶
- metric (string, Required): Metric name from the Metrics & Traces Reference (e.g.,
'token_consumption','agent_task_total'). - project_name (string, Optional): Project name to filter metrics.
- agent_name (string, Optional): Agent name to filter metrics.
- agent_class (string, Optional): Agent class to filter metrics (e.g.,
'ToolUseAgent','SearchAgent'). - model_key (string, Optional): Model identifier for inference metrics.
- session_id (string, Optional): Session ID to filter metrics to a specific user session.
- status (string, Optional): Status filter (e.g.,
'success','failure','timeout'). Default:'success'. - category (string, Optional): RAI rejection category:
harassment,hate,self-harm,sexual,violence,illicit. - percentile (string, Optional): Percentile for latency metrics (e.g.,
'0.50','0.95'). Default:'0.95'. - time_window (string, Optional): Time range (e.g.,
'5m','1h','24h'). Default:'1h'. - step (string, Optional): Bucket interval for time-series output (e.g.,
'15m','1h'). Default:'1h'.
Returns:¶
Returns a MetricsResponse object containing:
- status() →
str: Query status (e.g.,"success"). - query() →
str: The PromQL query executed. - result_type() →
str: Result type (e.g.,"matrix","vector"). - samples() →
List[MetricSample]: List of metric samples, each with:metric: Dict of metric labels.values: List of[timestamp, value]tuples (for matrix) or single value.
- sample_values(sample_index) →
List: Raw values for a specific sample. - iter_pairs() →
Iterator[Tuple[Dict, List]]: Iterator of(metric_labels, values)tuples. - Dict-like access:
response["status"],"data" in response,response.keys().
Example Usage:¶
import asyncio
import os
from air import AsyncAIRefinery
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))
async def get_metrics():
# Initialize the async client with your API key
client = AsyncAIRefinery(api_key=api_key)
# Query token consumption metrics for a project over the last hour
response = await client.metrics.query(
metric="token_consumption",
project_name="project-x",
time_window="1h",
)
# Access response metadata via typed helper methods
# query() returns the PromQL query that was executed
print(f"Status: {response.status()}, Query: {response.query()}")
# samples() returns a list of MetricSample dataclass objects
# Each sample has 'metric' (labels dict) and 'values' (time-series data)
for sample in response.samples():
print(f"Labels: {sample.metric}")
print(f"Values: {sample.values}")
# iter_pairs() provides a convenient iterator over (labels, values) tuples
for labels, values in response.iter_pairs():
print(f"{labels} -> {values}")
if __name__ == "__main__":
asyncio.run(get_metrics())
Synchronous Metrics Retrieval¶
AIRefinery.metrics.query()¶
This method supports the same parameters and return structure as the asynchronous method (AsyncAIRefinery.metrics.query()) described above.
Example Usage:¶
import os
from air import AIRefinery
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))
def get_metrics():
# Initialize the synchronous client
client = AIRefinery(api_key=api_key)
# Query agent task metrics filtered by project and session
response = client.metrics.query(
metric="agent_task_total",
project_name="project-x",
session_id="sess-abc123",
time_window="1h",
)
# result_type() returns 'matrix' (time-series) or 'vector' (instant)
print(f"Result type: {response.result_type()}")
# Iterate over MetricSample objects containing labels and values
for sample in response.samples():
print(f"{sample.metric}: {sample.values}")
if __name__ == "__main__":
get_metrics()
Traces¶
Query distributed traces across AI Refinery services, enabling you to inspect agent workflows, identify performance bottlenecks, and debug cross-service interactions. For available trace presets, see Metrics & Traces Reference.
Asynchronous Trace Retrieval¶
AsyncAIRefinery.traces.query()¶
Parameters:¶
- trace (string, Required): Trace name from the Metrics & Traces Reference (e.g.,
'inference_traces','distiller_traces'). - project_name (string, Optional): Project name to filter traces.
- trace_id (string, Optional): Specific trace ID to retrieve.
- time_window (string, Optional): Time range for query (e.g.,
'5m','1h','24h'). - detail (boolean, Optional): Whether to include detailed trace information. Default:
true. - limit (integer, Optional): Maximum number of traces to return. Default:
100.
Returns:¶
Returns a TracesResponse object containing:
- traces() →
List[Dict]: List of trace objects. - batches() →
List[TraceBatch]: List of trace batches, each with:resource: Resource information dict.scope_spans: List of scope span objects.
- spans() →
List[Dict]: Flattened list of all spans across all batches. - iter_spans() →
Iterator[Dict]: Iterator over all spans. - Dict-like access:
response["traces"],"batches" in response,response.keys().
Example Usage:¶
import asyncio
import os
from air import AsyncAIRefinery
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))
async def get_traces():
# Initialize the async client with your API key
client = AsyncAIRefinery(api_key=api_key)
# Query distiller traces for a project over the last hour
response = await client.traces.query(
trace="distiller_traces",
project_name="project-x",
time_window="1h",
)
# traces() returns the list of trace objects
print(f"Found {len(response.traces())} traces")
# iter_spans() flattens all spans across all batches into a single iterator
for span in response.iter_spans():
print(f"Span: {span.get('name')} - {span.get('status')}")
# batches() returns TraceBatch dataclass objects with resource metadata
for batch in response.batches():
print(f"Resource: {batch.resource}")
if __name__ == "__main__":
asyncio.run(get_traces())
Synchronous Trace Retrieval¶
AIRefinery.traces.query()¶
This method supports the same parameters and return structure as the asynchronous method (AsyncAIRefinery.traces.query()) described above.
Example Usage:¶
import os
from air import AIRefinery
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
api_key = str(os.getenv("API_KEY"))
def get_traces():
# Initialize the synchronous client
client = AIRefinery(api_key=api_key)
# Query inference traces over the last hour
response = client.traces.query(
trace="inference_traces",
time_window="1h",
)
# traces() returns the raw trace list, spans() returns all spans flattened
print(f"Traces: {len(response.traces())}")
print(f"Total spans: {len(response.spans())}")
# iter_spans() iterates over spans without loading all into memory
for span in response.iter_spans():
print(span.get("name"))
if __name__ == "__main__":
get_traces()