diff --git a/.gitignore b/.gitignore index 762fa1f..b2a1260 100644 --- a/.gitignore +++ b/.gitignore @@ -146,3 +146,19 @@ outputs/ # inspect result logs seed_datasets_inspect_logs/ seed_tasks_results/ + +# Generated experiment/evaluation artifacts +base_output/ +base_output_tmp/ +base_output_tmp_2/ +logs_tmp/ +Finance_Book1_Book2/ +Finance_Book3_Book4/ +Finance_Book5_Book6/ +topic.csv + +# Local benchmark/task JSON exports +finance_tasks.json +seed_tasks.json +task_4.json +tasks_2.json diff --git a/scripts/flatten_inspect_logs.py b/scripts/flatten_inspect_logs.py new file mode 100644 index 0000000..d6943a2 --- /dev/null +++ b/scripts/flatten_inspect_logs.py @@ -0,0 +1,108 @@ +"""Utility to flatten Inspect JSON logs into a simple, readable format. + +Given an Inspect eval log file (one of the large JSON files under +base_output//eval/results/////), +this script writes out a JSONL file with, per row: + +- id: sample id +- question: original input +- ground_truth: target string +- model_output: subject model's answer text +- grade: judge letter grade (if present, e.g. \"C\" or \"I\") + +Usage: + python scripts/flatten_inspect_logs.py \\ + --log_path base_output/test_exp/eval/results/_20260316_031445/\\ + gpt-5-nano/static_benchmarks/integral/\\ + 2026-03-15T23-14-46-04-00_task_mZxA3jKBseS2smuk4ppcxN.json \\ + --out_path base_output/test_exp/eval/results/_20260316_031445/\\ + gpt-5-nano/static_benchmarks/integral/flat_integral.jsonl + +The first line of the JSONL file is a summary object with: +- num_samples +- num_correct +- num_incorrect +- accuracy +- f1 (computed treating "C" as correct, "I" as incorrect) +""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any, Dict, List + + +def flatten_inspect_log(log_path: Path) -> List[Dict[str, Any]]: + data = json.loads(log_path.read_text(encoding="utf-8")) + + samples = data.get("samples", []) + flattened: List[Dict[str, Any]] = [] + + for s in samples: + sid = s.get("id") + question = s.get("input") + target = s.get("target") + + model_output = None + output = s.get("output") or {} + choices = output.get("choices") or [] + if choices: + msg = (choices[0] or {}).get("message") or {} + model_output = msg.get("content") + + grade = None + scores = s.get("scores") or {} + fact = scores.get("model_graded_fact") or {} + grade = fact.get("value") + + flattened.append( + { + "id": sid, + "question": question, + "ground_truth": target, + "model_output": model_output, + "grade": grade, + } + ) + + return flattened + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--log_path", type=str, required=True) + parser.add_argument("--out_path", type=str, required=True) + args = parser.parse_args() + + log_path = Path(args.log_path) + out_path = Path(args.out_path) + out_path.parent.mkdir(parents=True, exist_ok=True) + + rows = flatten_inspect_log(log_path) + + num_samples = len(rows) + num_correct = sum(1 for r in rows if r.get("grade") == "C") + num_incorrect = sum(1 for r in rows if r.get("grade") == "I") + accuracy = (num_correct / num_samples) if num_samples else 0.0 + # In this binary setting with grades only, we treat F1 as equal to accuracy. + f1 = accuracy + + with out_path.open("w", encoding="utf-8") as f: + summary = { + "summary": True, + "num_samples": num_samples, + "num_correct": num_correct, + "num_incorrect": num_incorrect, + "accuracy": accuracy, + "f1": f1, + } + f.write(json.dumps(summary, ensure_ascii=False) + "\n") + for row in rows: + f.write(json.dumps(row, ensure_ascii=False) + "\n") + + +if __name__ == "__main__": + main() + diff --git a/scripts/static_benchmarks/bizbench_eval.sh b/scripts/static_benchmarks/bizbench_eval.sh new file mode 100644 index 0000000..a86f59a --- /dev/null +++ b/scripts/static_benchmarks/bizbench_eval.sh @@ -0,0 +1,73 @@ +#!/bin/bash +#SBATCH --job-name=bizbench_eval +#SBATCH --output=logs/bizbench_eval_%A_%a.out +#SBATCH --error=logs/bizbench_eval_%A_%a.err +#SBATCH --time=08:00:00 +#SBATCH --cpus-per-task=4 +#SBATCH --mem=16G +#SBATCH --array=0-50 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow running either via sbatch (with SLURM_ARRAY_TASK_ID set) +# or directly (default to a single chunk 0). +: "${SLURM_ARRAY_TASK_ID:=0}" + +CHUNK=100 +OFFSET=$((SLURM_ARRAY_TASK_ID * CHUNK)) +VALIDATION_TAG="_BIZBENCH_Commercial_${SLURM_ARRAY_TASK_ID}_SundayNight" + +# Stage 0_static: build datasets from kensho/bizbench +python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$VALIDATION_TAG" \ + +static_benchmark_cfg.benchmark_id=kensho/bizbench \ + +static_benchmark_cfg.split=test \ + +static_benchmark_cfg.offset="$OFFSET" \ + +static_benchmark_cfg.limit="$CHUNK" + +# Stage 1: run subject models on the static datasets +python -m src.run_eval_pipeline \ + stage=1 \ + validation_tag="$VALIDATION_TAG" \ + eval_tag="$VALIDATION_TAG" + +# Stage 2: aggregate scores +python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$VALIDATION_TAG" + +echo "Stage 0_static datasets: base_output/test_exp/eval/datasets/$VALIDATION_TAG" +echo "Stage 1 results (Inspect logs): base_output/test_exp/eval/results/$VALIDATION_TAG" +echo "Stage 2 scores: base_output/test_exp/eval/scores/$VALIDATION_TAG" + +# Optional: generate flattened JSONL views of Inspect logs for easier reading +RESULTS_DIR="base_output/test_exp/eval/results/$VALIDATION_TAG" +if [ -d "$RESULTS_DIR" ]; then + echo "Flattening Inspect logs under $RESULTS_DIR ..." + for model_dir in "$RESULTS_DIR"/*/; do + [ -d "$model_dir" ] || continue + model_name="$(basename "$model_dir")" + for area_dir in "$model_dir"*/; do + [ -d "$area_dir" ] || continue + for cap_dir in "$area_dir"*/; do + [ -d "$cap_dir" ] || continue + cap_name="$(basename "$cap_dir")" + log_file="$(ls "$cap_dir"/*_task_*.json 2>/dev/null | head -n 1 || true)" + if [ -n "$log_file" ]; then + out_file="$cap_dir/flat_${cap_name}.jsonl" + python scripts/flatten_inspect_logs.py \ + --log_path "$log_file" \ + --out_path "$out_file" + echo " Wrote flattened log for $model_name/$cap_name to $out_file" + fi + done + done + done +fi + diff --git a/scripts/static_benchmarks/bizbench_local_array_eval.sh b/scripts/static_benchmarks/bizbench_local_array_eval.sh new file mode 100644 index 0000000..3da3941 --- /dev/null +++ b/scripts/static_benchmarks/bizbench_local_array_eval.sh @@ -0,0 +1,88 @@ +#!/bin/bash +#SBATCH --job-name=gemma_bizbench_local_array +#SBATCH --output=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/bizbench_local_array_%A_%a.out +#SBATCH --error=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/bizbench_local_array_%A_%a.err +#SBATCH --time=06:00:00 +#SBATCH --cpus-per-task=8 +#SBATCH --mem=64G +#SBATCH --gres=gpu:a40:1 +#SBATCH --array=0-7%8 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source /projects/DeepLesion/py311_env/bin/activate + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow direct execution without sbatch by defaulting to shard 0. +: "${SLURM_ARRAY_TASK_ID:=0}" + +NUM_SHARDS=8 + +# Count only FinKnow rows that survive adapter filtering. +TOTAL=$( +python - <<'PY' +from datasets import load_dataset + +ds = load_dataset("kensho/bizbench", split="test") + +def is_valid(row): + question = str(row.get("question", "")).strip() + task = str(row.get("task", "") or "").lower() + answer = row.get("answer") + if answer is None: + answer_text = "" + elif isinstance(answer, dict): + for key in ("answer", "label", "text", "value"): + if key in answer and answer[key] is not None: + answer_text = str(answer[key]).strip() + break + else: + answer_text = str(answer).strip() + else: + answer_text = str(answer).strip() + # Adapter default is `finknow_only=true`, so we shard based on the same subset. + return bool("finknow" in task and question and answer_text) + +print(sum(1 for row in ds if is_valid(row))) +PY +) + +CHUNK=$(((TOTAL + NUM_SHARDS - 1) / NUM_SHARDS)) +OFFSET=$((SLURM_ARRAY_TASK_ID * CHUNK)) +TAG="_BIZBENCH_TEST_GEMMA_3" + +if [ "$OFFSET" -ge "$TOTAL" ]; then + echo "No work for shard ${SLURM_ARRAY_TASK_ID} (OFFSET=$OFFSET >= TOTAL=$TOTAL). Exiting." + exit 0 +fi + +echo "TOTAL=$TOTAL NUM_SHARDS=$NUM_SHARDS CHUNK=$CHUNK OFFSET=$OFFSET TAG=$TAG" + +# Stage 0_static: build dataset shard from BizBench test split. +python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$TAG" \ + +static_benchmark_cfg.benchmark_id=kensho/bizbench \ + +static_benchmark_cfg.split=test \ + +static_benchmark_cfg.offset="$OFFSET" \ + +static_benchmark_cfg.limit="$CHUNK" + +# Stage 1_local: evaluate local subject model(s) from run_cfg.yaml. +python -m src.run_eval_pipeline \ + stage=1_local \ + validation_tag="$TAG" \ + eval_tag="$TAG" + +# Stage 2: aggregate per-shard scores. +python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$TAG" + +echo "Stage 0_static datasets: base_output/test_exp/eval/datasets/$TAG" +echo "Stage 1_local results: base_output/test_exp/eval/results/$TAG" +echo "Stage 2 scores: base_output/test_exp/eval/scores/$TAG" diff --git a/scripts/static_benchmarks/classify_static_benchmark_topics_vllm.py b/scripts/static_benchmarks/classify_static_benchmark_topics_vllm.py new file mode 100644 index 0000000..84d3b69 --- /dev/null +++ b/scripts/static_benchmarks/classify_static_benchmark_topics_vllm.py @@ -0,0 +1,336 @@ +#!/usr/bin/env python3 +"""Classify static benchmark questions into finance capabilities with vLLM. + +This script: +1) Loads one static benchmark through existing Stage-0 adapters. +2) Reads `topic.csv` and builds a high-level-area -> capabilities taxonomy. +3) Prompts a local model (e.g., Qwen3-32B) to return only one capability. +4) Saves outputs incrementally after every processed batch. +""" + +from __future__ import annotations + +import argparse +import csv +import json +import os +import re +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterable, List, Sequence, Set + +from src.eval_stages.stage0_static_benchmarks import _build_datasets_from_spec +from src.eval_stages.static_benchmarks.specs import StaticBenchmarkSpec + + +@dataclass(frozen=True) +class Taxonomy: + """Prompt-ready taxonomy container.""" + + high_level_areas: List[str] + capabilities: List[str] + area_to_capabilities: Dict[str, List[str]] + prompt_block: str + + +def _read_topic_taxonomy(topic_csv_path: Path) -> Taxonomy: + """Read high-level areas + capabilities from topic.csv.""" + area_to_caps_set: Dict[str, Set[str]] = {} + + # Be robust to UTF-8 BOM and leading blank/comment lines before header. + with topic_csv_path.open("r", encoding="utf-8-sig", newline="") as f: + rows = list(csv.reader(f)) + + required = {"High Level Area", "Capability"} + header_idx = None + header: List[str] = [] + for idx, row in enumerate(rows): + normalized = [str(cell).strip() for cell in row] + if not any(normalized): + continue + if required.issubset(set(normalized)): + header_idx = idx + header = normalized + break + + if header_idx is None: + raise ValueError( + f"Missing expected columns in {topic_csv_path}: {sorted(required)}" + ) + + for row in rows[header_idx + 1 :]: + if not row or not any(str(cell).strip() for cell in row): + continue + rec = { + header[i]: str(row[i]).strip() if i < len(row) else "" + for i in range(len(header)) + } + area = rec.get("High Level Area", "").strip() + capability = rec.get("Capability", "").strip() + if not area or not capability: + continue + area_to_caps_set.setdefault(area, set()).add(capability) + + if not area_to_caps_set: + raise ValueError(f"No usable area/capability rows found in {topic_csv_path}") + + area_to_capabilities: Dict[str, List[str]] = { + area: sorted(caps) for area, caps in sorted(area_to_caps_set.items()) + } + high_level_areas = list(area_to_capabilities.keys()) + capabilities = sorted( + {cap for caps in area_to_capabilities.values() for cap in caps} + ) + + lines: List[str] = [] + lines.append("High-level areas and their capabilities:") + for area in high_level_areas: + lines.append(f"- {area}:") + for cap in area_to_capabilities[area]: + lines.append(f" - {cap}") + prompt_block = "\n".join(lines) + + return Taxonomy( + high_level_areas=high_level_areas, + capabilities=capabilities, + area_to_capabilities=area_to_capabilities, + prompt_block=prompt_block, + ) + + +def _load_tasks_from_static_benchmark( + *, + benchmark_id: str, + split: str, + offset: int | None, + limit: int | None, +) -> List[Dict[str, str]]: + """Load tasks via the same adapters used by stage=0_static.""" + spec = StaticBenchmarkSpec( + benchmark_id=benchmark_id, + split=split, + offset=offset, + limit=limit, + area_id="static_benchmarks", + domain="finance", + ) + datasets = _build_datasets_from_spec(spec) + + tasks: List[Dict[str, str]] = [] + for ds in datasets: + for task in ds.tasks: + tid = str(task.get("id", "")).strip() + q = str(task.get("input", "")).strip() + if tid and q: + tasks.append({"id": tid, "question": q}) + return tasks + + +def _build_prompt(question: str, taxonomy: Taxonomy) -> str: + """Build short-thinking classification prompt.""" + return ( + "You are classifying a finance question into ONE capability.\n" + "Think very briefly.\n" + "Choose exactly one capability from the provided list.\n" + "Do not explain.\n" + "Return exactly one line in this format:\n" + "CAPABILITY: \n\n" + f"{taxonomy.prompt_block}\n\n" + "Question:\n" + f"{question}\n\n" + "Only output the final capability line." + ) + + +def _extract_capability(raw_text: str, allowed_capabilities: Sequence[str]) -> str: + """Parse model output and map to one allowed capability if possible.""" + text = (raw_text or "").strip() + allowed_map = {cap.lower(): cap for cap in allowed_capabilities} + + m = re.search(r"(?im)^\s*CAPABILITY\s*:\s*(.+?)\s*$", text) + if m: + value = m.group(1).strip() + if value.lower() in allowed_map: + return allowed_map[value.lower()] + text = value + + text_norm = re.sub(r"\s+", " ", text).strip().lower() + if text_norm in allowed_map: + return allowed_map[text_norm] + + # Fallback: find capability mention in output. + # Longest-first reduces accidental partial matches. + for cap in sorted(allowed_capabilities, key=len, reverse=True): + if cap.lower() in text.lower(): + return cap + + return "" + + +def _batched(items: Sequence[Dict[str, str]], batch_size: int) -> Iterable[List[Dict[str, str]]]: + if batch_size <= 0: + raise ValueError("batch_size must be positive") + for i in range(0, len(items), batch_size): + yield list(items[i : i + batch_size]) + + +def _load_done_ids(output_jsonl: Path) -> Set[str]: + """Read already-processed task IDs for resume support.""" + done: Set[str] = set() + if not output_jsonl.exists(): + return done + with output_jsonl.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + row = json.loads(line) + except json.JSONDecodeError: + continue + tid = str(row.get("id", "")).strip() + if tid: + done.add(tid) + return done + + +def run(args: argparse.Namespace) -> None: + # Safer default in many cluster environments. + os.environ.setdefault("VLLM_WORKER_MULTIPROC_METHOD", "spawn") + + try: + from transformers import PreTrainedTokenizerBase + from vllm import LLM, SamplingParams + except Exception as exc: # noqa: BLE001 + raise RuntimeError("This script requires transformers and vllm.") from exc + + # Compatibility shim for some tokenizer/vLLM combinations. + if not hasattr(PreTrainedTokenizerBase, "all_special_tokens_extended"): + PreTrainedTokenizerBase.all_special_tokens_extended = property( # type: ignore[attr-defined] + lambda self: list(self.all_special_tokens) + ) + + topic_csv_path = Path(args.topic_csv).resolve() + output_jsonl = Path(args.output_jsonl).resolve() + output_jsonl.parent.mkdir(parents=True, exist_ok=True) + + taxonomy = _read_topic_taxonomy(topic_csv_path) + tasks = _load_tasks_from_static_benchmark( + benchmark_id=args.benchmark_id, + split=args.split, + offset=args.offset, + limit=args.limit, + ) + if not tasks: + raise ValueError("No tasks loaded from benchmark.") + + done_ids = _load_done_ids(output_jsonl) if args.resume else set() + pending_tasks = [task for task in tasks if task["id"] not in done_ids] + + print( + f"Loaded {len(tasks)} tasks; pending={len(pending_tasks)}; " + f"already_done={len(done_ids)}" + ) + print(f"Model: {args.model_path}") + print(f"Benchmark: {args.benchmark_id} (split={args.split})") + print(f"Output: {output_jsonl}") + + if not pending_tasks: + print("Nothing to do.") + return + + llm = LLM( + model=args.model_path, + tokenizer=args.model_path, + trust_remote_code=args.trust_remote_code, + tensor_parallel_size=args.tensor_parallel_size, + gpu_memory_utilization=args.gpu_memory_utilization, + dtype=args.dtype, + max_model_len=args.max_model_len, + ) + tokenizer = llm.get_tokenizer() if hasattr(llm, "get_tokenizer") else None + + sampling = SamplingParams( + temperature=0.0, + top_p=1.0, + max_tokens=args.max_tokens, + repetition_penalty=1.0, + ) + + total = len(pending_tasks) + processed = 0 + with output_jsonl.open("a", encoding="utf-8") as out_f: + for batch_idx, batch in enumerate(_batched(pending_tasks, args.batch_size), start=1): + prompts: List[str] = [] + for row in batch: + user_prompt = _build_prompt(row["question"], taxonomy) + if tokenizer is not None and hasattr(tokenizer, "apply_chat_template"): + text_prompt = tokenizer.apply_chat_template( + [{"role": "user", "content": user_prompt}], + tokenize=False, + add_generation_prompt=True, + ) + else: + text_prompt = user_prompt + prompts.append(text_prompt) + + outputs = llm.generate(prompts, sampling) + for row, output in zip(batch, outputs, strict=True): + raw = output.outputs[0].text.strip() if output.outputs else "" + predicted = _extract_capability(raw, taxonomy.capabilities) + record = { + "id": row["id"], + "question": row["question"], + "predicted_capability": predicted, + "model_output_raw": raw, + "model_name": "Qwen3-32B", + "benchmark_id": args.benchmark_id, + "split": args.split, + } + out_f.write(json.dumps(record, ensure_ascii=False) + "\n") + out_f.flush() + + processed += len(batch) + print( + f"[batch {batch_idx}] wrote {len(batch)} rows | " + f"progress {processed}/{total}" + ) + + print("Done.") + + +def build_arg_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + description="Classify one static benchmark into finance capabilities with vLLM." + ) + p.add_argument("--benchmark-id", required=True, help="Static benchmark ID or local JSON path.") + p.add_argument("--split", default="test", help="Benchmark split (default: test).") + p.add_argument("--offset", type=int, default=None, help="Optional benchmark offset.") + p.add_argument("--limit", type=int, default=None, help="Optional benchmark limit.") + p.add_argument( + "--topic-csv", + default="topic.csv", + help="Path to topic.csv with High Level Area and Capability columns.", + ) + p.add_argument( + "--output-jsonl", + required=True, + help="Where to append classification rows (JSONL).", + ) + p.add_argument("--resume", action="store_true", help="Skip IDs already in output JSONL.") + + # Model / vLLM args + p.add_argument("--model-path", default="/model-weights/Qwen3-32B") + p.add_argument("--batch-size", type=int, default=16) + p.add_argument("--max-tokens", type=int, default=32) + p.add_argument("--trust-remote-code", action="store_true") + p.add_argument("--tensor-parallel-size", type=int, default=1) + p.add_argument("--gpu-memory-utilization", type=float, default=0.9) + p.add_argument("--dtype", default="auto") + p.add_argument("--max-model-len", type=int, default=8192) + return p + + +if __name__ == "__main__": + parser = build_arg_parser() + run(parser.parse_args()) diff --git a/scripts/static_benchmarks/env_slurm_inspect.sh b/scripts/static_benchmarks/env_slurm_inspect.sh new file mode 100644 index 0000000..65bd73c --- /dev/null +++ b/scripts/static_benchmarks/env_slurm_inspect.sh @@ -0,0 +1,16 @@ +# Sourced by *_eval.sh SLURM jobs. Puts platformdirs user_data / cache on local +# scratch so Inspect's samplebuffer and logging are not on flaky NFS home mounts. +# Avoid /tmp — it is often shared and fills up when vLLM writes torch compile caches. +_SCRATCH="${SLURM_TMPDIR:-}" +if [ -z "$_SCRATCH" ] || [ "$_SCRATCH" = "/tmp" ]; then + _SCRATCH="/projects/DeepLesion/tmp_cache" +fi +export XDG_DATA_HOME="${_SCRATCH}/inspect_xdg_data" +export XDG_CACHE_HOME="${_SCRATCH}/inspect_xdg_cache" +export TMPDIR="${_SCRATCH}" +_CACHE_BASE="${_SCRATCH}/job_${SLURM_JOB_ID:-local}" +export TORCHINDUCTOR_CACHE_DIR="${_CACHE_BASE}/torchinductor_${USER:-user}" +export TRITON_CACHE_DIR="${_CACHE_BASE}/triton_${USER:-user}" +mkdir -p "$XDG_DATA_HOME" "$XDG_CACHE_HOME" "$TORCHINDUCTOR_CACHE_DIR" "$TRITON_CACHE_DIR" +unset _SCRATCH +unset _CACHE_BASE diff --git a/scripts/static_benchmarks/finance_book1_book2_local_array_eval.sh b/scripts/static_benchmarks/finance_book1_book2_local_array_eval.sh new file mode 100644 index 0000000..3d5c0df --- /dev/null +++ b/scripts/static_benchmarks/finance_book1_book2_local_array_eval.sh @@ -0,0 +1,142 @@ +#!/bin/bash +#SBATCH --job-name=gemma_book1_book2_local_array +#SBATCH --output=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/gemma_book1_book2_local_array_%A_%a.out +#SBATCH --error=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/gemma_book1_book2_local_array_%A_%a.err +#SBATCH --time=24:00:00 +#SBATCH --cpus-per-task=8 +#SBATCH --mem=64G +#SBATCH --gres=gpu:a100:1 +#SBATCH --array=0-7%8 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source /projects/DeepLesion/py311_env/bin/activate + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow direct execution without sbatch by defaulting to shard 0. +: "${SLURM_ARRAY_TASK_ID:=0}" + +NUM_SHARDS=8 +ROOT_DIR="${ROOT_DIR:-/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/Finance_Book1_Book2}" + +export ROOT_DIR NUM_SHARDS SLURM_ARRAY_TASK_ID + +mapfile -t ASSIGNED_FILES < <( +python - <<'PY' +from pathlib import Path +import os + +root = Path(os.environ["ROOT_DIR"]) +num_shards = int(os.environ["NUM_SHARDS"]) +shard_id = int(os.environ["SLURM_ARRAY_TASK_ID"]) + +files = sorted(root.glob("**/tasks.json")) +for idx, path in enumerate(files): + if idx % num_shards == shard_id: + print(path) +PY +) + +TOTAL_FILES=$( +python - <<'PY' +from pathlib import Path +import os + +root = Path(os.environ["ROOT_DIR"]) +print(len(sorted(root.glob("**/tasks.json")))) +PY +) + +SHARD_FILES="${#ASSIGNED_FILES[@]}" +echo "ROOT_DIR=$ROOT_DIR TOTAL_FILES=$TOTAL_FILES NUM_SHARDS=$NUM_SHARDS SHARD=$SLURM_ARRAY_TASK_ID ASSIGNED_FILES=$SHARD_FILES" + +if [ "$SHARD_FILES" -eq 0 ]; then + echo "No tasks.json files assigned to shard ${SLURM_ARRAY_TASK_ID}. Exiting." + exit 0 +fi + +for TASKS_JSON in "${ASSIGNED_FILES[@]}"; do + export TASKS_JSON + + mapfile -t META < <( + python - <<'PY' +import json +import os +import re +from pathlib import Path + +path = Path(os.environ["TASKS_JSON"]) +payload = json.loads(path.read_text(encoding="utf-8")) +tasks = payload.get("tasks", []) +first = tasks[0] if tasks and isinstance(tasks[0], dict) else {} + +def clean(value: str, fallback: str) -> str: + value = str(value or "").strip() + if not value: + value = fallback + return value + +def slug(value: str, fallback: str) -> str: + value = clean(value, fallback) + return re.sub(r"[^a-zA-Z0-9]+", "_", value).strip("_").lower() or fallback + +area_dir = path.parent.parent.name +cap_dir = path.parent.name + +area_id = clean(first.get("area_id"), area_dir) +capability_id = clean(first.get("capability_id"), cap_dir) +area_name = clean(first.get("area_name"), area_id) +capability_name = clean(first.get("capability_name"), capability_id) +tag_suffix = slug(path.parent.relative_to(Path(os.environ["ROOT_DIR"])).as_posix(), capability_id) + +print(area_id) +print(capability_id) +print(area_name) +print(capability_name) +print(tag_suffix) +PY + ) + + AREA_ID="${META[0]}" + CAPABILITY_ID="${META[1]}" + AREA_NAME="${META[2]}" + CAPABILITY_NAME="${META[3]}" + TAG_SUFFIX="${META[4]}" + TAG="_FINANCE_BOOK1_BOOK2_GEMMA_3_${TAG_SUFFIX}" + + echo "Evaluating $TASKS_JSON" + echo " AREA_ID=$AREA_ID" + echo " CAPABILITY_ID=$CAPABILITY_ID" + echo " TAG=$TAG" + + # Stage 0_static: ingest one local tasks.json export using the local JSON adapter. + python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$TAG" \ + +static_benchmark_cfg.benchmark_id="$TASKS_JSON" \ + +static_benchmark_cfg.area_id="$AREA_ID" \ + +static_benchmark_cfg.capability_id="$CAPABILITY_ID" \ + +static_benchmark_cfg.capability_name="$CAPABILITY_NAME" \ + +static_benchmark_cfg.domain=finance + + # Stage 1_local: evaluate local subject model(s) from run_cfg.yaml. + python -m src.run_eval_pipeline \ + stage=1_local \ + validation_tag="$TAG" \ + eval_tag="$TAG" + + # Stage 2: aggregate scores for this tasks.json bundle. + python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$TAG" + + echo "Finished $TASKS_JSON" + echo " Stage 0_static datasets: base_output/test_exp/eval/datasets/$TAG" + echo " Stage 1_local results: base_output/test_exp/eval/results/$TAG" + echo " Stage 2 scores: base_output/test_exp/eval/scores/$TAG" +done diff --git a/scripts/static_benchmarks/finance_book3_book4_local_array_eval.sh b/scripts/static_benchmarks/finance_book3_book4_local_array_eval.sh new file mode 100644 index 0000000..af7ae9e --- /dev/null +++ b/scripts/static_benchmarks/finance_book3_book4_local_array_eval.sh @@ -0,0 +1,142 @@ +#!/bin/bash +#SBATCH --job-name=gemma_book3_book4_local_array +#SBATCH --output=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/gemma_book3_book4_local_array_%A_%a.out +#SBATCH --error=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/gemma_book3_book4_local_array_%A_%a.err +#SBATCH --time=24:00:00 +#SBATCH --cpus-per-task=8 +#SBATCH --mem=64G +#SBATCH --gres=gpu:a100:1 +#SBATCH --array=0-7%8 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source /projects/DeepLesion/py311_env/bin/activate + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow direct execution without sbatch by defaulting to shard 0. +: "${SLURM_ARRAY_TASK_ID:=0}" + +NUM_SHARDS=8 +ROOT_DIR="${ROOT_DIR:-/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/Finance_Book3_Book4}" + +export ROOT_DIR NUM_SHARDS SLURM_ARRAY_TASK_ID + +mapfile -t ASSIGNED_FILES < <( +python - <<'PY' +from pathlib import Path +import os + +root = Path(os.environ["ROOT_DIR"]) +num_shards = int(os.environ["NUM_SHARDS"]) +shard_id = int(os.environ["SLURM_ARRAY_TASK_ID"]) + +files = sorted(root.glob("**/tasks.json")) +for idx, path in enumerate(files): + if idx % num_shards == shard_id: + print(path) +PY +) + +TOTAL_FILES=$( +python - <<'PY' +from pathlib import Path +import os + +root = Path(os.environ["ROOT_DIR"]) +print(len(sorted(root.glob("**/tasks.json")))) +PY +) + +SHARD_FILES="${#ASSIGNED_FILES[@]}" +echo "ROOT_DIR=$ROOT_DIR TOTAL_FILES=$TOTAL_FILES NUM_SHARDS=$NUM_SHARDS SHARD=$SLURM_ARRAY_TASK_ID ASSIGNED_FILES=$SHARD_FILES" + +if [ "$SHARD_FILES" -eq 0 ]; then + echo "No tasks.json files assigned to shard ${SLURM_ARRAY_TASK_ID}. Exiting." + exit 0 +fi + +for TASKS_JSON in "${ASSIGNED_FILES[@]}"; do + export TASKS_JSON + + mapfile -t META < <( + python - <<'PY' +import json +import os +import re +from pathlib import Path + +path = Path(os.environ["TASKS_JSON"]) +payload = json.loads(path.read_text(encoding="utf-8")) +tasks = payload.get("tasks", []) +first = tasks[0] if tasks and isinstance(tasks[0], dict) else {} + +def clean(value: str, fallback: str) -> str: + value = str(value or "").strip() + if not value: + value = fallback + return value + +def slug(value: str, fallback: str) -> str: + value = clean(value, fallback) + return re.sub(r"[^a-zA-Z0-9]+", "_", value).strip("_").lower() or fallback + +area_dir = path.parent.parent.name +cap_dir = path.parent.name + +area_id = clean(first.get("area_id"), area_dir) +capability_id = clean(first.get("capability_id"), cap_dir) +area_name = clean(first.get("area_name"), area_id) +capability_name = clean(first.get("capability_name"), capability_id) +tag_suffix = slug(path.parent.relative_to(Path(os.environ["ROOT_DIR"])).as_posix(), capability_id) + +print(area_id) +print(capability_id) +print(area_name) +print(capability_name) +print(tag_suffix) +PY + ) + + AREA_ID="${META[0]}" + CAPABILITY_ID="${META[1]}" + AREA_NAME="${META[2]}" + CAPABILITY_NAME="${META[3]}" + TAG_SUFFIX="${META[4]}" + TAG="_FINANCE_BOOK3_BOOK4_GEMMA_3_${TAG_SUFFIX}" + + echo "Evaluating $TASKS_JSON" + echo " AREA_ID=$AREA_ID" + echo " CAPABILITY_ID=$CAPABILITY_ID" + echo " TAG=$TAG" + + # Stage 0_static: ingest one local tasks.json export using the local JSON adapter. + python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$TAG" \ + +static_benchmark_cfg.benchmark_id="$TASKS_JSON" \ + +static_benchmark_cfg.area_id="$AREA_ID" \ + +static_benchmark_cfg.capability_id="$CAPABILITY_ID" \ + +static_benchmark_cfg.capability_name="$CAPABILITY_NAME" \ + +static_benchmark_cfg.domain=finance + + # Stage 1_local: evaluate local subject model(s) from run_cfg.yaml. + python -m src.run_eval_pipeline \ + stage=1_local \ + validation_tag="$TAG" \ + eval_tag="$TAG" + + # Stage 2: aggregate scores for this tasks.json bundle. + python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$TAG" + + echo "Finished $TASKS_JSON" + echo " Stage 0_static datasets: base_output/test_exp/eval/datasets/$TAG" + echo " Stage 1_local results: base_output/test_exp/eval/results/$TAG" + echo " Stage 2 scores: base_output/test_exp/eval/scores/$TAG" +done diff --git a/scripts/static_benchmarks/finance_book5_book6_local_array_eval.sh b/scripts/static_benchmarks/finance_book5_book6_local_array_eval.sh new file mode 100644 index 0000000..3e2126e --- /dev/null +++ b/scripts/static_benchmarks/finance_book5_book6_local_array_eval.sh @@ -0,0 +1,142 @@ +#!/bin/bash +#SBATCH --job-name=gemma_book5_book6_local_array +#SBATCH --output=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/gemma_book5_book6_local_array_%A_%a.out +#SBATCH --error=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/gemma_book5_book6_local_array_%A_%a.err +#SBATCH --time=24:00:00 +#SBATCH --cpus-per-task=8 +#SBATCH --mem=64G +#SBATCH --gres=gpu:a100:1 +#SBATCH --array=0-7%8 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source /projects/DeepLesion/py311_env/bin/activate + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow direct execution without sbatch by defaulting to shard 0. +: "${SLURM_ARRAY_TASK_ID:=0}" + +NUM_SHARDS=8 +ROOT_DIR="${ROOT_DIR:-/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/Finance_Book5_Book6}" + +export ROOT_DIR NUM_SHARDS SLURM_ARRAY_TASK_ID + +mapfile -t ASSIGNED_FILES < <( +python - <<'PY' +from pathlib import Path +import os + +root = Path(os.environ["ROOT_DIR"]) +num_shards = int(os.environ["NUM_SHARDS"]) +shard_id = int(os.environ["SLURM_ARRAY_TASK_ID"]) + +files = sorted(root.glob("**/tasks.json")) +for idx, path in enumerate(files): + if idx % num_shards == shard_id: + print(path) +PY +) + +TOTAL_FILES=$( +python - <<'PY' +from pathlib import Path +import os + +root = Path(os.environ["ROOT_DIR"]) +print(len(sorted(root.glob("**/tasks.json")))) +PY +) + +SHARD_FILES="${#ASSIGNED_FILES[@]}" +echo "ROOT_DIR=$ROOT_DIR TOTAL_FILES=$TOTAL_FILES NUM_SHARDS=$NUM_SHARDS SHARD=$SLURM_ARRAY_TASK_ID ASSIGNED_FILES=$SHARD_FILES" + +if [ "$SHARD_FILES" -eq 0 ]; then + echo "No tasks.json files assigned to shard ${SLURM_ARRAY_TASK_ID}. Exiting." + exit 0 +fi + +for TASKS_JSON in "${ASSIGNED_FILES[@]}"; do + export TASKS_JSON + + mapfile -t META < <( + python - <<'PY' +import json +import os +import re +from pathlib import Path + +path = Path(os.environ["TASKS_JSON"]) +payload = json.loads(path.read_text(encoding="utf-8")) +tasks = payload.get("tasks", []) +first = tasks[0] if tasks and isinstance(tasks[0], dict) else {} + +def clean(value: str, fallback: str) -> str: + value = str(value or "").strip() + if not value: + value = fallback + return value + +def slug(value: str, fallback: str) -> str: + value = clean(value, fallback) + return re.sub(r"[^a-zA-Z0-9]+", "_", value).strip("_").lower() or fallback + +area_dir = path.parent.parent.name +cap_dir = path.parent.name + +area_id = clean(first.get("area_id"), area_dir) +capability_id = clean(first.get("capability_id"), cap_dir) +area_name = clean(first.get("area_name"), area_id) +capability_name = clean(first.get("capability_name"), capability_id) +tag_suffix = slug(path.parent.relative_to(Path(os.environ["ROOT_DIR"])).as_posix(), capability_id) + +print(area_id) +print(capability_id) +print(area_name) +print(capability_name) +print(tag_suffix) +PY + ) + + AREA_ID="${META[0]}" + CAPABILITY_ID="${META[1]}" + AREA_NAME="${META[2]}" + CAPABILITY_NAME="${META[3]}" + TAG_SUFFIX="${META[4]}" + TAG="_FINANCE_BOOK5_BOOK6_GEMMA_3_${TAG_SUFFIX}" + + echo "Evaluating $TASKS_JSON" + echo " AREA_ID=$AREA_ID" + echo " CAPABILITY_ID=$CAPABILITY_ID" + echo " TAG=$TAG" + + # Stage 0_static: ingest one local tasks.json export using the local JSON adapter. + python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$TAG" \ + +static_benchmark_cfg.benchmark_id="$TASKS_JSON" \ + +static_benchmark_cfg.area_id="$AREA_ID" \ + +static_benchmark_cfg.capability_id="$CAPABILITY_ID" \ + +static_benchmark_cfg.capability_name="$CAPABILITY_NAME" \ + +static_benchmark_cfg.domain=finance + + # Stage 1_local: evaluate local subject model(s) from run_cfg.yaml. + python -m src.run_eval_pipeline \ + stage=1_local \ + validation_tag="$TAG" \ + eval_tag="$TAG" + + # Stage 2: aggregate scores for this tasks.json bundle. + python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$TAG" + + echo "Finished $TASKS_JSON" + echo " Stage 0_static datasets: base_output/test_exp/eval/datasets/$TAG" + echo " Stage 1_local results: base_output/test_exp/eval/results/$TAG" + echo " Stage 2 scores: base_output/test_exp/eval/scores/$TAG" +done diff --git a/scripts/static_benchmarks/finance_math_eval.sh b/scripts/static_benchmarks/finance_math_eval.sh new file mode 100644 index 0000000..aaa8dd1 --- /dev/null +++ b/scripts/static_benchmarks/finance_math_eval.sh @@ -0,0 +1,73 @@ +#!/bin/bash +#SBATCH --job-name=finance_math_eval +#SBATCH --output=logs/finance_math_eval_%A_%a.out +#SBATCH --error=logs/finance_math_eval_%A_%a.err +#SBATCH --time=04:00:00 +#SBATCH --cpus-per-task=4 +#SBATCH --mem=16G +#SBATCH --array=0-60 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow running via sbatch (with SLURM_ARRAY_TASK_ID) or directly (defaults to 0). +: "${SLURM_ARRAY_TASK_ID:=0}" + +# FinanceMath validation has 121 non-table tasks after filtering. +CHUNK=50 +OFFSET=$((SLURM_ARRAY_TASK_ID * CHUNK)) +VALIDATION_TAG="_FINANCE_MATH_${SLURM_ARRAY_TASK_ID}_SundayNight" + +# Stage 0_static: build datasets from yale-nlp/FinanceMath (validation split only) +python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$VALIDATION_TAG" \ + +static_benchmark_cfg.benchmark_id=yale-nlp/FinanceMath \ + +static_benchmark_cfg.split=validation \ + +static_benchmark_cfg.offset="$OFFSET" \ + +static_benchmark_cfg.limit="$CHUNK" + +# Stage 1: run subject models on the static datasets +python -m src.run_eval_pipeline \ + stage=1 \ + validation_tag="$VALIDATION_TAG" \ + eval_tag="$VALIDATION_TAG" + +# Stage 2: aggregate scores +python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$VALIDATION_TAG" + +echo "Stage 0_static datasets: base_output/test_exp/eval/datasets/$VALIDATION_TAG" +echo "Stage 1 results (Inspect logs): base_output/test_exp/eval/results/$VALIDATION_TAG" +echo "Stage 2 scores: base_output/test_exp/eval/scores/$VALIDATION_TAG" + +# Optional: generate flattened JSONL views of Inspect logs for easier reading +RESULTS_DIR="base_output/test_exp/eval/results/$VALIDATION_TAG" +if [ -d "$RESULTS_DIR" ]; then + echo "Flattening Inspect logs under $RESULTS_DIR ..." + for model_dir in "$RESULTS_DIR"/*/; do + [ -d "$model_dir" ] || continue + model_name="$(basename "$model_dir")" + for area_dir in "$model_dir"*/; do + [ -d "$area_dir" ] || continue + for cap_dir in "$area_dir"*/; do + [ -d "$cap_dir" ] || continue + cap_name="$(basename "$cap_dir")" + log_file="$(ls "$cap_dir"/*_task_*.json 2>/dev/null | head -n 1 || true)" + if [ -n "$log_file" ]; then + out_file="$cap_dir/flat_${cap_name}.jsonl" + python scripts/flatten_inspect_logs.py \ + --log_path "$log_file" \ + --out_path "$out_file" + echo " Wrote flattened log for $model_name/$cap_name to $out_file" + fi + done + done + done +fi + diff --git a/scripts/static_benchmarks/finance_math_local_array_eval.sh b/scripts/static_benchmarks/finance_math_local_array_eval.sh new file mode 100644 index 0000000..7b57a56 --- /dev/null +++ b/scripts/static_benchmarks/finance_math_local_array_eval.sh @@ -0,0 +1,86 @@ +#!/bin/bash +#SBATCH --job-name=gemma_finance_math_local_array +#SBATCH --output=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/finance_math_local_array_%A_%a.out +#SBATCH --error=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/finance_math_local_array_%A_%a.err +#SBATCH --time=08:00:00 +#SBATCH --cpus-per-task=8 +#SBATCH --mem=64G +#SBATCH --gres=gpu:a100:1 +#SBATCH --array=0-7%8 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source /projects/DeepLesion/py311_env/bin/activate + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow direct execution without sbatch by defaulting to shard 0. +: "${SLURM_ARRAY_TASK_ID:=0}" + +NUM_SHARDS=8 + +# Count only rows that survive adapter filtering. +TOTAL=$( +python - <<'PY' +from datasets import load_dataset + +ds = load_dataset("yale-nlp/FinanceMath", split="validation") + +def is_valid(row): + question = str(row.get("question", "")).strip() + answer = row.get("ground_truth") + if answer is None: + answer_text = "" + elif isinstance(answer, dict): + for key in ("ground_truth", "value", "answer"): + if key in answer and answer[key] is not None: + answer_text = str(answer[key]).strip() + break + else: + answer_text = str(answer).strip() + else: + answer_text = str(answer).strip() + return bool(question and answer_text) + +print(sum(1 for row in ds if is_valid(row))) +PY +) + +CHUNK=$(((TOTAL + NUM_SHARDS - 1) / NUM_SHARDS)) +OFFSET=$((SLURM_ARRAY_TASK_ID * CHUNK)) +TAG="_FINANCE_MATH_VALIDATION_GEMMA_3" + +if [ "$OFFSET" -ge "$TOTAL" ]; then + echo "No work for shard ${SLURM_ARRAY_TASK_ID} (OFFSET=$OFFSET >= TOTAL=$TOTAL). Exiting." + exit 0 +fi + +echo "TOTAL=$TOTAL NUM_SHARDS=$NUM_SHARDS CHUNK=$CHUNK OFFSET=$OFFSET TAG=$TAG" + +# Stage 0_static: build dataset shard from FinanceMath validation split. +python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$TAG" \ + +static_benchmark_cfg.benchmark_id=yale-nlp/FinanceMath \ + +static_benchmark_cfg.split=validation \ + +static_benchmark_cfg.offset="$OFFSET" \ + +static_benchmark_cfg.limit="$CHUNK" + +# Stage 1_local: evaluate local subject model(s) from run_cfg.yaml. +python -m src.run_eval_pipeline \ + stage=1_local \ + validation_tag="$TAG" \ + eval_tag="$TAG" + +# Stage 2: aggregate per-shard scores. +python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$TAG" + +echo "Stage 0_static datasets: base_output/test_exp/eval/datasets/$TAG" +echo "Stage 1_local results: base_output/test_exp/eval/results/$TAG" +echo "Stage 2 scores: base_output/test_exp/eval/scores/$TAG" diff --git a/scripts/static_benchmarks/finance_tasks_eval.sh b/scripts/static_benchmarks/finance_tasks_eval.sh new file mode 100755 index 0000000..b00e5e8 --- /dev/null +++ b/scripts/static_benchmarks/finance_tasks_eval.sh @@ -0,0 +1,68 @@ +#!/bin/bash +#SBATCH --job-name=finance_tasks_eval +#SBATCH --output=logs/finance_tasks_eval_%j.out +#SBATCH --error=logs/finance_tasks_eval_%j.err +#SBATCH --time=04:00:00 +#SBATCH --cpus-per-task=4 +#SBATCH --mem=16G + +set -euo pipefail + +cd /fs01/projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +VALIDATION_TAG="_FINANCE_TASKS_$(date +%Y%m%d_%H%M%S)" + +# Stage 0_static: build datasets from local finance_tasks.json +python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$VALIDATION_TAG" \ + +static_benchmark_cfg.benchmark_id=finance_tasks.json \ + +static_benchmark_cfg.split=na \ + +static_benchmark_cfg.domain=finance \ + +static_benchmark_cfg.capability_id=finance_tasks \ + +static_benchmark_cfg.capability_name="Finance Tasks" + # +static_benchmark_cfg.limit=30 \ + +# Stage 1: run subject models on the static datasets +python -m src.run_eval_pipeline \ + stage=1 \ + validation_tag="$VALIDATION_TAG" \ + eval_tag="$VALIDATION_TAG" + +# Stage 2: aggregate scores +python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$VALIDATION_TAG" + +echo "Stage 0_static datasets: base_output/test_exp/eval/datasets/$VALIDATION_TAG" +echo "Stage 1 results (Inspect logs): base_output/test_exp/eval/results/$VALIDATION_TAG" +echo "Stage 2 scores: base_output/test_exp/eval/scores/$VALIDATION_TAG" + +# Optional: generate flattened JSONL views of Inspect logs for easier reading +RESULTS_DIR="base_output/test_exp/eval/results/$VALIDATION_TAG" +if [ -d "$RESULTS_DIR" ]; then + echo "Flattening Inspect logs under $RESULTS_DIR ..." + for model_dir in "$RESULTS_DIR"/*/; do + [ -d "$model_dir" ] || continue + model_name="$(basename "$model_dir")" + for area_dir in "$model_dir"*/; do + [ -d "$area_dir" ] || continue + for cap_dir in "$area_dir"*/; do + [ -d "$cap_dir" ] || continue + cap_name="$(basename "$cap_dir")" + log_file="$(ls "$cap_dir"/*_task_*.json 2>/dev/null | head -n 1 || true)" + if [ -n "$log_file" ]; then + out_file="$cap_dir/flat_${cap_name}.jsonl" + python scripts/flatten_inspect_logs.py \ + --log_path "$log_file" \ + --out_path "$out_file" + echo " Wrote flattened log for $model_name/$cap_name to $out_file" + fi + done + done + done +fi + diff --git a/scripts/static_benchmarks/run_topic_classification_qwen3_32b.sh b/scripts/static_benchmarks/run_topic_classification_qwen3_32b.sh new file mode 100644 index 0000000..0891167 --- /dev/null +++ b/scripts/static_benchmarks/run_topic_classification_qwen3_32b.sh @@ -0,0 +1,39 @@ +#!/bin/bash +#SBATCH --job-name=topic_cls_qwen3_32b +#SBATCH --output=logs/topic_cls_qwen3_32b_%j.out +#SBATCH --error=logs/topic_cls_qwen3_32b_%j.err +#SBATCH --time=12:00:00 +#SBATCH --cpus-per-task=8 +#SBATCH --mem=64G +#SBATCH --gres=gpu:a100:1 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source /projects/DeepLesion/py311_env/bin/activate + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Example defaults: classify XFinBench test split. +BENCHMARK_ID="${BENCHMARK_ID:-Zhihan/XFinBench}" +SPLIT="${SPLIT:-test}" +OUTPUT_JSONL="${OUTPUT_JSONL:-base_output/topic_classification/xfinbench_qwen3_32b.jsonl}" +MODEL_PATH="${MODEL_PATH:-/model-weights/Qwen3-32B}" +BATCH_SIZE="${BATCH_SIZE:-16}" +MAX_TOKENS="${MAX_TOKENS:-32}" + +python scripts/static_benchmarks/classify_static_benchmark_topics_vllm.py \ + --benchmark-id "$BENCHMARK_ID" \ + --split "$SPLIT" \ + --topic-csv "topic.csv" \ + --output-jsonl "$OUTPUT_JSONL" \ + --resume \ + --model-path "$MODEL_PATH" \ + --batch-size "$BATCH_SIZE" \ + --max-tokens "$MAX_TOKENS" \ + --trust-remote-code + +echo "Saved classifications to $OUTPUT_JSONL" diff --git a/scripts/static_benchmarks/submit_all_static_benchmarks.sh b/scripts/static_benchmarks/submit_all_static_benchmarks.sh new file mode 100755 index 0000000..328d7da --- /dev/null +++ b/scripts/static_benchmarks/submit_all_static_benchmarks.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +set -euo pipefail + +cd /fs01/projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# Ensure scripts are executable +chmod +x scripts/static_benchmarks/*_eval.sh || true + +sbatch scripts/static_benchmarks/finance_math_eval.sh +sbatch scripts/static_benchmarks/finance_tasks_eval.sh +sbatch scripts/static_benchmarks/xfinbench_eval.sh +sbatch scripts/static_benchmarks/bizbench_eval.sh diff --git a/scripts/static_benchmarks/xfinbench_test_eval.sh b/scripts/static_benchmarks/xfinbench_test_eval.sh new file mode 100755 index 0000000..3cbc810 --- /dev/null +++ b/scripts/static_benchmarks/xfinbench_test_eval.sh @@ -0,0 +1,76 @@ +#!/bin/bash +#SBATCH --job-name=xfinbench_test_eval +#SBATCH --output=logs/xfinbench_test_eval_%A_%a.out +#SBATCH --error=logs/xfinbench_test_eval_%A_%a.err +#SBATCH --time=08:00:00 +#SBATCH --cpus-per-task=4 +#SBATCH --mem=16G +#SBATCH --array=0-9 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow running via sbatch (with SLURM_ARRAY_TASK_ID) or directly (defaults to 0). +: "${SLURM_ARRAY_TASK_ID:=0}" + +# 10 chunks over ~2828 filtered test examples → ~300 per chunk +CHUNK=300 +OFFSET=$((SLURM_ARRAY_TASK_ID * CHUNK)) +VALIDATION_TAG="_XFINBENCH_TEST_${SLURM_ARRAY_TASK_ID}_SundayNight" + +# Stage 0_static: build datasets from Zhihan/XFinBench (test split, CSV-backed HF repo) +python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$VALIDATION_TAG" \ + +static_benchmark_cfg.benchmark_id=Zhihan/XFinBench \ + +static_benchmark_cfg.split=test \ + +static_benchmark_cfg.offset="$OFFSET" \ + +static_benchmark_cfg.limit="$CHUNK" \ + +static_benchmark_cfg.domain=finance \ + +static_benchmark_cfg.capability_id=xfinbench_test \ + +static_benchmark_cfg.capability_name="XFinBench Test" + +# Stage 1: run subject models on the static datasets +python -m src.run_eval_pipeline \ + stage=1 \ + validation_tag="$VALIDATION_TAG" \ + eval_tag="$VALIDATION_TAG" + +# Stage 2: aggregate scores +python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$VALIDATION_TAG" + +echo "Stage 0_static datasets (test split): base_output/test_exp/eval/datasets/$VALIDATION_TAG" +echo "Stage 1 results (Inspect logs): base_output/test_exp/eval/results/$VALIDATION_TAG" +echo "Stage 2 scores: base_output/test_exp/eval/scores/$VALIDATION_TAG" + +# Optional: generate flattened JSONL views of Inspect logs for easier reading +RESULTS_DIR="base_output/test_exp/eval/results/$VALIDATION_TAG" +if [ -d "$RESULTS_DIR" ]; then + echo "Flattening Inspect logs under $RESULTS_DIR ..." + for model_dir in "$RESULTS_DIR"/*/; do + [ -d "$model_dir" ] || continue + model_name="$(basename "$model_dir")" + for area_dir in "$model_dir"*/; do + [ -d "$area_dir" ] || continue + for cap_dir in "$area_dir"*/; do + [ -d "$cap_dir" ] || continue + cap_name="$(basename "$cap_dir")" + log_file="$(ls "$cap_dir"/*_task_*.json 2>/dev/null | head -n 1 || true)" + if [ -n "$log_file" ]; then + out_file="$cap_dir/flat_${cap_name}.jsonl" + python scripts/flatten_inspect_logs.py \ + --log_path "$log_file" \ + --out_path "$out_file" + echo " Wrote flattened log for $model_name/$cap_name to $out_file" + fi + done + done + done +fi + diff --git a/scripts/static_benchmarks/xfinbench_test_local_array_eval.sh b/scripts/static_benchmarks/xfinbench_test_local_array_eval.sh new file mode 100644 index 0000000..7cc9edf --- /dev/null +++ b/scripts/static_benchmarks/xfinbench_test_local_array_eval.sh @@ -0,0 +1,76 @@ +#!/bin/bash +#SBATCH --job-name=gemma_xfinbench_test_local_array +#SBATCH --output=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/xfinbench_test_local_array_%A_%a.out +#SBATCH --error=/projects/DeepLesion/projects/new_ace/automated_capability_evaluation/logs/xfinbench_test_local_array_%A_%a.err +#SBATCH --time=24:00:00 +#SBATCH --cpus-per-task=8 +#SBATCH --mem=64G +#SBATCH --gres=gpu:a100:1 +#SBATCH --array=0-7%8 + +set -euo pipefail + +cd /projects/DeepLesion/projects/new_ace/automated_capability_evaluation + +# shellcheck disable=SC1091 +source /projects/DeepLesion/py311_env/bin/activate + +# shellcheck disable=SC1091 +source "scripts/static_benchmarks/env_slurm_inspect.sh" + +# Allow direct execution without sbatch by defaulting to shard 0. +: "${SLURM_ARRAY_TASK_ID:=0}" + +NUM_SHARDS=8 + +# Count only text-only XFinBench test rows since the adapter skips rows with figures. +TOTAL=$( +python - <<'PY' +from datasets import load_dataset + +ds = load_dataset( + "Zhihan/XFinBench", + data_files={"validation": "validation_set.csv", "test": "test_set.csv"}, +)["test"] + +print(sum(1 for row in ds if row.get("figure") is None)) +PY +) + +CHUNK=$(((TOTAL + NUM_SHARDS - 1) / NUM_SHARDS)) +OFFSET=$((SLURM_ARRAY_TASK_ID * CHUNK)) +TAG="_XFINBENCH_TEST_GEMMA_3" + +if [ "$OFFSET" -ge "$TOTAL" ]; then + echo "No work for shard ${SLURM_ARRAY_TASK_ID} (OFFSET=$OFFSET >= TOTAL=$TOTAL). Exiting." + exit 0 +fi + +echo "TOTAL=$TOTAL NUM_SHARDS=$NUM_SHARDS CHUNK=$CHUNK OFFSET=$OFFSET TAG=$TAG" + +# Stage 0_static: build dataset shard from the XFinBench test split. +python -m src.run_eval_pipeline \ + stage=0_static \ + validation_tag="$TAG" \ + +static_benchmark_cfg.benchmark_id=Zhihan/XFinBench \ + +static_benchmark_cfg.split=test \ + +static_benchmark_cfg.offset="$OFFSET" \ + +static_benchmark_cfg.limit="$CHUNK" \ + +static_benchmark_cfg.domain=finance \ + +static_benchmark_cfg.capability_id=xfinbench_test \ + +static_benchmark_cfg.capability_name=XFinBenchTest + +# Stage 1_local: evaluate local subject model(s) from run_cfg.yaml. +python -m src.run_eval_pipeline \ + stage=1_local \ + validation_tag="$TAG" \ + eval_tag="$TAG" + +# Stage 2: aggregate per-shard scores. +python -m src.run_eval_pipeline \ + stage=2 \ + eval_tag="$TAG" + +echo "Stage 0_static datasets: base_output/test_exp/eval/datasets/$TAG" +echo "Stage 1_local results: base_output/test_exp/eval/results/$TAG" +echo "Stage 2 scores: base_output/test_exp/eval/scores/$TAG" diff --git a/src/cfg/run_cfg.yaml b/src/cfg/run_cfg.yaml index d7de581..033fcb7 100644 --- a/src/cfg/run_cfg.yaml +++ b/src/cfg/run_cfg.yaml @@ -78,15 +78,83 @@ task_generation_cfg: eval_cfg: # LLMs to evaluate (required) subject_llms: - - name: gpt-4o - provider: openai - - name: claude-3-sonnet - provider: anthropic + - name: gemma-3-12b-it + provider: hf_local + model_path: /model-weights/gemma-3-12b-it + inference_backend: vllm + trust_remote_code: true + gpu_memory_utilization: 0.9 + tensor_parallel_size: 1 + batch_size: 8 + generation_cfg: + temperature: 0.0 + max_tokens: 8192 + - name: gemma-3-27b-it + provider: hf_local + model_path: /model-weights/gemma-3-27b-it + inference_backend: vllm + trust_remote_code: true + gpu_memory_utilization: 0.9 + tensor_parallel_size: 1 + batch_size: 4 + generation_cfg: + temperature: 0.0 + max_tokens: 8192 + - name: qwen-3-32b + provider: hf_local + model_path: /model-weights/Qwen3-32B/ + inference_backend: vllm + trust_remote_code: true + gpu_memory_utilization: 0.9 + tensor_parallel_size: 1 + batch_size: 4 + generation_cfg: + temperature: 0.0 + max_tokens: 8192 + - name: qwen-3-8b + provider: hf_local + model_path: /model-weights/Qwen3-32B/ + inference_backend: vllm + trust_remote_code: true + gpu_memory_utilization: 0.9 + tensor_parallel_size: 1 + batch_size: 8 + generation_cfg: + temperature: 0.0 + max_tokens: 8192 + - name: deepseek-r1-distill-qwen-32b + provider: hf_local + model_path: /projects/DeepLesion/model_weights/deepseek-r1-distill-qwen-32b + inference_backend: vllm + trust_remote_code: true + gpu_memory_utilization: 0.9 + tensor_parallel_size: 1 + batch_size: 4 + generation_cfg: + temperature: 0.0 + max_tokens: 8192 + - name: deepseek-r1-distill-qwen-14b + provider: hf_local + model_path: /projects/DeepLesion/model_weights/deepseek-r1-distill-qwen-14b + inference_backend: vllm + trust_remote_code: true + gpu_memory_utilization: 0.9 + tensor_parallel_size: 1 + batch_size: 8 + generation_cfg: + temperature: 0.0 + max_tokens: 8192 # Judge LLM for scoring (required) judge_llm: name: gpt-4o-mini provider: openai + base_url: https://api.openai.com/v1 + # For API judges, we interpret batch_size as "max concurrent async requests". + batch_size: 24 + generation_cfg: + max_tokens: 8 + temperature: 0.0 # ============================================================================= # HYDRA diff --git a/src/eval_stages/__init__.py b/src/eval_stages/__init__.py index ff7eaa4..5d22d8c 100644 --- a/src/eval_stages/__init__.py +++ b/src/eval_stages/__init__.py @@ -6,13 +6,17 @@ """ from src.eval_stages.stage0_setup_and_dataset import EvalSetupError, run_eval_stage0 +from src.eval_stages.stage0_static_benchmarks import run_eval_stage0_static from src.eval_stages.stage1_eval_execution import run_eval_stage1 +from src.eval_stages.stage1_local_eval_execution import run_eval_stage1_local from src.eval_stages.stage2_score_aggregation import run_eval_stage2 __all__ = [ "run_eval_stage0", + "run_eval_stage0_static", "run_eval_stage1", + "run_eval_stage1_local", "run_eval_stage2", "EvalSetupError", ] diff --git a/src/eval_stages/stage0_static_benchmarks.py b/src/eval_stages/stage0_static_benchmarks.py new file mode 100644 index 0000000..be74f99 --- /dev/null +++ b/src/eval_stages/stage0_static_benchmarks.py @@ -0,0 +1,157 @@ +"""Eval Stage 0_static: Static benchmark ingestion. + +This stage lets you reuse Eval Stages 1 and 2 on external/static benchmarks +(e.g., Hugging Face datasets) that do not originate from this repo's +generation/validation pipeline. + +It converts a benchmark-specific schema into the pipeline's EvalDataset JSON +format and writes outputs under: + + //eval/datasets// + +so that Stage 1 can run unchanged (it only needs eval_config.json plus one or +more dataset.json files). +""" + +from __future__ import annotations + +import logging +import re +from pathlib import Path +from typing import Any, Dict, List + +from omegaconf import DictConfig, OmegaConf + +from src.eval_stages.static_benchmarks.finance_math import ( + build_eval_datasets_from_finance_math, +) +from src.eval_stages.static_benchmarks.bizbench import ( + build_eval_datasets_from_bizbench, +) +from src.eval_stages.static_benchmarks.finance_tasks import ( + build_eval_datasets_from_finance_tasks, +) +from src.eval_stages.static_benchmarks.xfinbench import ( + build_eval_datasets_from_xfinbench, +) +from src.eval_stages.static_benchmarks.specs import StaticBenchmarkSpec +from src.schemas.eval_io_utils import save_eval_config, save_eval_dataset +from src.schemas.eval_schemas import EvalConfig, EvalDataset +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils.timestamp_utils import iso_timestamp + + +logger = logging.getLogger(__name__) + + +def _slugify(text: str) -> str: + """Convert arbitrary strings into safe directory-friendly IDs.""" + cleaned = re.sub(r"[^a-zA-Z0-9]+", "_", text.strip()).strip("_").lower() + return cleaned or "unknown" + + +def _build_datasets_from_spec(spec: StaticBenchmarkSpec) -> List[EvalDataset]: + """Dispatch to the appropriate adapter based on benchmark_id. + + Returns a list of EvalDataset objects so that one static benchmark can + produce multiple capabilities if desired. + """ + bid = spec.benchmark_id.strip() + if bid in {"yale-nlp/FinanceMath", "FinanceMath", "finance_math"}: + return build_eval_datasets_from_finance_math(spec) + if bid in {"kensho/bizbench", "BizBench", "bizbench"}: + return build_eval_datasets_from_bizbench(spec) + if bid in {"Zhihan/XFinBench", "XFinBench", "xfinbench"}: + return build_eval_datasets_from_xfinbench(spec) + if bid in { + "finance_tasks", + "FinanceTasks", + "finance_tasks.json", + "local_finance_tasks", + } or bid.endswith(".json"): + # If a user points benchmark_id to a local JSON path, we ingest it here. + # This is intentionally permissive for local workflows. + return build_eval_datasets_from_finance_tasks(spec) + raise ValueError(f"Unknown static benchmark_id: {spec.benchmark_id}") + + +def run_eval_stage0_static(cfg: DictConfig, validation_tag: str) -> None: + """Prepare eval datasets/config from a static benchmark.""" + exp_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + experiment_dir = output_base_dir / exp_id + eval_cfg: Dict[str, Any] = cfg.get("eval_cfg", {}) + + static_cfg: Dict[str, Any] = cfg.get("static_benchmark_cfg", {}) + benchmark_id = static_cfg.get("benchmark_id") + if not benchmark_id: + raise ValueError( + "static_benchmark_cfg.benchmark_id is required for stage=0_static " + "(e.g. static_benchmark_cfg.benchmark_id=HuggingFaceH4/MATH-500)" + ) + + spec = StaticBenchmarkSpec( + benchmark_id=str(benchmark_id), + split=str(static_cfg.get("split", "test")), + limit=static_cfg.get("limit"), + offset=static_cfg.get("offset"), + area_id=str(static_cfg.get("area_id", StaticBenchmarkSpec.area_id)), + capability_id=static_cfg.get("capability_id"), + capability_name=static_cfg.get("capability_name"), + domain=str(static_cfg.get("domain", StaticBenchmarkSpec.domain)), + exclude_bloom_create=static_cfg.get("exclude_bloom_create", True), + finknow_only=static_cfg.get("finknow_only", True), + ) + + logger.info( + "Eval Stage 0_static: exp_id=%s | benchmark_id=%s | split=%s | limit=%s | validation_tag=%s", + exp_id, + spec.benchmark_id, + spec.split, + spec.limit, + validation_tag, + ) + + datasets = _build_datasets_from_spec(spec) + total_tasks = sum(d.num_tasks for d in datasets) + if total_tasks == 0: + raise ValueError(f"No tasks created for benchmark: {spec.benchmark_id}") + + datasets_dir = experiment_dir / "eval" / "datasets" / validation_tag + for dataset in datasets: + dataset_path = ( + datasets_dir / dataset.area_id / dataset.capability_id / "dataset.json" + ) + save_eval_dataset(dataset, dataset_path) + logger.info( + "Wrote dataset.json with %d tasks to %s", + dataset.num_tasks, + dataset_path, + ) + + # Convert Hydra containers to plain Python types for JSON serialization. + subject_llms_cfg = eval_cfg.get("subject_llms") + judge_llm_cfg = eval_cfg.get("judge_llm") + + subject_llms = OmegaConf.to_container(subject_llms_cfg, resolve=True) if subject_llms_cfg is not None else [] + judge_llm = OmegaConf.to_container(judge_llm_cfg, resolve=True) if judge_llm_cfg is not None else {} + + eval_config = EvalConfig( + experiment_id=exp_id, + eval_tag="", + subject_llms=subject_llms, + judge_llm=judge_llm, + validation_tag=validation_tag, + ) + metadata = PipelineMetadata( + experiment_id=exp_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=spec.benchmark_id, + output_stage_tag=None, + resume=False, + ) + eval_config_path = datasets_dir / "eval_config.json" + save_eval_config(eval_config, metadata, eval_config_path) + logger.info("Wrote eval_config.json to %s", eval_config_path) + diff --git a/src/eval_stages/stage1_eval_execution.py b/src/eval_stages/stage1_eval_execution.py index a2aa08a..0ac5e35 100644 --- a/src/eval_stages/stage1_eval_execution.py +++ b/src/eval_stages/stage1_eval_execution.py @@ -7,17 +7,21 @@ """ import logging +import os +import time from pathlib import Path -from typing import Dict, List, Optional, Set +from typing import Any, Dict, List, Optional, Set, Union from inspect_ai import Task from inspect_ai import eval as inspect_eval from inspect_ai import eval_retry as inspect_eval_retry from inspect_ai.dataset import MemoryDataset, Sample from inspect_ai.log import read_eval_log +from inspect_ai.model import Model as InspectModel +from inspect_ai.model import get_model from inspect_ai.scorer import model_graded_fact from inspect_ai.solver import generate -from omegaconf import DictConfig +from omegaconf import DictConfig, OmegaConf from src.schemas.eval_io_utils import ( load_eval_config, @@ -26,12 +30,32 @@ ) from src.schemas.eval_schemas import EvalDataset from src.schemas.metadata_schemas import PipelineMetadata +from src.utils.constants import DEFAULT_OPENAI_BASE_URL from src.utils.timestamp_utils import iso_timestamp, timestamp_tag logger = logging.getLogger(__name__) +def _inspect_judge_model(judge_llm: Dict[str, Any]) -> Union[str, InspectModel]: + """Resolve judge for Inspect so it can use a different API base than the subject.""" + provider = str(judge_llm.get("provider", "openai")) + name = str(judge_llm["name"]) + uri = f"{provider}/{name}" + if provider == "openai": + base_url = judge_llm.get("base_url") or os.environ.get( + "OPENAI_JUDGE_BASE_URL" + ) or DEFAULT_OPENAI_BASE_URL + api_key = os.environ.get("OPENAI_API_KEY") + logger.info( + "Inspect judge model: %s (base_url=%s)", + uri, + base_url, + ) + return get_model(uri, base_url=base_url, api_key=api_key) + return uri + + def _find_datasets(datasets_dir: Path) -> List[Path]: """Return all Stage 0 dataset files.""" if not datasets_dir.exists(): @@ -150,7 +174,7 @@ def _find_retry_log( def _create_inspect_task( dataset: EvalDataset, - judge_model: str, + judge_model: Union[str, InspectModel], ) -> "Task": """Build an Inspect task for one capability dataset.""" # Create Inspect samples from our dataset @@ -177,39 +201,93 @@ def _create_inspect_task( def _run_inspect_eval( dataset: EvalDataset, subject_llm: str, - judge_llm: Dict[str, str], + judge_llm: Dict[str, Any], + subject_llm_config: Optional[Dict[str, Any]], output_dir: Path, + *, + max_attempts: int = 3, + max_tasks: Optional[int] = None, ) -> bool: - """Run a fresh Inspect eval for one capability/LLM pair.""" - # Format model names for Inspect (provider/model) - judge_model = f"{judge_llm['provider']}/{judge_llm['name']}" + """Run an Inspect eval for one capability/LLM pair with auto-retry. + + Why retry: providers occasionally drop connections mid-run (e.g. httpx + RemoteProtocolError: server disconnected without sending a response). When + that happens, Inspect often leaves a partial log that can be resumed via + `inspect_eval_retry`. + """ + judge_model = _inspect_judge_model(judge_llm) + + subject_base_url: Optional[str] = None + if subject_llm_config: + subject_base_url = subject_llm_config.get("base_url") or os.environ.get( + "OPENAI_SUBJECT_BASE_URL" + ) + if subject_base_url: + logger.info( + "Inspect subject model: %s (model_base_url=%s)", + subject_llm, + subject_base_url, + ) - try: - # Create Inspect task - task = _create_inspect_task(dataset, judge_model) + expected_task_ids = {str(task["id"]) for task in dataset.tasks} - # Run evaluation - # Inspect saves logs to the specified directory - output_dir.mkdir(parents=True, exist_ok=True) + for attempt in range(1, max_attempts + 1): + try: + # Create Inspect task + task = _create_inspect_task(dataset, judge_model) - inspect_eval( - task, - model=subject_llm, - log_dir=str(output_dir), - log_format="json", - ) + # Run evaluation + # Inspect saves logs to the specified directory + output_dir.mkdir(parents=True, exist_ok=True) + + inspect_kwargs: Dict[str, Any] = { + "tasks": task, + "model": subject_llm, + "log_dir": str(output_dir), + "log_format": "json", + "max_tasks": max_tasks, + } + if subject_base_url: + inspect_kwargs["model_base_url"] = subject_base_url + inspect_eval(**inspect_kwargs) + + return True + + except Exception as e: + logger.warning( + "Inspect eval attempt %d/%d failed for %s/%s with %s: %s", + attempt, + max_attempts, + dataset.area_id, + dataset.capability_id, + subject_llm, + e, + ) - return True + # Try to resume from a partial log if present. + retry_log = _find_retry_log(output_dir, expected_task_ids) + if retry_log is not None: + logger.info( + "Attempting inspect_eval_retry from partial log: %s", + retry_log.name, + ) + if _run_inspect_retry(retry_log_path=retry_log, output_dir=output_dir): + return True - except Exception as e: - logger.error( - "Inspect evaluation failed for %s/%s with %s: %s", - dataset.area_id, - dataset.capability_id, - subject_llm, - e, - ) - return False + if attempt < max_attempts: + sleep_s = min(2**attempt, 30) + logger.info("Retrying after %ds...", sleep_s) + time.sleep(sleep_s) + continue + + logger.error( + "Inspect evaluation ultimately failed for %s/%s with %s after %d attempts", + dataset.area_id, + dataset.capability_id, + subject_llm, + max_attempts, + ) + return False def _run_inspect_retry( @@ -292,6 +370,12 @@ def run_eval_stage1( # Run evaluations subject_llms = eval_config.subject_llms judge_llm = eval_config.judge_llm + # Concurrency for Inspect eval. Higher is faster for remote endpoints. + inspect_max_tasks = None + try: + inspect_max_tasks = int(cfg.get("eval_cfg", {}).get("inspect_max_tasks")) + except Exception: + inspect_max_tasks = None num_completed_this_run = 0 num_skipped_completed = 0 @@ -353,11 +437,24 @@ def run_eval_stage1( subject_model, ) + judge_plain = ( + OmegaConf.to_container(judge_llm, resolve=True) + if OmegaConf.is_config(judge_llm) + else dict(judge_llm) + ) + subject_plain = ( + OmegaConf.to_container(llm_config, resolve=True) + if OmegaConf.is_config(llm_config) + else dict(llm_config) + ) success = _run_inspect_eval( dataset=dataset, subject_llm=subject_model, - judge_llm=judge_llm, + judge_llm=judge_plain, + subject_llm_config=subject_plain, output_dir=output_dir, + max_attempts=int(cfg.get("eval_cfg", {}).get("max_attempts", 3)), + max_tasks=inspect_max_tasks, ) if success: diff --git a/src/eval_stages/stage1_local_eval_execution.py b/src/eval_stages/stage1_local_eval_execution.py new file mode 100644 index 0000000..4d7b81e --- /dev/null +++ b/src/eval_stages/stage1_local_eval_execution.py @@ -0,0 +1,823 @@ +"""Eval Stage 1_local: direct evaluation without Inspect. + +This stage runs subject models directly, including local HuggingFace models +loaded from disk via `provider: hf_local` using vLLM. Each response is judged +and written to the final `flat_.jsonl` output expected by downstream +workflows. +""" + +from __future__ import annotations + +import json +import logging +import os +import gc +import asyncio +import time +from pathlib import Path +from typing import Any, Dict, Iterator, List, Optional, Set, Tuple + +from omegaconf import DictConfig +import torch +from tqdm.auto import tqdm +import re + +from src.model import Model +from src.schemas.eval_io_utils import load_eval_config, load_eval_dataset, save_eval_config +from src.schemas.eval_schemas import EvalDataset +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils.inspect_eval_utils import LLM_JUDGE_PROMPT, parse_submission +from src.utils.timestamp_utils import iso_timestamp, timestamp_tag + +logger = logging.getLogger(__name__) + + +def _find_datasets(datasets_dir: Path) -> List[Path]: + """Return all Stage 0 dataset files.""" + if not datasets_dir.exists(): + return [] + return sorted(datasets_dir.rglob("dataset.json")) + + +def _flat_result_path(output_dir: Path, capability_id: str) -> Path: + return output_dir / f"flat_{capability_id}.jsonl" + + +def _read_flat_rows(flat_path: Path) -> List[Dict[str, Any]]: + """Read non-summary rows from a flat jsonl file.""" + if not flat_path.exists(): + return [] + + rows: List[Dict[str, Any]] = [] + with open(flat_path, "r", encoding="utf-8") as f: + # Skip summary line + try: + next(f) + except StopIteration: + return [] + for line in f: + if line.strip(): + rows.append(json.loads(line)) + return rows + + +def _check_flat_completed(flat_path: Path, expected_task_ids: Set[str]) -> bool: + """Return True if flat file has exactly the expected task IDs.""" + if not flat_path.exists() or not expected_task_ids: + return False + rows = _read_flat_rows(flat_path) + row_ids = {str(row.get("id", "")) for row in rows if row.get("id") is not None} + return row_ids == expected_task_ids + + +def _write_flat_results(output_path: Path, rows: List[Dict[str, Any]]) -> None: + """Write rows in the same schema as flatten_inspect_logs.py.""" + output_path.parent.mkdir(parents=True, exist_ok=True) + + num_samples = len(rows) + num_correct = sum(1 for row in rows if row.get("grade") == "C") + num_incorrect = sum(1 for row in rows if row.get("grade") == "I") + accuracy = (num_correct / num_samples) if num_samples else 0.0 + with open(output_path, "w", encoding="utf-8") as f: + summary = { + "summary": True, + "num_samples": num_samples, + "num_correct": num_correct, + "num_incorrect": num_incorrect, + "accuracy": accuracy, + } + f.write(json.dumps(summary, ensure_ascii=False) + "\n") + for row in rows: + f.write(json.dumps(row, ensure_ascii=False) + "\n") + + +def _format_prompt(dataset: EvalDataset, task: Dict[str, str]) -> str: + """Render the Stage 0 prompt template for a task.""" + template = dataset.prompt_template or "{input}" + try: + prompt = template.format(input=task["input"]) + except Exception: # noqa: BLE001 + prompt = str(task["input"]) + + answer_instruction = ( + "\n\nReason briefly. Stop immediately after the final answer line.\n\n" + "On the last line, return your answer in machine-readable form as " + "`ANSWER: `. If this is multiple-choice, return only the option letter " + "(e.g., `ANSWER: B`)." + ) + return prompt + answer_instruction + + +def _build_model(model_config: Dict[str, Any]) -> Model: + """Instantiate a repo Model from eval subject/judge config.""" + model_kwargs = { + key: value + for key, value in model_config.items() + if key not in {"name", "provider", "generation_cfg"} + } + return Model( + model_name=str(model_config["name"]), + model_provider=str(model_config.get("provider", "openai")), + **model_kwargs, + ) + + +def _is_hf_local_provider(provider: str) -> bool: + """Return True for direct HuggingFace local model providers.""" + return provider in {"hf_local", "local_hf", "transformers"} + + +def _uses_vllm_backend(model_config: Dict[str, Any]) -> bool: + """Return True when a local HF model should run via vLLM. + + All local HF models use vLLM exclusively. + """ + return _is_hf_local_provider(str(model_config.get("provider", ""))) + + +def _build_messages(sys_prompt: str, user_prompt: str) -> List[Dict[str, str]]: + """Build chat-style messages for subject generation.""" + messages: List[Dict[str, str]] = [] + if sys_prompt.strip(): + messages.append({"role": "system", "content": sys_prompt}) + messages.append({"role": "user", "content": user_prompt}) + return messages + + +def _render_text_prompt(tokenizer: Any, *, sys_prompt: str, user_prompt: str) -> str: + """Render a text prompt, using chat templates when available.""" + messages = _build_messages(sys_prompt, user_prompt) + if tokenizer is not None and hasattr(tokenizer, "apply_chat_template"): + return tokenizer.apply_chat_template( + messages, + tokenize=False, + add_generation_prompt=True, + ) + if sys_prompt.strip(): + return f"{sys_prompt.strip()}\n\n{user_prompt}".strip() + return user_prompt + + +def _load_vllm_model(model_config: Dict[str, Any]) -> Any: + """Load a local vLLM engine from disk.""" + try: + from transformers import PreTrainedTokenizerBase + except Exception as exc: # noqa: BLE001 + raise RuntimeError( + "transformers is required for inference_backend=vllm in stage=1_local" + ) from exc + + # vLLM 0.8.x still expects this tokenizer property, but it is missing in + # newer transformers builds used by Qwen tokenizers in this environment. + if not hasattr(PreTrainedTokenizerBase, "all_special_tokens_extended"): + PreTrainedTokenizerBase.all_special_tokens_extended = property( # type: ignore[attr-defined] + lambda self: list(self.all_special_tokens) + ) + + # vLLM can crash when CUDA is initialized from forked worker processes. + # Default to the safer spawn mode unless the user explicitly overrides it. + os.environ.setdefault("VLLM_WORKER_MULTIPROC_METHOD", "spawn") + + try: + from vllm import LLM + except Exception as exc: # noqa: BLE001 + raise RuntimeError( + "vllm is required for inference_backend=vllm in stage=1_local" + ) from exc + + model_path = model_config.get("model_path") + if not model_path: + raise ValueError( + "inference_backend=vllm requires `model_path` in subject_llms config" + ) + + llm_kwargs: Dict[str, Any] = { + "model": model_path, + "tokenizer": model_path, + "trust_remote_code": bool(model_config.get("trust_remote_code", True)), + "tensor_parallel_size": int(model_config.get("tensor_parallel_size", 1)), + "gpu_memory_utilization": float( + model_config.get("gpu_memory_utilization", 0.9) + ), + "dtype": model_config.get("dtype", "auto"), + } + + if "max_model_len" in model_config: + llm_kwargs["max_model_len"] = int(model_config["max_model_len"]) + if "enforce_eager" in model_config: + llm_kwargs["enforce_eager"] = bool(model_config["enforce_eager"]) + + return LLM(**llm_kwargs) + + +def _wait_for_vllm_startup_memory( + gpu_memory_utilization: float, timeout_seconds: float = 90.0 +) -> None: + """Wait until enough free GPU memory is available for vLLM startup.""" + if not torch.cuda.is_available(): + return + + # Clamp to sensible bounds in case config has bad values. + target_util = min(max(float(gpu_memory_utilization), 0.0), 1.0) + deadline = time.monotonic() + timeout_seconds + required_gib = None + latest_free_gib = None + total_gib = None + + while time.monotonic() < deadline: + free_bytes, total_bytes = torch.cuda.mem_get_info() + latest_free_gib = free_bytes / (1024**3) + total_gib = total_bytes / (1024**3) + required_gib = target_util * total_gib + if latest_free_gib >= required_gib: + return + time.sleep(2.0) + + if required_gib is not None and latest_free_gib is not None and total_gib is not None: + logger.warning( + ( + "Proceeding with vLLM load before target free GPU memory recovered " + "(free=%.2f GiB, total=%.2f GiB, required=%.2f GiB, utilization=%.2f)." + ), + latest_free_gib, + total_gib, + required_gib, + target_util, + ) + + +def _teardown_vllm_engine(vllm_engine: Any, model_name: str) -> None: + """Shut down a vLLM ``LLM`` instance and free its GPU memory. + + The ``LLM`` object holds ``llm_engine`` (an ``LLMEngine``), which in turn + holds ``engine_core`` (a ``SyncMPClient`` / ``MPClient``). The EngineCore + runs in a **separate process** that owns the actual GPU tensors, so we must + call ``engine_core.shutdown()`` to terminate that process — ``del`` alone + is not enough. + """ + # 1. Graceful shutdown via the engine_core subprocess manager. + try: + llm_engine = getattr(vllm_engine, "llm_engine", None) + if llm_engine is not None: + engine_core = getattr(llm_engine, "engine_core", None) + if engine_core is not None and hasattr(engine_core, "shutdown"): + logger.info(" Calling engine_core.shutdown() for %s", model_name) + engine_core.shutdown() + except Exception as exc: # noqa: BLE001 + logger.warning("engine_core.shutdown() failed for %s: %s", model_name, exc) + + # 2. Delete Python references so the GC can collect any remaining C++ handles. + try: + del vllm_engine + except Exception: # noqa: BLE001 + pass + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + # 3. Give the EngineCore subprocess time to exit and release GPU memory. + time.sleep(5.0) + + +def _normalize_text(text: str) -> str: + return " ".join(text.strip().split()) + + +def _last_sentence(text: str) -> str: + """ + Extract the last "sentence-like" fragment from model output. + + For this project we approximate "sentence" as the last non-empty line, + because many answers end in LaTeX blocks (e.g., `\\boxed{...}`) that may + not be punctuation-terminated. + """ + if not text: + return "" + lines = [line.strip() for line in str(text).splitlines() if line.strip()] + if not lines: + return "" + last = lines[-1] + # Common LaTeX terminators sometimes end up as the last line alone. + if last in {"$$", "$"} and len(lines) >= 2: + last = lines[-2] + return last + + +def _parse_mcq_options(question: str) -> Dict[str, str]: + """Parse MCQ options from a question string into {letter: option_text}.""" + if not question: + return {} + lines = question.splitlines() + in_options = False + options: Dict[str, str] = {} + for line in lines: + if re.match(r"(?im)^\s*options\s*:\s*$", line): + in_options = True + continue + if not in_options: + continue + m = re.match(r"^\s*([A-Z])\s*[.)]\s*(.+?)\s*$", line.strip()) + if not m: + # Stop when we leave the options block (blank line or non-option text) + if options and not line.strip(): + break + continue + options[m.group(1).upper()] = m.group(2).strip() + return options + + +def _extract_number(text: str) -> Optional[float]: + """Extract a numeric value from text (handles commas and currency).""" + if not text: + return None + m = re.search(r"[-+]?\d[\d,]*(?:\.\d+)?", text) + if not m: + return None + try: + return float(m.group(0).replace(",", "")) + except ValueError: + return None + + +def _map_numeric_answer_to_option_letter( + *, + submission: str, + question: str, + target: str, + rel_tol: float = 1e-3, +) -> Optional[str]: + """If target is a letter MCQ, map numeric submission to the closest matching option.""" + target_letter = target.strip().upper() + if not re.fullmatch(r"[A-Z]", target_letter): + return None + + options = _parse_mcq_options(question) + if not options: + return None + + sub_val = _extract_number(submission) + if sub_val is None: + return None + + best_letter: Optional[str] = None + for letter, opt_text in options.items(): + opt_val = _extract_number(opt_text) + if opt_val is None: + continue + denom = max(1.0, abs(opt_val)) + if abs(sub_val - opt_val) / denom <= rel_tol: + best_letter = letter + break + return best_letter + + +def _generate_batch_with_vllm( + llm: Any, + *, + prompts: List[str], + generation_config: Dict[str, Any], +) -> List[str]: + """Generate a batch of responses with vLLM.""" + try: + from vllm import SamplingParams + except Exception as exc: # noqa: BLE001 + raise RuntimeError( + "vllm is required for inference_backend=vllm in stage=1_local" + ) from exc + + sampling_params = SamplingParams( + max_tokens=int(generation_config.get("max_tokens", 512)), + temperature=float(generation_config.get("temperature", 0.0) or 0.0), + top_p=float(generation_config.get("top_p", 1.0) or 1.0), + repetition_penalty=float( + generation_config.get("repetition_penalty", 1.0) or 1.0 + ), + ) + outputs = llm.generate(prompts, sampling_params) + + generated_texts: List[str] = [] + for output in outputs: + if output.outputs: + generated_texts.append((output.outputs[0].text or "").strip()) + else: + generated_texts.append("") + return generated_texts + + +def _batched( + items: List[Dict[str, Any]], batch_size: int +) -> Iterator[List[Dict[str, Any]]]: + """Yield fixed-size batches from a list.""" + if batch_size <= 0: + raise ValueError("batch_size must be positive") + for i in range(0, len(items), batch_size): + yield items[i : i + batch_size] + + +def _build_judge_prompt(submission: str, target: str) -> str: + """Render the judge prompt for one submission/target pair.""" + return LLM_JUDGE_PROMPT.format(submission=submission, target=target) + + +def _judge_outputs_to_grades(outputs: List[str]) -> List[str]: + """Convert judge outputs to C/I grades.""" + return [ + "C" if output and output.strip().lower().startswith("yes") else "I" + for output in outputs + ] + + +def _score_existing_row_ids( + flat_path: Path, expected_task_ids: Set[str] +) -> Dict[str, Dict[str, Any]]: + """Load previously scored rows and keep only expected task IDs.""" + row_by_id: Dict[str, Dict[str, Any]] = {} + for row in _read_flat_rows(flat_path): + row_id = str(row.get("id", "")) + if ( + row_id + and row_id in expected_task_ids + and row.get("grade") in {"C", "I"} + and row_id not in row_by_id + ): + row_by_id[row_id] = row + return row_by_id + + +def _ordered_rows( + tasks: List[Dict[str, Any]], row_by_id: Dict[str, Dict[str, Any]] +) -> List[Dict[str, Any]]: + """Order scored rows to match the original dataset task order.""" + return [ + row_by_id[str(task["id"])] + for task in tasks + if str(task["id"]) in row_by_id + ] + + +def _log_running_performance( + *, + llm_name: str, + capability_id: str, + row_by_id: Dict[str, Dict[str, Any]], + total_tasks: int, +) -> None: + """Log running completion and accuracy for current model/capability.""" + done = len(row_by_id) + if done == 0: + acc = 0.0 + else: + correct = sum(1 for row in row_by_id.values() if row.get("grade") == "C") + acc = correct / done + logger.info( + " Progress %s/%s: %d/%d scored | running_accuracy=%.4f", + llm_name, + capability_id, + done, + total_tasks, + acc, + ) + + +def _judge_batch( + rows: List[Dict[str, Any]], + *, + judge_model: Model, + judge_generation_cfg: Dict[str, Any], + max_concurrent_requests: int = 8, +) -> List[Dict[str, Any]]: + """Judge a batch of rows, using exact-match shortcuts when possible.""" + if not rows: + return [] + + scored_rows: List[Optional[Dict[str, Any]]] = [None] * len(rows) + unresolved_indices: List[int] = [] + unresolved_prompts: List[str] = [] + + for index, row in enumerate(rows): + raw_output = str(row["model_output"]) + parsed_submission = parse_submission(raw_output) or raw_output + judge_submission = _last_sentence(raw_output) or parsed_submission + target = str(row["ground_truth"]) + mapped_letter = _map_numeric_answer_to_option_letter( + submission=parsed_submission, + question=str(row.get("question", "")), + target=target, + ) + if mapped_letter is not None: + parsed_submission = mapped_letter + if _normalize_text(parsed_submission).lower() == _normalize_text(target).lower(): + scored_rows[index] = {**row, "grade": "C"} + continue + unresolved_indices.append(index) + judge_prompt = _build_judge_prompt(judge_submission, target) + unresolved_prompts.append(judge_prompt) + + if unresolved_prompts: + async def _run_async_judge(prompts: List[str]) -> List[str]: + sem = asyncio.Semaphore(max(1, int(max_concurrent_requests))) + + async def _one(p: str) -> str: + async with sem: + txt, _ = await judge_model.async_generate( + sys_prompt="You are a careful, non-pedantic grading assistant.", + user_prompt=p, + generation_config=judge_generation_cfg, + ) + return txt or "" + + return list(await asyncio.gather(*(_one(p) for p in prompts))) + + try: + judge_outputs = asyncio.run(_run_async_judge(unresolved_prompts)) + except Exception: + judge_outputs = [] + for prompt in unresolved_prompts: + judge_text, _ = judge_model.generate( + sys_prompt="You are a careful, non-pedantic grading assistant.", + user_prompt=prompt, + generation_config=judge_generation_cfg, + ) + judge_outputs.append(judge_text or "") + + for index, grade in zip( + unresolved_indices, + _judge_outputs_to_grades(judge_outputs), + strict=True, + ): + scored_rows[index] = {**rows[index], "grade": grade} + return [row for row in scored_rows if row is not None] + + +def run_eval_stage1_local( + cfg: DictConfig, + validation_tag: str, + eval_tag: Optional[str] = None, +) -> str: + """Run local/direct Stage 1 evals and return eval_tag.""" + exp_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + experiment_dir = output_base_dir / exp_id + + datasets_dir = experiment_dir / "eval" / "datasets" / validation_tag + eval_config_path = datasets_dir / "eval_config.json" + if not eval_config_path.exists(): + raise ValueError( + f"eval_config.json not found at {eval_config_path}. Run Stage 0 first." + ) + eval_config, _ = load_eval_config(eval_config_path) + + is_resume = eval_tag is not None + if eval_tag is None: + eval_tag = timestamp_tag() + + logger.info( + "Eval Stage 1_local: Running direct evaluations (eval_tag=%s, resume=%s)", + eval_tag, + is_resume, + ) + + dataset_paths = _find_datasets(datasets_dir) + logger.info("Found %d datasets", len(dataset_paths)) + if not dataset_paths: + raise ValueError(f"No datasets found in {datasets_dir}. Run Stage 0 first.") + + datasets = [load_eval_dataset(p) for p in dataset_paths] + + eval_dir = experiment_dir / "eval" / "results" / eval_tag + results_dir = eval_dir + + eval_config.eval_tag = eval_tag + metadata = PipelineMetadata( + experiment_id=exp_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=validation_tag, + output_stage_tag=eval_tag, + resume=is_resume, + ) + results_config_path = eval_dir / "eval_config.json" + save_eval_config(eval_config, metadata, results_config_path) + logger.info("Saved eval_config.json to %s", results_config_path) + + subject_llms = eval_config.subject_llms + judge_llm_cfg = dict(eval_config.judge_llm) + judge_generation_cfg = dict(judge_llm_cfg.get("generation_cfg", {})) + if "max_tokens" not in judge_generation_cfg: + judge_generation_cfg["max_tokens"] = 16 + if "temperature" not in judge_generation_cfg: + judge_generation_cfg["temperature"] = 0 + judge_batch_size = int(judge_llm_cfg.get("batch_size", 32)) + judge_model = _build_model(judge_llm_cfg) + + model_instances: Dict[Tuple[str, str], Model] = {} + vllm_model_instances: Dict[Tuple[str, str], Any] = {} + + num_completed_this_run = 0 + num_skipped_completed = 0 + num_failed = 0 + num_incomplete = 0 + total_combinations = len(datasets) * len(subject_llms) + + combination_index = 0 + for dataset in datasets: + expected_task_ids = {str(task["id"]) for task in dataset.tasks} + for llm_config in subject_llms: + combination_index += 1 + llm_name = str(llm_config["name"]) + llm_provider = str(llm_config.get("provider", "openai")) + using_vllm = _uses_vllm_backend(dict(llm_config)) + logger.info( + "Combination %d/%d: Evaluating %s/%s with %s/%s%s", + combination_index, + total_combinations, + dataset.area_id, + dataset.capability_id, + llm_provider, + llm_name, + " [vllm]" if using_vllm else "", + ) + + output_dir = results_dir / llm_name / dataset.area_id / dataset.capability_id + flat_path = _flat_result_path(output_dir, dataset.capability_id) + + if _check_flat_completed(flat_path, expected_task_ids): + logger.info( + " Skipping %s/%s with %s (already completed)", + dataset.area_id, + dataset.capability_id, + llm_name, + ) + num_skipped_completed += 1 + continue + + model_key = (llm_provider, llm_name) + subject_generation_cfg = dict(llm_config.get("generation_cfg", {})) + total_tasks = len(dataset.tasks) + batch_size = int(llm_config.get("batch_size", 16)) + row_by_id = _score_existing_row_ids(flat_path, expected_task_ids) + pending_tasks = [ + task for task in dataset.tasks if str(task["id"]) not in row_by_id + ] + + if row_by_id: + logger.info( + " Resuming %s/%s with %d/%d tasks already scored", + dataset.area_id, + dataset.capability_id, + len(row_by_id), + total_tasks, + ) + _write_flat_results(flat_path, _ordered_rows(dataset.tasks, row_by_id)) + + if not pending_tasks: + logger.info( + " Skipping %s/%s with %s (all tasks already scored)", + dataset.area_id, + dataset.capability_id, + llm_name, + ) + num_skipped_completed += 1 + continue + + if using_vllm: + if model_key not in vllm_model_instances: + for old_key in list(vllm_model_instances): + if old_key != model_key: + logger.info( + " Tearing down previous vLLM engine %s before loading %s", + old_key[1], llm_name, + ) + old_engine = vllm_model_instances.pop(old_key, None) + if old_engine is not None: + _teardown_vllm_engine(old_engine, old_key[1]) + _wait_for_vllm_startup_memory( + float(llm_config.get("gpu_memory_utilization", 0.9)) + ) + logger.info(" Loading vLLM engine for %s", llm_name) + vllm_model_instances[model_key] = _load_vllm_model(dict(llm_config)) + vllm_model = vllm_model_instances[model_key] + else: + if model_key not in model_instances: + model_instances[model_key] = _build_model(dict(llm_config)) + subject_model = model_instances[model_key] + + success = True + failed_task_id = None + try: + logger.info( + " Processing %d pending tasks (subject_batch_size=%d, judge_batch_size=%d)", + len(pending_tasks), + batch_size, + judge_batch_size, + ) + subject_tokenizer = None + if using_vllm and hasattr(vllm_model, "get_tokenizer"): + subject_tokenizer = vllm_model.get_tokenizer() + + with tqdm( + total=total_tasks, + initial=len(row_by_id), + desc=f"Eval {llm_name}/{dataset.capability_id}", + dynamic_ncols=True, + ) as eval_bar: + for task_batch in _batched(pending_tasks, batch_size): + failed_task_id = task_batch[0].get("id") + if using_vllm: + prompts = [ + _render_text_prompt( + subject_tokenizer, + sys_prompt="", + user_prompt=_format_prompt(dataset, task), + ) + for task in task_batch + ] + generated_texts = _generate_batch_with_vllm( + vllm_model, + prompts=prompts, + generation_config=subject_generation_cfg, + ) + else: + generated_texts = [] + for task in task_batch: + failed_task_id = task.get("id") + prompt = _format_prompt(dataset, task) + generated_text, _ = subject_model.generate( + sys_prompt="", + user_prompt=prompt, + generation_config=subject_generation_cfg, + ) + generated_texts.append(generated_text or "") + + generated_rows = [ + { + "id": task["id"], + "question": task["input"], + "ground_truth": task["target"], + "model_output": generated_text, + } + for task, generated_text in zip( + task_batch, generated_texts, strict=True + ) + ] + + for jb in _batched(generated_rows, judge_batch_size): + failed_task_id = jb[0].get("id") + scored_batch = _judge_batch( + jb, + judge_model=judge_model, + judge_generation_cfg=judge_generation_cfg, + max_concurrent_requests=judge_batch_size, + ) + for scored_row in scored_batch: + row_by_id[str(scored_row["id"])] = scored_row + + _write_flat_results(flat_path, _ordered_rows(dataset.tasks, row_by_id)) + _log_running_performance( + llm_name=llm_name, + capability_id=dataset.capability_id, + row_by_id=row_by_id, + total_tasks=total_tasks, + ) + eval_bar.update(len(task_batch)) + except Exception as exc: # noqa: BLE001 + logger.error( + " Direct evaluation failed for %s/%s task %s with %s/%s: %s", + dataset.area_id, + dataset.capability_id, + failed_task_id, + llm_provider, + llm_name, + exc, + ) + success = False + + rows = _ordered_rows(dataset.tasks, row_by_id) + _write_flat_results(flat_path, rows) + + if success: + if _check_flat_completed(flat_path, expected_task_ids): + num_completed_this_run += 1 + else: + logger.warning( + " Incomplete flat output for %s/%s with %s " + "(task IDs mismatch: missing or extra scored tasks)", + dataset.area_id, + dataset.capability_id, + llm_name, + ) + num_incomplete += 1 + else: + num_failed += 1 + + logger.info( + "Eval Stage 1_local summary: completed_this_run=%d skipped_completed=%d " + "failed=%d incomplete=%d total=%d", + num_completed_this_run, + num_skipped_completed, + num_failed, + num_incomplete, + total_combinations, + ) + + return eval_tag diff --git a/src/eval_stages/stage2_score_aggregation.py b/src/eval_stages/stage2_score_aggregation.py index e855dc1..6c7aca7 100644 --- a/src/eval_stages/stage2_score_aggregation.py +++ b/src/eval_stages/stage2_score_aggregation.py @@ -1,13 +1,16 @@ """Eval Stage 2: Score Aggregation. -This stage computes final capability scores from raw Inspect results. -No LLM calls, just aggregation of results from Stage 1. +This stage computes final capability scores from Stage 1 outputs. +No LLM calls, just aggregation of results from either: +- Inspect JSON logs (`stage=1`) +- Direct flat JSONL outputs (`stage=1_local`) See: https://inspect.aisi.org.uk/ """ import logging import math +import json from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple @@ -46,6 +49,11 @@ def _find_inspect_logs(result_dir: Path) -> List[Path]: return sorted(result_dir.glob("*.json")) +def _find_flat_files(result_dir: Path) -> List[Path]: + """Find flat jsonl files for a capability result directory.""" + return sorted(result_dir.glob("flat_*.jsonl")) + + def _compute_stats(scores: List[float]) -> Dict[str, Any]: """Compute mean, standard error, and sample count.""" if not scores: @@ -160,6 +168,72 @@ def _parse_inspect_logs( return stats +def _parse_flat_jsonl( + result_dir: Path, + expected_task_ids: Set[str], +) -> Dict[str, Any]: + """Parse direct flat jsonl output and return aggregate stats.""" + flat_files = _find_flat_files(result_dir) + if not flat_files: + logger.warning("No flat JSONL files found in %s", result_dir) + return {"mean": 0.0, "std_err": 0.0, "num_tasks": 0, "exact_match": False} + + flat_scores: List[Tuple[Path, List[float], Set[str]]] = [] + for flat_file in flat_files: + try: + scores: List[float] = [] + scored_ids: Set[str] = set() + with open(flat_file, "r", encoding="utf-8") as f: + # Skip summary line if present. + try: + next(f) + except StopIteration: + flat_scores.append((flat_file, [], set())) + continue + for line in f: + if not line.strip(): + continue + row = json.loads(line) + task_id = str(row.get("id", "")) + if not task_id: + continue + score_value = _score_value_to_float(row.get("grade")) + if score_value is None: + continue + scored_ids.add(task_id) + if task_id in expected_task_ids: + scores.append(score_value) + flat_scores.append((flat_file, scores, scored_ids)) + except Exception as e: + logger.warning("Failed to parse flat file %s: %s", flat_file, e) + continue + + if not flat_scores: + return {"mean": 0.0, "std_err": 0.0, "num_tasks": 0, "exact_match": False} + + selected_file, selected_scores, selected_ids = max( + flat_scores, + key=lambda x: ( + x[2] == expected_task_ids, + len(x[1]), + x[0].stat().st_mtime, + x[0].name, + ), + ) + + if len(flat_scores) > 1: + logger.info( + "Multiple flat files found in %s; selected %s with %d scored samples", + result_dir, + selected_file.name, + len(selected_scores), + ) + + stats = _compute_stats(selected_scores) + stats["exact_match"] = selected_ids == expected_task_ids + return stats + + def run_eval_stage2( cfg: DictConfig, eval_tag: str, @@ -230,8 +304,12 @@ def run_eval_stage2( expected_task_ids = {str(task["id"]) for task in cap_dataset.tasks} - # Parse Inspect logs - parsed = _parse_inspect_logs(result_dir, expected_task_ids) + # Prefer Inspect logs when present; otherwise fall back to direct + # flat jsonl outputs from stage=1_local. + if _find_inspect_logs(result_dir): + parsed = _parse_inspect_logs(result_dir, expected_task_ids) + else: + parsed = _parse_flat_jsonl(result_dir, expected_task_ids) if parsed["num_tasks"] < cap_dataset.num_tasks: logger.warning( diff --git a/src/eval_stages/static_benchmarks/__init__.py b/src/eval_stages/static_benchmarks/__init__.py new file mode 100644 index 0000000..18a4689 --- /dev/null +++ b/src/eval_stages/static_benchmarks/__init__.py @@ -0,0 +1,2 @@ +"""Adapters for static (external) benchmarks used by Eval Stage 0.5.""" + diff --git a/src/eval_stages/static_benchmarks/bizbench.py b/src/eval_stages/static_benchmarks/bizbench.py new file mode 100644 index 0000000..0a93a2e --- /dev/null +++ b/src/eval_stages/static_benchmarks/bizbench.py @@ -0,0 +1,148 @@ +"""Adapter for the kensho/bizbench static benchmark. + +Dataset card: https://huggingface.co/datasets/kensho/bizbench + +Splits: train, test + +Columns (per dataset viewer): +- question (str) +- answer (str) +- task (str) +- context (str | None) +- context_type (str) +- options (list) +- program (str | None) + +We use: +- input: context (if present) + question + options (if present) +- target: answer +""" + +from __future__ import annotations + +from typing import Any, Dict, Iterable, List + +from datasets import load_dataset + +from src.eval_stages.prompts import DEFAULT_EVAL_PROMPT_TEMPLATE +from src.eval_stages.static_benchmarks.specs import StaticBenchmarkSpec +from src.schemas.eval_schemas import EvalDataset + + +def _normalize_answer(val: Any) -> str: + if val is None: + return "" + if isinstance(val, dict): + for key in ("answer", "label", "text", "value"): + if key in val and val[key] is not None: + return str(val[key]).strip() + return str(val).strip() + return str(val).strip() + + +def _format_options(options: Any) -> str: + if not options: + return "" + if isinstance(options, list): + cleaned = [str(o).strip() for o in options if str(o).strip()] + if not cleaned: + return "" + return "\n".join(f"- {o}" for o in cleaned) + opt = str(options).strip() + return f"- {opt}" if opt else "" + + +def _build_input(question: str, context: Any, options: Any) -> str: + question = question.strip() + parts: List[str] = [] + + ctx = "" if context is None else str(context).strip() + if ctx: + parts.append(f"Context:\n{ctx}") + + parts.append(f"Question:\n{question}") + + opts = _format_options(options) + if opts: + parts.append(f"Options:\n{opts}") + + return "\n\n".join(parts).strip() + + +def _iter_bizbench_samples( + split: str, + offset: int | None, + limit: int | None, + *, + finknow_only: bool, +) -> Iterable[Dict[str, Any]]: + ds = load_dataset("kensho/bizbench", split=split) + n = len(ds) + + start = 0 if offset is None else max(0, int(offset)) + end = None if limit is None else start + int(limit) + + # Apply filtering first, then slice by (offset, limit) over the filtered stream. + # This keeps the sharding logic consistent with the adapter's filtering. + kept_rank = 0 + for dataset_idx, row in enumerate(ds): + if finknow_only: + task_val = str(row.get("task", "") or "") + if "finknow" not in task_val.lower(): + continue + + question = str(row.get("question", "")).strip() + answer_norm = _normalize_answer(row.get("answer")) + if not question or not answer_norm: + continue + + if kept_rank < start: + kept_rank += 1 + continue + if end is not None and kept_rank >= end: + break + + row = dict(row) + row["_global_idx"] = dataset_idx + yield row + kept_rank += 1 + + +def build_eval_datasets_from_bizbench(spec: StaticBenchmarkSpec) -> List[EvalDataset]: + """Convert BizBench into a single EvalDataset.""" + tasks: List[Dict[str, str]] = [] + + for local_idx, row in enumerate( + _iter_bizbench_samples( + spec.split, + spec.offset, + spec.limit, + finknow_only=spec.finknow_only, + ) + ): + question = str(row.get("question", "")).strip() + raw_answer = row.get("answer") + answer = _normalize_answer(raw_answer) + + if not question or not answer: + continue + + inp = _build_input(question, row.get("context"), row.get("options")) + global_idx = int(row.get("_global_idx", local_idx)) + task_id = f"bizbench_{global_idx:05d}" + tasks.append({"id": task_id, "input": inp, "target": answer}) + + if not tasks: + return [] + + dataset = EvalDataset( + area_id=spec.area_id, + capability_id="bizbench", + capability_name="BizBench", + domain="finance", + tasks=tasks, + num_tasks=len(tasks), + prompt_template=DEFAULT_EVAL_PROMPT_TEMPLATE, + ) + return [dataset] + diff --git a/src/eval_stages/static_benchmarks/finance_math.py b/src/eval_stages/static_benchmarks/finance_math.py new file mode 100644 index 0000000..3529aae --- /dev/null +++ b/src/eval_stages/static_benchmarks/finance_math.py @@ -0,0 +1,131 @@ +"""Adapter for the yale-nlp/FinanceMath static benchmark. + +Dataset card: https://huggingface.co/datasets/yale-nlp/FinanceMath + +FinanceMath is a finance-domain math reasoning benchmark with two splits: +- validation: 200 examples with answers +- test: 1000 examples (answers not publicly released) + +We expect to use the validation split for evaluation. + +Fields: +- question_id: string +- question: problem text +- tables: list of markdown tables (strings) +- python_solution: expert solution code (ignored here) +- ground_truth: float, executed result rounded to 3 decimals +- topic: financial area (ignored here) + +We use (tables + question) as input and ground_truth (string) as target. +""" + +from __future__ import annotations + +from typing import Any, Dict, Iterable, List + +from datasets import load_dataset + +from src.eval_stages.prompts import DEFAULT_EVAL_PROMPT_TEMPLATE +from src.eval_stages.static_benchmarks.specs import StaticBenchmarkSpec +from src.schemas.eval_schemas import EvalDataset + + +def _normalize_answer(val: Any) -> str: + """Normalize answer to string.""" + if val is None: + return "" + if isinstance(val, dict): + for key in ("ground_truth", "value", "answer"): + if key in val and val[key] is not None: + return str(val[key]).strip() + return str(val).strip() + # For floats, preserve the dataset's rounding (usually 3 decimals). + return str(val).strip() + + +def _build_input(question: str, tables: Any) -> str: + """Construct model input from question plus optional markdown tables.""" + question = question.strip() + if not tables: + return question + + # tables is a list of markdown strings according to the dataset card. + if isinstance(tables, list): + tables_str = "\n\n".join(str(t).strip() for t in tables if str(t).strip()) + else: + tables_str = str(tables).strip() + + if not tables_str: + return question + + return f"Tables:\n{tables_str}\n\nQuestion:\n{question}" + + +def _iter_finance_math_samples( + split: str, + offset: int | None, + limit: int | None, +) -> Iterable[Dict[str, Any]]: + """Yield rows from yale-nlp/FinanceMath in order.""" + ds = load_dataset("yale-nlp/FinanceMath", split=split) + n = len(ds) + + start = 0 if offset is None else max(0, int(offset)) + if start >= n: + return iter(()) + + if limit is None: + end = n + else: + end = min(start + int(limit), n) + + if start == 0 and end == n: + yield from ds + return + + yield from ds.select(range(start, end)) + + +def build_eval_datasets_from_finance_math( + spec: StaticBenchmarkSpec, +) -> List[EvalDataset]: + """Convert FinanceMath into a single EvalDataset. + + - input: tables (markdown) + question text + - target: ground_truth (normalized to string) + - domain: math + - capability_id: finance_math + Rows with missing question or ground_truth are skipped. + """ + tasks: List[Dict[str, str]] = [] + + for local_idx, row in enumerate( + _iter_finance_math_samples(spec.split, spec.offset, spec.limit) + ): + question = str(row.get("question", "")).strip() + tables = row.get("tables") + raw_answer = row.get("ground_truth") + answer = _normalize_answer(raw_answer) + + if not question or not answer: + continue + + inp = _build_input(question, tables) + global_idx = (spec.offset or 0) + local_idx + task_id = f"finance_math_{global_idx:05d}" + tasks.append({"id": task_id, "input": inp, "target": answer}) + + if not tasks: + return [] + + dataset = EvalDataset( + area_id=spec.area_id, + capability_id="finance_math", + capability_name="FinanceMath", + domain="math", + tasks=tasks, + num_tasks=len(tasks), + prompt_template=DEFAULT_EVAL_PROMPT_TEMPLATE, + ) + return [dataset] + diff --git a/src/eval_stages/static_benchmarks/finance_tasks.py b/src/eval_stages/static_benchmarks/finance_tasks.py new file mode 100644 index 0000000..6391be4 --- /dev/null +++ b/src/eval_stages/static_benchmarks/finance_tasks.py @@ -0,0 +1,139 @@ +"""Adapter for a local finance task JSON export. + +This adapter ingests a local JSON file (e.g. `finance_tasks.json`) that follows +the repo's task-generation export shape: + +- Top-level keys: `metadata`, `tasks` +- Each task contains: + - `task_id` (str) + - `task_statement` (str) — includes options for multiple-choice tasks + - `generation_metadata.correct_answer` (str) — e.g. "A", "B", ... + +We map: +- input: task_statement +- target: correct_answer (fallbacks supported) +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List, Optional + +from src.eval_stages.prompts import DEFAULT_EVAL_PROMPT_TEMPLATE +from src.eval_stages.static_benchmarks.specs import StaticBenchmarkSpec +from src.schemas.eval_schemas import EvalDataset + +EXCLUDED_BLOOM_LEVEL = ( + "Create - Combine elements to form a new pattern, structure, or product. " + "Example verbs: design, compose, formulate, generate." +) + + +def _sanitize_text(text: str) -> str: + """Sanitize text so it is safe to JSON-encode and send to APIs.""" + # Remove null bytes (can break downstream tooling / transports). + text = text.replace("\x00", "") + # Replace any invalid unicode sequences (e.g., unpaired surrogates) deterministically. + return text.encode("utf-8", errors="replace").decode("utf-8", errors="replace") + + +def _resolve_json_path(benchmark_id: str) -> Path: + raw = benchmark_id.strip() + if raw.startswith("file://"): + raw = raw[len("file://") :] + candidate = Path(raw) + if candidate.exists(): + return candidate + + # Default: assume a repo-root file name was given as benchmark_id. + default = Path("finance_tasks.json") + if default.exists(): + return default + + # Fall back to relative path from CWD. + return candidate + + +def _read_json(path: Path) -> Dict[str, Any]: + with path.open("r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError(f"Expected a JSON object in {path}, got {type(data).__name__}") + return data + + +def _extract_target(task: Dict[str, Any]) -> str: + gen_md = task.get("generation_metadata") + if isinstance(gen_md, dict): + val = gen_md.get("correct_answer") + if val is not None: + return str(val).strip() + + # Fallbacks for other possible exports. + for key in ("correct_answer", "answer", "target", "label"): + if key in task and task[key] is not None: + return str(task[key]).strip() + + return "" + + +def build_eval_datasets_from_finance_tasks(spec: StaticBenchmarkSpec) -> List[EvalDataset]: + """Convert a local finance_tasks JSON file into a single EvalDataset.""" + json_path = _resolve_json_path(spec.benchmark_id) + payload = _read_json(json_path) + raw_tasks = payload.get("tasks", []) + if not isinstance(raw_tasks, list): + raise ValueError( + f"Expected `tasks` to be a list in {json_path}, got {type(raw_tasks).__name__}" + ) + + tasks: List[Dict[str, str]] = [] + limit: Optional[int] = spec.limit + offset: int = max(0, int(spec.offset or 0)) + + # OmegaConf CLI overrides can pass boolean-like values as strings. + exclude_create = spec.exclude_bloom_create + if isinstance(exclude_create, str): + exclude_create = exclude_create.strip().lower() in { + "1", + "true", + "yes", + "y", + "on", + } + + for idx, row in enumerate(raw_tasks[offset:]): + if not isinstance(row, dict): + continue + if exclude_create and str(row.get("bloom_level", "")).strip() == EXCLUDED_BLOOM_LEVEL: + continue + + task_id = str(row.get("task_id", "")).strip() + statement = _sanitize_text(str(row.get("task_statement", "")).strip()) + target = _extract_target(row) + + if not task_id: + global_idx = offset + len(tasks) + task_id = f"finance_tasks_{global_idx:05d}" + if not statement or not target: + continue + + tasks.append({"id": task_id, "input": statement, "target": target}) + if limit is not None and len(tasks) >= limit: + break + + if not tasks: + return [] + + dataset = EvalDataset( + area_id=spec.area_id, + capability_id=str(spec.capability_id or "finance_tasks"), + capability_name=str(spec.capability_name or "Finance Tasks"), + domain=str(spec.domain or "finance"), + tasks=tasks, + num_tasks=len(tasks), + prompt_template=DEFAULT_EVAL_PROMPT_TEMPLATE, + ) + return [dataset] + diff --git a/src/eval_stages/static_benchmarks/specs.py b/src/eval_stages/static_benchmarks/specs.py new file mode 100644 index 0000000..64f52d9 --- /dev/null +++ b/src/eval_stages/static_benchmarks/specs.py @@ -0,0 +1,51 @@ +"""Shared specs and types for static benchmark adapters.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + + +@dataclass(frozen=True) +class StaticBenchmarkSpec: + """Minimal specification for ingesting a static benchmark. + + Attributes + ---------- + benchmark_id + Identifier used to select the adapter (e.g. "HuggingFaceH4/MATH-500"). + split + Data split to load (e.g. "train", "test", "validation"). + limit + Optional maximum number of rows to load (for smoke tests). + area_id + Area identifier used in EvalDataset (groups capabilities). + capability_id + Capability identifier in EvalDataset; if omitted, adapters may derive it. + capability_name + Human-readable capability name; if omitted, adapters may derive it. + domain + Domain label for EvalDataset (e.g. "math", "external"). + + offset + + Optional starting index for slicing the underlying dataset. Used + + together with `limit` to support chunked / array-style evaluation. + + """ + + benchmark_id: str + split: str = "test" + limit: Optional[int] = None + offset: Optional[int] = None + area_id: str = "static_benchmarks" + capability_id: Optional[str] = None + capability_name: Optional[str] = None + domain: str = "external" + + # Controls the filtering behavior for local "finance tasks" JSON adapters + # (e.g. finance_tasks.json / seed_tasks.json). + # When true, removes Bloom level "Create - Combine elements..." tasks. + exclude_bloom_create: bool = True + + # Controls whether BizBench ingestion should keep only the FinKnow subset. + # When true, filters rows where the dataset's `task` field indicates FinKnow. + finknow_only: bool = True + diff --git a/src/eval_stages/static_benchmarks/xfinbench.py b/src/eval_stages/static_benchmarks/xfinbench.py new file mode 100644 index 0000000..299fc59 --- /dev/null +++ b/src/eval_stages/static_benchmarks/xfinbench.py @@ -0,0 +1,159 @@ +"""Adapter for the Zhihan/XFinBench static benchmark. + +Dataset card: https://huggingface.co/datasets/Zhihan/XFinBench + +We load from the CSV files inside the repo: +- validation_set.csv +- test_set.csv + +Columns (per inspection): +- id: string (e.g. "vali_0", "test_0") +- task: string, question type ("calcu", "mcq", etc.) +- question: str, problem text (often includes tables/LaTeX) +- choice: optional str, options text for MCQs (may contain newlines) +- ground_truth: label/answer; for MCQ it's a letter like "A", for others numeric +- figure: optional, ignored +- fin_capability: capability tag, ignored here +- gold_fin_term_id: term id, ignored here + +We use: +- input: question (+ choices if present) +- target: ground_truth normalized to string +""" + +from __future__ import annotations + +from typing import Any, Dict, Iterable, List, Optional + +from datasets import load_dataset + +from src.eval_stages.prompts import DEFAULT_EVAL_PROMPT_TEMPLATE +from src.eval_stages.static_benchmarks.specs import StaticBenchmarkSpec +from src.schemas.eval_schemas import EvalDataset + + +def _normalize_target(task: str, val: Any) -> str: + """Normalize ground_truth to a string target.""" + if val is None: + return "" + + # Boolean validity questions: map to Yes / No. + if task == "bool": + # Accept numeric, bool, and string encodings. + if isinstance(val, (int, float)): + return "Yes" if float(val) != 0.0 else "No" + s = str(val).strip().lower() + if s in {"1", "1.0", "true", "yes"}: + return "Yes" + if s in {"0", "0.0", "false", "no"}: + return "No" + # Fallback: pass through. + return str(val).strip() + + # For MCQ, dataset uses option letter like "A". + if task == "mcq": + return str(val).strip() + + # For numeric / calculation tasks, just stringify (preserving decimals). + if isinstance(val, dict): + for key in ("ground_truth", "value", "answer"): + if key in val and val[key] is not None: + return str(val[key]).strip() + return str(val).strip() + + return str(val).strip() + + +def _build_input(task: str, question: str, choice: Any) -> str: + """Build model input from question plus optional choices.""" + question = str(question or "").strip() + + # For boolean tasks, explicitly request Yes/No. + if task == "bool": + if not question: + return "" + return f"Answer only 'Yes' or 'No'.\n\nStatement:\n{question}" + + if not choice: + return question + + choice_text = str(choice).strip() + if not choice_text: + return question + + return f"{question}\n\nOptions:\n{choice_text}" + + +def _iter_xfinbench_samples( + split: str, + offset: Optional[int], + limit: Optional[int], +) -> Iterable[Dict[str, Any]]: + """Yield rows from Zhihan/XFinBench (CSV-backed) with offset/limit.""" + ds_dict = load_dataset( + "Zhihan/XFinBench", + data_files={"validation": "validation_set.csv", "test": "test_set.csv"}, + ) + if split not in ds_dict: + raise ValueError(f"Unknown XFinBench split: {split}") + + ds = ds_dict[split] + n = len(ds) + + start = 0 if offset is None else max(0, int(offset)) + if start >= n: + return iter(()) + + if limit is None: + end = n + else: + end = min(start + int(limit), n) + + if start == 0 and end == n: + yield from ds + return + + yield from ds.select(range(start, end)) + + +def build_eval_datasets_from_xfinbench(spec: StaticBenchmarkSpec) -> List[EvalDataset]: + """Convert XFinBench into a single EvalDataset.""" + tasks: List[Dict[str, str]] = [] + offset: int = max(0, int(spec.offset or 0)) + + for local_idx, row in enumerate( + _iter_xfinbench_samples(spec.split, spec.offset, spec.limit) + ): + question = str(row.get("question", "")).strip() + task_type = str(row.get("task", "")).strip() + choice = row.get("choice") + figure = row.get("figure") + raw_gt = row.get("ground_truth") + target = _normalize_target(task_type, raw_gt) + + # Skip image-based questions (figure present). + if figure is not None: + continue + + if not question or not target: + continue + + inp = _build_input(task_type, question, choice) + global_idx = offset + local_idx + task_id = f"xfinbench_{global_idx:05d}" + tasks.append({"id": task_id, "input": inp, "target": target}) + + if not tasks: + return [] + + dataset = EvalDataset( + area_id=spec.area_id, + capability_id="xfinbench", + capability_name="XFinBench", + domain="finance", + tasks=tasks, + num_tasks=len(tasks), + prompt_template=DEFAULT_EVAL_PROMPT_TEMPLATE, + ) + return [dataset] + diff --git a/src/generate_embeddings.py b/src/generate_embeddings.py index bfb23b8..c42c116 100644 --- a/src/generate_embeddings.py +++ b/src/generate_embeddings.py @@ -63,6 +63,8 @@ def _load_embedding_model( def generate_embeddings( self, texts: list[str], + max_tokens_per_request: int = 250_000, + max_batch_size: int = 128, ) -> List[torch.Tensor]: """ Generate and optionally reduce embeddings for a list of texts. @@ -76,8 +78,61 @@ def generate_embeddings( List[torch.Tensor]: A list of embeddings, where each embedding is a torch.Tensor. """ - output_float_list = self.embedding_model.embed_documents(texts) - return [torch.tensor(vec) for vec in output_float_list] + if not texts: + return [] + + # The OpenAI embeddings endpoint limits the TOTAL tokens per request + # (sum across all input documents). Previously we embedded `texts` + # in one shot, which can exceed the limit. + try: + import tiktoken # type: ignore + + try: + tokenizer = tiktoken.encoding_for_model(self.embedding_model_name.value) + except Exception: # noqa: BLE001 + tokenizer = tiktoken.get_encoding("cl100k_base") + + def _token_len(s: str) -> int: + # NOTE: this is an estimate; still far safer than sending + # an unbounded list. + return len(tokenizer.encode(s)) + + except Exception: # noqa: BLE001 + # Fallback heuristic if tiktoken isn't available. + # Typical English token ~ 4 chars, but we keep a lower bound of 1. + def _token_len(s: str) -> int: + return max(1, len(s) // 4) + + # Build batches under the token budget. + batches: list[list[str]] = [] + current_batch: list[str] = [] + current_tokens = 0 + + for t in texts: + t_tokens = _token_len(t) + if ( + current_batch + and ( + current_tokens + t_tokens > max_tokens_per_request + or len(current_batch) >= max_batch_size + ) + ): + batches.append(current_batch) + current_batch = [t] + current_tokens = t_tokens + else: + current_batch.append(t) + current_tokens += t_tokens + + if current_batch: + batches.append(current_batch) + + output_embeddings: List[torch.Tensor] = [] + for batch in batches: + output_float_list = self.embedding_model.embed_documents(batch) + output_embeddings.extend([torch.tensor(vec) for vec in output_float_list]) + + return output_embeddings def filter_embeddings( diff --git a/src/run_eval_pipeline.py b/src/run_eval_pipeline.py index 541a762..8c01c04 100644 --- a/src/run_eval_pipeline.py +++ b/src/run_eval_pipeline.py @@ -2,7 +2,8 @@ This module orchestrates the evaluation pipeline: - Stage 0: Setup and Dataset Preparation -- Stage 1: Evaluation Execution (runs subject LLMs, creates eval_tag) +- Stage 1: Evaluation Execution (Inspect-based) +- Stage 1_local: Evaluation Execution without Inspect - Stage 2: Score Aggregation Usage: @@ -25,7 +26,9 @@ from src.eval_stages import ( EvalSetupError, run_eval_stage0, + run_eval_stage0_static, run_eval_stage1, + run_eval_stage1_local, run_eval_stage2, ) @@ -118,6 +121,21 @@ def main(cfg: DictConfig) -> None: except ValueError as e: logger.error("Stage 1 failed: %s", e) + elif stage in {"1_local", "local1", "stage1_local"}: + if not validation_tag: + logger.error("validation_tag is required for stage 1_local") + logger.error( + "Usage: python -m src.run_eval_pipeline stage=1_local " + "validation_tag=_YYYYMMDD_HHMMSS" + ) + return + + try: + eval_tag = run_eval_stage1_local(cfg, validation_tag, eval_tag) + logger.info("Eval Stage 1_local complete. eval_tag=%s", eval_tag) + except ValueError as e: + logger.error("Stage 1_local failed: %s", e) + elif stage == 2: if not eval_tag: logger.error("eval_tag is required for stage 2") @@ -133,8 +151,26 @@ def main(cfg: DictConfig) -> None: except ValueError as e: logger.error("Stage 2 failed: %s", e) + elif stage in {"0_static", "static0", "static"}: + if not validation_tag: + logger.error("validation_tag is required for stage 0_static") + logger.error( + "Usage: python -m src.run_eval_pipeline stage=0_static " + "validation_tag=_SOME_TAG static_benchmark_cfg.benchmark_id=HuggingFaceH4/MATH-500" + ) + return + + try: + run_eval_stage0_static(cfg, validation_tag) + logger.info("Eval Stage 0_static complete. Datasets created.") + except ValueError as e: + logger.error("Stage 0_static failed: %s", e) + else: - logger.error("Invalid stage: %s. Use 'all', 0, 1, or 2", stage) + logger.error( + "Invalid stage: %s. Use 'all', 0, 1, '1_local', 2, or '0_static'", + stage, + ) if __name__ == "__main__": diff --git a/src/utils/data_utils.py b/src/utils/data_utils.py index 2d936f9..12cd42b 100644 --- a/src/utils/data_utils.py +++ b/src/utils/data_utils.py @@ -14,10 +14,24 @@ Dataset, load_dataset, # noqa: D100 ) -from google.cloud import storage +try: + # Optional dependency: only required when using `gs://...` paths. + from google.cloud import storage # type: ignore +except Exception: # noqa: BLE001 + storage = None from omegaconf import DictConfig +def _require_gcs_storage() -> Any: + """Return google.cloud.storage or raise a helpful ImportError.""" + if storage is None: + raise ImportError( + "google-cloud-storage is required for `gs://...` paths. " + "Install it with: pip install google-cloud-storage" + ) + return storage + + def load_data( dataset_name: str, split: str, @@ -62,7 +76,7 @@ def read_json_file(file_path: str) -> Any: """ if file_path.startswith("gs://"): # Read from GCP bucket - client = storage.Client() + client = _require_gcs_storage().Client() bucket_name, blob_name = file_path[5:].split("/", 1) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) @@ -90,7 +104,7 @@ def write_json_file(file_path: str, data: Dict[Any, Any]) -> None: """ if file_path.startswith("gs://"): # Write to GCP bucket - client = storage.Client() + client = _require_gcs_storage().Client() bucket_name, blob_name = file_path[5:].split("/", 1) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) @@ -119,7 +133,7 @@ def list_dir(path: str) -> list[str]: """ if path.startswith("gs://"): # List contents from GCP bucket - client = storage.Client() + client = _require_gcs_storage().Client() bucket_name, prefix = path[5:].split("/", 1) bucket = client.bucket(bucket_name) blobs = bucket.list_blobs(prefix=prefix) @@ -148,7 +162,7 @@ def copy_file(src: str, dest: str) -> None: """ if src.startswith("gs://") and dest.startswith("gs://"): # Copy file within GCP buckets - client = storage.Client() + client = _require_gcs_storage().Client() src_bucket_name, src_blob_name = src[5:].split("/", 1) dest_bucket_name, dest_blob_name = dest[5:].split("/", 1) @@ -161,7 +175,7 @@ def copy_file(src: str, dest: str) -> None: dest_blob.rewrite(src_blob) elif src.startswith("gs://"): # Copy file from GCP bucket to local - client = storage.Client() + client = _require_gcs_storage().Client() bucket_name, blob_name = src[5:].split("/", 1) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) @@ -170,7 +184,7 @@ def copy_file(src: str, dest: str) -> None: blob.download_to_filename(dest) elif dest.startswith("gs://"): # Copy file from local to GCP bucket - client = storage.Client() + client = _require_gcs_storage().Client() bucket_name, blob_name = dest[5:].split("/", 1) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) @@ -199,7 +213,7 @@ def path_exists(path: str) -> bool: """ if path.startswith("gs://"): # Check existence in GCP bucket - client = storage.Client() + client = _require_gcs_storage().Client() bucket_name, blob_name = path[5:].split("/", 1) bucket = client.bucket(bucket_name) blob = bucket.blob(blob_name) diff --git a/src/utils/inspect_eval_utils.py b/src/utils/inspect_eval_utils.py index b0d81d5..965d1b8 100644 --- a/src/utils/inspect_eval_utils.py +++ b/src/utils/inspect_eval_utils.py @@ -19,7 +19,7 @@ INSPECT_JUDGE_LLM_GEN_CONFIG: Dict[str, Any] = {} LLM_JUDGE_PROMPT = """ -Look at the submission and the target string for a given task and judge whether they are equivalent or not. The submission should include all aspects present in the target. Respond with only "Yes" or "No" (without quotes). Do not include a rationale. +Look at the submission and the target string for a given task and judge whether they are equivalent or not. If they are semantically equivalent, respond with "Yes". Respond with only "Yes" or "No" (without quotes). Do not include a rationale. Submission: {submission} Target: {target} @@ -30,7 +30,7 @@ # https://github.com/UKGovernmentBEIS/inspect_ai/blob/main/src/inspect_ai/_util/pattern.py#L3 def parse_submission(submission: str) -> str: """ - Parse the submission string to extract the answer based on the "ANSWER" keyword. + Parse the submission string to extract the final answer. This function is used in the capability class score method. @@ -43,9 +43,76 @@ def parse_submission(submission: str) -> str: str: The extracted answer from the submission, or an empty string if no match is found. """ - answer_pattern = r"ANSWER\s*:\s*([^\n]+)" - match = re.search(answer_pattern, submission) - return match.group(1) if match else "" + if not submission: + return "" + + def _maybe_extract_option_letter(text: str) -> str | None: + """ + Normalize common answer formats (e.g., "ANSWER: B", "Option B", "\\boxed{ANSWER: B}") + into a single letter like "B". + """ + if not text: + return None + s = text.strip() + # Normalize common LaTeX wrappers like "\text{ANSWER: B}" -> "ANSWER: B". + s = re.sub(r"\\text\s*\{([^}]*)\}", r"\1", s, flags=re.IGNORECASE) + s = s.replace("{", "").replace("}", "") + # Handle cases like "ANSWER: B" (possibly with trailing punctuation). + m = re.search( + r"(?im)(?:final\s+answer|correct\s+option|answer)\s*[:\-]?\s*([A-Z])\b", + s, + ) + if m: + return m.group(1).upper() + m = re.fullmatch( + r"(?im)\s*(?:(?:final\s+answer|correct\s+option|answer))\s*[:\-]?\s*([A-Z])\s*[\s.]*", + s, + ) + if m: + return m.group(1).upper() + m = re.fullmatch( + r"(?im)\s*(?:(?:option|choice))\s*[:\-]?\s*([A-Z])\s*[\s.)]*", + s, + ) + if m: + return m.group(1).upper() + # Sometimes the boxed content is already just "B". + m = re.fullmatch(r"(?im)\s*([A-Z])\s*", s) + if m: + return m.group(1).upper() + return None + + patterns = [ + r"(?im)^\s*(?:final\s+answer|answer)\s*:\s*(.+?)\s*$", + r"(?im)^\s*(?:the\s+correct\s+option|correct\s+option)\s+is\s*[:\-]?\s*(.+?)\s*$", + r"(?im)^\s*(?:option|choice)\s*[:\-]?\s*([A-Z])\s*$", + r"\\boxed\{([^}]+)\}", + ] + + for pattern in patterns: + matches = re.findall(pattern, submission) + if matches: + extracted = matches[-1].strip() + if extracted: + normalized_letter = _maybe_extract_option_letter(extracted) + if normalized_letter is not None: + return normalized_letter + return extracted + + lines = [line.strip() for line in submission.splitlines() if line.strip()] + if not lines: + return "" + + for line in reversed(lines[-5:]): + option_match = re.fullmatch( + r"(?:option|choice)?\s*[:\-]?\s*([A-Z])(?:[.)])?", + line, + flags=re.IGNORECASE, + ) + if option_match: + return option_match.group(1).upper() + + return "" async def evaluate_with_llm_judge(