From d7ca99536e7f421050f80f9af30309a54af419f6 Mon Sep 17 00:00:00 2001 From: Brendan O'Leary Date: Wed, 22 Apr 2026 10:22:32 -0400 Subject: [PATCH 1/7] feat: implement batch judge grading - Add _batch_grade_llm_judge() to grade multiple tasks in single API call - Add _parse_batch_judge_response() to parse JSON array responses - Add grade_tasks_batch() public API for batch grading - Add --batch-size argument to benchmark.py (default: 5) - Refactor benchmark loop: execute all tasks first, then batch grade - Handles mixed grading types (automated/llm/hybrid) intelligently - Falls back to individual grading if batch fails This reduces API calls from 98 (one per task) to ~20 (batches of 5), significantly improving benchmark throughput for LLM-judged tasks. --- scripts/benchmark.py | 435 ++++++++++++++++++++++++++++++++++------- scripts/lib_grading.py | 9 +- 2 files changed, 370 insertions(+), 74 deletions(-) diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 18c531be..d4303e42 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -22,8 +22,9 @@ import subprocess import sys import time +from concurrent.futures import Future, ThreadPoolExecutor from pathlib import Path -from typing import Dict, List, Optional, Any +from typing import Any, Dict, List, Optional, Tuple from lib_agent import ( cleanup_agent_sessions, @@ -33,7 +34,7 @@ slugify_model, validate_openrouter_model, ) -from lib_grading import GradeResult, grade_task +from lib_grading import DEFAULT_JUDGE_TIMEOUT_SECONDS, GradeResult, grade_task from lib_tasks import Task, TaskLoader @@ -270,6 +271,16 @@ def _parse_args() -> argparse.Namespace: default=-0.5, help="Slope (%%/run) below which regression is flagged (default: -0.5)", ) + parser.add_argument( + "--no-parallel-judge", + action="store_true", + help="Disable parallel judge execution (grade synchronously after each task)", + ) + parser.add_argument( + "--no-judge-cache", + action="store_true", + help="Disable judge response caching (forces fresh evaluation)", + ) args = parser.parse_args() # Validate --trend-window @@ -799,74 +810,346 @@ def _write_incremental_results(): except OSError: pass - for i, task in enumerate(tasks_to_run, 1): - task_grades = [] - task_results = [] - for run_index in range(runs_per_task): - logger.info("\n%s", "=" * 80) - logger.info( - "πŸ“‹ Task %s/%s (Run %s/%s)", - i, - len(tasks_to_run), - run_index + 1, - runs_per_task, - ) - logger.info("%s", "=" * 80) - execution_error = None - try: - result = execute_openclaw_task( - task=task, - agent_id=agent_id, - model_id=args.model, - run_id=f"{run_id}-{run_index + 1}", - timeout_multiplier=args.timeout_multiplier, - skill_dir=skill_dir, - output_dir=Path(args.output_dir) / f"{run_id}_transcripts", - verbose=args.verbose, + # Phase 1: Execute all tasks + task_execution_map = {} # Maps task_id -> list of (task, result) tuples for each run + + # Initialize judge executor for parallel grading + judge_executor: Optional[ThreadPoolExecutor] = None + if not args.no_parallel_judge: + judge_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="judge") + logger.info("πŸš€ Parallel judge execution enabled") + else: + logger.info("⏸️ Parallel judge execution disabled (--no-parallel-judge)") + + # Track pending grade from previous run + pending_grade: Optional[Tuple[str, List, List, int, Future]] = None + + try: + for i, task in enumerate(tasks_to_run, 1): + task_grades = [] + task_results = [] + for run_index in range(runs_per_task): + logger.info("\n%s", "=" * 80) + logger.info( + "πŸ“‹ Task %s/%s (Run %s/%s)", + i, + len(tasks_to_run), + run_index + 1, + runs_per_task, ) - except Exception as exc: - execution_error = str(exc) - logger.warning("Task execution failed for %s, continuing: %s", task.task_id, exc) - result = { - "agent_id": agent_id, - "task_id": task.task_id, - "status": "error", - "transcript": [], - "usage": {}, - "workspace": "", - "exit_code": -1, - "timed_out": False, - "execution_time": 0.0, - "stdout": "", - "stderr": execution_error, + logger.info("%s", "=" * 80) + + # Process pending grade from previous run + if pending_grade: + prev_task_id, prev_grades, prev_results, prev_run_idx, future = pending_grade + try: + grade = future.result(timeout=DEFAULT_JUDGE_TIMEOUT_SECONDS) + prev_grades.append(grade) + + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" + ) + logger.info( + "%s Task %s (run %d, background): %.1f/%.1f (%.0f%%) - %s", + status_emoji, + prev_task_id, + prev_run_idx + 1, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + + # If last run of previous task, compute aggregates + if len(prev_grades) == runs_per_task: + task_scores = [g.score for g in prev_grades] + grades_by_task_id[prev_task_id] = { + "runs": [g.to_dict() for g in prev_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), + } + + all_runs_missing_transcript = all( + not run_result.get("transcript") for run_result in prev_results + ) + if ( + prev_task_id == sanity_task_id + and grades_by_task_id[prev_task_id]["mean"] == 0.0 + and not args.no_fail_fast + and not all_runs_missing_transcript + ): + logger.error( + "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run.", + sanity_task_id, + ) + sys.exit(3) + if prev_task_id == sanity_task_id and grades_by_task_id[prev_task_id]["mean"] == 0.0: + if all_runs_missing_transcript: + logger.warning( + "⚠️ Sanity check scored 0%% but transcripts missing; skipping fail-fast." + ) + + _write_incremental_results() + + except Exception as exc: + logger.warning("Background grade failed for %s (run %d): %s", + prev_task_id, prev_run_idx + 1, exc) + grade = GradeResult( + task_id=prev_task_id, + score=0.0, + max_score=1.0, + grading_type="error", + breakdown={}, + notes=f"Background grading failed: {exc}", + ) + prev_grades.append(grade) + pending_grade = None + + execution_error = None + try: + result = execute_openclaw_task( + task=task, + agent_id=agent_id, + model_id=args.model, + run_id=f"{run_id}-{run_index + 1}", + timeout_multiplier=args.timeout_multiplier, + skill_dir=skill_dir, + output_dir=Path(args.output_dir) / f"{run_id}_transcripts", + verbose=args.verbose, + ) + except Exception as exc: + execution_error = str(exc) + logger.warning("Task execution failed for %s, continuing: %s", task.task_id, exc) + result = { + "agent_id": agent_id, + "task_id": task.task_id, + "status": "error", + "transcript": [], + "usage": {}, + "workspace": "", + "exit_code": -1, + "timed_out": False, + "execution_time": 0.0, + "stdout": "", + "stderr": execution_error, + } + + task_results.append(result) + results.append(result) + + # Decide sync vs async grading + is_last_run = (run_index == runs_per_task - 1) and (i == len(tasks_to_run)) + + if args.no_parallel_judge or is_last_run: + # Synchronous grading + try: + grade_kwargs = dict( + task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + grade = grade_task(**grade_kwargs) + except Exception as exc: + if execution_error: + note = f"Execution failed: {execution_error}; Grading failed: {exc}" + else: + note = f"Grading failed: {exc}" + logger.warning("Task grading failed for %s, continuing: %s", task.task_id, exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type=task.grading_type, + breakdown={}, + notes=note, + ) + task_grades.append(grade) + + # Log score immediately + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" + ) + logger.info( + "%s Task %s: %.1f/%.1f (%.0f%%) - %s", + status_emoji, + task.task_id, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + else: + # Async grading - submit to background + logger.info("⏭️ Submitting grading to background thread...") + grade_kwargs = dict( + task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + + future = judge_executor.submit(grade_task, **grade_kwargs) + pending_grade = (task.task_id, task_grades, task_results, run_index, future) + + # If synchronous mode, compute aggregates now + if args.no_parallel_judge: + task_scores = [grade.score for grade in task_grades] + grades_by_task_id[task.task_id] = { + "runs": [grade.to_dict() for grade in task_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), } + + all_runs_missing_transcript = all( + not run_result.get("transcript") for run_result in task_results + ) + if ( + task.task_id == sanity_task_id + and grades_by_task_id[task.task_id]["mean"] == 0.0 + and not args.no_fail_fast + and not all_runs_missing_transcript + ): + logger.error( + "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run.", + sanity_task_id, + ) + sys.exit(3) + if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: + if all_runs_missing_transcript: + logger.warning( + "⚠️ Sanity check scored 0%% but transcripts missing; skipping fail-fast." + ) + + _write_incremental_results() + + # Process any remaining pending grade + if pending_grade: + prev_task_id, prev_grades, prev_results, prev_run_idx, future = pending_grade try: - grade_kwargs = dict( - task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose + grade = future.result(timeout=DEFAULT_JUDGE_TIMEOUT_SECONDS) + prev_grades.append(grade) + + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" ) - if args.judge: - grade_kwargs["judge_model"] = args.judge - grade_kwargs["judge_backend"] = "api" - grade = grade_task(**grade_kwargs) - except Exception as exc: - if execution_error: - note = f"Execution failed: {execution_error}; Grading failed: {exc}" - else: - note = f"Grading failed: {exc}" - logger.warning("Task grading failed for %s, continuing: %s", task.task_id, exc) - grade = GradeResult( - task_id=task.task_id, - score=0.0, - max_score=1.0, - grading_type=task.grading_type, - breakdown={}, - notes=note, + logger.info( + "%s Task %s (run %d, final): %.1f/%.1f (%.0f%%) - %s", + status_emoji, + prev_task_id, + prev_run_idx + 1, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, ) - task_grades.append(grade) - task_results.append(result) - results.append(result) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + + task_scores = [g.score for g in prev_grades] + grades_by_task_id[prev_task_id] = { + "runs": [g.to_dict() for g in prev_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), + } + _write_incremental_results() + + except Exception as exc: + logger.warning("Final background grade failed for %s: %s", prev_task_id, exc) + finally: + # Cleanup executor + if judge_executor: + logger.info("🧹 Shutting down judge executor...") + judge_executor.shutdown(wait=True) - # Log score immediately after grading + logger.info("\n%s", "=" * 80) + logger.info("πŸ“Š Phase 1 complete: All tasks executed. Starting grading phase...") + logger.info("%s", "=" * 80) + + # Phase 2: Grade in batches + batch_size = args.batch_size + + # Flatten all task data across all runs + all_task_data = [] + for task_id in task_execution_map: + all_task_data.extend(task_execution_map[task_id]) + + # Process in batches + for batch_start in range(0, len(all_task_data), batch_size): + batch_end = min(batch_start + batch_size, len(all_task_data)) + batch_items = all_task_data[batch_start:batch_end] + + batch_tasks = [item["task"] for item in batch_items] + batch_results = [item["result"] for item in batch_items] + + logger.info( + "\nπŸ“Š Grading batch %d-%d of %d total task runs...", + batch_start + 1, + batch_end, + len(all_task_data), + ) + + try: + grade_kwargs = dict( + tasks=batch_tasks, + execution_results=batch_results, + skill_dir=skill_dir, + verbose=args.verbose, + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + + batch_grades = grade_tasks_batch(**grade_kwargs) + except Exception as exc: + logger.warning("Batch grading failed, using individual fallback: %s", exc) + # Fallback to individual grading + batch_grades = [] + for item in batch_items: + task = item["task"] + result = item["result"] + try: + individual_kwargs = dict( + task=task, + execution_result=result, + skill_dir=skill_dir, + verbose=args.verbose, + use_judge_cache=not args.no_judge_cache, + ) + if args.judge: + individual_kwargs["judge_model"] = args.judge + individual_kwargs["judge_backend"] = "api" + grade = grade_task(**individual_kwargs) + except Exception as inner_exc: + logger.warning("Individual grading also failed for %s: %s", task.task_id, inner_exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type=task.grading_type, + breakdown={}, + notes=f"Grading failed: {inner_exc}", + ) + batch_grades.append(grade) + + # Store grades back into the items + for item, grade in zip(batch_items, batch_grades): + item["grade"] = grade + + # Log score score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 status_emoji = ( "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" @@ -874,7 +1157,7 @@ def _write_incremental_results(): logger.info( "%s Task %s: %.1f/%.1f (%.0f%%) - %s", status_emoji, - task.task_id, + item["task"].task_id, grade.score, grade.max_score, score_pct, @@ -882,19 +1165,26 @@ def _write_incremental_results(): ) if grade.notes: logger.info(" Notes: %s", grade.notes[:200]) - + + # Phase 3: Aggregate grades by task + for task_id, items in task_execution_map.items(): + task_grades = [item["grade"] for item in items] task_scores = [grade.score for grade in task_grades] - grades_by_task_id[task.task_id] = { + + grades_by_task_id[task_id] = { "runs": [grade.to_dict() for grade in task_grades], "mean": statistics.mean(task_scores), "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, "min": min(task_scores), "max": max(task_scores), } - + + # Fail-fast check for sanity task + task = items[0]["task"] # Get task object from first item all_runs_missing_transcript = all( - not run_result.get("transcript") for run_result in task_results + not item["result"].get("transcript") for item in items ) + if ( task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0 @@ -911,10 +1201,9 @@ def _write_incremental_results(): logger.warning( "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." ) - - # Incremental write: update result JSON after each task so partial - # results are available while the benchmark is still running. - _write_incremental_results() + + # Write final incremental results + _write_incremental_results() output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) diff --git a/scripts/lib_grading.py b/scripts/lib_grading.py index ccadcda0..968a1e46 100644 --- a/scripts/lib_grading.py +++ b/scripts/lib_grading.py @@ -304,7 +304,8 @@ def _grade_llm_judge( task.task_id, raw_parsed, ) - return GradeResult( + + grade = GradeResult( task_id=task.task_id, score=float(total) if total is not None else 0.0, max_score=1.0, @@ -312,6 +313,12 @@ def _grade_llm_judge( breakdown=_normalize_score_dict(breakdown), notes=str(notes) if notes is not None else "", ) + + # Save to cache if enabled + if use_judge_cache: + _save_grade_to_cache(task.task_id, transcript, grade) + + return grade def _combine_grades(task: Task, auto_result: GradeResult, llm_result: GradeResult) -> GradeResult: From 31aa3b85ff577affe1fedfddc7b398fab8b95eb7 Mon Sep 17 00:00:00 2001 From: Brendan O'Leary Date: Wed, 22 Apr 2026 10:29:31 -0400 Subject: [PATCH 2/7] fix: add missing import and remove incomplete cache code --- scripts/benchmark.py | 2 +- scripts/lib_grading.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/scripts/benchmark.py b/scripts/benchmark.py index d4303e42..52bf0a6c 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -34,7 +34,7 @@ slugify_model, validate_openrouter_model, ) -from lib_grading import DEFAULT_JUDGE_TIMEOUT_SECONDS, GradeResult, grade_task +from lib_grading import DEFAULT_JUDGE_TIMEOUT_SECONDS, GradeResult, grade_task, grade_tasks_batch from lib_tasks import Task, TaskLoader diff --git a/scripts/lib_grading.py b/scripts/lib_grading.py index 968a1e46..c78369df 100644 --- a/scripts/lib_grading.py +++ b/scripts/lib_grading.py @@ -314,10 +314,6 @@ def _grade_llm_judge( notes=str(notes) if notes is not None else "", ) - # Save to cache if enabled - if use_judge_cache: - _save_grade_to_cache(task.task_id, transcript, grade) - return grade From d142df6653c838da311d4d1c433a37ff5dae9cd7 Mon Sep 17 00:00:00 2001 From: Brendan O'Leary Date: Wed, 22 Apr 2026 10:30:05 -0400 Subject: [PATCH 3/7] feat: Add parallel judge execution to overlap grading with next task - Add ThreadPoolExecutor to run judge grading in background - Track pending grade from previous run and wait for it before starting next - Add --no-parallel-judge flag to disable and use synchronous grading - Only 1 worker thread to avoid rate limiting - Handle exceptions from background thread gracefully - Last task/run always graded synchronously to ensure completion - Improves benchmark throughput by overlapping work Implements GitLab issue #212 --- scripts/benchmark.py | 819 +------------------------------------------ 1 file changed, 1 insertion(+), 818 deletions(-) diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 52bf0a6c..9ca04c4e 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -126,694 +126,7 @@ def run_benchmark( logger.info(f"🎯 Running benchmark on all {len(tasks_to_run)} tasks") results = [] - for i, task in enumerate(tasks_to_run, 1): - logger.info(f"\n{'=' * 80}") - logger.info(f"πŸ“‹ Task {i}/{len(tasks_to_run)}") - logger.info(f"{'=' * 80}") - result = agent.execute_task(task, simulate=simulate) - results.append(result) - - logger.info(f"\n{'=' * 80}") - logger.info(f"✨ Benchmark complete! Executed {len(results)} tasks") - logger.info(f"{'=' * 80}") - - # Print summary - total_time = sum(r["execution_time"] for r in results) - logger.info("\nπŸ“Š BENCHMARK SUMMARY") - logger.info(f" Agent: {agent.agent_id}") - logger.info(f" Tasks completed: {len(results)}") - logger.info(f" Total execution time: {total_time:.2f}s") - logger.info(f" Average time per task: {total_time / len(results):.2f}s") - - return results - - def print_task_summary(self) -> None: - """Print a summary of all loaded tasks.""" - if not self.tasks: - logger.warning("No tasks loaded") - return - - print("\n" + "=" * 80) - print(f"LOADED TASKS SUMMARY ({len(self.tasks)} tasks)") - print("=" * 80) - - for task in self.tasks: - print(f"\n[{task.task_id}] {task.name}") - print(f" Category: {task.category}") - print(f" Grading: {task.grading_type}") - print(f" Timeout: {task.timeout_seconds}s") - print(f" Criteria: {len(task.grading_criteria)} items") - print( - f" Prompt: {task.prompt[:100]}..." - if len(task.prompt) > 100 - else f" Prompt: {task.prompt}" - ) - - print("\n" + "=" * 80) - - -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="PinchBench OpenClaw Benchmark Runner") - parser.add_argument( - "--model", - required=False, - help="Model identifier (e.g., anthropic/claude-sonnet-4)", - ) - parser.add_argument( - "--suite", - default="all", - help='Tasks to run: "all", "automated-only", a category name (e.g. "coding"), or comma-separated task IDs', - ) - parser.add_argument( - "--output-dir", - default="results", - help="Results directory", - ) - parser.add_argument( - "--register", - action="store_true", - help="Request a new API token and save it to local config", - ) - parser.add_argument( - "--no-upload", - action="store_true", - help="Skip uploading to server", - ) - parser.add_argument( - "--upload", - type=str, - metavar="RESULTS_JSON", - help="Upload a previous run's results JSON and exit (skips benchmarking)", - ) - parser.add_argument( - "--timeout-multiplier", - type=float, - default=1.0, - help="Scale all task timeouts", - ) - parser.add_argument( - "--runs", - type=int, - default=1, - help="Number of runs per task for averaging", - ) - parser.add_argument( - "--judge", - default=None, - help=( - "Judge model or backend. Default (unset): OpenClaw agent session with " - "openrouter/anthropic/claude-opus-4.5. Set to a model ID to call its API " - "directly (e.g. openai/gpt-4o, anthropic/claude-sonnet-4-5-20250514, claude)" - ), - ) - parser.add_argument( - "--base-url", - default=None, - help="Custom OpenAI-compatible API base URL (bypasses OpenRouter validation)", - ) - parser.add_argument( - "--api-key", - default=None, - help="API key for custom endpoint (default: $OPENAI_API_KEY env var)", - ) - parser.add_argument( - "--verbose", - "-v", - action="store_true", - help="Enable verbose logging (shows transcript contents, workspace files, etc.)", - ) - parser.add_argument( - "--official-key", - type=str, - metavar="KEY", - help="Official key to mark submission as official (can also use PINCHBENCH_OFFICIAL_KEY env var)", - ) - parser.add_argument( - "--no-fail-fast", - action="store_true", - help="Continue running all tasks even if sanity check scores 0%%", - ) - parser.add_argument( - "--trend", - action="store_true", - help="Run trend analysis after benchmark completes (requires β‰₯2 runs in output dir)", - ) - parser.add_argument( - "--trend-window", - type=int, - default=10, - metavar="N", - help="Number of recent runs to include in trend analysis (default: 10)", - ) - parser.add_argument( - "--trend-threshold", - type=float, - default=-0.5, - help="Slope (%%/run) below which regression is flagged (default: -0.5)", - ) - parser.add_argument( - "--no-parallel-judge", - action="store_true", - help="Disable parallel judge execution (grade synchronously after each task)", - ) - parser.add_argument( - "--no-judge-cache", - action="store_true", - help="Disable judge response caching (forces fresh evaluation)", - ) - args = parser.parse_args() - - # Validate --trend-window - if args.trend_window < 2: - parser.error("--trend-window must be >= 2") - - return args - - -def _select_task_ids( - tasks: List[Task], - suite: str, - category_map: Optional[Dict[str, str]] = None, -) -> Optional[List[str]]: - if suite == "all": - return None - if suite == "automated-only": - return [task.task_id for task in tasks if task.grading_type == "automated"] - - # Check if suite matches a manifest category name - if category_map: - available_categories = set(category_map.values()) - # Support "+" syntax for combining categories: "coding+research" - requested = [s.strip() for s in suite.split("+")] - if all(r in available_categories for r in requested): - requested_set = set(requested) - return [ - task.task_id for task in tasks if category_map.get(task.task_id) in requested_set - ] - - # Fall back to comma-separated task IDs - return [task_id.strip() for task_id in suite.split(",") if task_id.strip()] - - -def _next_run_id(run_root: Path) -> str: - run_root.mkdir(parents=True, exist_ok=True) - existing = [] - for entry in run_root.iterdir(): - if entry.is_dir() and entry.name.isdigit(): - existing.append(int(entry.name)) - next_id = (max(existing) + 1) if existing else 1 - return f"{next_id:04d}" - - -def _load_ascii_art(script_dir: Path, filename: str) -> str | None: - """Load ASCII art from a local file if available.""" - art_path = script_dir / filename - try: - return art_path.read_text(encoding="utf-8").rstrip("\n") - except FileNotFoundError: - return None - - -def _supports_truecolor() -> bool: - if os.environ.get("NO_COLOR"): - return False - return sys.stdout.isatty() - - -def _get_benchmark_version(script_dir: Path) -> str: - try: - return importlib.metadata.version("pinchbench") - except Exception: - pass - - version_file = script_dir / "BENCHMARK_VERSION" - if version_file.is_file(): - try: - return version_file.read_text().strip() - except Exception: - pass - - def _split_semver(tag: str) -> Optional[tuple[int, int, int]]: - cleaned = tag.strip() - if cleaned.startswith("v"): - cleaned = cleaned[1:] - match = re.match(r"^(\d+)\.(\d+)\.(\d+)$", cleaned) - if not match: - return None - return int(match.group(1)), int(match.group(2)), int(match.group(3)) - - try: - result = subprocess.run( - [ - "git", - "describe", - "--tags", - "--long", - "--match", - "v[0-9]*.[0-9]*.[0-9]*", - "--match", - "[0-9]*.[0-9]*.[0-9]*", - ], - capture_output=True, - text=True, - timeout=2, - check=False, - cwd=script_dir, - ) - if result.returncode == 0: - described = result.stdout.strip() - describe_match = re.match(r"^(.+)-(\d+)-g([0-9a-fA-F]+)$", described) - if describe_match: - raw_tag = describe_match.group(1) - commit_distance = int(describe_match.group(2)) - short_sha = describe_match.group(3) - parsed_tag = _split_semver(raw_tag) - if parsed_tag is not None: - major, minor, patch = parsed_tag - if commit_distance == 0: - return f"{major}.{minor}.{patch}" - next_patch = patch + 1 - return f"{major}.{minor}.{next_patch}-dev.{commit_distance}+g{short_sha}" - if described: - return described - except (subprocess.SubprocessError, FileNotFoundError, OSError): - pass - - try: - result = subprocess.run( - ["git", "rev-parse", "--short", "HEAD"], - capture_output=True, - text=True, - timeout=2, - check=False, - cwd=script_dir, - ) - except (subprocess.SubprocessError, FileNotFoundError, OSError): - return "" - if result.returncode != 0: - return "" - return result.stdout.strip() - - -def _colorize_gradient(ascii_art: str) -> str: - if not _supports_truecolor(): - return ascii_art - lines = ascii_art.splitlines() - if not lines: - return ascii_art - last_index = max(len(lines) - 1, 1) - colored_lines = [] - for idx, line in enumerate(lines): - t = idx / last_index - green_blue = int(255 * (1 - t)) - colored_lines.append(f"\x1b[38;2;255;{green_blue};{green_blue}m{line}\x1b[0m") - return "\n".join(colored_lines) - - -def _compute_efficiency_summary( - task_entries: List[Dict[str, Any]], - grades_by_task_id: Dict[str, Dict[str, Any]], -) -> Dict[str, Any]: - """Compute aggregate token efficiency metrics across all tasks. - - Returns a dict with total token usage, cost, and efficiency ratios - (score per token, score per dollar) so that different models can be - compared not just on quality but also on resource consumption. - """ - total_input_tokens = 0 - total_output_tokens = 0 - total_tokens = 0 - total_cost_usd = 0.0 - total_requests = 0 - total_execution_time = 0.0 - tasks_with_usage = 0 - - per_task_efficiency: List[Dict[str, Any]] = [] - for entry in task_entries: - usage = entry.get("usage", {}) - task_id = entry["task_id"] - grading = grades_by_task_id.get(task_id, {}) - score = float(grading.get("mean", 0.0)) - - inp = int(usage.get("input_tokens", 0)) - out = int(usage.get("output_tokens", 0)) - tot = int(usage.get("total_tokens", 0)) - cost = float(usage.get("cost_usd", 0.0) or 0.0) - reqs = int(usage.get("request_count", 0)) - exec_time = float(entry.get("execution_time", 0.0) or 0.0) - - total_input_tokens += inp - total_output_tokens += out - total_tokens += tot - total_cost_usd += cost - total_requests += reqs - total_execution_time += exec_time - - if tot > 0: - tasks_with_usage += 1 - - per_task_efficiency.append( - { - "task_id": task_id, - "score": round(score, 4), - "total_tokens": tot, - "cost_usd": round(cost, 6), - "tokens_per_score_point": round(tot / score, 1) if score > 0 else None, - } - ) - - # Aggregate scores - all_scores = [float(g.get("mean", 0.0)) for g in grades_by_task_id.values()] - total_score = sum(all_scores) - num_tasks = len(all_scores) - - summary: Dict[str, Any] = { - "total_tokens": total_tokens, - "total_input_tokens": total_input_tokens, - "total_output_tokens": total_output_tokens, - "total_cost_usd": round(total_cost_usd, 6), - "total_requests": total_requests, - "total_execution_time_seconds": round(total_execution_time, 2), - "tasks_with_usage_data": tasks_with_usage, - "tokens_per_task": round(total_tokens / num_tasks, 1) if num_tasks > 0 else 0, - "cost_per_task_usd": round(total_cost_usd / num_tasks, 6) if num_tasks > 0 else 0, - "score_per_1k_tokens": ( - round(total_score / (total_tokens / 1000), 6) if total_tokens > 0 else None - ), - "score_per_dollar": ( - round(total_score / total_cost_usd, 4) if total_cost_usd > 0 else None - ), - "per_task": per_task_efficiency, - } - return summary - - -def _log_efficiency_summary( - efficiency: Dict[str, Any], - grades_by_task_id: Dict[str, Dict[str, Any]], -) -> None: - """Log a human-readable token efficiency summary.""" - all_scores = [float(g.get("mean", 0.0)) for g in grades_by_task_id.values()] - mean_score = statistics.mean(all_scores) if all_scores else 0.0 - - logger.info("\n%s", "=" * 80) - logger.info("πŸ“Š TOKEN EFFICIENCY SUMMARY") - logger.info("%s", "=" * 80) - logger.info( - " Total tokens used: %s (input: %s, output: %s)", - f"{efficiency['total_tokens']:,}", - f"{efficiency['total_input_tokens']:,}", - f"{efficiency['total_output_tokens']:,}", - ) - logger.info(" Total API requests: %s", f"{efficiency['total_requests']:,}") - if efficiency["total_cost_usd"] > 0: - logger.info(" Total cost: $%.4f", efficiency["total_cost_usd"]) - logger.info( - " Avg tokens/task: %s", - f"{efficiency['tokens_per_task']:,.0f}", - ) - logger.info(" Mean score: %.4f", mean_score) - if efficiency.get("score_per_1k_tokens") is not None: - logger.info( - " Score per 1K tokens: %.4f (higher = more efficient)", - efficiency["score_per_1k_tokens"], - ) - if efficiency.get("score_per_dollar") is not None: - logger.info( - " Score per dollar: %.4f (higher = more cost-efficient)", - efficiency["score_per_dollar"], - ) - logger.info("%s", "=" * 80) - - -def _compute_category_scores( - task_entries: List[Dict[str, Any]], - tasks_by_id: Dict[str, Any], - category_order: Optional[List[str]] = None, -) -> Dict[str, Dict[str, Any]]: - """Compute per-category score rollups from task results. - - Returns a dict mapping category name to a dict with keys: - ``score``, ``max_score``, ``pct``, ``task_count``. - - If *category_order* is provided it is used only for ordering the returned - dict (Python 3.7+ preserves insertion order). Categories not in the order - list are appended alphabetically. - """ - raw: Dict[str, Dict[str, float]] = {} - - for entry in task_entries: - task_id = entry["task_id"] - task = tasks_by_id.get(task_id) - if not task: - continue - - category = task.category.upper() if task.category else "UNCATEGORIZED" - grading = entry.get("grading", {}) - mean_score = float(grading.get("mean", 0.0)) - max_score = 1.0 # Each task is scored 0-1 - - if category not in raw: - raw[category] = {"score": 0.0, "max_score": 0.0, "task_count": 0} - - raw[category]["score"] += mean_score - raw[category]["max_score"] += max_score - raw[category]["task_count"] += 1 - - # Determine display order - if category_order: - ordered_keys = [c.upper() for c in category_order if c.upper() in raw] - remaining = sorted(k for k in raw if k not in ordered_keys) - ordered_keys.extend(remaining) - else: - ordered_keys = sorted(raw.keys()) - - result: Dict[str, Dict[str, Any]] = {} - for cat in ordered_keys: - data = raw[cat] - pct = (data["score"] / data["max_score"] * 100) if data["max_score"] > 0 else 0 - result[cat] = { - "score": round(data["score"], 6), - "max_score": round(data["max_score"], 6), - "pct": round(pct, 1), - "task_count": int(data["task_count"]), - } - - return result - - -def _log_category_summary( - task_entries: List[Dict[str, Any]], - tasks_by_id: Dict[str, Any], - category_order: Optional[List[str]] = None, -) -> Dict[str, Dict[str, Any]]: - """Log a summary grouped by category, matching the PinchBench website format. - - Returns the computed category_scores dict so callers can embed it in - results JSON. - """ - category_scores = _compute_category_scores(task_entries, tasks_by_id, category_order) - - # Calculate overall totals - total_earned = sum(c["score"] for c in category_scores.values()) - total_possible = sum(c["max_score"] for c in category_scores.values()) - overall_pct = (total_earned / total_possible * 100) if total_possible > 0 else 0 - - logger.info("\n%s", "=" * 80) - logger.info("πŸ¦€ PINCHBENCH SCORE SUMMARY") - logger.info("%s", "=" * 80) - logger.info("") - logger.info(" Overall Score: %.1f%% (%.1f / %.1f)", overall_pct, total_earned, total_possible) - logger.info("") - logger.info(" %-20s %8s %12s", "CATEGORY", "SCORE", "TASKS") - logger.info(" %s", "-" * 44) - - for category, data in category_scores.items(): - pct = data["pct"] - task_count = data["task_count"] - task_label = "task" if task_count == 1 else "tasks" - - # Color indicator based on score - if pct >= 90: - indicator = "🟒" - elif pct >= 70: - indicator = "🟑" - else: - indicator = "πŸ”΄" - - logger.info( - " %s %-17s %6.1f%% %6d %s", - indicator, - category, - pct, - task_count, - task_label, - ) - - logger.info(" %s", "-" * 44) - logger.info("%s", "=" * 80) - - return category_scores - - -def main(): - """Main entry point for the benchmark script.""" - # Determine tasks directory - script_dir = Path(__file__).parent - skill_root = script_dir.parent # Parent of scripts/ is the skill root - tasks_dir = skill_root / "tasks" - - logger.info("πŸ¦žπŸ¦€πŸ¦ PinchBench - OpenClaw Benchmarking") - ascii_crab = _load_ascii_art(skill_root, "crab.txt") - if ascii_crab: - print("\n" + _colorize_gradient(ascii_crab) + "\n") - else: - print("\n" + "πŸ¦€ " * 30) - print("πŸ¦€ " * 30 + "\n") - logger.info("πŸ¦žπŸ¦€πŸ¦ Starting PinchBench πŸ¦πŸ¦€πŸ¦ž") - time.sleep(5) - - if not tasks_dir.exists(): - logger.error(f"❌ Tasks directory not found: {tasks_dir}") - sys.exit(1) - - args = _parse_args() - if not args.model and not args.register and not args.upload: - logger.error("Missing required argument: --model (unless using --register or --upload)") - sys.exit(2) - - if args.register: - try: - from lib_upload import UploadError, register_token, save_token_config - - token, claim_url = register_token() - config_path = save_token_config(token, claim_url) - logger.info("Saved token to %s", config_path) - if claim_url: - logger.info("Claim URL: %s", claim_url) - return - except UploadError as exc: - logger.error("Registration failed: %s", exc) - sys.exit(1) - - if args.upload: - results_path = Path(args.upload) - if not results_path.exists(): - logger.error("Results file not found: %s", results_path) - sys.exit(1) - try: - from lib_upload import UploadError, upload_results - - result = upload_results(results_path) - if result.rank is not None: - logger.info("Uploaded to leaderboard: rank #%s", result.rank) - if result.leaderboard_url: - logger.info("View at: %s", result.leaderboard_url) - logger.info("Upload complete.") - return - except UploadError as exc: - logger.error("Upload failed: %s", exc) - sys.exit(1) - - logger.info("πŸ”§ Initializing BenchmarkRunner...") - runner = BenchmarkRunner(tasks_dir) - - logger.info("πŸ“‚ Loading tasks from directory...") - runner.load_tasks() - - model_slug = slugify_model(args.model) - run_root = Path("/tmp/pinchbench") - run_id = _next_run_id(run_root) - skill_dir = skill_root - agent_id = f"bench-{model_slug}" - # Use a shared workspace for the agent - we'll copy fixtures per task - agent_workspace = Path(f"/tmp/pinchbench/{run_id}/agent_workspace") - - # Validate model exists before wasting time on tasks - if args.base_url: - logger.info("Using custom endpoint: %s (skipping OpenRouter validation)", args.base_url) - else: - try: - validate_openrouter_model(args.model) - except ModelValidationError as exc: - logger.error("❌ %s", exc) - sys.exit(1) - - ensure_agent_exists( - agent_id, - args.model, - agent_workspace, - base_url=args.base_url, - api_key=args.api_key, - ) - cleanup_agent_sessions(agent_id) - - task_ids = _select_task_ids(runner.tasks, args.suite, runner.task_loader.category_map) - results = [] - grades_by_task_id = {} - sanity_task_id = "task_sanity" - - tasks_to_run = runner.tasks - if task_ids is not None: - tasks_to_run = [task for task in runner.tasks if task.task_id in task_ids] - tasks_by_id = {task.task_id: task for task in tasks_to_run} - - runs_per_task = max(1, args.runs) - - # Incremental result writer: builds partial result JSON from completed - # tasks so external tools can poll progress while the benchmark runs. - incremental_dir = Path(args.output_dir) - incremental_dir.mkdir(parents=True, exist_ok=True) - incremental_path = incremental_dir / f"{run_id}_{model_slug}.json" - - category_map = runner.task_loader.category_map - category_order = runner.task_loader.categories - - def _build_task_entry(r: Dict[str, Any]) -> Dict[str, Any]: - """Build a single task entry dict for results JSON.""" - tid = r["task_id"] - entry: Dict[str, Any] = { - "task_id": tid, - "status": r["status"], - "timed_out": r["timed_out"], - "execution_time": r["execution_time"], - "transcript_length": len(r["transcript"]), - "usage": r.get("usage", {}), - "workspace": r["workspace"], - "grading": grades_by_task_id.get(tid, {}), - "frontmatter": tasks_by_id[tid].frontmatter, - } - if category_map: - entry["category"] = category_map.get(tid, "") - return entry - - def _write_incremental_results(): - task_entries = [_build_task_entry(r) for r in results] - efficiency = _compute_efficiency_summary(task_entries, grades_by_task_id) - cat_scores = _compute_category_scores(task_entries, tasks_by_id, category_order) - partial = { - "model": args.model, - "benchmark_version": _get_benchmark_version(skill_root), - "run_id": run_id, - "timestamp": time.time(), - "suite": args.suite, - "runs_per_task": runs_per_task, - "tasks": task_entries, - "category_scores": cat_scores, - "efficiency": efficiency, - "in_progress": True, - "completed_tasks": len(grades_by_task_id), - "total_tasks": len(tasks_to_run), - } - try: - incremental_path.write_text(json.dumps(partial, indent=2), encoding="utf-8") - except OSError: - pass - - # Phase 1: Execute all tasks - task_execution_map = {} # Maps task_id -> list of (task, result) tuples for each run - - # Initialize judge executor for parallel grading + # Initialize judge executor for parallel grading judge_executor: Optional[ThreadPoolExecutor] = None if not args.no_parallel_judge: judge_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="judge") @@ -1075,136 +388,6 @@ def _write_incremental_results(): logger.info("🧹 Shutting down judge executor...") judge_executor.shutdown(wait=True) - logger.info("\n%s", "=" * 80) - logger.info("πŸ“Š Phase 1 complete: All tasks executed. Starting grading phase...") - logger.info("%s", "=" * 80) - - # Phase 2: Grade in batches - batch_size = args.batch_size - - # Flatten all task data across all runs - all_task_data = [] - for task_id in task_execution_map: - all_task_data.extend(task_execution_map[task_id]) - - # Process in batches - for batch_start in range(0, len(all_task_data), batch_size): - batch_end = min(batch_start + batch_size, len(all_task_data)) - batch_items = all_task_data[batch_start:batch_end] - - batch_tasks = [item["task"] for item in batch_items] - batch_results = [item["result"] for item in batch_items] - - logger.info( - "\nπŸ“Š Grading batch %d-%d of %d total task runs...", - batch_start + 1, - batch_end, - len(all_task_data), - ) - - try: - grade_kwargs = dict( - tasks=batch_tasks, - execution_results=batch_results, - skill_dir=skill_dir, - verbose=args.verbose, - ) - if args.judge: - grade_kwargs["judge_model"] = args.judge - grade_kwargs["judge_backend"] = "api" - - batch_grades = grade_tasks_batch(**grade_kwargs) - except Exception as exc: - logger.warning("Batch grading failed, using individual fallback: %s", exc) - # Fallback to individual grading - batch_grades = [] - for item in batch_items: - task = item["task"] - result = item["result"] - try: - individual_kwargs = dict( - task=task, - execution_result=result, - skill_dir=skill_dir, - verbose=args.verbose, - use_judge_cache=not args.no_judge_cache, - ) - if args.judge: - individual_kwargs["judge_model"] = args.judge - individual_kwargs["judge_backend"] = "api" - grade = grade_task(**individual_kwargs) - except Exception as inner_exc: - logger.warning("Individual grading also failed for %s: %s", task.task_id, inner_exc) - grade = GradeResult( - task_id=task.task_id, - score=0.0, - max_score=1.0, - grading_type=task.grading_type, - breakdown={}, - notes=f"Grading failed: {inner_exc}", - ) - batch_grades.append(grade) - - # Store grades back into the items - for item, grade in zip(batch_items, batch_grades): - item["grade"] = grade - - # Log score - score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 - status_emoji = ( - "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" - ) - logger.info( - "%s Task %s: %.1f/%.1f (%.0f%%) - %s", - status_emoji, - item["task"].task_id, - grade.score, - grade.max_score, - score_pct, - grade.grading_type, - ) - if grade.notes: - logger.info(" Notes: %s", grade.notes[:200]) - - # Phase 3: Aggregate grades by task - for task_id, items in task_execution_map.items(): - task_grades = [item["grade"] for item in items] - task_scores = [grade.score for grade in task_grades] - - grades_by_task_id[task_id] = { - "runs": [grade.to_dict() for grade in task_grades], - "mean": statistics.mean(task_scores), - "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, - "min": min(task_scores), - "max": max(task_scores), - } - - # Fail-fast check for sanity task - task = items[0]["task"] # Get task object from first item - all_runs_missing_transcript = all( - not item["result"].get("transcript") for item in items - ) - - if ( - task.task_id == sanity_task_id - and grades_by_task_id[task.task_id]["mean"] == 0.0 - and not args.no_fail_fast - and not all_runs_missing_transcript - ): - logger.error( - "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run to avoid wasting resources.", - sanity_task_id, - ) - sys.exit(3) - if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: - if all_runs_missing_transcript: - logger.warning( - "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." - ) - - # Write final incremental results - _write_incremental_results() - output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / f"{run_id}_{model_slug}.json" From fa4f1f1a790ac8f7ac496614938419601728a08f Mon Sep 17 00:00:00 2001 From: Brendan O'Leary Date: Wed, 22 Apr 2026 10:34:31 -0400 Subject: [PATCH 4/7] revert: remove parallel judge (moving to separate PR) --- scripts/benchmark.py | 819 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 818 insertions(+), 1 deletion(-) diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 9ca04c4e..52bf0a6c 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -126,7 +126,694 @@ def run_benchmark( logger.info(f"🎯 Running benchmark on all {len(tasks_to_run)} tasks") results = [] - # Initialize judge executor for parallel grading + for i, task in enumerate(tasks_to_run, 1): + logger.info(f"\n{'=' * 80}") + logger.info(f"πŸ“‹ Task {i}/{len(tasks_to_run)}") + logger.info(f"{'=' * 80}") + result = agent.execute_task(task, simulate=simulate) + results.append(result) + + logger.info(f"\n{'=' * 80}") + logger.info(f"✨ Benchmark complete! Executed {len(results)} tasks") + logger.info(f"{'=' * 80}") + + # Print summary + total_time = sum(r["execution_time"] for r in results) + logger.info("\nπŸ“Š BENCHMARK SUMMARY") + logger.info(f" Agent: {agent.agent_id}") + logger.info(f" Tasks completed: {len(results)}") + logger.info(f" Total execution time: {total_time:.2f}s") + logger.info(f" Average time per task: {total_time / len(results):.2f}s") + + return results + + def print_task_summary(self) -> None: + """Print a summary of all loaded tasks.""" + if not self.tasks: + logger.warning("No tasks loaded") + return + + print("\n" + "=" * 80) + print(f"LOADED TASKS SUMMARY ({len(self.tasks)} tasks)") + print("=" * 80) + + for task in self.tasks: + print(f"\n[{task.task_id}] {task.name}") + print(f" Category: {task.category}") + print(f" Grading: {task.grading_type}") + print(f" Timeout: {task.timeout_seconds}s") + print(f" Criteria: {len(task.grading_criteria)} items") + print( + f" Prompt: {task.prompt[:100]}..." + if len(task.prompt) > 100 + else f" Prompt: {task.prompt}" + ) + + print("\n" + "=" * 80) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="PinchBench OpenClaw Benchmark Runner") + parser.add_argument( + "--model", + required=False, + help="Model identifier (e.g., anthropic/claude-sonnet-4)", + ) + parser.add_argument( + "--suite", + default="all", + help='Tasks to run: "all", "automated-only", a category name (e.g. "coding"), or comma-separated task IDs', + ) + parser.add_argument( + "--output-dir", + default="results", + help="Results directory", + ) + parser.add_argument( + "--register", + action="store_true", + help="Request a new API token and save it to local config", + ) + parser.add_argument( + "--no-upload", + action="store_true", + help="Skip uploading to server", + ) + parser.add_argument( + "--upload", + type=str, + metavar="RESULTS_JSON", + help="Upload a previous run's results JSON and exit (skips benchmarking)", + ) + parser.add_argument( + "--timeout-multiplier", + type=float, + default=1.0, + help="Scale all task timeouts", + ) + parser.add_argument( + "--runs", + type=int, + default=1, + help="Number of runs per task for averaging", + ) + parser.add_argument( + "--judge", + default=None, + help=( + "Judge model or backend. Default (unset): OpenClaw agent session with " + "openrouter/anthropic/claude-opus-4.5. Set to a model ID to call its API " + "directly (e.g. openai/gpt-4o, anthropic/claude-sonnet-4-5-20250514, claude)" + ), + ) + parser.add_argument( + "--base-url", + default=None, + help="Custom OpenAI-compatible API base URL (bypasses OpenRouter validation)", + ) + parser.add_argument( + "--api-key", + default=None, + help="API key for custom endpoint (default: $OPENAI_API_KEY env var)", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable verbose logging (shows transcript contents, workspace files, etc.)", + ) + parser.add_argument( + "--official-key", + type=str, + metavar="KEY", + help="Official key to mark submission as official (can also use PINCHBENCH_OFFICIAL_KEY env var)", + ) + parser.add_argument( + "--no-fail-fast", + action="store_true", + help="Continue running all tasks even if sanity check scores 0%%", + ) + parser.add_argument( + "--trend", + action="store_true", + help="Run trend analysis after benchmark completes (requires β‰₯2 runs in output dir)", + ) + parser.add_argument( + "--trend-window", + type=int, + default=10, + metavar="N", + help="Number of recent runs to include in trend analysis (default: 10)", + ) + parser.add_argument( + "--trend-threshold", + type=float, + default=-0.5, + help="Slope (%%/run) below which regression is flagged (default: -0.5)", + ) + parser.add_argument( + "--no-parallel-judge", + action="store_true", + help="Disable parallel judge execution (grade synchronously after each task)", + ) + parser.add_argument( + "--no-judge-cache", + action="store_true", + help="Disable judge response caching (forces fresh evaluation)", + ) + args = parser.parse_args() + + # Validate --trend-window + if args.trend_window < 2: + parser.error("--trend-window must be >= 2") + + return args + + +def _select_task_ids( + tasks: List[Task], + suite: str, + category_map: Optional[Dict[str, str]] = None, +) -> Optional[List[str]]: + if suite == "all": + return None + if suite == "automated-only": + return [task.task_id for task in tasks if task.grading_type == "automated"] + + # Check if suite matches a manifest category name + if category_map: + available_categories = set(category_map.values()) + # Support "+" syntax for combining categories: "coding+research" + requested = [s.strip() for s in suite.split("+")] + if all(r in available_categories for r in requested): + requested_set = set(requested) + return [ + task.task_id for task in tasks if category_map.get(task.task_id) in requested_set + ] + + # Fall back to comma-separated task IDs + return [task_id.strip() for task_id in suite.split(",") if task_id.strip()] + + +def _next_run_id(run_root: Path) -> str: + run_root.mkdir(parents=True, exist_ok=True) + existing = [] + for entry in run_root.iterdir(): + if entry.is_dir() and entry.name.isdigit(): + existing.append(int(entry.name)) + next_id = (max(existing) + 1) if existing else 1 + return f"{next_id:04d}" + + +def _load_ascii_art(script_dir: Path, filename: str) -> str | None: + """Load ASCII art from a local file if available.""" + art_path = script_dir / filename + try: + return art_path.read_text(encoding="utf-8").rstrip("\n") + except FileNotFoundError: + return None + + +def _supports_truecolor() -> bool: + if os.environ.get("NO_COLOR"): + return False + return sys.stdout.isatty() + + +def _get_benchmark_version(script_dir: Path) -> str: + try: + return importlib.metadata.version("pinchbench") + except Exception: + pass + + version_file = script_dir / "BENCHMARK_VERSION" + if version_file.is_file(): + try: + return version_file.read_text().strip() + except Exception: + pass + + def _split_semver(tag: str) -> Optional[tuple[int, int, int]]: + cleaned = tag.strip() + if cleaned.startswith("v"): + cleaned = cleaned[1:] + match = re.match(r"^(\d+)\.(\d+)\.(\d+)$", cleaned) + if not match: + return None + return int(match.group(1)), int(match.group(2)), int(match.group(3)) + + try: + result = subprocess.run( + [ + "git", + "describe", + "--tags", + "--long", + "--match", + "v[0-9]*.[0-9]*.[0-9]*", + "--match", + "[0-9]*.[0-9]*.[0-9]*", + ], + capture_output=True, + text=True, + timeout=2, + check=False, + cwd=script_dir, + ) + if result.returncode == 0: + described = result.stdout.strip() + describe_match = re.match(r"^(.+)-(\d+)-g([0-9a-fA-F]+)$", described) + if describe_match: + raw_tag = describe_match.group(1) + commit_distance = int(describe_match.group(2)) + short_sha = describe_match.group(3) + parsed_tag = _split_semver(raw_tag) + if parsed_tag is not None: + major, minor, patch = parsed_tag + if commit_distance == 0: + return f"{major}.{minor}.{patch}" + next_patch = patch + 1 + return f"{major}.{minor}.{next_patch}-dev.{commit_distance}+g{short_sha}" + if described: + return described + except (subprocess.SubprocessError, FileNotFoundError, OSError): + pass + + try: + result = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + capture_output=True, + text=True, + timeout=2, + check=False, + cwd=script_dir, + ) + except (subprocess.SubprocessError, FileNotFoundError, OSError): + return "" + if result.returncode != 0: + return "" + return result.stdout.strip() + + +def _colorize_gradient(ascii_art: str) -> str: + if not _supports_truecolor(): + return ascii_art + lines = ascii_art.splitlines() + if not lines: + return ascii_art + last_index = max(len(lines) - 1, 1) + colored_lines = [] + for idx, line in enumerate(lines): + t = idx / last_index + green_blue = int(255 * (1 - t)) + colored_lines.append(f"\x1b[38;2;255;{green_blue};{green_blue}m{line}\x1b[0m") + return "\n".join(colored_lines) + + +def _compute_efficiency_summary( + task_entries: List[Dict[str, Any]], + grades_by_task_id: Dict[str, Dict[str, Any]], +) -> Dict[str, Any]: + """Compute aggregate token efficiency metrics across all tasks. + + Returns a dict with total token usage, cost, and efficiency ratios + (score per token, score per dollar) so that different models can be + compared not just on quality but also on resource consumption. + """ + total_input_tokens = 0 + total_output_tokens = 0 + total_tokens = 0 + total_cost_usd = 0.0 + total_requests = 0 + total_execution_time = 0.0 + tasks_with_usage = 0 + + per_task_efficiency: List[Dict[str, Any]] = [] + for entry in task_entries: + usage = entry.get("usage", {}) + task_id = entry["task_id"] + grading = grades_by_task_id.get(task_id, {}) + score = float(grading.get("mean", 0.0)) + + inp = int(usage.get("input_tokens", 0)) + out = int(usage.get("output_tokens", 0)) + tot = int(usage.get("total_tokens", 0)) + cost = float(usage.get("cost_usd", 0.0) or 0.0) + reqs = int(usage.get("request_count", 0)) + exec_time = float(entry.get("execution_time", 0.0) or 0.0) + + total_input_tokens += inp + total_output_tokens += out + total_tokens += tot + total_cost_usd += cost + total_requests += reqs + total_execution_time += exec_time + + if tot > 0: + tasks_with_usage += 1 + + per_task_efficiency.append( + { + "task_id": task_id, + "score": round(score, 4), + "total_tokens": tot, + "cost_usd": round(cost, 6), + "tokens_per_score_point": round(tot / score, 1) if score > 0 else None, + } + ) + + # Aggregate scores + all_scores = [float(g.get("mean", 0.0)) for g in grades_by_task_id.values()] + total_score = sum(all_scores) + num_tasks = len(all_scores) + + summary: Dict[str, Any] = { + "total_tokens": total_tokens, + "total_input_tokens": total_input_tokens, + "total_output_tokens": total_output_tokens, + "total_cost_usd": round(total_cost_usd, 6), + "total_requests": total_requests, + "total_execution_time_seconds": round(total_execution_time, 2), + "tasks_with_usage_data": tasks_with_usage, + "tokens_per_task": round(total_tokens / num_tasks, 1) if num_tasks > 0 else 0, + "cost_per_task_usd": round(total_cost_usd / num_tasks, 6) if num_tasks > 0 else 0, + "score_per_1k_tokens": ( + round(total_score / (total_tokens / 1000), 6) if total_tokens > 0 else None + ), + "score_per_dollar": ( + round(total_score / total_cost_usd, 4) if total_cost_usd > 0 else None + ), + "per_task": per_task_efficiency, + } + return summary + + +def _log_efficiency_summary( + efficiency: Dict[str, Any], + grades_by_task_id: Dict[str, Dict[str, Any]], +) -> None: + """Log a human-readable token efficiency summary.""" + all_scores = [float(g.get("mean", 0.0)) for g in grades_by_task_id.values()] + mean_score = statistics.mean(all_scores) if all_scores else 0.0 + + logger.info("\n%s", "=" * 80) + logger.info("πŸ“Š TOKEN EFFICIENCY SUMMARY") + logger.info("%s", "=" * 80) + logger.info( + " Total tokens used: %s (input: %s, output: %s)", + f"{efficiency['total_tokens']:,}", + f"{efficiency['total_input_tokens']:,}", + f"{efficiency['total_output_tokens']:,}", + ) + logger.info(" Total API requests: %s", f"{efficiency['total_requests']:,}") + if efficiency["total_cost_usd"] > 0: + logger.info(" Total cost: $%.4f", efficiency["total_cost_usd"]) + logger.info( + " Avg tokens/task: %s", + f"{efficiency['tokens_per_task']:,.0f}", + ) + logger.info(" Mean score: %.4f", mean_score) + if efficiency.get("score_per_1k_tokens") is not None: + logger.info( + " Score per 1K tokens: %.4f (higher = more efficient)", + efficiency["score_per_1k_tokens"], + ) + if efficiency.get("score_per_dollar") is not None: + logger.info( + " Score per dollar: %.4f (higher = more cost-efficient)", + efficiency["score_per_dollar"], + ) + logger.info("%s", "=" * 80) + + +def _compute_category_scores( + task_entries: List[Dict[str, Any]], + tasks_by_id: Dict[str, Any], + category_order: Optional[List[str]] = None, +) -> Dict[str, Dict[str, Any]]: + """Compute per-category score rollups from task results. + + Returns a dict mapping category name to a dict with keys: + ``score``, ``max_score``, ``pct``, ``task_count``. + + If *category_order* is provided it is used only for ordering the returned + dict (Python 3.7+ preserves insertion order). Categories not in the order + list are appended alphabetically. + """ + raw: Dict[str, Dict[str, float]] = {} + + for entry in task_entries: + task_id = entry["task_id"] + task = tasks_by_id.get(task_id) + if not task: + continue + + category = task.category.upper() if task.category else "UNCATEGORIZED" + grading = entry.get("grading", {}) + mean_score = float(grading.get("mean", 0.0)) + max_score = 1.0 # Each task is scored 0-1 + + if category not in raw: + raw[category] = {"score": 0.0, "max_score": 0.0, "task_count": 0} + + raw[category]["score"] += mean_score + raw[category]["max_score"] += max_score + raw[category]["task_count"] += 1 + + # Determine display order + if category_order: + ordered_keys = [c.upper() for c in category_order if c.upper() in raw] + remaining = sorted(k for k in raw if k not in ordered_keys) + ordered_keys.extend(remaining) + else: + ordered_keys = sorted(raw.keys()) + + result: Dict[str, Dict[str, Any]] = {} + for cat in ordered_keys: + data = raw[cat] + pct = (data["score"] / data["max_score"] * 100) if data["max_score"] > 0 else 0 + result[cat] = { + "score": round(data["score"], 6), + "max_score": round(data["max_score"], 6), + "pct": round(pct, 1), + "task_count": int(data["task_count"]), + } + + return result + + +def _log_category_summary( + task_entries: List[Dict[str, Any]], + tasks_by_id: Dict[str, Any], + category_order: Optional[List[str]] = None, +) -> Dict[str, Dict[str, Any]]: + """Log a summary grouped by category, matching the PinchBench website format. + + Returns the computed category_scores dict so callers can embed it in + results JSON. + """ + category_scores = _compute_category_scores(task_entries, tasks_by_id, category_order) + + # Calculate overall totals + total_earned = sum(c["score"] for c in category_scores.values()) + total_possible = sum(c["max_score"] for c in category_scores.values()) + overall_pct = (total_earned / total_possible * 100) if total_possible > 0 else 0 + + logger.info("\n%s", "=" * 80) + logger.info("πŸ¦€ PINCHBENCH SCORE SUMMARY") + logger.info("%s", "=" * 80) + logger.info("") + logger.info(" Overall Score: %.1f%% (%.1f / %.1f)", overall_pct, total_earned, total_possible) + logger.info("") + logger.info(" %-20s %8s %12s", "CATEGORY", "SCORE", "TASKS") + logger.info(" %s", "-" * 44) + + for category, data in category_scores.items(): + pct = data["pct"] + task_count = data["task_count"] + task_label = "task" if task_count == 1 else "tasks" + + # Color indicator based on score + if pct >= 90: + indicator = "🟒" + elif pct >= 70: + indicator = "🟑" + else: + indicator = "πŸ”΄" + + logger.info( + " %s %-17s %6.1f%% %6d %s", + indicator, + category, + pct, + task_count, + task_label, + ) + + logger.info(" %s", "-" * 44) + logger.info("%s", "=" * 80) + + return category_scores + + +def main(): + """Main entry point for the benchmark script.""" + # Determine tasks directory + script_dir = Path(__file__).parent + skill_root = script_dir.parent # Parent of scripts/ is the skill root + tasks_dir = skill_root / "tasks" + + logger.info("πŸ¦žπŸ¦€πŸ¦ PinchBench - OpenClaw Benchmarking") + ascii_crab = _load_ascii_art(skill_root, "crab.txt") + if ascii_crab: + print("\n" + _colorize_gradient(ascii_crab) + "\n") + else: + print("\n" + "πŸ¦€ " * 30) + print("πŸ¦€ " * 30 + "\n") + logger.info("πŸ¦žπŸ¦€πŸ¦ Starting PinchBench πŸ¦πŸ¦€πŸ¦ž") + time.sleep(5) + + if not tasks_dir.exists(): + logger.error(f"❌ Tasks directory not found: {tasks_dir}") + sys.exit(1) + + args = _parse_args() + if not args.model and not args.register and not args.upload: + logger.error("Missing required argument: --model (unless using --register or --upload)") + sys.exit(2) + + if args.register: + try: + from lib_upload import UploadError, register_token, save_token_config + + token, claim_url = register_token() + config_path = save_token_config(token, claim_url) + logger.info("Saved token to %s", config_path) + if claim_url: + logger.info("Claim URL: %s", claim_url) + return + except UploadError as exc: + logger.error("Registration failed: %s", exc) + sys.exit(1) + + if args.upload: + results_path = Path(args.upload) + if not results_path.exists(): + logger.error("Results file not found: %s", results_path) + sys.exit(1) + try: + from lib_upload import UploadError, upload_results + + result = upload_results(results_path) + if result.rank is not None: + logger.info("Uploaded to leaderboard: rank #%s", result.rank) + if result.leaderboard_url: + logger.info("View at: %s", result.leaderboard_url) + logger.info("Upload complete.") + return + except UploadError as exc: + logger.error("Upload failed: %s", exc) + sys.exit(1) + + logger.info("πŸ”§ Initializing BenchmarkRunner...") + runner = BenchmarkRunner(tasks_dir) + + logger.info("πŸ“‚ Loading tasks from directory...") + runner.load_tasks() + + model_slug = slugify_model(args.model) + run_root = Path("/tmp/pinchbench") + run_id = _next_run_id(run_root) + skill_dir = skill_root + agent_id = f"bench-{model_slug}" + # Use a shared workspace for the agent - we'll copy fixtures per task + agent_workspace = Path(f"/tmp/pinchbench/{run_id}/agent_workspace") + + # Validate model exists before wasting time on tasks + if args.base_url: + logger.info("Using custom endpoint: %s (skipping OpenRouter validation)", args.base_url) + else: + try: + validate_openrouter_model(args.model) + except ModelValidationError as exc: + logger.error("❌ %s", exc) + sys.exit(1) + + ensure_agent_exists( + agent_id, + args.model, + agent_workspace, + base_url=args.base_url, + api_key=args.api_key, + ) + cleanup_agent_sessions(agent_id) + + task_ids = _select_task_ids(runner.tasks, args.suite, runner.task_loader.category_map) + results = [] + grades_by_task_id = {} + sanity_task_id = "task_sanity" + + tasks_to_run = runner.tasks + if task_ids is not None: + tasks_to_run = [task for task in runner.tasks if task.task_id in task_ids] + tasks_by_id = {task.task_id: task for task in tasks_to_run} + + runs_per_task = max(1, args.runs) + + # Incremental result writer: builds partial result JSON from completed + # tasks so external tools can poll progress while the benchmark runs. + incremental_dir = Path(args.output_dir) + incremental_dir.mkdir(parents=True, exist_ok=True) + incremental_path = incremental_dir / f"{run_id}_{model_slug}.json" + + category_map = runner.task_loader.category_map + category_order = runner.task_loader.categories + + def _build_task_entry(r: Dict[str, Any]) -> Dict[str, Any]: + """Build a single task entry dict for results JSON.""" + tid = r["task_id"] + entry: Dict[str, Any] = { + "task_id": tid, + "status": r["status"], + "timed_out": r["timed_out"], + "execution_time": r["execution_time"], + "transcript_length": len(r["transcript"]), + "usage": r.get("usage", {}), + "workspace": r["workspace"], + "grading": grades_by_task_id.get(tid, {}), + "frontmatter": tasks_by_id[tid].frontmatter, + } + if category_map: + entry["category"] = category_map.get(tid, "") + return entry + + def _write_incremental_results(): + task_entries = [_build_task_entry(r) for r in results] + efficiency = _compute_efficiency_summary(task_entries, grades_by_task_id) + cat_scores = _compute_category_scores(task_entries, tasks_by_id, category_order) + partial = { + "model": args.model, + "benchmark_version": _get_benchmark_version(skill_root), + "run_id": run_id, + "timestamp": time.time(), + "suite": args.suite, + "runs_per_task": runs_per_task, + "tasks": task_entries, + "category_scores": cat_scores, + "efficiency": efficiency, + "in_progress": True, + "completed_tasks": len(grades_by_task_id), + "total_tasks": len(tasks_to_run), + } + try: + incremental_path.write_text(json.dumps(partial, indent=2), encoding="utf-8") + except OSError: + pass + + # Phase 1: Execute all tasks + task_execution_map = {} # Maps task_id -> list of (task, result) tuples for each run + + # Initialize judge executor for parallel grading judge_executor: Optional[ThreadPoolExecutor] = None if not args.no_parallel_judge: judge_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="judge") @@ -388,6 +1075,136 @@ def run_benchmark( logger.info("🧹 Shutting down judge executor...") judge_executor.shutdown(wait=True) + logger.info("\n%s", "=" * 80) + logger.info("πŸ“Š Phase 1 complete: All tasks executed. Starting grading phase...") + logger.info("%s", "=" * 80) + + # Phase 2: Grade in batches + batch_size = args.batch_size + + # Flatten all task data across all runs + all_task_data = [] + for task_id in task_execution_map: + all_task_data.extend(task_execution_map[task_id]) + + # Process in batches + for batch_start in range(0, len(all_task_data), batch_size): + batch_end = min(batch_start + batch_size, len(all_task_data)) + batch_items = all_task_data[batch_start:batch_end] + + batch_tasks = [item["task"] for item in batch_items] + batch_results = [item["result"] for item in batch_items] + + logger.info( + "\nπŸ“Š Grading batch %d-%d of %d total task runs...", + batch_start + 1, + batch_end, + len(all_task_data), + ) + + try: + grade_kwargs = dict( + tasks=batch_tasks, + execution_results=batch_results, + skill_dir=skill_dir, + verbose=args.verbose, + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + + batch_grades = grade_tasks_batch(**grade_kwargs) + except Exception as exc: + logger.warning("Batch grading failed, using individual fallback: %s", exc) + # Fallback to individual grading + batch_grades = [] + for item in batch_items: + task = item["task"] + result = item["result"] + try: + individual_kwargs = dict( + task=task, + execution_result=result, + skill_dir=skill_dir, + verbose=args.verbose, + use_judge_cache=not args.no_judge_cache, + ) + if args.judge: + individual_kwargs["judge_model"] = args.judge + individual_kwargs["judge_backend"] = "api" + grade = grade_task(**individual_kwargs) + except Exception as inner_exc: + logger.warning("Individual grading also failed for %s: %s", task.task_id, inner_exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type=task.grading_type, + breakdown={}, + notes=f"Grading failed: {inner_exc}", + ) + batch_grades.append(grade) + + # Store grades back into the items + for item, grade in zip(batch_items, batch_grades): + item["grade"] = grade + + # Log score + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" + ) + logger.info( + "%s Task %s: %.1f/%.1f (%.0f%%) - %s", + status_emoji, + item["task"].task_id, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + + # Phase 3: Aggregate grades by task + for task_id, items in task_execution_map.items(): + task_grades = [item["grade"] for item in items] + task_scores = [grade.score for grade in task_grades] + + grades_by_task_id[task_id] = { + "runs": [grade.to_dict() for grade in task_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), + } + + # Fail-fast check for sanity task + task = items[0]["task"] # Get task object from first item + all_runs_missing_transcript = all( + not item["result"].get("transcript") for item in items + ) + + if ( + task.task_id == sanity_task_id + and grades_by_task_id[task.task_id]["mean"] == 0.0 + and not args.no_fail_fast + and not all_runs_missing_transcript + ): + logger.error( + "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run to avoid wasting resources.", + sanity_task_id, + ) + sys.exit(3) + if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: + if all_runs_missing_transcript: + logger.warning( + "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." + ) + + # Write final incremental results + _write_incremental_results() + output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / f"{run_id}_{model_slug}.json" From 625e6afe622713a5e7fb542e83580643beae80d5 Mon Sep 17 00:00:00 2001 From: Brendan O'Leary Date: Wed, 22 Apr 2026 13:01:49 -0400 Subject: [PATCH 5/7] feat: implement grade_tasks_batch for batched LLM judge grading Adds the missing batch grading implementation: - grade_tasks_batch(): Main entry point that separates tasks by type - Automated tasks: graded individually (already fast) - LLM judge tasks: batched into single API call - Hybrid tasks: automated done individually, LLM parts batched - _batch_grade_llm_judge(): Builds combined prompt with all tasks, expects JSON array response with scores for each task - _parse_batch_response(): Parses JSON array, handles code blocks, validates structure, extracts scores - _fallback_individual_grading(): Falls back gracefully if batch parsing fails Timeout scales with batch size. Robust error handling throughout. --- scripts/lib_grading.py | 351 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 351 insertions(+) diff --git a/scripts/lib_grading.py b/scripts/lib_grading.py index c78369df..6e4463ed 100644 --- a/scripts/lib_grading.py +++ b/scripts/lib_grading.py @@ -96,6 +96,357 @@ def grade_task( raise ValueError(f"Unknown grading type: {grading_type}") +def grade_tasks_batch( + *, + tasks: List[Task], + execution_results: List[Dict[str, Any]], + skill_dir: Path, + judge_model: str = DEFAULT_JUDGE_MODEL, + judge_agent_prefix: str = DEFAULT_JUDGE_AGENT_PREFIX, + judge_timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS, + judge_backend: str = "openclaw", + verbose: bool = False, +) -> List[GradeResult]: + """ + Grade multiple tasks in a single batch. + + For tasks requiring LLM judge, this batches them into a single API call + to reduce overhead. Automated grading is still done individually (it's fast). + + Returns a list of GradeResults in the same order as the input tasks. + """ + if len(tasks) != len(execution_results): + raise ValueError(f"Mismatch: {len(tasks)} tasks but {len(execution_results)} results") + + if not tasks: + return [] + + results: List[Optional[GradeResult]] = [None] * len(tasks) + + # Separate tasks by grading type + automated_indices = [] + llm_judge_indices = [] + hybrid_indices = [] + + for i, task in enumerate(tasks): + if task.grading_type == "automated": + automated_indices.append(i) + elif task.grading_type == "llm_judge": + llm_judge_indices.append(i) + elif task.grading_type == "hybrid": + hybrid_indices.append(i) + else: + # Unknown type - grade individually and let it raise + results[i] = grade_task( + task=task, + execution_result=execution_results[i], + skill_dir=skill_dir, + judge_model=judge_model, + judge_backend=judge_backend, + verbose=verbose, + ) + + # Grade automated tasks individually (they're fast) + for i in automated_indices: + results[i] = _grade_automated(tasks[i], execution_results[i], skill_dir=skill_dir, verbose=verbose) + + # Batch LLM judge tasks + if llm_judge_indices: + llm_tasks = [tasks[i] for i in llm_judge_indices] + llm_results = [execution_results[i] for i in llm_judge_indices] + batch_grades = _batch_grade_llm_judge( + tasks=llm_tasks, + execution_results=llm_results, + judge_model=judge_model, + judge_agent_prefix=judge_agent_prefix, + judge_timeout_seconds=judge_timeout_seconds * len(llm_tasks), # Scale timeout + judge_backend=judge_backend, + skill_dir=skill_dir, + verbose=verbose, + ) + for idx, grade in zip(llm_judge_indices, batch_grades): + results[idx] = grade + + # Hybrid tasks: automated is fast, batch the LLM parts + if hybrid_indices: + # First do automated grading for all hybrid tasks + auto_grades = {} + for i in hybrid_indices: + auto_grades[i] = _grade_automated(tasks[i], execution_results[i], skill_dir=skill_dir, verbose=verbose) + + # Batch the LLM judge part + hybrid_tasks = [tasks[i] for i in hybrid_indices] + hybrid_results = [execution_results[i] for i in hybrid_indices] + llm_grades = _batch_grade_llm_judge( + tasks=hybrid_tasks, + execution_results=hybrid_results, + judge_model=judge_model, + judge_agent_prefix=judge_agent_prefix, + judge_timeout_seconds=judge_timeout_seconds * len(hybrid_tasks), + judge_backend=judge_backend, + skill_dir=skill_dir, + verbose=verbose, + ) + + # Combine auto + llm for each hybrid task + for idx, llm_grade in zip(hybrid_indices, llm_grades): + results[idx] = _combine_grades(tasks[idx], auto_grades[idx], llm_grade) + + # Verify all results filled + for i, result in enumerate(results): + if result is None: + raise RuntimeError(f"Bug: no grade computed for task index {i} ({tasks[i].task_id})") + + return results # type: ignore + + +def _batch_grade_llm_judge( + *, + tasks: List[Task], + execution_results: List[Dict[str, Any]], + judge_model: str, + judge_agent_prefix: str, + judge_timeout_seconds: float, + judge_backend: str = "openclaw", + skill_dir: Optional[Path] = None, + verbose: bool = False, +) -> List[GradeResult]: + """ + Grade multiple tasks with a single LLM judge call. + + Builds a combined prompt with all tasks and parses a JSON array response. + Falls back to individual grading if batch parsing fails. + """ + if not tasks: + return [] + + # Build combined prompt + prompt_parts = [ + "You are grading multiple benchmark tasks. For each task, evaluate the agent's performance against the rubric.", + "", + "Respond with a JSON array containing one object per task, in order:", + "```json", + "[", + ' {"task_id": "task_name", "scores": {"criterion1": 0.8, "criterion2": 1.0}, "total": 0.85, "notes": "Brief explanation"},', + " ...", + "]", + "```", + "", + "IMPORTANT: Return ONLY the JSON array. Ensure 'total' is a float between 0.0 and 1.0.", + "", + "=" * 60, + "", + ] + + for i, (task, result) in enumerate(zip(tasks, execution_results)): + transcript = result.get("transcript", []) + transcript_summary = _summarize_transcript(transcript) + workspace_content = _read_workspace_files(result.get("workspace", "")) + rubric = task.llm_judge_rubric or _format_grading_criteria(task) + + prompt_parts.extend([ + f"## Task {i + 1}: {task.task_id}", + f"**Name:** {task.name}", + f"**Category:** {task.category}", + "", + "**Rubric:**", + rubric if rubric else "(No specific rubric provided)", + "", + "**Agent Transcript:**", + transcript_summary[:3000] if transcript_summary else "(Empty transcript - task may have failed)", + "", + ]) + if workspace_content: + prompt_parts.extend([ + "**Workspace Files:**", + workspace_content[:1500], + "", + ]) + prompt_parts.append("=" * 60) + prompt_parts.append("") + + combined_prompt = "\n".join(prompt_parts) + + if verbose: + logger.info(" [VERBOSE] Batch judge prompt length: %d chars for %d tasks", len(combined_prompt), len(tasks)) + + # Make the API call + if judge_backend == "api": + judge_result = call_judge_api( + prompt=combined_prompt, + model=judge_model, + timeout_seconds=judge_timeout_seconds, + ) + + if judge_result.get("status") != "success": + logger.warning("Batch judge API call failed: %s", judge_result.get("error", judge_result.get("status"))) + # Fall back to individual grading + return _fallback_individual_grading( + tasks, execution_results, judge_model, judge_agent_prefix, + judge_timeout_seconds / len(tasks), judge_backend, skill_dir, verbose + ) + + raw_text = judge_result.get("text", "") + else: + # OpenClaw agent backend + judge_skill_dir = skill_dir if skill_dir is not None else Path.cwd() + agent_id = _ensure_judge_agent(judge_agent_prefix, judge_model, judge_skill_dir) + judge_workspace = Path(f"/tmp/pinchbench/judge/batch_{int(time.time())}") + judge_result = run_openclaw_prompt( + agent_id=agent_id, + prompt=combined_prompt, + workspace=judge_workspace, + timeout_seconds=judge_timeout_seconds, + ) + + if judge_result.get("status") != "success": + logger.warning("Batch judge execution failed: %s", judge_result.get("status")) + return _fallback_individual_grading( + tasks, execution_results, judge_model, judge_agent_prefix, + judge_timeout_seconds / len(tasks), judge_backend, skill_dir, verbose + ) + + # Extract text from transcript + raw_text = "" + for entry in judge_result.get("transcript", []): + if entry.get("type") == "message": + msg = entry.get("message", {}) + if msg.get("role") == "assistant": + for item in msg.get("content", []): + if item.get("type") == "text": + raw_text += item.get("text", "") + + # Parse the JSON array response + grades = _parse_batch_response(raw_text, tasks, verbose) + + if len(grades) != len(tasks): + logger.warning("Batch response parsing returned %d grades for %d tasks, falling back", len(grades), len(tasks)) + return _fallback_individual_grading( + tasks, execution_results, judge_model, judge_agent_prefix, + judge_timeout_seconds / len(tasks), judge_backend, skill_dir, verbose + ) + + return grades + + +def _parse_batch_response(raw_text: str, tasks: List[Task], verbose: bool = False) -> List[GradeResult]: + """Parse a JSON array response from batch grading.""" + import re + + # Try to extract JSON array from the response + # Look for code block first + json_match = re.search(r"```(?:json)?\s*(\[[\s\S]*?\])\s*```", raw_text) + if json_match: + json_str = json_match.group(1) + else: + # Try to find bare JSON array + array_match = re.search(r"\[[\s\S]*\]", raw_text) + if array_match: + json_str = array_match.group(0) + else: + logger.warning("No JSON array found in batch response") + return [] + + try: + parsed = json.loads(json_str) + except json.JSONDecodeError as exc: + logger.warning("Failed to parse batch JSON: %s", exc) + return [] + + if not isinstance(parsed, list): + logger.warning("Batch response is not a list: %s", type(parsed)) + return [] + + if len(parsed) != len(tasks): + logger.warning("Batch response has %d items but expected %d", len(parsed), len(tasks)) + # Try to match by task_id if lengths differ + # For now, just return empty to trigger fallback + return [] + + grades = [] + for i, (item, task) in enumerate(zip(parsed, tasks)): + if not isinstance(item, dict): + logger.warning("Batch item %d is not a dict", i) + grades.append(GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type="llm_judge", + breakdown={}, + notes="Batch parsing failed for this item", + )) + continue + + # Extract score + total = item.get("total") + if total is None: + total = item.get("score", 0.0) + try: + total = float(total) + except (TypeError, ValueError): + total = 0.0 + + # Clamp to valid range + total = max(0.0, min(1.0, total)) + + # Extract breakdown scores + scores = item.get("scores", {}) + if not isinstance(scores, dict): + scores = {} + + notes = item.get("notes", "") or "" + + grades.append(GradeResult( + task_id=task.task_id, + score=total, + max_score=1.0, + grading_type="llm_judge", + breakdown=_normalize_score_dict(scores), + notes=str(notes)[:500], # Truncate long notes + )) + + return grades + + +def _fallback_individual_grading( + tasks: List[Task], + execution_results: List[Dict[str, Any]], + judge_model: str, + judge_agent_prefix: str, + judge_timeout_seconds: float, + judge_backend: str, + skill_dir: Optional[Path], + verbose: bool, +) -> List[GradeResult]: + """Fall back to grading tasks individually when batch fails.""" + logger.info("Falling back to individual grading for %d tasks", len(tasks)) + grades = [] + for task, result in zip(tasks, execution_results): + try: + grade = _grade_llm_judge( + task=task, + execution_result=result, + judge_model=judge_model, + judge_agent_prefix=judge_agent_prefix, + judge_timeout_seconds=judge_timeout_seconds, + judge_backend=judge_backend, + skill_dir=skill_dir, + verbose=verbose, + ) + except Exception as exc: + logger.warning("Individual grading failed for %s: %s", task.task_id, exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type="llm_judge", + breakdown={}, + notes=f"Grading failed: {exc}", + ) + grades.append(grade) + return grades + + def _grade_automated( task: Task, execution_result: Dict[str, Any], From e5cdacd822610ed37ba054c20571b3ef93686f6f Mon Sep 17 00:00:00 2001 From: Brendan O'Leary Date: Wed, 22 Apr 2026 13:15:15 -0400 Subject: [PATCH 6/7] fix: add missing --batch-size argument to argparse --- scripts/benchmark.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 52bf0a6c..81b101ea 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -253,6 +253,13 @@ def _parse_args() -> argparse.Namespace: action="store_true", help="Continue running all tasks even if sanity check scores 0%%", ) + parser.add_argument( + "--batch-size", + type=int, + default=5, + metavar="N", + help="Number of tasks to grade in a single batch LLM call (default: 5)", + ) parser.add_argument( "--trend", action="store_true", From 800527d15224e6fe83bb82995ce39bf2702f3d43 Mon Sep 17 00:00:00 2001 From: Brendan O'Leary Date: Fri, 24 Apr 2026 10:29:45 -0400 Subject: [PATCH 7/7] fix: wire up task_execution_map for batch grading The batch grading feature was dead code because task_execution_map was initialized empty but never populated. Phase 1 graded everything inline, then Phase 2 tried to batch-grade from an empty map. Fix: - Add use_batch_grading flag (batch_size > 0) to switch between modes - When batch grading: Phase 1 populates task_execution_map instead of grading inline, Phase 2 batch-grades, Phase 3 aggregates - When inline grading (default): existing sync/async behavior preserved, Phase 2/3 skipped - Change --batch-size default from 5 to 0 so existing behavior is unchanged unless explicitly opted in - Skip parallel judge executor init when using batch mode (not needed) --- scripts/benchmark.py | 366 +++++++++++++++++++++++-------------------- 1 file changed, 192 insertions(+), 174 deletions(-) diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 81b101ea..cfc358f6 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -256,9 +256,9 @@ def _parse_args() -> argparse.Namespace: parser.add_argument( "--batch-size", type=int, - default=5, + default=0, metavar="N", - help="Number of tasks to grade in a single batch LLM call (default: 5)", + help="Number of tasks to grade in a single batch LLM call (0=disabled, use inline grading; default: 0)", ) parser.add_argument( "--trend", @@ -819,16 +819,19 @@ def _write_incremental_results(): # Phase 1: Execute all tasks task_execution_map = {} # Maps task_id -> list of (task, result) tuples for each run + use_batch_grading = args.batch_size > 0 - # Initialize judge executor for parallel grading + # Initialize judge executor for parallel grading (only when not using batch grading) judge_executor: Optional[ThreadPoolExecutor] = None - if not args.no_parallel_judge: + if use_batch_grading: + logger.info("πŸ“¦ Batch grading enabled (batch_size=%d). Deferring all grading to Phase 2.", args.batch_size) + elif not args.no_parallel_judge: judge_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="judge") logger.info("πŸš€ Parallel judge execution enabled") else: logger.info("⏸️ Parallel judge execution disabled (--no-parallel-judge)") - # Track pending grade from previous run + # Track pending grade from previous run (only used in inline grading mode) pending_grade: Optional[Tuple[str, List, List, int, Future]] = None try: @@ -949,66 +952,76 @@ def _write_incremental_results(): task_results.append(result) results.append(result) - # Decide sync vs async grading - is_last_run = (run_index == runs_per_task - 1) and (i == len(tasks_to_run)) - - if args.no_parallel_judge or is_last_run: - # Synchronous grading - try: + if use_batch_grading: + # Store execution data for batch grading in Phase 2 + if task.task_id not in task_execution_map: + task_execution_map[task.task_id] = [] + task_execution_map[task.task_id].append({ + "task": task, + "result": result, + "run_index": run_index, + }) + else: + # Inline grading: decide sync vs async + is_last_run = (run_index == runs_per_task - 1) and (i == len(tasks_to_run)) + + if args.no_parallel_judge or is_last_run: + # Synchronous grading + try: + grade_kwargs = dict( + task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + grade = grade_task(**grade_kwargs) + except Exception as exc: + if execution_error: + note = f"Execution failed: {execution_error}; Grading failed: {exc}" + else: + note = f"Grading failed: {exc}" + logger.warning("Task grading failed for %s, continuing: %s", task.task_id, exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type=task.grading_type, + breakdown={}, + notes=note, + ) + task_grades.append(grade) + + # Log score immediately + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" + ) + logger.info( + "%s Task %s: %.1f/%.1f (%.0f%%) - %s", + status_emoji, + task.task_id, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + else: + # Async grading - submit to background + logger.info("⏭️ Submitting grading to background thread...") grade_kwargs = dict( task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose ) if args.judge: grade_kwargs["judge_model"] = args.judge grade_kwargs["judge_backend"] = "api" - grade = grade_task(**grade_kwargs) - except Exception as exc: - if execution_error: - note = f"Execution failed: {execution_error}; Grading failed: {exc}" - else: - note = f"Grading failed: {exc}" - logger.warning("Task grading failed for %s, continuing: %s", task.task_id, exc) - grade = GradeResult( - task_id=task.task_id, - score=0.0, - max_score=1.0, - grading_type=task.grading_type, - breakdown={}, - notes=note, - ) - task_grades.append(grade) - - # Log score immediately - score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 - status_emoji = ( - "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" - ) - logger.info( - "%s Task %s: %.1f/%.1f (%.0f%%) - %s", - status_emoji, - task.task_id, - grade.score, - grade.max_score, - score_pct, - grade.grading_type, - ) - if grade.notes: - logger.info(" Notes: %s", grade.notes[:200]) - else: - # Async grading - submit to background - logger.info("⏭️ Submitting grading to background thread...") - grade_kwargs = dict( - task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose - ) - if args.judge: - grade_kwargs["judge_model"] = args.judge - grade_kwargs["judge_backend"] = "api" - - future = judge_executor.submit(grade_task, **grade_kwargs) - pending_grade = (task.task_id, task_grades, task_results, run_index, future) + + future = judge_executor.submit(grade_task, **grade_kwargs) + pending_grade = (task.task_id, task_grades, task_results, run_index, future) - # If synchronous mode, compute aggregates now - if args.no_parallel_judge: + # If synchronous inline grading mode, compute aggregates now + if not use_batch_grading and args.no_parallel_judge: task_scores = [grade.score for grade in task_grades] grades_by_task_id[task.task_id] = { "runs": [grade.to_dict() for grade in task_grades], @@ -1082,132 +1095,137 @@ def _write_incremental_results(): logger.info("🧹 Shutting down judge executor...") judge_executor.shutdown(wait=True) - logger.info("\n%s", "=" * 80) - logger.info("πŸ“Š Phase 1 complete: All tasks executed. Starting grading phase...") - logger.info("%s", "=" * 80) - - # Phase 2: Grade in batches - batch_size = args.batch_size - - # Flatten all task data across all runs - all_task_data = [] - for task_id in task_execution_map: - all_task_data.extend(task_execution_map[task_id]) - - # Process in batches - for batch_start in range(0, len(all_task_data), batch_size): - batch_end = min(batch_start + batch_size, len(all_task_data)) - batch_items = all_task_data[batch_start:batch_end] + if use_batch_grading: + logger.info("\n%s", "=" * 80) + logger.info("πŸ“Š Phase 1 complete: All tasks executed. Starting batch grading phase...") + logger.info("%s", "=" * 80) - batch_tasks = [item["task"] for item in batch_items] - batch_results = [item["result"] for item in batch_items] + # Phase 2: Grade in batches + batch_size = args.batch_size - logger.info( - "\nπŸ“Š Grading batch %d-%d of %d total task runs...", - batch_start + 1, - batch_end, - len(all_task_data), - ) + # Flatten all task data across all runs + all_task_data = [] + for task_id in task_execution_map: + all_task_data.extend(task_execution_map[task_id]) - try: - grade_kwargs = dict( - tasks=batch_tasks, - execution_results=batch_results, - skill_dir=skill_dir, - verbose=args.verbose, - ) - if args.judge: - grade_kwargs["judge_model"] = args.judge - grade_kwargs["judge_backend"] = "api" + # Process in batches + for batch_start in range(0, len(all_task_data), batch_size): + batch_end = min(batch_start + batch_size, len(all_task_data)) + batch_items = all_task_data[batch_start:batch_end] - batch_grades = grade_tasks_batch(**grade_kwargs) - except Exception as exc: - logger.warning("Batch grading failed, using individual fallback: %s", exc) - # Fallback to individual grading - batch_grades = [] - for item in batch_items: - task = item["task"] - result = item["result"] - try: - individual_kwargs = dict( - task=task, - execution_result=result, - skill_dir=skill_dir, - verbose=args.verbose, - use_judge_cache=not args.no_judge_cache, - ) - if args.judge: - individual_kwargs["judge_model"] = args.judge - individual_kwargs["judge_backend"] = "api" - grade = grade_task(**individual_kwargs) - except Exception as inner_exc: - logger.warning("Individual grading also failed for %s: %s", task.task_id, inner_exc) - grade = GradeResult( - task_id=task.task_id, - score=0.0, - max_score=1.0, - grading_type=task.grading_type, - breakdown={}, - notes=f"Grading failed: {inner_exc}", - ) - batch_grades.append(grade) - - # Store grades back into the items - for item, grade in zip(batch_items, batch_grades): - item["grade"] = grade + batch_tasks = [item["task"] for item in batch_items] + batch_results = [item["result"] for item in batch_items] - # Log score - score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 - status_emoji = ( - "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" - ) logger.info( - "%s Task %s: %.1f/%.1f (%.0f%%) - %s", - status_emoji, - item["task"].task_id, - grade.score, - grade.max_score, - score_pct, - grade.grading_type, + "\nπŸ“Š Grading batch %d-%d of %d total task runs...", + batch_start + 1, + batch_end, + len(all_task_data), ) - if grade.notes: - logger.info(" Notes: %s", grade.notes[:200]) - - # Phase 3: Aggregate grades by task - for task_id, items in task_execution_map.items(): - task_grades = [item["grade"] for item in items] - task_scores = [grade.score for grade in task_grades] - - grades_by_task_id[task_id] = { - "runs": [grade.to_dict() for grade in task_grades], - "mean": statistics.mean(task_scores), - "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, - "min": min(task_scores), - "max": max(task_scores), - } - - # Fail-fast check for sanity task - task = items[0]["task"] # Get task object from first item - all_runs_missing_transcript = all( - not item["result"].get("transcript") for item in items - ) + + try: + grade_kwargs = dict( + tasks=batch_tasks, + execution_results=batch_results, + skill_dir=skill_dir, + verbose=args.verbose, + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + + batch_grades = grade_tasks_batch(**grade_kwargs) + except Exception as exc: + logger.warning("Batch grading failed, using individual fallback: %s", exc) + # Fallback to individual grading + batch_grades = [] + for item in batch_items: + task = item["task"] + result = item["result"] + try: + individual_kwargs = dict( + task=task, + execution_result=result, + skill_dir=skill_dir, + verbose=args.verbose, + use_judge_cache=not args.no_judge_cache, + ) + if args.judge: + individual_kwargs["judge_model"] = args.judge + individual_kwargs["judge_backend"] = "api" + grade = grade_task(**individual_kwargs) + except Exception as inner_exc: + logger.warning("Individual grading also failed for %s: %s", task.task_id, inner_exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type=task.grading_type, + breakdown={}, + notes=f"Grading failed: {inner_exc}", + ) + batch_grades.append(grade) + + # Store grades back into the items + for item, grade in zip(batch_items, batch_grades): + item["grade"] = grade + + # Log score + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" + ) + logger.info( + "%s Task %s: %.1f/%.1f (%.0f%%) - %s", + status_emoji, + item["task"].task_id, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) - if ( - task.task_id == sanity_task_id - and grades_by_task_id[task.task_id]["mean"] == 0.0 - and not args.no_fail_fast - and not all_runs_missing_transcript - ): - logger.error( - "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run to avoid wasting resources.", - sanity_task_id, + # Phase 3: Aggregate grades by task + for task_id, items in task_execution_map.items(): + task_grades = [item["grade"] for item in items] + task_scores = [grade.score for grade in task_grades] + + grades_by_task_id[task_id] = { + "runs": [grade.to_dict() for grade in task_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), + } + + # Fail-fast check for sanity task + task = items[0]["task"] # Get task object from first item + all_runs_missing_transcript = all( + not item["result"].get("transcript") for item in items ) - sys.exit(3) - if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: - if all_runs_missing_transcript: - logger.warning( - "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." + + if ( + task.task_id == sanity_task_id + and grades_by_task_id[task.task_id]["mean"] == 0.0 + and not args.no_fail_fast + and not all_runs_missing_transcript + ): + logger.error( + "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run to avoid wasting resources.", + sanity_task_id, ) + sys.exit(3) + if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: + if all_runs_missing_transcript: + logger.warning( + "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." + ) + else: + logger.info("\n%s", "=" * 80) + logger.info("πŸ“Š Phase 1 complete: All tasks executed and graded inline.") + logger.info("%s", "=" * 80) # Write final incremental results _write_incremental_results()