diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 18c531b..cfc358f 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -22,8 +22,9 @@ import subprocess import sys import time +from concurrent.futures import Future, ThreadPoolExecutor from pathlib import Path -from typing import Dict, List, Optional, Any +from typing import Any, Dict, List, Optional, Tuple from lib_agent import ( cleanup_agent_sessions, @@ -33,7 +34,7 @@ slugify_model, validate_openrouter_model, ) -from lib_grading import GradeResult, grade_task +from lib_grading import DEFAULT_JUDGE_TIMEOUT_SECONDS, GradeResult, grade_task, grade_tasks_batch from lib_tasks import Task, TaskLoader @@ -252,6 +253,13 @@ def _parse_args() -> argparse.Namespace: action="store_true", help="Continue running all tasks even if sanity check scores 0%%", ) + parser.add_argument( + "--batch-size", + type=int, + default=0, + metavar="N", + help="Number of tasks to grade in a single batch LLM call (0=disabled, use inline grading; default: 0)", + ) parser.add_argument( "--trend", action="store_true", @@ -270,6 +278,16 @@ def _parse_args() -> argparse.Namespace: default=-0.5, help="Slope (%%/run) below which regression is flagged (default: -0.5)", ) + parser.add_argument( + "--no-parallel-judge", + action="store_true", + help="Disable parallel judge execution (grade synchronously after each task)", + ) + parser.add_argument( + "--no-judge-cache", + action="store_true", + help="Disable judge response caching (forces fresh evaluation)", + ) args = parser.parse_args() # Validate --trend-window @@ -799,122 +817,418 @@ def _write_incremental_results(): except OSError: pass - for i, task in enumerate(tasks_to_run, 1): - task_grades = [] - task_results = [] - for run_index in range(runs_per_task): - logger.info("\n%s", "=" * 80) - logger.info( - "πŸ“‹ Task %s/%s (Run %s/%s)", - i, - len(tasks_to_run), - run_index + 1, - runs_per_task, - ) - logger.info("%s", "=" * 80) - execution_error = None + # Phase 1: Execute all tasks + task_execution_map = {} # Maps task_id -> list of (task, result) tuples for each run + use_batch_grading = args.batch_size > 0 + + # Initialize judge executor for parallel grading (only when not using batch grading) + judge_executor: Optional[ThreadPoolExecutor] = None + if use_batch_grading: + logger.info("πŸ“¦ Batch grading enabled (batch_size=%d). Deferring all grading to Phase 2.", args.batch_size) + elif not args.no_parallel_judge: + judge_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="judge") + logger.info("πŸš€ Parallel judge execution enabled") + else: + logger.info("⏸️ Parallel judge execution disabled (--no-parallel-judge)") + + # Track pending grade from previous run (only used in inline grading mode) + pending_grade: Optional[Tuple[str, List, List, int, Future]] = None + + try: + for i, task in enumerate(tasks_to_run, 1): + task_grades = [] + task_results = [] + for run_index in range(runs_per_task): + logger.info("\n%s", "=" * 80) + logger.info( + "πŸ“‹ Task %s/%s (Run %s/%s)", + i, + len(tasks_to_run), + run_index + 1, + runs_per_task, + ) + logger.info("%s", "=" * 80) + + # Process pending grade from previous run + if pending_grade: + prev_task_id, prev_grades, prev_results, prev_run_idx, future = pending_grade + try: + grade = future.result(timeout=DEFAULT_JUDGE_TIMEOUT_SECONDS) + prev_grades.append(grade) + + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" + ) + logger.info( + "%s Task %s (run %d, background): %.1f/%.1f (%.0f%%) - %s", + status_emoji, + prev_task_id, + prev_run_idx + 1, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + + # If last run of previous task, compute aggregates + if len(prev_grades) == runs_per_task: + task_scores = [g.score for g in prev_grades] + grades_by_task_id[prev_task_id] = { + "runs": [g.to_dict() for g in prev_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), + } + + all_runs_missing_transcript = all( + not run_result.get("transcript") for run_result in prev_results + ) + if ( + prev_task_id == sanity_task_id + and grades_by_task_id[prev_task_id]["mean"] == 0.0 + and not args.no_fail_fast + and not all_runs_missing_transcript + ): + logger.error( + "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run.", + sanity_task_id, + ) + sys.exit(3) + if prev_task_id == sanity_task_id and grades_by_task_id[prev_task_id]["mean"] == 0.0: + if all_runs_missing_transcript: + logger.warning( + "⚠️ Sanity check scored 0%% but transcripts missing; skipping fail-fast." + ) + + _write_incremental_results() + + except Exception as exc: + logger.warning("Background grade failed for %s (run %d): %s", + prev_task_id, prev_run_idx + 1, exc) + grade = GradeResult( + task_id=prev_task_id, + score=0.0, + max_score=1.0, + grading_type="error", + breakdown={}, + notes=f"Background grading failed: {exc}", + ) + prev_grades.append(grade) + pending_grade = None + + execution_error = None + try: + result = execute_openclaw_task( + task=task, + agent_id=agent_id, + model_id=args.model, + run_id=f"{run_id}-{run_index + 1}", + timeout_multiplier=args.timeout_multiplier, + skill_dir=skill_dir, + output_dir=Path(args.output_dir) / f"{run_id}_transcripts", + verbose=args.verbose, + ) + except Exception as exc: + execution_error = str(exc) + logger.warning("Task execution failed for %s, continuing: %s", task.task_id, exc) + result = { + "agent_id": agent_id, + "task_id": task.task_id, + "status": "error", + "transcript": [], + "usage": {}, + "workspace": "", + "exit_code": -1, + "timed_out": False, + "execution_time": 0.0, + "stdout": "", + "stderr": execution_error, + } + + task_results.append(result) + results.append(result) + + if use_batch_grading: + # Store execution data for batch grading in Phase 2 + if task.task_id not in task_execution_map: + task_execution_map[task.task_id] = [] + task_execution_map[task.task_id].append({ + "task": task, + "result": result, + "run_index": run_index, + }) + else: + # Inline grading: decide sync vs async + is_last_run = (run_index == runs_per_task - 1) and (i == len(tasks_to_run)) + + if args.no_parallel_judge or is_last_run: + # Synchronous grading + try: + grade_kwargs = dict( + task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + grade = grade_task(**grade_kwargs) + except Exception as exc: + if execution_error: + note = f"Execution failed: {execution_error}; Grading failed: {exc}" + else: + note = f"Grading failed: {exc}" + logger.warning("Task grading failed for %s, continuing: %s", task.task_id, exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type=task.grading_type, + breakdown={}, + notes=note, + ) + task_grades.append(grade) + + # Log score immediately + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" + ) + logger.info( + "%s Task %s: %.1f/%.1f (%.0f%%) - %s", + status_emoji, + task.task_id, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + else: + # Async grading - submit to background + logger.info("⏭️ Submitting grading to background thread...") + grade_kwargs = dict( + task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose + ) + if args.judge: + grade_kwargs["judge_model"] = args.judge + grade_kwargs["judge_backend"] = "api" + + future = judge_executor.submit(grade_task, **grade_kwargs) + pending_grade = (task.task_id, task_grades, task_results, run_index, future) + + # If synchronous inline grading mode, compute aggregates now + if not use_batch_grading and args.no_parallel_judge: + task_scores = [grade.score for grade in task_grades] + grades_by_task_id[task.task_id] = { + "runs": [grade.to_dict() for grade in task_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), + } + + all_runs_missing_transcript = all( + not run_result.get("transcript") for run_result in task_results + ) + if ( + task.task_id == sanity_task_id + and grades_by_task_id[task.task_id]["mean"] == 0.0 + and not args.no_fail_fast + and not all_runs_missing_transcript + ): + logger.error( + "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run.", + sanity_task_id, + ) + sys.exit(3) + if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: + if all_runs_missing_transcript: + logger.warning( + "⚠️ Sanity check scored 0%% but transcripts missing; skipping fail-fast." + ) + + _write_incremental_results() + + # Process any remaining pending grade + if pending_grade: + prev_task_id, prev_grades, prev_results, prev_run_idx, future = pending_grade try: - result = execute_openclaw_task( - task=task, - agent_id=agent_id, - model_id=args.model, - run_id=f"{run_id}-{run_index + 1}", - timeout_multiplier=args.timeout_multiplier, - skill_dir=skill_dir, - output_dir=Path(args.output_dir) / f"{run_id}_transcripts", - verbose=args.verbose, + grade = future.result(timeout=DEFAULT_JUDGE_TIMEOUT_SECONDS) + prev_grades.append(grade) + + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" ) - except Exception as exc: - execution_error = str(exc) - logger.warning("Task execution failed for %s, continuing: %s", task.task_id, exc) - result = { - "agent_id": agent_id, - "task_id": task.task_id, - "status": "error", - "transcript": [], - "usage": {}, - "workspace": "", - "exit_code": -1, - "timed_out": False, - "execution_time": 0.0, - "stdout": "", - "stderr": execution_error, + logger.info( + "%s Task %s (run %d, final): %.1f/%.1f (%.0f%%) - %s", + status_emoji, + prev_task_id, + prev_run_idx + 1, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + + task_scores = [g.score for g in prev_grades] + grades_by_task_id[prev_task_id] = { + "runs": [g.to_dict() for g in prev_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), } + _write_incremental_results() + + except Exception as exc: + logger.warning("Final background grade failed for %s: %s", prev_task_id, exc) + finally: + # Cleanup executor + if judge_executor: + logger.info("🧹 Shutting down judge executor...") + judge_executor.shutdown(wait=True) + + if use_batch_grading: + logger.info("\n%s", "=" * 80) + logger.info("πŸ“Š Phase 1 complete: All tasks executed. Starting batch grading phase...") + logger.info("%s", "=" * 80) + + # Phase 2: Grade in batches + batch_size = args.batch_size + + # Flatten all task data across all runs + all_task_data = [] + for task_id in task_execution_map: + all_task_data.extend(task_execution_map[task_id]) + + # Process in batches + for batch_start in range(0, len(all_task_data), batch_size): + batch_end = min(batch_start + batch_size, len(all_task_data)) + batch_items = all_task_data[batch_start:batch_end] + + batch_tasks = [item["task"] for item in batch_items] + batch_results = [item["result"] for item in batch_items] + + logger.info( + "\nπŸ“Š Grading batch %d-%d of %d total task runs...", + batch_start + 1, + batch_end, + len(all_task_data), + ) + try: grade_kwargs = dict( - task=task, execution_result=result, skill_dir=skill_dir, verbose=args.verbose + tasks=batch_tasks, + execution_results=batch_results, + skill_dir=skill_dir, + verbose=args.verbose, ) if args.judge: grade_kwargs["judge_model"] = args.judge grade_kwargs["judge_backend"] = "api" - grade = grade_task(**grade_kwargs) + + batch_grades = grade_tasks_batch(**grade_kwargs) except Exception as exc: - if execution_error: - note = f"Execution failed: {execution_error}; Grading failed: {exc}" - else: - note = f"Grading failed: {exc}" - logger.warning("Task grading failed for %s, continuing: %s", task.task_id, exc) - grade = GradeResult( - task_id=task.task_id, - score=0.0, - max_score=1.0, - grading_type=task.grading_type, - breakdown={}, - notes=note, + logger.warning("Batch grading failed, using individual fallback: %s", exc) + # Fallback to individual grading + batch_grades = [] + for item in batch_items: + task = item["task"] + result = item["result"] + try: + individual_kwargs = dict( + task=task, + execution_result=result, + skill_dir=skill_dir, + verbose=args.verbose, + use_judge_cache=not args.no_judge_cache, + ) + if args.judge: + individual_kwargs["judge_model"] = args.judge + individual_kwargs["judge_backend"] = "api" + grade = grade_task(**individual_kwargs) + except Exception as inner_exc: + logger.warning("Individual grading also failed for %s: %s", task.task_id, inner_exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type=task.grading_type, + breakdown={}, + notes=f"Grading failed: {inner_exc}", + ) + batch_grades.append(grade) + + # Store grades back into the items + for item, grade in zip(batch_items, batch_grades): + item["grade"] = grade + + # Log score + score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 + status_emoji = ( + "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" ) - task_grades.append(grade) - task_results.append(result) - results.append(result) - - # Log score immediately after grading - score_pct = grade.score / grade.max_score * 100 if grade.max_score > 0 else 0 - status_emoji = ( - "βœ…" if grade.score >= grade.max_score else "⚠️" if grade.score > 0 else "❌" - ) - logger.info( - "%s Task %s: %.1f/%.1f (%.0f%%) - %s", - status_emoji, - task.task_id, - grade.score, - grade.max_score, - score_pct, - grade.grading_type, - ) - if grade.notes: - logger.info(" Notes: %s", grade.notes[:200]) - - task_scores = [grade.score for grade in task_grades] - grades_by_task_id[task.task_id] = { - "runs": [grade.to_dict() for grade in task_grades], - "mean": statistics.mean(task_scores), - "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, - "min": min(task_scores), - "max": max(task_scores), - } - - all_runs_missing_transcript = all( - not run_result.get("transcript") for run_result in task_results - ) - if ( - task.task_id == sanity_task_id - and grades_by_task_id[task.task_id]["mean"] == 0.0 - and not args.no_fail_fast - and not all_runs_missing_transcript - ): - logger.error( - "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run to avoid wasting resources.", - sanity_task_id, + logger.info( + "%s Task %s: %.1f/%.1f (%.0f%%) - %s", + status_emoji, + item["task"].task_id, + grade.score, + grade.max_score, + score_pct, + grade.grading_type, + ) + if grade.notes: + logger.info(" Notes: %s", grade.notes[:200]) + + # Phase 3: Aggregate grades by task + for task_id, items in task_execution_map.items(): + task_grades = [item["grade"] for item in items] + task_scores = [grade.score for grade in task_grades] + + grades_by_task_id[task_id] = { + "runs": [grade.to_dict() for grade in task_grades], + "mean": statistics.mean(task_scores), + "std": statistics.stdev(task_scores) if len(task_scores) > 1 else 0.0, + "min": min(task_scores), + "max": max(task_scores), + } + + # Fail-fast check for sanity task + task = items[0]["task"] # Get task object from first item + all_runs_missing_transcript = all( + not item["result"].get("transcript") for item in items ) - sys.exit(3) - if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: - if all_runs_missing_transcript: - logger.warning( - "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." + + if ( + task.task_id == sanity_task_id + and grades_by_task_id[task.task_id]["mean"] == 0.0 + and not args.no_fail_fast + and not all_runs_missing_transcript + ): + logger.error( + "🚨 FAIL FAST: Sanity check (%s) scored 0%%. Aborting benchmark run to avoid wasting resources.", + sanity_task_id, ) - - # Incremental write: update result JSON after each task so partial - # results are available while the benchmark is still running. - _write_incremental_results() + sys.exit(3) + if task.task_id == sanity_task_id and grades_by_task_id[task.task_id]["mean"] == 0.0: + if all_runs_missing_transcript: + logger.warning( + "⚠️ Sanity check scored 0%% but transcripts were missing for all runs; skipping fail-fast as likely infrastructure/logging issue." + ) + else: + logger.info("\n%s", "=" * 80) + logger.info("πŸ“Š Phase 1 complete: All tasks executed and graded inline.") + logger.info("%s", "=" * 80) + + # Write final incremental results + _write_incremental_results() output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) diff --git a/scripts/lib_grading.py b/scripts/lib_grading.py index ccadcda..6e4463e 100644 --- a/scripts/lib_grading.py +++ b/scripts/lib_grading.py @@ -96,6 +96,357 @@ def grade_task( raise ValueError(f"Unknown grading type: {grading_type}") +def grade_tasks_batch( + *, + tasks: List[Task], + execution_results: List[Dict[str, Any]], + skill_dir: Path, + judge_model: str = DEFAULT_JUDGE_MODEL, + judge_agent_prefix: str = DEFAULT_JUDGE_AGENT_PREFIX, + judge_timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS, + judge_backend: str = "openclaw", + verbose: bool = False, +) -> List[GradeResult]: + """ + Grade multiple tasks in a single batch. + + For tasks requiring LLM judge, this batches them into a single API call + to reduce overhead. Automated grading is still done individually (it's fast). + + Returns a list of GradeResults in the same order as the input tasks. + """ + if len(tasks) != len(execution_results): + raise ValueError(f"Mismatch: {len(tasks)} tasks but {len(execution_results)} results") + + if not tasks: + return [] + + results: List[Optional[GradeResult]] = [None] * len(tasks) + + # Separate tasks by grading type + automated_indices = [] + llm_judge_indices = [] + hybrid_indices = [] + + for i, task in enumerate(tasks): + if task.grading_type == "automated": + automated_indices.append(i) + elif task.grading_type == "llm_judge": + llm_judge_indices.append(i) + elif task.grading_type == "hybrid": + hybrid_indices.append(i) + else: + # Unknown type - grade individually and let it raise + results[i] = grade_task( + task=task, + execution_result=execution_results[i], + skill_dir=skill_dir, + judge_model=judge_model, + judge_backend=judge_backend, + verbose=verbose, + ) + + # Grade automated tasks individually (they're fast) + for i in automated_indices: + results[i] = _grade_automated(tasks[i], execution_results[i], skill_dir=skill_dir, verbose=verbose) + + # Batch LLM judge tasks + if llm_judge_indices: + llm_tasks = [tasks[i] for i in llm_judge_indices] + llm_results = [execution_results[i] for i in llm_judge_indices] + batch_grades = _batch_grade_llm_judge( + tasks=llm_tasks, + execution_results=llm_results, + judge_model=judge_model, + judge_agent_prefix=judge_agent_prefix, + judge_timeout_seconds=judge_timeout_seconds * len(llm_tasks), # Scale timeout + judge_backend=judge_backend, + skill_dir=skill_dir, + verbose=verbose, + ) + for idx, grade in zip(llm_judge_indices, batch_grades): + results[idx] = grade + + # Hybrid tasks: automated is fast, batch the LLM parts + if hybrid_indices: + # First do automated grading for all hybrid tasks + auto_grades = {} + for i in hybrid_indices: + auto_grades[i] = _grade_automated(tasks[i], execution_results[i], skill_dir=skill_dir, verbose=verbose) + + # Batch the LLM judge part + hybrid_tasks = [tasks[i] for i in hybrid_indices] + hybrid_results = [execution_results[i] for i in hybrid_indices] + llm_grades = _batch_grade_llm_judge( + tasks=hybrid_tasks, + execution_results=hybrid_results, + judge_model=judge_model, + judge_agent_prefix=judge_agent_prefix, + judge_timeout_seconds=judge_timeout_seconds * len(hybrid_tasks), + judge_backend=judge_backend, + skill_dir=skill_dir, + verbose=verbose, + ) + + # Combine auto + llm for each hybrid task + for idx, llm_grade in zip(hybrid_indices, llm_grades): + results[idx] = _combine_grades(tasks[idx], auto_grades[idx], llm_grade) + + # Verify all results filled + for i, result in enumerate(results): + if result is None: + raise RuntimeError(f"Bug: no grade computed for task index {i} ({tasks[i].task_id})") + + return results # type: ignore + + +def _batch_grade_llm_judge( + *, + tasks: List[Task], + execution_results: List[Dict[str, Any]], + judge_model: str, + judge_agent_prefix: str, + judge_timeout_seconds: float, + judge_backend: str = "openclaw", + skill_dir: Optional[Path] = None, + verbose: bool = False, +) -> List[GradeResult]: + """ + Grade multiple tasks with a single LLM judge call. + + Builds a combined prompt with all tasks and parses a JSON array response. + Falls back to individual grading if batch parsing fails. + """ + if not tasks: + return [] + + # Build combined prompt + prompt_parts = [ + "You are grading multiple benchmark tasks. For each task, evaluate the agent's performance against the rubric.", + "", + "Respond with a JSON array containing one object per task, in order:", + "```json", + "[", + ' {"task_id": "task_name", "scores": {"criterion1": 0.8, "criterion2": 1.0}, "total": 0.85, "notes": "Brief explanation"},', + " ...", + "]", + "```", + "", + "IMPORTANT: Return ONLY the JSON array. Ensure 'total' is a float between 0.0 and 1.0.", + "", + "=" * 60, + "", + ] + + for i, (task, result) in enumerate(zip(tasks, execution_results)): + transcript = result.get("transcript", []) + transcript_summary = _summarize_transcript(transcript) + workspace_content = _read_workspace_files(result.get("workspace", "")) + rubric = task.llm_judge_rubric or _format_grading_criteria(task) + + prompt_parts.extend([ + f"## Task {i + 1}: {task.task_id}", + f"**Name:** {task.name}", + f"**Category:** {task.category}", + "", + "**Rubric:**", + rubric if rubric else "(No specific rubric provided)", + "", + "**Agent Transcript:**", + transcript_summary[:3000] if transcript_summary else "(Empty transcript - task may have failed)", + "", + ]) + if workspace_content: + prompt_parts.extend([ + "**Workspace Files:**", + workspace_content[:1500], + "", + ]) + prompt_parts.append("=" * 60) + prompt_parts.append("") + + combined_prompt = "\n".join(prompt_parts) + + if verbose: + logger.info(" [VERBOSE] Batch judge prompt length: %d chars for %d tasks", len(combined_prompt), len(tasks)) + + # Make the API call + if judge_backend == "api": + judge_result = call_judge_api( + prompt=combined_prompt, + model=judge_model, + timeout_seconds=judge_timeout_seconds, + ) + + if judge_result.get("status") != "success": + logger.warning("Batch judge API call failed: %s", judge_result.get("error", judge_result.get("status"))) + # Fall back to individual grading + return _fallback_individual_grading( + tasks, execution_results, judge_model, judge_agent_prefix, + judge_timeout_seconds / len(tasks), judge_backend, skill_dir, verbose + ) + + raw_text = judge_result.get("text", "") + else: + # OpenClaw agent backend + judge_skill_dir = skill_dir if skill_dir is not None else Path.cwd() + agent_id = _ensure_judge_agent(judge_agent_prefix, judge_model, judge_skill_dir) + judge_workspace = Path(f"/tmp/pinchbench/judge/batch_{int(time.time())}") + judge_result = run_openclaw_prompt( + agent_id=agent_id, + prompt=combined_prompt, + workspace=judge_workspace, + timeout_seconds=judge_timeout_seconds, + ) + + if judge_result.get("status") != "success": + logger.warning("Batch judge execution failed: %s", judge_result.get("status")) + return _fallback_individual_grading( + tasks, execution_results, judge_model, judge_agent_prefix, + judge_timeout_seconds / len(tasks), judge_backend, skill_dir, verbose + ) + + # Extract text from transcript + raw_text = "" + for entry in judge_result.get("transcript", []): + if entry.get("type") == "message": + msg = entry.get("message", {}) + if msg.get("role") == "assistant": + for item in msg.get("content", []): + if item.get("type") == "text": + raw_text += item.get("text", "") + + # Parse the JSON array response + grades = _parse_batch_response(raw_text, tasks, verbose) + + if len(grades) != len(tasks): + logger.warning("Batch response parsing returned %d grades for %d tasks, falling back", len(grades), len(tasks)) + return _fallback_individual_grading( + tasks, execution_results, judge_model, judge_agent_prefix, + judge_timeout_seconds / len(tasks), judge_backend, skill_dir, verbose + ) + + return grades + + +def _parse_batch_response(raw_text: str, tasks: List[Task], verbose: bool = False) -> List[GradeResult]: + """Parse a JSON array response from batch grading.""" + import re + + # Try to extract JSON array from the response + # Look for code block first + json_match = re.search(r"```(?:json)?\s*(\[[\s\S]*?\])\s*```", raw_text) + if json_match: + json_str = json_match.group(1) + else: + # Try to find bare JSON array + array_match = re.search(r"\[[\s\S]*\]", raw_text) + if array_match: + json_str = array_match.group(0) + else: + logger.warning("No JSON array found in batch response") + return [] + + try: + parsed = json.loads(json_str) + except json.JSONDecodeError as exc: + logger.warning("Failed to parse batch JSON: %s", exc) + return [] + + if not isinstance(parsed, list): + logger.warning("Batch response is not a list: %s", type(parsed)) + return [] + + if len(parsed) != len(tasks): + logger.warning("Batch response has %d items but expected %d", len(parsed), len(tasks)) + # Try to match by task_id if lengths differ + # For now, just return empty to trigger fallback + return [] + + grades = [] + for i, (item, task) in enumerate(zip(parsed, tasks)): + if not isinstance(item, dict): + logger.warning("Batch item %d is not a dict", i) + grades.append(GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type="llm_judge", + breakdown={}, + notes="Batch parsing failed for this item", + )) + continue + + # Extract score + total = item.get("total") + if total is None: + total = item.get("score", 0.0) + try: + total = float(total) + except (TypeError, ValueError): + total = 0.0 + + # Clamp to valid range + total = max(0.0, min(1.0, total)) + + # Extract breakdown scores + scores = item.get("scores", {}) + if not isinstance(scores, dict): + scores = {} + + notes = item.get("notes", "") or "" + + grades.append(GradeResult( + task_id=task.task_id, + score=total, + max_score=1.0, + grading_type="llm_judge", + breakdown=_normalize_score_dict(scores), + notes=str(notes)[:500], # Truncate long notes + )) + + return grades + + +def _fallback_individual_grading( + tasks: List[Task], + execution_results: List[Dict[str, Any]], + judge_model: str, + judge_agent_prefix: str, + judge_timeout_seconds: float, + judge_backend: str, + skill_dir: Optional[Path], + verbose: bool, +) -> List[GradeResult]: + """Fall back to grading tasks individually when batch fails.""" + logger.info("Falling back to individual grading for %d tasks", len(tasks)) + grades = [] + for task, result in zip(tasks, execution_results): + try: + grade = _grade_llm_judge( + task=task, + execution_result=result, + judge_model=judge_model, + judge_agent_prefix=judge_agent_prefix, + judge_timeout_seconds=judge_timeout_seconds, + judge_backend=judge_backend, + skill_dir=skill_dir, + verbose=verbose, + ) + except Exception as exc: + logger.warning("Individual grading failed for %s: %s", task.task_id, exc) + grade = GradeResult( + task_id=task.task_id, + score=0.0, + max_score=1.0, + grading_type="llm_judge", + breakdown={}, + notes=f"Grading failed: {exc}", + ) + grades.append(grade) + return grades + + def _grade_automated( task: Task, execution_result: Dict[str, Any], @@ -304,7 +655,8 @@ def _grade_llm_judge( task.task_id, raw_parsed, ) - return GradeResult( + + grade = GradeResult( task_id=task.task_id, score=float(total) if total is not None else 0.0, max_score=1.0, @@ -312,6 +664,8 @@ def _grade_llm_judge( breakdown=_normalize_score_dict(breakdown), notes=str(notes) if notes is not None else "", ) + + return grade def _combine_grades(task: Task, auto_result: GradeResult, llm_result: GradeResult) -> GradeResult: