From 79f780f81dd36107cb5919f5c130502099501d6f Mon Sep 17 00:00:00 2001 From: Copilot <223556219+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 02:07:44 -0300 Subject: [PATCH] fix(cloud-eval): lift grader execution errors into RowMetric.error When a Foundry azure_ai_evaluator grader fails to execute (e.g., the evaluator service principal lacks Cognitive Services OpenAI User on the model deployment), the per-metric score returns null and the real cause is buried in result.sample.error.message. Without surfacing it, operators see only actual=missing in the threshold table and have to dig into cloud_output_items.json to find the RBAC failure. The parser now extracts sample.error.message (and top-level error dicts), prefixing the error code when present. The orchestrator's 0-usable-scores warning quotes the first grader error so CI logs carry the actionable cause. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- CHANGELOG.md | 14 + src/agentops/pipeline/cloud_results.py | 565 ++++---- src/agentops/pipeline/orchestrator.py | 1628 ++++++++++++------------ tests/unit/test_cloud_results.py | 574 +++++---- 4 files changed, 1487 insertions(+), 1294 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 993973e..2c39949 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ This format follows [Keep a Changelog](https://keepachangelog.com/) and adheres ## [Unreleased] +### Fixed +- **Cloud eval surfaces grader execution errors instead of silent nulls.** + When a Foundry `azure_ai_evaluator` grader fails to execute (most + commonly because the evaluator service principal lacks + `Cognitive Services OpenAI User` on the target model deployment), the + per-metric `score` comes back `null` and the real cause is buried in + `result.sample.error.message`. The cloud-results parser now lifts that + message into `RowMetric.error` (including the error `code` prefix + when present), so the actionable error appears in `results.json` and + `report.md` instead of operators only seeing `actual=missing` in the + threshold table. The orchestrator's "0 usable metric scores" warning + also quotes the first grader error so CI logs carry the signal + without operators having to download the raw artifact. + ### Added - **`cloud_output_items.json` is now uploaded as a CI artifact.** Generated PR and deploy workflows (GitHub Actions and Azure DevOps) include `.agentops/results/latest/cloud_output_items.json` in the `agentops-*-results` artifact bundle alongside `results.json`, `report.md`, and `cloud_evaluation.json`. Pairs with the "0 usable scores" warning so operators can diagnose unrecognized Foundry grader shapes without re-running locally. - **`cloud_output_items.json` raw dump.** Every cloud eval run now diff --git a/src/agentops/pipeline/cloud_results.py b/src/agentops/pipeline/cloud_results.py index 2d13a1c..b259927 100644 --- a/src/agentops/pipeline/cloud_results.py +++ b/src/agentops/pipeline/cloud_results.py @@ -1,254 +1,311 @@ -"""Map Foundry cloud eval output items into AgentOps result shapes. - -When ``execution: cloud`` is used in ``agentops.yaml``, the agent and -evaluators run server-side via the Foundry / OpenAI Evals API. We then -download per-row ``output_items`` from Foundry and reshape them into the -same :class:`RowResult` / :class:`RunResult` schema that local execution -produces, so downstream consumers (``report.md``, ``--baseline`` diffing, -CI gates) behave identically regardless of where the run executed. - -The cloud output schema is intentionally loose: we accept multiple field -spellings (``output_text`` / ``output`` / ``message``; ``score`` / -``value`` / ``passed``) and fall back gracefully when a field is absent. -""" - -from __future__ import annotations - -import json -from typing import Any, Dict, List, Optional - -from agentops.core.results import RowMetric, RowResult - - -def rows_from_cloud_output_items( - output_items: List[Dict[str, Any]], -) -> List[RowResult]: - """Build a list of :class:`RowResult` from raw Foundry output items. - - ``output_items`` is the list returned by - ``cloud_runner._list_output_items``. Each item is a dict with at - least ``datasource_item``, ``sample`` and ``results``; missing keys - yield blank fields rather than raising. - """ - rows: List[RowResult] = [] - for index, item in enumerate(output_items): - rows.append(_row_from_item(index, item)) - return rows - - -def _row_from_item(index: int, item: Dict[str, Any]) -> RowResult: - datasource = _as_dict(item.get("datasource_item")) or {} - sample = _as_dict(item.get("sample")) or {} - results = item.get("results") or [] - - metrics: List[RowMetric] = [] - if isinstance(results, list): - for entry in results: - metric = _metric_from_result(entry) - if metric is not None: - metrics.append(metric) - - return RowResult( - row_index=index, - input=_as_str(datasource.get("input")), - expected=_optional_str(datasource.get("expected")), - response=_extract_response_text(sample), - context=_optional_str(datasource.get("context")), - latency_seconds=None, # Foundry-side latency is not client-perceived. - tool_calls=datasource.get("tool_calls") if isinstance(datasource.get("tool_calls"), list) else None, - metrics=metrics, - error=_extract_item_error(item), - ) - - -_NUMERIC_SCORE_KEYS = ( - "score", - "value", - "result", - "metric_value", - "rating", - "grader_score", - "numeric_value", -) -_PASS_LABELS = {"pass", "passed", "true", "yes", "1", "ok", "success"} -_FAIL_LABELS = {"fail", "failed", "false", "no", "0", "error", "errored"} - - -def _score_from_label(value: Any) -> Optional[float]: - """Map a textual pass/fail label onto 1.0 / 0.0.""" - if not isinstance(value, str): - return None - token = value.strip().lower() - if not token: - return None - if token in _PASS_LABELS: - return 1.0 - if token in _FAIL_LABELS: - return 0.0 - return None - - -def _score_from_mapping(entry: Dict[str, Any]) -> Optional[float]: - """Probe a dict-like result envelope for a score using a wide net of - field names. Mirrors the loose-shape contract documented at the top - of this module: Foundry / OpenAI Evals API have shipped at least - ``score``, ``value``, ``result``, ``grader_score`` and ``rating`` as - the numeric carrier across SDK versions and grader types, plus - ``passed`` (bool) and ``label`` (string) as binary fallbacks.""" - score = _coerce_float(*(entry.get(k) for k in _NUMERIC_SCORE_KEYS)) - if score is not None: - return score - passed = entry.get("passed") - if isinstance(passed, bool): - return 1.0 if passed else 0.0 - label_score = _score_from_label(entry.get("label")) - if label_score is not None: - return label_score - return None - - -def _metric_from_result(entry: Any) -> Optional[RowMetric]: - if not isinstance(entry, dict): - return None - name = entry.get("name") or entry.get("metric") - if not isinstance(name, str) or not name: - return None - - # First try the top-level envelope where azure_ai_evaluator graders - # populate `score` + `passed` directly. - score = _score_from_mapping(entry) - - # Some Foundry server-side graders (especially custom prompt-based - # evaluators) tuck the score down inside `sample` or `details` - # instead. Probe those as a fallback so a missing top-level score - # doesn't mask an evaluator that actually returned a number. - if score is None: - sample = entry.get("sample") - if isinstance(sample, dict): - score = _score_from_mapping(sample) - if score is None: - details = entry.get("details") - if isinstance(details, dict): - score = _score_from_mapping(details) - - reason = entry.get("reason") if isinstance(entry.get("reason"), str) else None - err = entry.get("error") if isinstance(entry.get("error"), str) else None - if score is None and err is None: - # Surface the missing-score case as a structured reason instead of - # silently writing null. The orchestrator/reporter use this string - # to point operators at `cloud_output_items.json` for triage. - err = ( - "no numeric score returned by Foundry grader; inspect " - "cloud_output_items.json in the results directory." - ) - return RowMetric(name=name, value=score, error=err, reason=reason) - - -def _extract_response_text(sample: Dict[str, Any]) -> str: - """Reach into a Foundry sample payload and pull a plain text response. - - Foundry's sample shape varies: sometimes the response is a clean string - under ``output_text``, sometimes it's a list of output items under - ``output`` / ``output_items``, and occasionally ``output_text`` is set - to a JSON-encoded version of the structured output. Try structured - fields first (they're authoritative), and recurse into JSON-encoded - strings rather than passing them through as the response. - """ - # 1. Structured fields are authoritative. Walk them first. - for key in ("output", "messages", "output_items"): - text = _text_from_structured(sample.get(key)) - if text: - return text - - # 2. Flat string fields. If the value looks like JSON, parse and recurse. - for key in ("output_text", "text", "content"): - value = sample.get(key) - if isinstance(value, str) and value: - stripped = value.strip() - if stripped.startswith("[") or stripped.startswith("{"): - try: - parsed = json.loads(stripped) - except (ValueError, TypeError): - return value - if isinstance(parsed, list): - text = _text_from_structured(parsed) - if text: - return text - elif isinstance(parsed, dict): - return _extract_response_text(parsed) - # Fall through to raw value if we couldn't extract. - return value - if isinstance(value, list): - text = _text_from_structured(value) - if text: - return text - return "" - - -def _text_from_structured(value: Any) -> str: - """Walk a list-of-dicts (output / messages / output_items shape) and - return the first textual payload encountered, or ``""`` when none is - found. Iterates in reverse so the assistant's final message wins. - """ - if not isinstance(value, list): - return "" - for entry in reversed(value): - if not isinstance(entry, dict): - continue - # Try flat text fields first. - for field in ("output_text", "text"): - candidate = entry.get(field) - if isinstance(candidate, str) and candidate: - return candidate - # Some shapes nest under "content" as either a string or a list of - # content blocks (OpenAI Responses API: content: [{type, text}, ...]). - nested = entry.get("content") - if isinstance(nested, str) and nested: - return nested - if isinstance(nested, list): - text = _text_from_structured(nested) - if text: - return text - return "" - - -def _extract_item_error(item: Dict[str, Any]) -> Optional[str]: - err = item.get("error") - if isinstance(err, str) and err: - return err - if isinstance(err, dict): - msg = err.get("message") or err.get("error") - if isinstance(msg, str) and msg: - return msg - status = item.get("status") - if isinstance(status, str) and status.lower() in {"failed", "error"}: - return f"output item status: {status}" - return None - - -def _as_dict(value: Any) -> Optional[Dict[str, Any]]: - return value if isinstance(value, dict) else None - - -def _as_str(value: Any) -> str: - return value if isinstance(value, str) else "" - - -def _optional_str(value: Any) -> Optional[str]: - return value if isinstance(value, str) and value else None - - -def _coerce_float(*candidates: Any) -> Optional[float]: - for value in candidates: - if value is None: - continue - if isinstance(value, bool): - return 1.0 if value else 0.0 - if isinstance(value, (int, float)): - return float(value) - if isinstance(value, str): - try: - return float(value) - except ValueError: - continue - return None +"""Map Foundry cloud eval output items into AgentOps result shapes. + +When ``execution: cloud`` is used in ``agentops.yaml``, the agent and +evaluators run server-side via the Foundry / OpenAI Evals API. We then +download per-row ``output_items`` from Foundry and reshape them into the +same :class:`RowResult` / :class:`RunResult` schema that local execution +produces, so downstream consumers (``report.md``, ``--baseline`` diffing, +CI gates) behave identically regardless of where the run executed. + +The cloud output schema is intentionally loose: we accept multiple field +spellings (``output_text`` / ``output`` / ``message``; ``score`` / +``value`` / ``passed``) and fall back gracefully when a field is absent. +""" + +from __future__ import annotations + +import json +from typing import Any, Dict, List, Optional + +from agentops.core.results import RowMetric, RowResult + + +def rows_from_cloud_output_items( + output_items: List[Dict[str, Any]], +) -> List[RowResult]: + """Build a list of :class:`RowResult` from raw Foundry output items. + + ``output_items`` is the list returned by + ``cloud_runner._list_output_items``. Each item is a dict with at + least ``datasource_item``, ``sample`` and ``results``; missing keys + yield blank fields rather than raising. + """ + rows: List[RowResult] = [] + for index, item in enumerate(output_items): + rows.append(_row_from_item(index, item)) + return rows + + +def _row_from_item(index: int, item: Dict[str, Any]) -> RowResult: + datasource = _as_dict(item.get("datasource_item")) or {} + sample = _as_dict(item.get("sample")) or {} + results = item.get("results") or [] + + metrics: List[RowMetric] = [] + if isinstance(results, list): + for entry in results: + metric = _metric_from_result(entry) + if metric is not None: + metrics.append(metric) + + return RowResult( + row_index=index, + input=_as_str(datasource.get("input")), + expected=_optional_str(datasource.get("expected")), + response=_extract_response_text(sample), + context=_optional_str(datasource.get("context")), + latency_seconds=None, # Foundry-side latency is not client-perceived. + tool_calls=datasource.get("tool_calls") if isinstance(datasource.get("tool_calls"), list) else None, + metrics=metrics, + error=_extract_item_error(item), + ) + + +_NUMERIC_SCORE_KEYS = ( + "score", + "value", + "result", + "metric_value", + "rating", + "grader_score", + "numeric_value", +) +_PASS_LABELS = {"pass", "passed", "true", "yes", "1", "ok", "success"} +_FAIL_LABELS = {"fail", "failed", "false", "no", "0", "error", "errored"} + + +def _score_from_label(value: Any) -> Optional[float]: + """Map a textual pass/fail label onto 1.0 / 0.0.""" + if not isinstance(value, str): + return None + token = value.strip().lower() + if not token: + return None + if token in _PASS_LABELS: + return 1.0 + if token in _FAIL_LABELS: + return 0.0 + return None + + +def _score_from_mapping(entry: Dict[str, Any]) -> Optional[float]: + """Probe a dict-like result envelope for a score using a wide net of + field names. Mirrors the loose-shape contract documented at the top + of this module: Foundry / OpenAI Evals API have shipped at least + ``score``, ``value``, ``result``, ``grader_score`` and ``rating`` as + the numeric carrier across SDK versions and grader types, plus + ``passed`` (bool) and ``label`` (string) as binary fallbacks.""" + score = _coerce_float(*(entry.get(k) for k in _NUMERIC_SCORE_KEYS)) + if score is not None: + return score + passed = entry.get("passed") + if isinstance(passed, bool): + return 1.0 if passed else 0.0 + label_score = _score_from_label(entry.get("label")) + if label_score is not None: + return label_score + return None + + +def _metric_from_result(entry: Any) -> Optional[RowMetric]: + if not isinstance(entry, dict): + return None + name = entry.get("name") or entry.get("metric") + if not isinstance(name, str) or not name: + return None + + # First try the top-level envelope where azure_ai_evaluator graders + # populate `score` + `passed` directly. + score = _score_from_mapping(entry) + + # Some Foundry server-side graders (especially custom prompt-based + # evaluators) tuck the score down inside `sample` or `details` + # instead. Probe those as a fallback so a missing top-level score + # doesn't mask an evaluator that actually returned a number. + if score is None: + sample = entry.get("sample") + if isinstance(sample, dict): + score = _score_from_mapping(sample) + if score is None: + details = entry.get("details") + if isinstance(details, dict): + score = _score_from_mapping(details) + + reason = entry.get("reason") if isinstance(entry.get("reason"), str) else None + err = _extract_grader_error(entry) + if score is None and err is None: + # Surface the missing-score case as a structured reason instead of + # silently writing null. The orchestrator/reporter use this string + # to point operators at `cloud_output_items.json` for triage. + err = ( + "no numeric score returned by Foundry grader; inspect " + "cloud_output_items.json in the results directory." + ) + return RowMetric(name=name, value=score, error=err, reason=reason) + + +def _extract_grader_error(entry: Dict[str, Any]) -> Optional[str]: + """Pull a human-readable error out of a Foundry grader result envelope. + + The on-the-wire shape we have seen in production when an + ``azure_ai_evaluator`` grader fails to execute (e.g., the evaluator + service principal lacks RBAC on the model deployment) is:: + + { + "name": "coherence", + "score": null, + "passed": null, + "status": "error", + "sample": { + "error": { + "code": "FAILED_EXECUTION", + "message": "(UserError) OpenAI API hits AuthenticationError: ..." + } + } + } + + Without lifting ``sample.error.message`` into ``RowMetric.error``, the + real cause is buried in ``cloud_output_items.json`` and the user only + sees ``actual=missing`` in the threshold table. Probe (in order): + + 1. Top-level ``error`` (string or ``{message, code}`` dict). + 2. ``sample.error`` (string or ``{message, code}`` dict). + 3. ``status == "error"`` as a last-resort signal so we at least flag + the row even when no error payload is present. + """ + primary = _normalize_error_payload(entry.get("error")) + if primary is not None: + return primary + sample = entry.get("sample") + if isinstance(sample, dict): + nested = _normalize_error_payload(sample.get("error")) + if nested is not None: + return nested + status = entry.get("status") + if isinstance(status, str) and status.strip().lower() == "error": + return "grader status: error (no error payload returned)" + return None + + +def _normalize_error_payload(value: Any) -> Optional[str]: + """Flatten ``error`` (string or ``{message, code}`` dict) into one line.""" + if isinstance(value, str) and value.strip(): + return value.strip() + if isinstance(value, dict): + message = value.get("message") or value.get("error") + if isinstance(message, str) and message.strip(): + code = value.get("code") + if isinstance(code, str) and code.strip(): + return f"{code.strip()}: {message.strip()}" + return message.strip() + return None + + +def _extract_response_text(sample: Dict[str, Any]) -> str: + """Reach into a Foundry sample payload and pull a plain text response. + + Foundry's sample shape varies: sometimes the response is a clean string + under ``output_text``, sometimes it's a list of output items under + ``output`` / ``output_items``, and occasionally ``output_text`` is set + to a JSON-encoded version of the structured output. Try structured + fields first (they're authoritative), and recurse into JSON-encoded + strings rather than passing them through as the response. + """ + # 1. Structured fields are authoritative. Walk them first. + for key in ("output", "messages", "output_items"): + text = _text_from_structured(sample.get(key)) + if text: + return text + + # 2. Flat string fields. If the value looks like JSON, parse and recurse. + for key in ("output_text", "text", "content"): + value = sample.get(key) + if isinstance(value, str) and value: + stripped = value.strip() + if stripped.startswith("[") or stripped.startswith("{"): + try: + parsed = json.loads(stripped) + except (ValueError, TypeError): + return value + if isinstance(parsed, list): + text = _text_from_structured(parsed) + if text: + return text + elif isinstance(parsed, dict): + return _extract_response_text(parsed) + # Fall through to raw value if we couldn't extract. + return value + if isinstance(value, list): + text = _text_from_structured(value) + if text: + return text + return "" + + +def _text_from_structured(value: Any) -> str: + """Walk a list-of-dicts (output / messages / output_items shape) and + return the first textual payload encountered, or ``""`` when none is + found. Iterates in reverse so the assistant's final message wins. + """ + if not isinstance(value, list): + return "" + for entry in reversed(value): + if not isinstance(entry, dict): + continue + # Try flat text fields first. + for field in ("output_text", "text"): + candidate = entry.get(field) + if isinstance(candidate, str) and candidate: + return candidate + # Some shapes nest under "content" as either a string or a list of + # content blocks (OpenAI Responses API: content: [{type, text}, ...]). + nested = entry.get("content") + if isinstance(nested, str) and nested: + return nested + if isinstance(nested, list): + text = _text_from_structured(nested) + if text: + return text + return "" + + +def _extract_item_error(item: Dict[str, Any]) -> Optional[str]: + err = item.get("error") + if isinstance(err, str) and err: + return err + if isinstance(err, dict): + msg = err.get("message") or err.get("error") + if isinstance(msg, str) and msg: + return msg + status = item.get("status") + if isinstance(status, str) and status.lower() in {"failed", "error"}: + return f"output item status: {status}" + return None + + +def _as_dict(value: Any) -> Optional[Dict[str, Any]]: + return value if isinstance(value, dict) else None + + +def _as_str(value: Any) -> str: + return value if isinstance(value, str) else "" + + +def _optional_str(value: Any) -> Optional[str]: + return value if isinstance(value, str) and value else None + + +def _coerce_float(*candidates: Any) -> Optional[float]: + for value in candidates: + if value is None: + continue + if isinstance(value, bool): + return 1.0 if value else 0.0 + if isinstance(value, (int, float)): + return float(value) + if isinstance(value, str): + try: + return float(value) + except ValueError: + continue + return None diff --git a/src/agentops/pipeline/orchestrator.py b/src/agentops/pipeline/orchestrator.py index 7789b73..8880124 100644 --- a/src/agentops/pipeline/orchestrator.py +++ b/src/agentops/pipeline/orchestrator.py @@ -1,804 +1,824 @@ -"""End-to-end evaluation orchestrator for AgentOps 1.0. - -This is the single entry point exercised by ``agentops eval``. It loads the -flat config, classifies the target, infers evaluators from the dataset shape, -invokes the target row-by-row, runs each evaluator, applies thresholds, and -writes ``results.json`` and ``report.md``. -""" - -from __future__ import annotations - -import json -import logging -import statistics -import sys -import time -import os -from dataclasses import dataclass, field -from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Callable, Dict, Iterable, List, Optional - -from agentops.core.agentops_config import AgentOpsConfig, Threshold, classify_agent -from agentops.core.evaluators import ( - detect_dataset_shape, - merge_thresholds, - select_evaluators, -) -from agentops.core.results import ( - RowMetric, - RowResult, - RunResult, - RunSummary, - TargetInfo, -) -from agentops.pipeline import comparison as comparison_module -from agentops.pipeline import invocations, publisher, reporter, runtime, thresholds -from agentops.utils import telemetry -from agentops.utils.colors import style - -logger = logging.getLogger("agentops.pipeline") - - -# --------------------------------------------------------------------------- -# Public entry point -# --------------------------------------------------------------------------- - - -@dataclass -class RunOptions: - config_path: Path - output_dir: Path - baseline_path: Optional[Path] = None - timeout_seconds: float = 120.0 - dataset_override: Optional[Path] = None - agent_override: Optional[str] = None - # Optional callback invoked with progress messages during a run. The - # CLI wires this to ``typer.echo`` so users see per-row progress - # ("invoking", "scored", ...) instead of long unexplained pauses. - # Library callers can leave it as ``None`` to keep runs silent. - progress: Optional[Callable[[str], None]] = field(default=None, repr=False) - - -def run_evaluation( - config: AgentOpsConfig, - *, - options: RunOptions, -) -> RunResult: - """Run a full evaluation and persist artifacts. Returns the RunResult.""" - telemetry.init_tracing() - try: - return _run_evaluation(config, options=options) - finally: - telemetry.shutdown() - - -def _run_evaluation( - config: AgentOpsConfig, - *, - options: RunOptions, -) -> RunResult: - """Run a full evaluation after optional telemetry has been initialized.""" - if options.baseline_path is not None and not options.baseline_path.exists(): - raise FileNotFoundError( - f"baseline file not found: {options.baseline_path}. " - "Run `agentops eval run` once without `--baseline` first, then copy " - "`.agentops/results/latest/results.json` to the baseline path." - ) - - if config.execution == "cloud": - return _run_evaluation_cloud(config, options=options) - return _run_evaluation_local(config, options=options) - - -def _run_evaluation_local( - config: AgentOpsConfig, - *, - options: RunOptions, -) -> RunResult: - """Local execution: AgentOps invokes the agent + evaluators row-by-row.""" - - started_at = datetime.now(timezone.utc) - started_perf = time.perf_counter() - - target = classify_agent( - options.agent_override or config.agent, - config.protocol, - ) - - dataset_path = options.dataset_override or _resolve_dataset_path(config, options) - shape = detect_dataset_shape(dataset_path) - - overrides = ( - [override.name for override in config.evaluators] if config.evaluators else None - ) - presets = select_evaluators( - target, - shape, - overrides=overrides, - threshold_metrics=config.thresholds.keys(), - ) - user_thresholds = [ - Threshold.from_expression(metric, expr) - for metric, expr in config.thresholds.items() - ] - threshold_rules = merge_thresholds(presets, user_thresholds) - - evaluator_runtimes = runtime.load_evaluators(presets) - - progress = options.progress or (lambda _msg: None) - - dataset_rows = list(_iter_dataset(dataset_path)) - total = len(dataset_rows) - from agentops import __version__ as _agentops_version - py = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" - progress( - f"{style('agentops', 'bold', 'cyan')} {style(_agentops_version, 'cyan')} " - f"{style('|', 'dim')} python {py} " - f"{style('|', 'dim')} config: {style(options.config_path.name, 'cyan')}" - ) - progress( - f"Loaded {style(str(total), 'bold')} row(s) from " - f"{style(dataset_path.name, 'cyan')}; running " - f"{style(str(len(presets)), 'bold')} evaluator(s) against " - f"{_friendly_target_kind(target.kind)}: {style(target.raw, 'bold')}." - ) - - with telemetry.eval_run_span( - bundle_name=options.config_path.stem, - dataset_name=dataset_path.name, - backend_type=target.kind, - target=target.raw, - model=target.deployment, - agent_id=target.raw if target.kind.startswith("foundry") else None, - ) as run_span: - rows: List[RowResult] = [] - rules_by_metric = {rule.metric: rule for rule in threshold_rules} - for index, row in enumerate(dataset_rows): - rows.append( - _evaluate_row( - row=row, - index=index, - total=total, - target=target, - config=config, - evaluators=evaluator_runtimes, - timeout=options.timeout_seconds, - progress=progress, - rules_by_metric=rules_by_metric, - ) - ) - - aggregate = _aggregate_metrics(rows) - threshold_results = thresholds.evaluate(threshold_rules, aggregate) - summary = _summarize(rows, threshold_results) - telemetry.set_eval_run_result( - run_span, - passed=summary.overall_passed, - items_total=summary.items_total, - items_passed=summary.items_passed_all, - ) - - finished_at = datetime.now(timezone.utc) - duration = time.perf_counter() - started_perf - - result = RunResult( - started_at=started_at.isoformat(), - finished_at=finished_at.isoformat(), - duration_seconds=duration, - target=TargetInfo( - kind=target.kind, - raw=target.raw, - protocol=target.protocol, - name=target.name, - version=target.version, - url=target.url, - deployment=target.deployment, - ), - dataset_path=str(dataset_path), - evaluators=[preset.name for preset in presets], - rows=rows, - aggregate_metrics=aggregate, - thresholds=threshold_results, - summary=summary, - config={ - "version": config.version, - "agent": config.agent, - "thresholds": dict(config.thresholds), - }, - ) - - if options.baseline_path is not None: - baseline = comparison_module.load_baseline(options.baseline_path) - result.comparison = comparison_module.build_comparison( - current=result, - baseline=baseline, - baseline_path=options.baseline_path, - ) - - _persist(result, options.output_dir) - - # Local execution only ever publishes to Classic Foundry. Cloud - # execution goes through _run_evaluation_cloud and never reaches here. - if config.publish_target() == "foundry": - _publish_to_foundry_safely(result, config, options.output_dir, progress=progress) - - return result - - -def _run_evaluation_cloud( - config: AgentOpsConfig, - *, - options: RunOptions, -) -> RunResult: - """Cloud execution: Foundry invokes the agent + evaluators server-side. - - The agent is invoked exactly once - on Foundry's side. AgentOps does - not run the row-by-row local loop. After the cloud run completes we - download the per-row ``output_items`` and reshape them into the same - :class:`RunResult` schema that local execution produces, so - ``report.md`` and ``--baseline`` work identically. - """ - started_at = datetime.now(timezone.utc) - started_perf = time.perf_counter() - - target = classify_agent( - options.agent_override or config.agent, - config.protocol, - ) - if target.kind != "foundry_prompt": - raise ValueError( - "execution: cloud only supports Foundry prompt agents " - f"('name:version'); got target.kind={target.kind!r}." - ) - - dataset_path = options.dataset_override or _resolve_dataset_path(config, options) - shape = detect_dataset_shape(dataset_path) - overrides = ( - [override.name for override in config.evaluators] if config.evaluators else None - ) - all_presets = select_evaluators( - target, - shape, - overrides=overrides, - threshold_metrics=config.thresholds.keys(), - ) - - # Cloud execution runs server-side, so client-side runtime evaluators - # (e.g. avg_latency_seconds) cannot be measured. Excluding them is the - # right choice - otherwise their default thresholds would mark the run - # FAILED for a metric we never had a chance to observe. - presets = [p for p in all_presets if "runtime" not in p.categories] - skipped_runtime = [p.name for p in all_presets if "runtime" in p.categories] - - user_thresholds = [ - Threshold.from_expression(metric, expr) - for metric, expr in config.thresholds.items() - # Drop user-specified thresholds for runtime metrics too - they - # would otherwise fail with actual="missing". - if metric not in {p.score_key for p in all_presets if "runtime" in p.categories} - ] - threshold_rules = merge_thresholds(presets, user_thresholds) - - # Build a "shell" result that carries just enough metadata for the - # cloud publisher to map evaluator class names onto Azure AI evaluator - # testing criteria. - shell_target = TargetInfo( - kind=target.kind, - raw=target.raw, - protocol=target.protocol, - name=target.name, - version=target.version, - url=target.url, - deployment=target.deployment, - ) - - progress = options.progress or (lambda _msg: None) - from agentops import __version__ as _agentops_version - py = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" - progress( - f"{style('agentops', 'bold', 'cyan')} {style(_agentops_version, 'cyan')} " - f"{style('|', 'dim')} python {py} " - f"{style('|', 'dim')} config: {style(options.config_path.name, 'cyan')}" - ) - progress( - f"execution: {style('cloud', 'bold')} - Foundry will run the agent " - f"and {style(str(len(presets)), 'bold')} evaluator(s) server-side. " - f"Agent: {style(target.raw, 'bold')}." - ) - if skipped_runtime: - progress( - f" (skipped client-side runtime evaluators: " - f"{', '.join(skipped_runtime)} - not measurable in cloud mode)" - ) - - shell_result = RunResult( - started_at=started_at.isoformat(), - finished_at=started_at.isoformat(), - duration_seconds=0.0, - target=shell_target, - dataset_path=str(dataset_path), - evaluators=[preset.name for preset in presets], - rows=[], - aggregate_metrics={}, - thresholds=[], - summary=_summarize([], []), - config={ - "version": config.version, - "agent": config.agent, - "thresholds": dict(config.thresholds), - }, - ) - - endpoint = config.project_endpoint or os.getenv("AZURE_AI_FOUNDRY_PROJECT_ENDPOINT") - if not endpoint: - raise ValueError( - "execution: cloud requires either 'project_endpoint' in " - "agentops.yaml or the AZURE_AI_FOUNDRY_PROJECT_ENDPOINT env var." - ) - - from agentops.pipeline import cloud_runner - from agentops.pipeline import cloud_results - - with telemetry.eval_run_span( - bundle_name=options.config_path.stem, - dataset_name=dataset_path.name, - backend_type="foundry_cloud", - target=target.raw, - model=target.deployment, - agent_id=target.raw, - ) as run_span: - published = cloud_runner.run_on_foundry_cloud( - shell_result, - dataset_path=dataset_path, - project_endpoint=endpoint, - dataset_sync=config.dataset_sync, - progress=progress, - ) - - if run_span is not None: - run_span.set_attribute("agentops.eval.execution", "cloud") - run_span.set_attribute("agentops.eval.cloud.eval_id", published.eval_id) - run_span.set_attribute("agentops.eval.cloud.run_id", published.run_id) - run_span.set_attribute("agentops.eval.cloud.status", published.status) - if published.report_url: - run_span.set_attribute( - "agentops.eval.cloud.report_url", - published.report_url, - ) - if published.dataset: - run_span.set_attribute( - "agentops.eval.cloud.dataset.mode", - str(published.dataset.get("mode") or ""), - ) - dataset_id = published.dataset.get("id") - if dataset_id: - run_span.set_attribute( - "agentops.eval.cloud.dataset.id", - str(dataset_id), - ) - - rows = cloud_results.rows_from_cloud_output_items(published.output_items) - aggregate = _aggregate_metrics(rows) - threshold_results = thresholds.evaluate(threshold_rules, aggregate) - summary = _summarize(rows, threshold_results) - telemetry.set_eval_run_result( - run_span, - passed=summary.overall_passed, - items_total=summary.items_total, - items_passed=summary.items_passed_all, - ) - - finished_at = datetime.now(timezone.utc) - duration = time.perf_counter() - started_perf - - # Always persist the raw Foundry output_items next to results.json / - # report.md so the run is debuggable from the artifact bundle alone. - # This is the only place the per-row grader payloads survive in their - # native shape; without it a parser regression looks the same in CI - # as a real eval failure. - try: - options.output_dir.mkdir(parents=True, exist_ok=True) - raw_items_path = options.output_dir / "cloud_output_items.json" - raw_items_path.write_text( - json.dumps(list(published.output_items), indent=2, default=str), - encoding="utf-8", - ) - except (OSError, TypeError) as exc: - progress(f"warning: failed to write cloud_output_items.json: {exc}") - raw_items_path = None - - # If the cloud run yielded zero usable metric values despite running - # graders, surface that loudly so the user does not chase a phantom - # "threshold failed" gate. The artifact dumped above is the triage - # entry point. - if presets and not aggregate and rows: - suffix = ( - f" Inspect {raw_items_path}." if raw_items_path is not None else "" - ) - progress( - "warning: cloud eval returned 0 usable metric scores across " - f"{len(rows)} row(s).{suffix}" - ) - - result = RunResult( - started_at=started_at.isoformat(), - finished_at=finished_at.isoformat(), - duration_seconds=duration, - target=shell_target, - dataset_path=str(dataset_path), - evaluators=[preset.name for preset in presets], - rows=rows, - aggregate_metrics=aggregate, - thresholds=threshold_results, - summary=summary, - config={ - "version": config.version, - "agent": config.agent, - "thresholds": dict(config.thresholds), - "execution": "cloud", - "cloud_evaluation": { - "mode": "cloud", - "evaluation_name": published.evaluation_name, - "eval_id": published.eval_id, - "run_id": published.run_id, - "status": published.status, - "report_url": published.report_url, - "dataset": published.dataset, - }, - }, - ) - - if options.baseline_path is not None: - baseline = comparison_module.load_baseline(options.baseline_path) - result.comparison = comparison_module.build_comparison( - current=result, - baseline=baseline, - baseline_path=options.baseline_path, - ) - - _persist(result, options.output_dir) - - # Write cloud_evaluation.json next to the other artifacts for parity - # with the (now-removed) post-run cloud publish path. - cloud_meta_path = options.output_dir / "cloud_evaluation.json" - cloud_meta_path.write_text( - json.dumps(result.config["cloud_evaluation"], indent=2), - encoding="utf-8", - ) - - progress( - f"Submitted to {style('New Foundry Evaluations', 'bold')}: " - f"{style(published.report_url or '(no portal URL)', 'cyan')}" - ) - progress( - f" eval_id={published.eval_id} run_id={published.run_id} " - f"status={style(published.status, 'green' if published.status == 'completed' else 'yellow')} " - f"rows={len(rows)}" - ) - - if not rows: - progress( - f"{style('WARNING', 'yellow')}: no per-row results were " - f"downloaded from Foundry; report.md will be minimal. The " - f"canonical view is the Foundry portal." - ) - - return result - - -def _publish_to_foundry_safely( - result: RunResult, - config: AgentOpsConfig, - output_dir: Path, - *, - progress: Optional[Callable[[str], None]] = None, -) -> None: - """Best-effort Classic Foundry publish. Failures are logged, never fatal.""" - if config.publish_target() != "foundry": - return - - notify = progress or (lambda _msg: None) - - try: - published = publisher.publish_to_foundry( - result, - project_endpoint=config.project_endpoint, - ) - except Exception as exc: # noqa: BLE001 - logger.debug("foundry publish failed: %s", exc) - notify( - f"{style('publish foundry FAILED', 'red')}: {exc}. " - f"Local results.json is the source of truth." - ) - return - - cloud_meta_path = output_dir / "cloud_evaluation.json" - cloud_meta_path.write_text( - json.dumps( - { - "mode": "classic", - "evaluation_name": published.evaluation_name, - "report_url": published.studio_url, - }, - indent=2, - ), - encoding="utf-8", - ) - notify( - f"Published to {style('Classic Foundry Evaluations', 'bold')}: " - f"{style(published.studio_url, 'cyan')}" - ) - notify( - f"Tip: to run server-side in the {style('New Foundry', 'bold')} " - f"experience, set 'execution: cloud' + 'publish: true' (preview)." - ) - - -def exit_code_from(result: RunResult) -> int: - """Translate a run's outcome into the ``agentops`` CLI contract. - - * ``0`` - success, all thresholds passed. - * ``2`` - invocations succeeded but a threshold failed. - * ``1`` - runtime errors are raised as exceptions before this is called. - """ - return 0 if result.summary.overall_passed else 2 - - -# --------------------------------------------------------------------------- -# Dataset -# --------------------------------------------------------------------------- - - -def _resolve_dataset_path(config: AgentOpsConfig, options: RunOptions) -> Path: - candidate = config.dataset - if candidate.is_absolute() and candidate.exists(): - return candidate - base = options.config_path.parent - resolved = (base / candidate).resolve() - if not resolved.exists(): - raise FileNotFoundError(f"dataset not found: {resolved}") - return resolved - - -_FRIENDLY_KIND = { - "foundry_prompt": "foundry agent", - "foundry_hosted": "foundry agent (hosted)", - "http_json": "http endpoint", - "model_direct": "model deployment", -} - - -def _friendly_target_kind(kind: str) -> str: - return _FRIENDLY_KIND.get(kind, kind) - - -def _iter_dataset(path: Path) -> Iterable[Dict[str, Any]]: - with path.open("r", encoding="utf-8") as handle: - for line_number, line in enumerate(handle, start=1): - stripped = line.strip() - if not stripped: - continue - try: - row = json.loads(stripped) - except json.JSONDecodeError as exc: - raise ValueError( - f"{path}: invalid JSON on line {line_number}: {exc}" - ) from exc - if not isinstance(row, dict): - raise ValueError( - f"{path}: line {line_number} is not a JSON object" - ) - yield row - - -# --------------------------------------------------------------------------- -# Per-row execution -# --------------------------------------------------------------------------- - - -def _metric_passes(rule: Threshold, value: float) -> bool: - if rule.value is None or rule.criteria in {"true", "false"}: - return True - target_v = float(rule.value) - c = rule.criteria - if c == ">=": - return value >= target_v - if c == ">": - return value > target_v - if c == "<=": - return value <= target_v - if c == "<": - return value < target_v - if c == "==": - return value == target_v - return True - - -def _evaluate_row( - *, - row: Dict[str, Any], - index: int, - total: int, - target, - config: AgentOpsConfig, - evaluators: List[runtime.EvaluatorRuntime], - timeout: float, - progress: Callable[[str], None], - rules_by_metric: Optional[Dict[str, Threshold]] = None, -) -> RowResult: - label = style(f"[{index + 1}/{total}]", "dim") - preview = str(row.get("input", "")).strip().replace("\n", " ") - if len(preview) > 80: - preview = preview[:77] + "..." - progress(f"{label} invoking target: {preview!r}") - expected = row.get("expected") - expected_text = str(expected) if expected is not None else None - - with telemetry.eval_item_span( - row_index=index, - input_text=str(row.get("input", "")), - expected_text=expected_text, - ) as item_span: - try: - with telemetry.agent_invoke_span( - target="agent" if target.kind.startswith("foundry") else "model", - model=target.deployment, - agent_id=target.raw if target.kind.startswith("foundry") else None, - agent_name=target.name, - agent_version=target.version, - ) as invoke_span: - invocation = invocations.invoke(target, config, row, timeout=timeout) - telemetry.set_agent_invoke_result( - invoke_span, - response_model=target.deployment, - ) - except Exception as exc: # noqa: BLE001 - telemetry.set_eval_item_result(item_span, passed=False) - logger.debug("row %d invocation failed: %s", index, exc) - progress(f"{label} {style('invocation FAILED', 'bold', 'red')}: {exc}") - return RowResult( - row_index=index, - input=str(row.get("input", "")), - expected=row.get("expected"), - response="", - context=row.get("context"), - error=str(exc), - ) - - tool_count = len(invocation.tool_calls) if invocation.tool_calls else 0 - progress( - f"{label} replied in {style(f'{invocation.latency_seconds:.2f}s', 'cyan')} " - f"({tool_count} tool call(s)); scoring..." - ) - - metrics: List[RowMetric] = [] - for evaluator in evaluators: - metric = runtime.run_evaluator( - evaluator, - row=row, - response=invocation.response, - latency_seconds=invocation.latency_seconds, - actual_tool_calls=invocation.tool_calls, - ) - metrics.append(metric) - - rule = (rules_by_metric or {}).get(metric.name) - metric_passed = ( - None - if metric.value is None or rule is None - else _metric_passes(rule, float(metric.value)) - ) - telemetry.record_evaluator_span( - evaluator_name=evaluator.preset.name, - builtin_name=metric.name, - source=( - "local" - if evaluator.preset.class_name == "_latency" - else "azure-ai-evaluation" - ), - score=float(metric.value) if metric.value is not None else 0.0, - threshold=rule.value if rule is not None else None, - criteria=rule.criteria if rule is not None else None, - passed=metric_passed, - ) - - telemetry.set_eval_item_result( - item_span, - passed=all(metric.error is None for metric in metrics), - ) - - rules = rules_by_metric or {} - - def _format_metric(m: RowMetric) -> str: - if isinstance(m.value, (int, float)): - rule = rules.get(m.name) - text = f"{m.value:.2f}" - if rule is None: - # No user threshold for this metric: keep value neutral - # so the line stays readable. - return f"{m.name}={text}" - color = "green" if _metric_passes(rule, float(m.value)) else "red" - return f"{m.name}={style(text, color)}" - if m.error: - return f"{m.name}={style('ERR', 'red')}" - return f"{m.name}={style('n/a', 'dim')}" - - scored = ", ".join(_format_metric(m) for m in metrics) - progress(f"{label} scored: {scored}") - - return RowResult( - row_index=index, - input=str(row.get("input", "")), - expected=row.get("expected"), - response=invocation.response, - context=row.get("context"), - latency_seconds=invocation.latency_seconds, - tool_calls=invocation.tool_calls, - metrics=metrics, - ) - - -# --------------------------------------------------------------------------- -# Aggregation -# --------------------------------------------------------------------------- - - -def _aggregate_metrics(rows: List[RowResult]) -> Dict[str, float]: - by_metric: Dict[str, List[float]] = {} - for row in rows: - for metric in row.metrics: - if metric.value is None: - continue - by_metric.setdefault(metric.name, []).append(metric.value) - aggregate: Dict[str, float] = {} - for name, values in by_metric.items(): - if values: - aggregate[name] = statistics.fmean(values) - return aggregate - - -def _summarize( - rows: List[RowResult], - threshold_results, -) -> RunSummary: - items_total = len(rows) - items_passed_all = sum( - 1 - for row in rows - if row.error is None and all(m.error is None for m in row.metrics) - ) - items_pass_rate = items_passed_all / items_total if items_total else 0.0 - thresholds_total = len(threshold_results) - thresholds_passed = sum(1 for t in threshold_results if t.passed) - threshold_pass_rate = ( - thresholds_passed / thresholds_total if thresholds_total else 1.0 - ) - overall = items_total > 0 and threshold_pass_rate == 1.0 and items_passed_all > 0 - return RunSummary( - items_total=items_total, - items_passed_all=items_passed_all, - items_pass_rate=items_pass_rate, - thresholds_total=thresholds_total, - thresholds_passed=thresholds_passed, - threshold_pass_rate=threshold_pass_rate, - overall_passed=overall, - ) - - -# --------------------------------------------------------------------------- -# Persistence -# --------------------------------------------------------------------------- - - -def _persist(result: RunResult, output_dir: Path) -> None: - output_dir.mkdir(parents=True, exist_ok=True) - results_path = output_dir / "results.json" - report_path = output_dir / "report.md" - - payload = result.model_dump(mode="json") - results_path.write_text( - json.dumps(payload, indent=2, ensure_ascii=False), - encoding="utf-8", - ) - report_path.write_text(reporter.render(result), encoding="utf-8") +"""End-to-end evaluation orchestrator for AgentOps 1.0. + +This is the single entry point exercised by ``agentops eval``. It loads the +flat config, classifies the target, infers evaluators from the dataset shape, +invokes the target row-by-row, runs each evaluator, applies thresholds, and +writes ``results.json`` and ``report.md``. +""" + +from __future__ import annotations + +import json +import logging +import statistics +import sys +import time +import os +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable, Dict, Iterable, List, Optional + +from agentops.core.agentops_config import AgentOpsConfig, Threshold, classify_agent +from agentops.core.evaluators import ( + detect_dataset_shape, + merge_thresholds, + select_evaluators, +) +from agentops.core.results import ( + RowMetric, + RowResult, + RunResult, + RunSummary, + TargetInfo, +) +from agentops.pipeline import comparison as comparison_module +from agentops.pipeline import invocations, publisher, reporter, runtime, thresholds +from agentops.utils import telemetry +from agentops.utils.colors import style + +logger = logging.getLogger("agentops.pipeline") + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +@dataclass +class RunOptions: + config_path: Path + output_dir: Path + baseline_path: Optional[Path] = None + timeout_seconds: float = 120.0 + dataset_override: Optional[Path] = None + agent_override: Optional[str] = None + # Optional callback invoked with progress messages during a run. The + # CLI wires this to ``typer.echo`` so users see per-row progress + # ("invoking", "scored", ...) instead of long unexplained pauses. + # Library callers can leave it as ``None`` to keep runs silent. + progress: Optional[Callable[[str], None]] = field(default=None, repr=False) + + +def run_evaluation( + config: AgentOpsConfig, + *, + options: RunOptions, +) -> RunResult: + """Run a full evaluation and persist artifacts. Returns the RunResult.""" + telemetry.init_tracing() + try: + return _run_evaluation(config, options=options) + finally: + telemetry.shutdown() + + +def _run_evaluation( + config: AgentOpsConfig, + *, + options: RunOptions, +) -> RunResult: + """Run a full evaluation after optional telemetry has been initialized.""" + if options.baseline_path is not None and not options.baseline_path.exists(): + raise FileNotFoundError( + f"baseline file not found: {options.baseline_path}. " + "Run `agentops eval run` once without `--baseline` first, then copy " + "`.agentops/results/latest/results.json` to the baseline path." + ) + + if config.execution == "cloud": + return _run_evaluation_cloud(config, options=options) + return _run_evaluation_local(config, options=options) + + +def _run_evaluation_local( + config: AgentOpsConfig, + *, + options: RunOptions, +) -> RunResult: + """Local execution: AgentOps invokes the agent + evaluators row-by-row.""" + + started_at = datetime.now(timezone.utc) + started_perf = time.perf_counter() + + target = classify_agent( + options.agent_override or config.agent, + config.protocol, + ) + + dataset_path = options.dataset_override or _resolve_dataset_path(config, options) + shape = detect_dataset_shape(dataset_path) + + overrides = ( + [override.name for override in config.evaluators] if config.evaluators else None + ) + presets = select_evaluators( + target, + shape, + overrides=overrides, + threshold_metrics=config.thresholds.keys(), + ) + user_thresholds = [ + Threshold.from_expression(metric, expr) + for metric, expr in config.thresholds.items() + ] + threshold_rules = merge_thresholds(presets, user_thresholds) + + evaluator_runtimes = runtime.load_evaluators(presets) + + progress = options.progress or (lambda _msg: None) + + dataset_rows = list(_iter_dataset(dataset_path)) + total = len(dataset_rows) + from agentops import __version__ as _agentops_version + py = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + progress( + f"{style('agentops', 'bold', 'cyan')} {style(_agentops_version, 'cyan')} " + f"{style('|', 'dim')} python {py} " + f"{style('|', 'dim')} config: {style(options.config_path.name, 'cyan')}" + ) + progress( + f"Loaded {style(str(total), 'bold')} row(s) from " + f"{style(dataset_path.name, 'cyan')}; running " + f"{style(str(len(presets)), 'bold')} evaluator(s) against " + f"{_friendly_target_kind(target.kind)}: {style(target.raw, 'bold')}." + ) + + with telemetry.eval_run_span( + bundle_name=options.config_path.stem, + dataset_name=dataset_path.name, + backend_type=target.kind, + target=target.raw, + model=target.deployment, + agent_id=target.raw if target.kind.startswith("foundry") else None, + ) as run_span: + rows: List[RowResult] = [] + rules_by_metric = {rule.metric: rule for rule in threshold_rules} + for index, row in enumerate(dataset_rows): + rows.append( + _evaluate_row( + row=row, + index=index, + total=total, + target=target, + config=config, + evaluators=evaluator_runtimes, + timeout=options.timeout_seconds, + progress=progress, + rules_by_metric=rules_by_metric, + ) + ) + + aggregate = _aggregate_metrics(rows) + threshold_results = thresholds.evaluate(threshold_rules, aggregate) + summary = _summarize(rows, threshold_results) + telemetry.set_eval_run_result( + run_span, + passed=summary.overall_passed, + items_total=summary.items_total, + items_passed=summary.items_passed_all, + ) + + finished_at = datetime.now(timezone.utc) + duration = time.perf_counter() - started_perf + + result = RunResult( + started_at=started_at.isoformat(), + finished_at=finished_at.isoformat(), + duration_seconds=duration, + target=TargetInfo( + kind=target.kind, + raw=target.raw, + protocol=target.protocol, + name=target.name, + version=target.version, + url=target.url, + deployment=target.deployment, + ), + dataset_path=str(dataset_path), + evaluators=[preset.name for preset in presets], + rows=rows, + aggregate_metrics=aggregate, + thresholds=threshold_results, + summary=summary, + config={ + "version": config.version, + "agent": config.agent, + "thresholds": dict(config.thresholds), + }, + ) + + if options.baseline_path is not None: + baseline = comparison_module.load_baseline(options.baseline_path) + result.comparison = comparison_module.build_comparison( + current=result, + baseline=baseline, + baseline_path=options.baseline_path, + ) + + _persist(result, options.output_dir) + + # Local execution only ever publishes to Classic Foundry. Cloud + # execution goes through _run_evaluation_cloud and never reaches here. + if config.publish_target() == "foundry": + _publish_to_foundry_safely(result, config, options.output_dir, progress=progress) + + return result + + +def _run_evaluation_cloud( + config: AgentOpsConfig, + *, + options: RunOptions, +) -> RunResult: + """Cloud execution: Foundry invokes the agent + evaluators server-side. + + The agent is invoked exactly once - on Foundry's side. AgentOps does + not run the row-by-row local loop. After the cloud run completes we + download the per-row ``output_items`` and reshape them into the same + :class:`RunResult` schema that local execution produces, so + ``report.md`` and ``--baseline`` work identically. + """ + started_at = datetime.now(timezone.utc) + started_perf = time.perf_counter() + + target = classify_agent( + options.agent_override or config.agent, + config.protocol, + ) + if target.kind != "foundry_prompt": + raise ValueError( + "execution: cloud only supports Foundry prompt agents " + f"('name:version'); got target.kind={target.kind!r}." + ) + + dataset_path = options.dataset_override or _resolve_dataset_path(config, options) + shape = detect_dataset_shape(dataset_path) + overrides = ( + [override.name for override in config.evaluators] if config.evaluators else None + ) + all_presets = select_evaluators( + target, + shape, + overrides=overrides, + threshold_metrics=config.thresholds.keys(), + ) + + # Cloud execution runs server-side, so client-side runtime evaluators + # (e.g. avg_latency_seconds) cannot be measured. Excluding them is the + # right choice - otherwise their default thresholds would mark the run + # FAILED for a metric we never had a chance to observe. + presets = [p for p in all_presets if "runtime" not in p.categories] + skipped_runtime = [p.name for p in all_presets if "runtime" in p.categories] + + user_thresholds = [ + Threshold.from_expression(metric, expr) + for metric, expr in config.thresholds.items() + # Drop user-specified thresholds for runtime metrics too - they + # would otherwise fail with actual="missing". + if metric not in {p.score_key for p in all_presets if "runtime" in p.categories} + ] + threshold_rules = merge_thresholds(presets, user_thresholds) + + # Build a "shell" result that carries just enough metadata for the + # cloud publisher to map evaluator class names onto Azure AI evaluator + # testing criteria. + shell_target = TargetInfo( + kind=target.kind, + raw=target.raw, + protocol=target.protocol, + name=target.name, + version=target.version, + url=target.url, + deployment=target.deployment, + ) + + progress = options.progress or (lambda _msg: None) + from agentops import __version__ as _agentops_version + py = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + progress( + f"{style('agentops', 'bold', 'cyan')} {style(_agentops_version, 'cyan')} " + f"{style('|', 'dim')} python {py} " + f"{style('|', 'dim')} config: {style(options.config_path.name, 'cyan')}" + ) + progress( + f"execution: {style('cloud', 'bold')} - Foundry will run the agent " + f"and {style(str(len(presets)), 'bold')} evaluator(s) server-side. " + f"Agent: {style(target.raw, 'bold')}." + ) + if skipped_runtime: + progress( + f" (skipped client-side runtime evaluators: " + f"{', '.join(skipped_runtime)} - not measurable in cloud mode)" + ) + + shell_result = RunResult( + started_at=started_at.isoformat(), + finished_at=started_at.isoformat(), + duration_seconds=0.0, + target=shell_target, + dataset_path=str(dataset_path), + evaluators=[preset.name for preset in presets], + rows=[], + aggregate_metrics={}, + thresholds=[], + summary=_summarize([], []), + config={ + "version": config.version, + "agent": config.agent, + "thresholds": dict(config.thresholds), + }, + ) + + endpoint = config.project_endpoint or os.getenv("AZURE_AI_FOUNDRY_PROJECT_ENDPOINT") + if not endpoint: + raise ValueError( + "execution: cloud requires either 'project_endpoint' in " + "agentops.yaml or the AZURE_AI_FOUNDRY_PROJECT_ENDPOINT env var." + ) + + from agentops.pipeline import cloud_runner + from agentops.pipeline import cloud_results + + with telemetry.eval_run_span( + bundle_name=options.config_path.stem, + dataset_name=dataset_path.name, + backend_type="foundry_cloud", + target=target.raw, + model=target.deployment, + agent_id=target.raw, + ) as run_span: + published = cloud_runner.run_on_foundry_cloud( + shell_result, + dataset_path=dataset_path, + project_endpoint=endpoint, + dataset_sync=config.dataset_sync, + progress=progress, + ) + + if run_span is not None: + run_span.set_attribute("agentops.eval.execution", "cloud") + run_span.set_attribute("agentops.eval.cloud.eval_id", published.eval_id) + run_span.set_attribute("agentops.eval.cloud.run_id", published.run_id) + run_span.set_attribute("agentops.eval.cloud.status", published.status) + if published.report_url: + run_span.set_attribute( + "agentops.eval.cloud.report_url", + published.report_url, + ) + if published.dataset: + run_span.set_attribute( + "agentops.eval.cloud.dataset.mode", + str(published.dataset.get("mode") or ""), + ) + dataset_id = published.dataset.get("id") + if dataset_id: + run_span.set_attribute( + "agentops.eval.cloud.dataset.id", + str(dataset_id), + ) + + rows = cloud_results.rows_from_cloud_output_items(published.output_items) + aggregate = _aggregate_metrics(rows) + threshold_results = thresholds.evaluate(threshold_rules, aggregate) + summary = _summarize(rows, threshold_results) + telemetry.set_eval_run_result( + run_span, + passed=summary.overall_passed, + items_total=summary.items_total, + items_passed=summary.items_passed_all, + ) + + finished_at = datetime.now(timezone.utc) + duration = time.perf_counter() - started_perf + + # Always persist the raw Foundry output_items next to results.json / + # report.md so the run is debuggable from the artifact bundle alone. + # This is the only place the per-row grader payloads survive in their + # native shape; without it a parser regression looks the same in CI + # as a real eval failure. + try: + options.output_dir.mkdir(parents=True, exist_ok=True) + raw_items_path = options.output_dir / "cloud_output_items.json" + raw_items_path.write_text( + json.dumps(list(published.output_items), indent=2, default=str), + encoding="utf-8", + ) + except (OSError, TypeError) as exc: + progress(f"warning: failed to write cloud_output_items.json: {exc}") + raw_items_path = None + + # If the cloud run yielded zero usable metric values despite running + # graders, surface that loudly so the user does not chase a phantom + # "threshold failed" gate. The artifact dumped above is the triage + # entry point, but we also lift the first per-metric error string + # into the warning itself so CI logs carry the actionable cause + # (most often: the Foundry evaluator service principal lacks + # `Cognitive Services OpenAI User` on the model deployment). + if presets and not aggregate and rows: + suffix = ( + f" Inspect {raw_items_path}." if raw_items_path is not None else "" + ) + first_error = _first_metric_error(rows) + cause_suffix = f" First grader error: {first_error}" if first_error else "" + progress( + "warning: cloud eval returned 0 usable metric scores across " + f"{len(rows)} row(s).{cause_suffix}{suffix}" + ) + + result = RunResult( + started_at=started_at.isoformat(), + finished_at=finished_at.isoformat(), + duration_seconds=duration, + target=shell_target, + dataset_path=str(dataset_path), + evaluators=[preset.name for preset in presets], + rows=rows, + aggregate_metrics=aggregate, + thresholds=threshold_results, + summary=summary, + config={ + "version": config.version, + "agent": config.agent, + "thresholds": dict(config.thresholds), + "execution": "cloud", + "cloud_evaluation": { + "mode": "cloud", + "evaluation_name": published.evaluation_name, + "eval_id": published.eval_id, + "run_id": published.run_id, + "status": published.status, + "report_url": published.report_url, + "dataset": published.dataset, + }, + }, + ) + + if options.baseline_path is not None: + baseline = comparison_module.load_baseline(options.baseline_path) + result.comparison = comparison_module.build_comparison( + current=result, + baseline=baseline, + baseline_path=options.baseline_path, + ) + + _persist(result, options.output_dir) + + # Write cloud_evaluation.json next to the other artifacts for parity + # with the (now-removed) post-run cloud publish path. + cloud_meta_path = options.output_dir / "cloud_evaluation.json" + cloud_meta_path.write_text( + json.dumps(result.config["cloud_evaluation"], indent=2), + encoding="utf-8", + ) + + progress( + f"Submitted to {style('New Foundry Evaluations', 'bold')}: " + f"{style(published.report_url or '(no portal URL)', 'cyan')}" + ) + progress( + f" eval_id={published.eval_id} run_id={published.run_id} " + f"status={style(published.status, 'green' if published.status == 'completed' else 'yellow')} " + f"rows={len(rows)}" + ) + + if not rows: + progress( + f"{style('WARNING', 'yellow')}: no per-row results were " + f"downloaded from Foundry; report.md will be minimal. The " + f"canonical view is the Foundry portal." + ) + + return result + + +def _publish_to_foundry_safely( + result: RunResult, + config: AgentOpsConfig, + output_dir: Path, + *, + progress: Optional[Callable[[str], None]] = None, +) -> None: + """Best-effort Classic Foundry publish. Failures are logged, never fatal.""" + if config.publish_target() != "foundry": + return + + notify = progress or (lambda _msg: None) + + try: + published = publisher.publish_to_foundry( + result, + project_endpoint=config.project_endpoint, + ) + except Exception as exc: # noqa: BLE001 + logger.debug("foundry publish failed: %s", exc) + notify( + f"{style('publish foundry FAILED', 'red')}: {exc}. " + f"Local results.json is the source of truth." + ) + return + + cloud_meta_path = output_dir / "cloud_evaluation.json" + cloud_meta_path.write_text( + json.dumps( + { + "mode": "classic", + "evaluation_name": published.evaluation_name, + "report_url": published.studio_url, + }, + indent=2, + ), + encoding="utf-8", + ) + notify( + f"Published to {style('Classic Foundry Evaluations', 'bold')}: " + f"{style(published.studio_url, 'cyan')}" + ) + notify( + f"Tip: to run server-side in the {style('New Foundry', 'bold')} " + f"experience, set 'execution: cloud' + 'publish: true' (preview)." + ) + + +def exit_code_from(result: RunResult) -> int: + """Translate a run's outcome into the ``agentops`` CLI contract. + + * ``0`` - success, all thresholds passed. + * ``2`` - invocations succeeded but a threshold failed. + * ``1`` - runtime errors are raised as exceptions before this is called. + """ + return 0 if result.summary.overall_passed else 2 + + +# --------------------------------------------------------------------------- +# Dataset +# --------------------------------------------------------------------------- + + +def _resolve_dataset_path(config: AgentOpsConfig, options: RunOptions) -> Path: + candidate = config.dataset + if candidate.is_absolute() and candidate.exists(): + return candidate + base = options.config_path.parent + resolved = (base / candidate).resolve() + if not resolved.exists(): + raise FileNotFoundError(f"dataset not found: {resolved}") + return resolved + + +_FRIENDLY_KIND = { + "foundry_prompt": "foundry agent", + "foundry_hosted": "foundry agent (hosted)", + "http_json": "http endpoint", + "model_direct": "model deployment", +} + + +def _friendly_target_kind(kind: str) -> str: + return _FRIENDLY_KIND.get(kind, kind) + + +def _iter_dataset(path: Path) -> Iterable[Dict[str, Any]]: + with path.open("r", encoding="utf-8") as handle: + for line_number, line in enumerate(handle, start=1): + stripped = line.strip() + if not stripped: + continue + try: + row = json.loads(stripped) + except json.JSONDecodeError as exc: + raise ValueError( + f"{path}: invalid JSON on line {line_number}: {exc}" + ) from exc + if not isinstance(row, dict): + raise ValueError( + f"{path}: line {line_number} is not a JSON object" + ) + yield row + + +# --------------------------------------------------------------------------- +# Per-row execution +# --------------------------------------------------------------------------- + + +def _metric_passes(rule: Threshold, value: float) -> bool: + if rule.value is None or rule.criteria in {"true", "false"}: + return True + target_v = float(rule.value) + c = rule.criteria + if c == ">=": + return value >= target_v + if c == ">": + return value > target_v + if c == "<=": + return value <= target_v + if c == "<": + return value < target_v + if c == "==": + return value == target_v + return True + + +def _evaluate_row( + *, + row: Dict[str, Any], + index: int, + total: int, + target, + config: AgentOpsConfig, + evaluators: List[runtime.EvaluatorRuntime], + timeout: float, + progress: Callable[[str], None], + rules_by_metric: Optional[Dict[str, Threshold]] = None, +) -> RowResult: + label = style(f"[{index + 1}/{total}]", "dim") + preview = str(row.get("input", "")).strip().replace("\n", " ") + if len(preview) > 80: + preview = preview[:77] + "..." + progress(f"{label} invoking target: {preview!r}") + expected = row.get("expected") + expected_text = str(expected) if expected is not None else None + + with telemetry.eval_item_span( + row_index=index, + input_text=str(row.get("input", "")), + expected_text=expected_text, + ) as item_span: + try: + with telemetry.agent_invoke_span( + target="agent" if target.kind.startswith("foundry") else "model", + model=target.deployment, + agent_id=target.raw if target.kind.startswith("foundry") else None, + agent_name=target.name, + agent_version=target.version, + ) as invoke_span: + invocation = invocations.invoke(target, config, row, timeout=timeout) + telemetry.set_agent_invoke_result( + invoke_span, + response_model=target.deployment, + ) + except Exception as exc: # noqa: BLE001 + telemetry.set_eval_item_result(item_span, passed=False) + logger.debug("row %d invocation failed: %s", index, exc) + progress(f"{label} {style('invocation FAILED', 'bold', 'red')}: {exc}") + return RowResult( + row_index=index, + input=str(row.get("input", "")), + expected=row.get("expected"), + response="", + context=row.get("context"), + error=str(exc), + ) + + tool_count = len(invocation.tool_calls) if invocation.tool_calls else 0 + progress( + f"{label} replied in {style(f'{invocation.latency_seconds:.2f}s', 'cyan')} " + f"({tool_count} tool call(s)); scoring..." + ) + + metrics: List[RowMetric] = [] + for evaluator in evaluators: + metric = runtime.run_evaluator( + evaluator, + row=row, + response=invocation.response, + latency_seconds=invocation.latency_seconds, + actual_tool_calls=invocation.tool_calls, + ) + metrics.append(metric) + + rule = (rules_by_metric or {}).get(metric.name) + metric_passed = ( + None + if metric.value is None or rule is None + else _metric_passes(rule, float(metric.value)) + ) + telemetry.record_evaluator_span( + evaluator_name=evaluator.preset.name, + builtin_name=metric.name, + source=( + "local" + if evaluator.preset.class_name == "_latency" + else "azure-ai-evaluation" + ), + score=float(metric.value) if metric.value is not None else 0.0, + threshold=rule.value if rule is not None else None, + criteria=rule.criteria if rule is not None else None, + passed=metric_passed, + ) + + telemetry.set_eval_item_result( + item_span, + passed=all(metric.error is None for metric in metrics), + ) + + rules = rules_by_metric or {} + + def _format_metric(m: RowMetric) -> str: + if isinstance(m.value, (int, float)): + rule = rules.get(m.name) + text = f"{m.value:.2f}" + if rule is None: + # No user threshold for this metric: keep value neutral + # so the line stays readable. + return f"{m.name}={text}" + color = "green" if _metric_passes(rule, float(m.value)) else "red" + return f"{m.name}={style(text, color)}" + if m.error: + return f"{m.name}={style('ERR', 'red')}" + return f"{m.name}={style('n/a', 'dim')}" + + scored = ", ".join(_format_metric(m) for m in metrics) + progress(f"{label} scored: {scored}") + + return RowResult( + row_index=index, + input=str(row.get("input", "")), + expected=row.get("expected"), + response=invocation.response, + context=row.get("context"), + latency_seconds=invocation.latency_seconds, + tool_calls=invocation.tool_calls, + metrics=metrics, + ) + + +# --------------------------------------------------------------------------- +# Aggregation +# --------------------------------------------------------------------------- + + +def _aggregate_metrics(rows: List[RowResult]) -> Dict[str, float]: + by_metric: Dict[str, List[float]] = {} + for row in rows: + for metric in row.metrics: + if metric.value is None: + continue + by_metric.setdefault(metric.name, []).append(metric.value) + aggregate: Dict[str, float] = {} + for name, values in by_metric.items(): + if values: + aggregate[name] = statistics.fmean(values) + return aggregate + + +def _first_metric_error(rows: List[RowResult]) -> Optional[str]: + """Return the first non-empty per-metric error string across all rows. + + Used by the cloud-eval orchestrator to lift the actionable cause of + an all-null aggregate (e.g., RBAC failure on the evaluator's model + deployment) into the user-facing warning so CI logs carry the signal + without operators having to download the raw artifact. + """ + for row in rows: + for metric in row.metrics: + if isinstance(metric.error, str) and metric.error.strip(): + return metric.error.strip() + return None + + +def _summarize( + rows: List[RowResult], + threshold_results, +) -> RunSummary: + items_total = len(rows) + items_passed_all = sum( + 1 + for row in rows + if row.error is None and all(m.error is None for m in row.metrics) + ) + items_pass_rate = items_passed_all / items_total if items_total else 0.0 + thresholds_total = len(threshold_results) + thresholds_passed = sum(1 for t in threshold_results if t.passed) + threshold_pass_rate = ( + thresholds_passed / thresholds_total if thresholds_total else 1.0 + ) + overall = items_total > 0 and threshold_pass_rate == 1.0 and items_passed_all > 0 + return RunSummary( + items_total=items_total, + items_passed_all=items_passed_all, + items_pass_rate=items_pass_rate, + thresholds_total=thresholds_total, + thresholds_passed=thresholds_passed, + threshold_pass_rate=threshold_pass_rate, + overall_passed=overall, + ) + + +# --------------------------------------------------------------------------- +# Persistence +# --------------------------------------------------------------------------- + + +def _persist(result: RunResult, output_dir: Path) -> None: + output_dir.mkdir(parents=True, exist_ok=True) + results_path = output_dir / "results.json" + report_path = output_dir / "report.md" + + payload = result.model_dump(mode="json") + results_path.write_text( + json.dumps(payload, indent=2, ensure_ascii=False), + encoding="utf-8", + ) + report_path.write_text(reporter.render(result), encoding="utf-8") diff --git a/tests/unit/test_cloud_results.py b/tests/unit/test_cloud_results.py index 1a39486..894662d 100644 --- a/tests/unit/test_cloud_results.py +++ b/tests/unit/test_cloud_results.py @@ -1,236 +1,338 @@ -"""Tests for :mod:`agentops.pipeline.cloud_results`.""" - -from __future__ import annotations - -import json - -from agentops.pipeline.cloud_results import rows_from_cloud_output_items - - -def _item(datasource, sample, results): - return {"datasource_item": datasource, "sample": sample, "results": results} - - -def test_extracts_text_from_output_items_list(): - """sample.output as a flat list of {text, type} dicts (Foundry shape).""" - items = [ - _item( - {"input": "hi", "expected": "hello"}, - { - "output": [ - { - "annotations": [], - "text": "hello", - "type": "output_text", - "logprobs": [], - } - ] - }, - [{"name": "similarity", "score": 5.0}], - ), - ] - rows = rows_from_cloud_output_items(items) - assert rows[0].response == "hello" - - -def test_extracts_text_from_responses_api_content_blocks(): - """sample.output[i].content as a list of {type, text} blocks - (OpenAI Responses API canonical shape).""" - items = [ - _item( - {"input": "hi"}, - { - "output": [ - { - "type": "message", - "role": "assistant", - "content": [ - {"type": "output_text", "text": "Paris is the capital."} - ], - } - ] - }, - [], - ), - ] - rows = rows_from_cloud_output_items(items) - assert rows[0].response == "Paris is the capital." - - -def test_reparses_json_encoded_output_text(): - """When Foundry stores a JSON-stringified output_items list under - ``output_text``, we should reparse it instead of passing the JSON - through as the response.""" - payload = json.dumps( - [ - { - "annotations": [], - "text": "Paris is the capital of France.", - "type": "output_text", - "logprobs": [], - } - ] - ) - items = [_item({"input": "hi"}, {"output_text": payload}, [])] - rows = rows_from_cloud_output_items(items) - assert rows[0].response == "Paris is the capital of France." - - -def test_falls_back_to_empty_string_when_sample_unrecognized(): - items = [_item({"input": "hi"}, {"strange_field": 42}, [])] - rows = rows_from_cloud_output_items(items) - assert rows[0].response == "" - - -def test_extracts_metric_scores_from_results(): - items = [ - _item( - {"input": "hi", "expected": "hello"}, - {"output_text": "hello"}, - [ - {"name": "similarity", "score": 5.0}, - {"name": "coherence", "score": 4}, - {"name": "f1_score", "value": 1.0}, - {"name": "fluency", "passed": True}, - ], - ), - ] - rows = rows_from_cloud_output_items(items) - by_name = {m.name: m.value for m in rows[0].metrics} - assert by_name == { - "similarity": 5.0, - "coherence": 4.0, - "f1_score": 1.0, - "fluency": 1.0, - } - - -def test_passes_through_context_and_tool_calls_from_datasource(): - items = [ - _item( - { - "input": "hi", - "expected": "hello", - "context": "greeting context", - "tool_calls": [{"name": "lookup"}], - }, - {"output_text": "hello"}, - [], - ), - ] - rows = rows_from_cloud_output_items(items) - assert rows[0].context == "greeting context" - assert rows[0].tool_calls == [{"name": "lookup"}] - - -def test_extracts_score_zero_as_legitimate_value(): - """``score: 0`` is the lowest valid number and must not be coerced to None. - Real Foundry safety graders (violence/sexual/self_harm) emit ``score: 0`` - on a clean row plus ``label: "pass"``; treating zero as missing collapses - the row to ``missing`` in the threshold table.""" - items = [ - _item( - {"input": "hi"}, - {"output_text": "hello"}, - [{"name": "violence", "score": 0, "label": "pass", "passed": True}], - ), - ] - rows = rows_from_cloud_output_items(items) - by_name = {m.name: m.value for m in rows[0].metrics} - assert by_name == {"violence": 0.0} - - -def test_extracts_real_foundry_azure_ai_evaluator_result_shape(): - """The on-the-wire shape emitted by Foundry's ``azure_ai_evaluator`` - grader carries both ``metric`` and ``name`` plus a ``label``, a - ``threshold``, and a ``passed`` boolean. The parser must find the - score under the canonical ``score`` field even with all the extra keys - present (extras must not shadow the score). Schema sourced from - Azure/azure-sdk-for-python evaluation fixture - ``evaluation_util_convert_expected_output.json``.""" - items = [ - _item( - {"input": "hi", "expected": "hello"}, - {"output_text": "hello"}, - [ - { - "type": "azure_ai_evaluator", - "name": "violence", - "metric": "violence", - "score": 0, - "label": "pass", - "reason": "no violent content detected", - "threshold": 3, - "passed": True, - "sample": {"output_text": "hello"}, - "status": "completed", - }, - { - "type": "azure_ai_evaluator", - "name": "coherence", - "metric": "coherence", - "score": 4.5, - "reason": "well-structured response", - "passed": True, - "status": "completed", - }, - ], - ), - ] - rows = rows_from_cloud_output_items(items) - by_name = {m.name: m.value for m in rows[0].metrics} - assert by_name == {"violence": 0.0, "coherence": 4.5} - - -def test_extracts_score_nested_in_sample_when_top_level_missing(): - """Some custom Foundry prompt-based graders only populate - ``result["sample"]["score"]`` rather than ``result["score"]``. The - parser must descend into ``sample`` as a fallback so those metrics - don't show up as missing.""" - items = [ - _item( - {"input": "hi"}, - {"output_text": "hello"}, - [{"name": "custom_quality", "sample": {"score": 3.5}}], - ), - ] - rows = rows_from_cloud_output_items(items) - by_name = {m.name: m.value for m in rows[0].metrics} - assert by_name == {"custom_quality": 3.5} - - -def test_extracts_score_from_label_when_no_numeric_score(): - """Binary content-safety graders sometimes return only ``label: pass`` - / ``label: fail`` with no numeric score. Treat those as 1.0 / 0.0 so - they don't drop out of the threshold table as missing.""" - items = [ - _item( - {"input": "hi"}, - {"output_text": "hello"}, - [ - {"name": "protected_material", "label": "pass"}, - {"name": "hate_unfairness", "label": "fail"}, - ], - ), - ] - rows = rows_from_cloud_output_items(items) - by_name = {m.name: m.value for m in rows[0].metrics} - assert by_name == {"protected_material": 1.0, "hate_unfairness": 0.0} - - -def test_records_diagnostic_reason_when_score_is_missing(): - """When a grader returns absolutely no usable score the parser emits a - structured ``error`` pointing operators at the raw items file. Silent - nulls were the symptom that motivated this fix.""" - items = [ - _item( - {"input": "hi"}, - {"output_text": "hello"}, - [{"name": "coherence"}], - ), - ] - rows = rows_from_cloud_output_items(items) - metric = rows[0].metrics[0] - assert metric.value is None - assert metric.error is not None - assert "cloud_output_items.json" in metric.error +"""Tests for :mod:`agentops.pipeline.cloud_results`.""" + +from __future__ import annotations + +import json + +from agentops.pipeline.cloud_results import rows_from_cloud_output_items + + +def _item(datasource, sample, results): + return {"datasource_item": datasource, "sample": sample, "results": results} + + +def test_extracts_text_from_output_items_list(): + """sample.output as a flat list of {text, type} dicts (Foundry shape).""" + items = [ + _item( + {"input": "hi", "expected": "hello"}, + { + "output": [ + { + "annotations": [], + "text": "hello", + "type": "output_text", + "logprobs": [], + } + ] + }, + [{"name": "similarity", "score": 5.0}], + ), + ] + rows = rows_from_cloud_output_items(items) + assert rows[0].response == "hello" + + +def test_extracts_text_from_responses_api_content_blocks(): + """sample.output[i].content as a list of {type, text} blocks + (OpenAI Responses API canonical shape).""" + items = [ + _item( + {"input": "hi"}, + { + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ + {"type": "output_text", "text": "Paris is the capital."} + ], + } + ] + }, + [], + ), + ] + rows = rows_from_cloud_output_items(items) + assert rows[0].response == "Paris is the capital." + + +def test_reparses_json_encoded_output_text(): + """When Foundry stores a JSON-stringified output_items list under + ``output_text``, we should reparse it instead of passing the JSON + through as the response.""" + payload = json.dumps( + [ + { + "annotations": [], + "text": "Paris is the capital of France.", + "type": "output_text", + "logprobs": [], + } + ] + ) + items = [_item({"input": "hi"}, {"output_text": payload}, [])] + rows = rows_from_cloud_output_items(items) + assert rows[0].response == "Paris is the capital of France." + + +def test_falls_back_to_empty_string_when_sample_unrecognized(): + items = [_item({"input": "hi"}, {"strange_field": 42}, [])] + rows = rows_from_cloud_output_items(items) + assert rows[0].response == "" + + +def test_extracts_metric_scores_from_results(): + items = [ + _item( + {"input": "hi", "expected": "hello"}, + {"output_text": "hello"}, + [ + {"name": "similarity", "score": 5.0}, + {"name": "coherence", "score": 4}, + {"name": "f1_score", "value": 1.0}, + {"name": "fluency", "passed": True}, + ], + ), + ] + rows = rows_from_cloud_output_items(items) + by_name = {m.name: m.value for m in rows[0].metrics} + assert by_name == { + "similarity": 5.0, + "coherence": 4.0, + "f1_score": 1.0, + "fluency": 1.0, + } + + +def test_passes_through_context_and_tool_calls_from_datasource(): + items = [ + _item( + { + "input": "hi", + "expected": "hello", + "context": "greeting context", + "tool_calls": [{"name": "lookup"}], + }, + {"output_text": "hello"}, + [], + ), + ] + rows = rows_from_cloud_output_items(items) + assert rows[0].context == "greeting context" + assert rows[0].tool_calls == [{"name": "lookup"}] + + +def test_extracts_score_zero_as_legitimate_value(): + """``score: 0`` is the lowest valid number and must not be coerced to None. + Real Foundry safety graders (violence/sexual/self_harm) emit ``score: 0`` + on a clean row plus ``label: "pass"``; treating zero as missing collapses + the row to ``missing`` in the threshold table.""" + items = [ + _item( + {"input": "hi"}, + {"output_text": "hello"}, + [{"name": "violence", "score": 0, "label": "pass", "passed": True}], + ), + ] + rows = rows_from_cloud_output_items(items) + by_name = {m.name: m.value for m in rows[0].metrics} + assert by_name == {"violence": 0.0} + + +def test_extracts_real_foundry_azure_ai_evaluator_result_shape(): + """The on-the-wire shape emitted by Foundry's ``azure_ai_evaluator`` + grader carries both ``metric`` and ``name`` plus a ``label``, a + ``threshold``, and a ``passed`` boolean. The parser must find the + score under the canonical ``score`` field even with all the extra keys + present (extras must not shadow the score). Schema sourced from + Azure/azure-sdk-for-python evaluation fixture + ``evaluation_util_convert_expected_output.json``.""" + items = [ + _item( + {"input": "hi", "expected": "hello"}, + {"output_text": "hello"}, + [ + { + "type": "azure_ai_evaluator", + "name": "violence", + "metric": "violence", + "score": 0, + "label": "pass", + "reason": "no violent content detected", + "threshold": 3, + "passed": True, + "sample": {"output_text": "hello"}, + "status": "completed", + }, + { + "type": "azure_ai_evaluator", + "name": "coherence", + "metric": "coherence", + "score": 4.5, + "reason": "well-structured response", + "passed": True, + "status": "completed", + }, + ], + ), + ] + rows = rows_from_cloud_output_items(items) + by_name = {m.name: m.value for m in rows[0].metrics} + assert by_name == {"violence": 0.0, "coherence": 4.5} + + +def test_extracts_score_nested_in_sample_when_top_level_missing(): + """Some custom Foundry prompt-based graders only populate + ``result["sample"]["score"]`` rather than ``result["score"]``. The + parser must descend into ``sample`` as a fallback so those metrics + don't show up as missing.""" + items = [ + _item( + {"input": "hi"}, + {"output_text": "hello"}, + [{"name": "custom_quality", "sample": {"score": 3.5}}], + ), + ] + rows = rows_from_cloud_output_items(items) + by_name = {m.name: m.value for m in rows[0].metrics} + assert by_name == {"custom_quality": 3.5} + + +def test_extracts_score_from_label_when_no_numeric_score(): + """Binary content-safety graders sometimes return only ``label: pass`` + / ``label: fail`` with no numeric score. Treat those as 1.0 / 0.0 so + they don't drop out of the threshold table as missing.""" + items = [ + _item( + {"input": "hi"}, + {"output_text": "hello"}, + [ + {"name": "protected_material", "label": "pass"}, + {"name": "hate_unfairness", "label": "fail"}, + ], + ), + ] + rows = rows_from_cloud_output_items(items) + by_name = {m.name: m.value for m in rows[0].metrics} + assert by_name == {"protected_material": 1.0, "hate_unfairness": 0.0} + + +def test_records_diagnostic_reason_when_score_is_missing(): + """When a grader returns absolutely no usable score the parser emits a + structured ``error`` pointing operators at the raw items file. Silent + nulls were the symptom that motivated this fix.""" + items = [ + _item( + {"input": "hi"}, + {"output_text": "hello"}, + [{"name": "coherence"}], + ), + ] + rows = rows_from_cloud_output_items(items) + metric = rows[0].metrics[0] + assert metric.value is None + assert metric.error is not None + assert "cloud_output_items.json" in metric.error + + +def test_lifts_grader_execution_error_from_sample_error_dict(): + """When a Foundry ``azure_ai_evaluator`` grader fails to execute (e.g. + the evaluator service principal lacks ``Cognitive Services OpenAI + User`` on the model deployment), the failure is buried inside + ``result.sample.error.message`` and the top-level score is just + ``null``. The parser must lift that message into ``RowMetric.error`` + so the actionable cause shows up in ``results.json`` / ``report.md`` + instead of operators seeing only ``actual=missing`` in the threshold + table. Schema mirrors the on-the-wire shape captured from real + Foundry ``cloud_output_items.json`` artifacts.""" + items = [ + _item( + {"input": "hi", "expected": "hello"}, + {"output_text": "hello"}, + [ + { + "name": "coherence", + "metric": "coherence", + "type": "azure_ai_evaluator", + "score": None, + "passed": None, + "label": None, + "reason": None, + "threshold": None, + "status": "error", + "sample": { + "error": { + "code": "FAILED_EXECUTION", + "message": ( + "(UserError) OpenAI API hits " + "AuthenticationError: Error code: 401 - " + "PermissionDenied: lacks the required data " + "action chat/completions/action" + ), + } + }, + } + ], + ), + ] + rows = rows_from_cloud_output_items(items) + metric = rows[0].metrics[0] + assert metric.name == "coherence" + assert metric.value is None + assert metric.error is not None + assert metric.error.startswith("FAILED_EXECUTION: ") + assert "AuthenticationError" in metric.error + assert "PermissionDenied" in metric.error + assert "cloud_output_items.json" not in metric.error + + +def test_lifts_grader_error_from_top_level_dict_payload(): + """Some Foundry response shapes carry the error as a dict at the top + level of the result envelope rather than inside ``sample.error``. + The parser must accept both shapes interchangeably.""" + items = [ + _item( + {"input": "hi"}, + {"output_text": "hello"}, + [ + { + "name": "fluency", + "score": None, + "error": { + "code": "RateLimited", + "message": "rate limit exceeded", + }, + } + ], + ), + ] + rows = rows_from_cloud_output_items(items) + metric = rows[0].metrics[0] + assert metric.value is None + assert metric.error == "RateLimited: rate limit exceeded" + + +def test_grader_error_payload_does_not_shadow_a_real_score(): + """A grader can return ``status: completed`` plus a numeric score + even when ``sample.error`` exists for an upstream warning. In that + case the numeric score wins and ``RowMetric.error`` stays empty so + aggregation continues normally.""" + items = [ + _item( + {"input": "hi"}, + {"output_text": "hello"}, + [ + { + "name": "coherence", + "score": 4.5, + "status": "completed", + "sample": {"error": None}, + } + ], + ), + ] + rows = rows_from_cloud_output_items(items) + metric = rows[0].metrics[0] + assert metric.value == 4.5 + assert metric.error is None