From 151b5a9a6eeb534639ca64c8cd27fe68a0211537 Mon Sep 17 00:00:00 2001 From: Negiiiin Date: Tue, 3 Feb 2026 11:00:58 -0500 Subject: [PATCH 1/3] New Implementation of IRT metrics --- src/cfg/irt_cfg.yaml | 31 ++ src/run_irt_analysis.py | 436 ++++++++++++++++++++++++++++ src/schemas/__init__.py | 4 + src/schemas/irt_schemas.py | 92 ++++++ src/utils/irt_utils.py | 566 +++++++++++++++++++++++++++++++++++++ 5 files changed, 1129 insertions(+) create mode 100644 src/cfg/irt_cfg.yaml create mode 100644 src/run_irt_analysis.py create mode 100644 src/schemas/irt_schemas.py create mode 100644 src/utils/irt_utils.py diff --git a/src/cfg/irt_cfg.yaml b/src/cfg/irt_cfg.yaml new file mode 100644 index 00000000..d6bfc695 --- /dev/null +++ b/src/cfg/irt_cfg.yaml @@ -0,0 +1,31 @@ +# IRT analysis configuration (src/run_irt_analysis.py). +# CLI overrides: data_cfg.scores_dir=/path, data_cfg.per_capability=true + +defaults: + - _self_ + +hydra: + run: + dir: . + +data_cfg: # Set this to the path to the scores directory containing evaluation JSON files + scores_dir: data/scores_sample # required; all **/*.json under this dir + per_capability: true # true: fit per capability; false: one combined fit + +output_cfg: + output_dir: null # null = irt_results/_ in project root + output_filename: irt_analysis.json + +irt_cfg: + model_type: "3PL" # 1PL, 2PL, or 3PL + max_iterations: 1000 + tolerance: 1.0e-6 + quadrature_n: 41 + +# Top-level fallbacks when nested keys not overridden +model_type: "3PL" +max_iterations: 2000 +quadrature_n: 41 +output_dir: null +dataset_id: null +verbose: false diff --git a/src/run_irt_analysis.py b/src/run_irt_analysis.py new file mode 100644 index 00000000..a4e7152b --- /dev/null +++ b/src/run_irt_analysis.py @@ -0,0 +1,436 @@ +"""Run IRT analysis on evaluation results. + +Loads all **/*.json under data_cfg.scores_dir, extracts (model, question) responses, +builds a response matrix, and fits 1PL/2PL/3PL IRT (girth). data_cfg.per_capability +controls whether to fit one combined model (false) or one model per capability (true). +Configuration: src/cfg/irt_cfg.yaml. Override from CLI, e.g.: + python src/run_irt_analysis.py data_cfg.scores_dir=/path/to/scores +""" + +import logging +import os +from datetime import datetime +from typing import Any, Dict, List, Tuple + +import hydra +from omegaconf import DictConfig + +from src.schemas.irt_schemas import IRTAnalysis, IRTItemParameters +from src.utils import constants +from src.utils.data_utils import list_dir, write_json_file +from src.utils.irt_utils import ( + build_response_matrix_from_inspect_score_files, + create_response_matrix_from_flat, + extract_question_responses, + fit_3pl_irt, + group_response_data_by_capability, + load_score_files, +) + +logger = logging.getLogger(__name__) + + +def discover_model_score_files( + scores_dir: str, + domain: str, + capability_name: str, + model_names: List[str] | None = None, +) -> List[Tuple[str, str]]: + """ + Discover (model_name, score_json_path) for a capability. + + Expects layout: scores_dir / model_name / domain / capability_name / *.json + + Args + ---- + scores_dir: Base directory containing model subdirs. + domain: Capability domain (e.g. "mathematics"). + capability_name: Capability name (e.g. "number_theory_combinatorics"). + model_names: If provided, only these models are used; otherwise + all subdirs of scores_dir are treated as model names. + + Returns + ------- + List of (model_name, json_path). Only includes models that have + at least one .json file in their capability folder. + """ + if model_names is None: + try: + model_names = list_dir(scores_dir) + except (FileNotFoundError, NotADirectoryError) as e: + logger.error("Cannot list scores_dir %s: %s", scores_dir, e) + return [] + + result: List[Tuple[str, str]] = [] + for model_name in model_names: + cap_dir = os.path.join( + scores_dir, model_name, domain, capability_name + ) + if not os.path.isdir(cap_dir): + logger.debug( + "Skipping model %s: no directory %s", + model_name, + cap_dir, + ) + continue + try: + files = [ + f + for f in list_dir(cap_dir) + if f.endswith(".json") + ] + except Exception as e: + logger.warning( + "Skipping model %s: failed to list %s: %s", + model_name, + cap_dir, + e, + ) + continue + if not files: + logger.debug( + "Skipping model %s: no .json in %s", + model_name, + cap_dir, + ) + continue + # Use first JSON file (e.g. single eval run per model) + json_path = os.path.join(cap_dir, files[0]) + result.append((model_name, json_path)) + return result + + +def _fit_and_build( + response_matrix: List[List[int]], + question_ids: List[str], + model_names: List[str], + dataset_id: str, + model_type: str, + max_iterations: int, + quadrature_n: int, + tolerance: float, +) -> IRTAnalysis: + """Fit IRT and build IRTAnalysis (no save).""" + fit_result = fit_3pl_irt( + response_matrix=response_matrix, + question_ids=question_ids, + model_names=model_names, + max_iterations=max_iterations, + quadrature_n=quadrature_n, + model_type=model_type, + ) + evaluation_settings: Dict[str, Any] = { + "model_type": model_type, + "max_iterations": max_iterations, + "quadrature_n": quadrature_n, + "tolerance": tolerance, + } + item_parameters = { + qid: IRTItemParameters( + task_id=qid, + discrimination=p["discrimination"], + difficulty=p["difficulty"], + guessing=p["guessing"], + ) + for qid, p in fit_result["item_parameters"].items() + } + return IRTAnalysis( + dataset_id=dataset_id, + subject_model_names=model_names, + evaluation_settings=evaluation_settings, + item_parameters=item_parameters, + model_info=fit_result["model_info"], + ) + + +def run_irt_analysis_flat( + scores_dir: str, + model_type: str = "3PL", + max_iterations: int = 1000, + quadrature_n: int = 41, + tolerance: float = 1e-6, + output_dir: str | None = None, + output_filename: str = "irt_analysis.json", + dataset_id: str | None = None, + per_capability: bool = False, +) -> IRTAnalysis | Dict[str, IRTAnalysis]: + """ + Run IRT pipeline using flat loading: load all **/*.json under scores_dir, + extract (model, question) responses (custom_scorer C), build matrix, fit IRT. + + Args + ---- + scores_dir: Base directory containing evaluation JSON files (recursive). + model_type, max_iterations, quadrature_n, tolerance: IRT fitting options. + output_dir: If set, save IRTAnalysis here as output_filename. + output_filename: Output JSON filename (default irt_analysis.json). + dataset_id: Dataset identifier (default "flat"). + per_capability: If True, group by capability (eval.task), fit IRT separately + per capability, and save one JSON with all capability analyses. If False, + one combined response matrix and one IRT fit (current behavior). + + Returns + ------- + IRTAnalysis when per_capability=False; Dict[capability_name, IRTAnalysis] when True. + """ + data = load_score_files(scores_dir) + if not data: + raise FileNotFoundError( + f"No JSON score files found under {scores_dir}. " + "Ensure the directory exists and contains **/*.json." + ) + response_data, question_info = extract_question_responses(data) + if not response_data: + raise ValueError( + f"No (model, question) responses extracted from {scores_dir}. " + "Check that score files contain 'samples' and 'eval.task'." + ) + + if per_capability: + by_capability = group_response_data_by_capability( + response_data, question_info + ) + if not by_capability: + raise ValueError( + "No capabilities found after grouping. Check question_info has 'task'." + ) + analyses: Dict[str, IRTAnalysis] = {} + for cap_name, cap_response_data in by_capability.items(): + if not cap_response_data: + continue + response_matrix, question_ids, model_names = ( + create_response_matrix_from_flat(cap_response_data) + ) + if not response_matrix or not question_ids or not model_names: + logger.warning( + "Skipping capability %s: empty or insufficient matrix", + cap_name, + ) + continue + # IRT (girth) needs at least 2 items and 2 persons + if len(question_ids) < 2 or len(model_names) < 2: + logger.warning( + "Skipping capability %s: need at least 2 questions and 2 models (got %d questions, %d models)", + cap_name, + len(question_ids), + len(model_names), + ) + continue + try: + analyses[cap_name] = _fit_and_build( + response_matrix=response_matrix, + question_ids=question_ids, + model_names=model_names, + dataset_id=cap_name, + model_type=model_type, + max_iterations=max_iterations, + quadrature_n=quadrature_n, + tolerance=tolerance, + ) + logger.info("Fitted IRT for capability: %s", cap_name) + except (ValueError, AttributeError, RuntimeError) as e: + logger.warning( + "Skipping capability %s: IRT fit failed (%s)", + cap_name, + e, + ) + continue + if output_dir: + os.makedirs(output_dir, exist_ok=True) + out_path = os.path.join(output_dir, output_filename) + payload: Dict[str, Any] = { + "per_capability": True, + "capabilities": { + name: a.to_dict() for name, a in analyses.items() + }, + } + write_json_file(out_path, payload) + logger.info( + "Per-capability IRT analyses saved to %s (%d capabilities)", + out_path, + len(analyses), + ) + return analyses + + # Combined: one response matrix, one IRT fit + response_matrix, question_ids, model_names = create_response_matrix_from_flat( + response_data + ) + if not response_matrix or not question_ids or not model_names: + raise ValueError("Response matrix is empty after flat extraction.") + analysis = _fit_and_build( + response_matrix=response_matrix, + question_ids=question_ids, + model_names=model_names, + dataset_id=dataset_id or "flat", + model_type=model_type, + max_iterations=max_iterations, + quadrature_n=quadrature_n, + tolerance=tolerance, + ) + if output_dir: + os.makedirs(output_dir, exist_ok=True) + out_path = os.path.join(output_dir, output_filename) + write_json_file(out_path, analysis.to_dict()) + logger.info("IRT analysis saved to %s", out_path) + return analysis + + +def run_irt_analysis( + scores_dir: str, + domain: str, + capability_name: str, + model_names: List[str] | None = None, + model_type: str = "3PL", + max_iterations: int = 2000, + quadrature_n: int = 41, + tolerance: float = 1e-6, + output_dir: str | None = None, + output_filename: str = "irt_analysis.json", + dataset_id: str | None = None, +) -> IRTAnalysis: + """ + Run full IRT pipeline (per-capability): discover score files by domain/capability, + build matrix, fit IRT, return IRTAnalysis. + + Args + ---- + scores_dir: Base directory for evaluation scores (layout: scores_dir/model/domain/capability/*.json). + domain: Capability domain. + capability_name: Capability name. + model_names: Optional list of model names; if None, discovered from scores_dir. + model_type: "1PL", "2PL", or "3PL". + max_iterations: MML max iterations. + quadrature_n: Quadrature points. + tolerance: Convergence tolerance (stored in evaluation_settings). + output_dir: If set, IRTAnalysis is saved here as output_filename. + output_filename: Output JSON filename. + dataset_id: Identifier for the dataset (default: capability_name). + + Returns + ------- + IRTAnalysis instance with item parameters and context. + """ + model_score_files = discover_model_score_files( + scores_dir=scores_dir, + domain=domain, + capability_name=capability_name, + model_names=model_names, + ) + if not model_score_files: + raise FileNotFoundError( + f"No score files found for capability {capability_name} " + f"(domain={domain}) under {scores_dir}. " + "Ensure evaluation has been run for at least one model." + ) + + response_matrix, question_ids, names = ( + build_response_matrix_from_inspect_score_files(model_score_files) + ) + if not response_matrix or not question_ids or not names: + raise ValueError( + f"Response matrix is empty for {capability_name}. " + "Check that all model score files share at least one task id." + ) + + analysis = _fit_and_build( + response_matrix=response_matrix, + question_ids=question_ids, + model_names=names, + dataset_id=dataset_id or capability_name, + model_type=model_type, + max_iterations=max_iterations, + quadrature_n=quadrature_n, + tolerance=tolerance, + ) + if output_dir: + os.makedirs(output_dir, exist_ok=True) + out_path = os.path.join(output_dir, output_filename) + write_json_file(out_path, analysis.to_dict()) + logger.info("IRT analysis saved to %s", out_path) + return analysis + + +def _resolve_output_dir( + cfg: DictConfig, + default_dataset_id: str, +) -> str: + """Resolve output_dir from output_cfg.output_dir or output_dir; default to irt_results/_.""" + output_cfg = cfg.get("output_cfg") or {} + out = output_cfg.get("output_dir") if output_cfg else None + if out is None: + out = cfg.get("output_dir") + if out is None or (isinstance(out, str) and out.lower() == "null"): + timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%SZ") + # Use local project directory instead of shared artifacts directory + project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + return os.path.join( + project_root, + "irt_results", + f"{default_dataset_id}_{timestamp}", + ) + return str(out) + + +def _resolve_output_filename(cfg: DictConfig) -> str: + """Resolve output filename from output_cfg.output_filename or default irt_analysis.json.""" + output_cfg = cfg.get("output_cfg") or {} + name = output_cfg.get("output_filename") if output_cfg else None + if name is None or (isinstance(name, str) and name.strip() == ""): + return "irt_analysis.json" + return str(name).strip() + + +def _resolve_irt_params(cfg: DictConfig) -> tuple[str, int, float, int]: + """Return (model_type, max_iterations, tolerance, quadrature_n) from irt_cfg with top-level fallback.""" + irt = cfg.get("irt_cfg") or {} + model_type = str(irt.get("model_type") or cfg.get("model_type", "3PL")) + max_iterations = int(irt.get("max_iterations") or cfg.get("max_iterations", 2000)) + tolerance = float(irt.get("tolerance", 1e-6)) + quadrature_n = int(irt.get("quadrature_n") or cfg.get("quadrature_n", 41)) + return model_type, max_iterations, tolerance, quadrature_n + + +@hydra.main(version_base=None, config_path="cfg", config_name="irt_cfg") +def main(cfg: DictConfig) -> None: + """Run IRT analysis using config from cfg/irt_cfg.yaml (overridable via CLI).""" + logging.basicConfig( + level=logging.DEBUG if cfg.get("verbose", False) else logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + model_type, max_iterations, tolerance, quadrature_n = _resolve_irt_params(cfg) + output_filename = _resolve_output_filename(cfg) + + data_cfg = cfg.get("data_cfg") or {} + scores_dir = data_cfg.get("scores_dir") if data_cfg else None + if not scores_dir or str(scores_dir).strip().lower() in ("null", ""): + raise ValueError( + "data_cfg.scores_dir is required. Set it in irt_cfg.yaml or override from CLI, e.g. " + "python src/run_irt_analysis.py data_cfg.scores_dir=/path/to/scores" + ) + scores_dir = str(scores_dir).strip() + dataset_id = cfg.get("dataset_id") + dataset_id = str(dataset_id) if dataset_id is not None else "flat" + output_dir = _resolve_output_dir(cfg, dataset_id) + per_capability = bool(data_cfg.get("per_capability", False)) + + run_irt_analysis_flat( + scores_dir=scores_dir, + model_type=model_type, + max_iterations=max_iterations, + quadrature_n=quadrature_n, + tolerance=tolerance, + output_dir=output_dir, + output_filename=output_filename, + dataset_id=dataset_id if dataset_id != "flat" else None, + per_capability=per_capability, + ) + logger.info( + "IRT analysis completed (per_capability=%s).", + per_capability, + ) + + +if __name__ == "__main__": + main() diff --git a/src/schemas/__init__.py b/src/schemas/__init__.py index 29e46fc9..c900ec38 100644 --- a/src/schemas/__init__.py +++ b/src/schemas/__init__.py @@ -7,6 +7,7 @@ from src.schemas.area_schemas import Area from src.schemas.capability_schemas import Capability from src.schemas.domain_schemas import Domain +from src.schemas.irt_schemas import IRTAnalysis, IRTItemParameters from src.schemas.experiment_schemas import Experiment from src.schemas.io_utils import ( load_areas, @@ -40,6 +41,9 @@ "Area", # Capability schemas "Capability", + # IRT schemas + "IRTAnalysis", + "IRTItemParameters", # Task schemas "Task", # Solution schemas diff --git a/src/schemas/irt_schemas.py b/src/schemas/irt_schemas.py new file mode 100644 index 00000000..06d7a593 --- /dev/null +++ b/src/schemas/irt_schemas.py @@ -0,0 +1,92 @@ +"""Schemas for Item Response Theory (IRT) analysis. + +Defines IRTItemParameters and IRTAnalysis dataclasses. IRT parameters are +context-dependent (dataset, subject models, evaluation settings), so they +are stored in a separate analysis object rather than on the Task class. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List + + +@dataclass +class IRTItemParameters: + """IRT parameters for a single task/item.""" + + task_id: str + discrimination: float + difficulty: float + guessing: float + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "task_id": self.task_id, + "discrimination": self.discrimination, + "difficulty": self.difficulty, + "guessing": self.guessing, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> IRTItemParameters: + """Create from dictionary.""" + return cls( + task_id=data["task_id"], + discrimination=float(data["discrimination"]), + difficulty=float(data["difficulty"]), + guessing=float(data["guessing"]), + ) + + +@dataclass +class IRTAnalysis: + """Complete IRT analysis for a dataset evaluation. + + Stores item parameters per task along with the context (dataset, + subject models, evaluation settings) so that parameters are + interpretable and comparable. + """ + + # Context: which evaluation this analysis belongs to + dataset_id: str + subject_model_names: List[str] + evaluation_settings: Dict[str, Any] = field(default_factory=dict) + + # IRT parameters per task (keyed by task_id) + item_parameters: Dict[str, IRTItemParameters] = field(default_factory=dict) + + # Model fit info (n_items, n_persons, model_type, method, note) + model_info: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "dataset_id": self.dataset_id, + "subject_model_names": self.subject_model_names, + "evaluation_settings": self.evaluation_settings, + "item_parameters": { + tid: p.to_dict() for tid, p in self.item_parameters.items() + }, + "model_info": self.model_info, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> IRTAnalysis: + """Create from dictionary.""" + item_params = { + tid: IRTItemParameters.from_dict(p) + for tid, p in data.get("item_parameters", {}).items() + } + return cls( + dataset_id=data["dataset_id"], + subject_model_names=list(data["subject_model_names"]), + evaluation_settings=dict(data.get("evaluation_settings", {})), + item_parameters=item_params, + model_info=dict(data.get("model_info", {})), + ) + + def get_parameters_for_task(self, task_id: str) -> IRTItemParameters | None: + """Get IRT parameters for a task by id.""" + return self.item_parameters.get(task_id) diff --git a/src/utils/irt_utils.py b/src/utils/irt_utils.py new file mode 100644 index 00000000..99ed9a44 --- /dev/null +++ b/src/utils/irt_utils.py @@ -0,0 +1,566 @@ +"""IRT analysis utilities using 3PL model via girth library. + +Builds response matrices from inspect score JSON files and fits 1PL/2PL/3PL +IRT models to estimate item parameters (difficulty, discrimination, guessing). + +Supports two loading paths: +- Per-capability: discover model/domain/capability score files, then build matrix. +- Flat/glob: load all JSONs under a scores directory, extract responses (custom_scorer C), + then build matrix for IRT. +""" + +import glob +import json +import logging +import os +from collections import defaultdict +from typing import Any, Dict, List, Tuple + +import numpy as np + +from src.utils import constants +from src.utils.capability_utils import CAPABILITY_SCORER_MAP +from src.utils.data_utils import read_json_file + +logger = logging.getLogger(__name__) + +try: + import girth + from girth import ability_3pl_eap, rasch_mml, threepl_mml, twopl_mml + + GIRTH_AVAILABLE = True +except ImportError: + GIRTH_AVAILABLE = False + logger.warning( + "girth not available. IRT fitting will fail. Install via: pip install girth" + ) + +# Inspect AI uses "C" for correct, "I" for incorrect (CORRECT/INCORRECT from inspect_ai.scorer) +INSPECT_SCORE_CORRECT = "C" + + +def load_score_files(scores_dir: str) -> List[Dict]: + """Load all JSON score files from the scores directory (recursive **/*.json). + + Args + ---- + scores_dir: Path to the scores directory. + + Returns + ------- + List of loaded JSON dicts; each entry has '_file_path' added. + """ + if not os.path.exists(scores_dir): + logger.error("Directory does not exist: %s", scores_dir) + return [] + + pattern = os.path.join(scores_dir, "**", "*.json") + files = glob.glob(pattern, recursive=True) + logger.info("Found %d JSON score files under %s", len(files), scores_dir) + + data: List[Dict] = [] + errors = 0 + for i, file_path in enumerate(files): + try: + with open(file_path, encoding="utf-8") as f: + file_data = json.load(f) + file_data["_file_path"] = file_path + data.append(file_data) + if (i + 1) % 50 == 0: + logger.debug("Loaded %d/%d files...", i + 1, len(files)) + except Exception as e: + errors += 1 + logger.warning("Error loading %s: %s", file_path, e) + + if errors > 0: + logger.warning("%d files failed to load", errors) + logger.info("Successfully loaded %d files", len(data)) + return data + + +def extract_model_name_from_path(file_path: str) -> str: + """Extract model name from file path (segment after directory containing 'scores'). + + Args + ---- + file_path: Full path to the score file. + + Returns + ------- + Model name or empty string. + """ + parts = file_path.replace(os.sep, "/").split("/") + # Look for any directory containing "scores" (e.g., "scores", "scores_sample") + for idx, part in enumerate(parts): + if "scores" in part.lower(): + if idx + 1 < len(parts): + return parts[idx + 1] + return "" + + +def extract_question_responses( + data: List[Dict], +) -> Tuple[Dict[Tuple[str, str], int], Dict[str, Any]]: + """Extract (model, question_id) -> score from score file data. + + Uses eval.task and sample id; score is 1 if scorer value is 'C', else 0. + Uses capability-specific scorer name (from CAPABILITY_SCORER_MAP) or default. + Unique question id is task_name + question_id. + + Args + ---- + data: List of score file dicts (with '_file_path'). + + Returns + ------- + (response_data, question_info): + - response_data: (model_name, unique_question_id) -> 0 or 1 + - question_info: unique_question_id -> {task, question_id, input, target} + """ + response_data: Dict[Tuple[str, str], int] = {} + question_info: Dict[str, Dict] = {} + files_processed = 0 + samples_processed = 0 + + for file_idx, file_data in enumerate(data): + if "samples" not in file_data: + continue + + file_path = file_data.get("_file_path", "") + model_name = extract_model_name_from_path(file_path) + if not model_name: + logger.debug( + "Skipping file %s: could not extract model name from path", + file_path, + ) + continue + + eval_data = file_data.get("eval", {}) + task_name = eval_data.get("task", "unknown") + capability_name = _clean_task_name(task_name) + scorer_name = CAPABILITY_SCORER_MAP.get( + capability_name, constants.DEFAULT_INSPECT_SCORER_NAME + ) + files_processed += 1 + + for sample in file_data["samples"]: + samples_processed += 1 + question_id = sample.get("id", "") + if not question_id: + continue + + unique_question_id = f"{task_name}_{question_id}" + + score_value = 0 + scores = sample.get("scores", {}) + if scorer_name in scores: + scorer_result = scores[scorer_name] + if isinstance(scorer_result, dict): + value = scorer_result.get("value", "") + score_value = 1 if value == INSPECT_SCORE_CORRECT else 0 + elif scorer_result == INSPECT_SCORE_CORRECT: + score_value = 1 + elif "custom_scorer" in scores: + # Fallback to custom_scorer if capability-specific scorer not found + scorer_result = scores["custom_scorer"] + if isinstance(scorer_result, dict): + value = scorer_result.get("value", "") + score_value = 1 if value == INSPECT_SCORE_CORRECT else 0 + elif scorer_result == INSPECT_SCORE_CORRECT: + score_value = 1 + + response_data[(model_name, unique_question_id)] = score_value + if unique_question_id not in question_info: + question_info[unique_question_id] = { + "task": task_name, + "question_id": question_id, + "input": sample.get("input", ""), + "target": sample.get("target", ""), + } + + if (file_idx + 1) % 20 == 0: + logger.debug( + "Processed %d/%d files, %d samples...", + file_idx + 1, + len(data), + samples_processed, + ) + + logger.info( + "Extracted %d unique questions, %d model-question responses from %d files", + len(question_info), + len(response_data), + files_processed, + ) + return response_data, question_info + + +def create_response_matrix_from_flat( + response_data: Dict[Tuple[str, str], int], +) -> Tuple[List[List[int]], List[str], List[str]]: + """Build response matrix for IRT from (model, question_id) -> score dict. + + Missing (model, question_id) entries are treated as 0. + + Args + ---- + response_data: (model_name, question_id) -> 0 or 1. + + Returns + ------- + (response_matrix, question_ids, model_names): + - response_matrix: 2D list, rows=questions, columns=models + - question_ids: row order + - model_names: column order + """ + models = sorted(set(m for m, _ in response_data.keys())) + questions = sorted(set(q for _, q in response_data.keys())) + + matrix: List[List[int]] = [] + for question_id in questions: + row = [response_data.get((model_name, question_id), 0) for model_name in models] + matrix.append(row) + + logger.info( + "Created response matrix: %d questions x %d models", + len(questions), + len(models), + ) + return matrix, questions, models + + +def get_model_question_counts( + response_data: Dict[Tuple[str, str], int], +) -> Dict[str, int]: + """Count questions per model from (model, question_id) -> score dict.""" + counts: Dict[str, int] = defaultdict(int) + for model_name, _ in response_data.keys(): + counts[model_name] += 1 + return dict(counts) + + +def group_response_data_by_capability( + response_data: Dict[Tuple[str, str], int], + question_info: Dict[str, Any], +) -> Dict[str, Dict[Tuple[str, str], int]]: + """Group (model, question_id) -> score by capability (task name). + + Uses question_info[unique_question_id]["task"] to determine capability. + Each capability gets a subset of response_data for that task only. + + Args + ---- + response_data: (model_name, unique_question_id) -> 0 or 1. + question_info: unique_question_id -> {task, question_id, ...}. + + Returns + ------- + capability_name -> response_data subset for that capability. + """ + by_capability: Dict[str, Dict[Tuple[str, str], int]] = {} + for (model_name, qid), score in response_data.items(): + task = question_info.get(qid, {}).get("task", "unknown") + if task not in by_capability: + by_capability[task] = {} + by_capability[task][(model_name, qid)] = score + logger.info( + "Grouped response data into %d capabilities: %s", + len(by_capability), + list(by_capability.keys()), + ) + return by_capability + + +def _clean_task_name(x: str) -> str: + """Extract capability/task name from eval task path.""" + return x.split("/")[-1] + + +def read_inspect_score_samples( + json_path: str, +) -> Tuple[Dict[str, int], str]: + """ + Read raw task-level scores from an inspect evaluation JSON file. + + Args + ---- + json_path: Path to the inspect score JSON file. + + Returns + ------- + (task_scores, capability_name): Dict mapping task_id to 0/1 score, + and the capability name from the eval config. + """ + data = read_json_file(json_path) + samples = data.get("samples", []) + eval_info = data.get("eval", {}) + task_name = eval_info.get("master_task") or eval_info.get("task", "") + capability_name = _clean_task_name(task_name) + scorer_name = CAPABILITY_SCORER_MAP.get( + capability_name, constants.DEFAULT_INSPECT_SCORER_NAME + ) + + task_scores: Dict[str, int] = {} + for sample in samples: + task_id = sample.get("id") + if task_id is None: + continue + try: + score_val = sample.get("scores", {}).get(scorer_name, {}).get("value") + correct = 1 if score_val == INSPECT_SCORE_CORRECT else 0 + task_scores[task_id] = correct + except (TypeError, KeyError) as e: + logger.warning( + "Skipping sample id=%s in %s: %s", task_id, json_path, e + ) + return task_scores, capability_name + + +def build_response_matrix_from_inspect_score_files( + model_score_files: List[Tuple[str, str]], +) -> Tuple[List[List[int]], List[str], List[str]]: + """ + Build a response matrix from multiple inspect score JSON files (one per model). + + Uses the intersection of task ids across all files so that every cell + is defined (every model has a score for every task). + + Args + ---- + model_score_files: List of (model_name, json_file_path) for each subject model. + + Returns + ------- + (response_matrix, question_ids, model_names): + - response_matrix: 2D list, rows = questions (tasks), columns = models. + - question_ids: List of task ids in row order. + - model_names: List of model names in column order. + """ + if not model_score_files: + return [], [], [] + + all_task_scores: List[Tuple[str, Dict[str, int]]] = [] + model_names: List[str] = [] + task_id_sets: List[set] = [] + + for model_name, json_path in model_score_files: + task_scores, _ = read_inspect_score_samples(json_path) + if not task_scores: + logger.warning( + "No task scores found for model %s in %s", model_name, json_path + ) + continue + all_task_scores.append((model_name, task_scores)) + model_names.append(model_name) + task_id_sets.append(set(task_scores.keys())) + + if not all_task_scores: + return [], [], [] + + # Use intersection of task ids so every (task, model) has a value + common_ids = task_id_sets[0] + for s in task_id_sets[1:]: + common_ids = common_ids & s + question_ids = sorted(common_ids) + + if not question_ids: + logger.warning( + "No common task ids across model files; cannot build response matrix." + ) + return [], [], [] + + n_items = len(question_ids) + n_persons = len(model_names) + response_matrix = [ + [all_task_scores[j][1][qid] for j in range(n_persons)] + for qid in question_ids + ] + return response_matrix, question_ids, model_names + + +def fit_3pl_irt( + response_matrix: List[List[int]], + question_ids: List[str], + model_names: List[str], + max_iterations: int = 2000, + quadrature_n: int = 41, + model_type: str = "3PL", +) -> Dict[str, Any]: + """ + Fit 1PL, 2PL, or 3PL IRT model using the girth library. + + For 1PL and 2PL, the corresponding girth MML routines are used. + For 3PL, the three-parameter logistic model is fit with upper asymptote + fixed at 1.0. + + Args + ---- + response_matrix: 2D list, rows = questions, columns = models (0/1). + question_ids: List of question (task) IDs in row order. + model_names: List of model names (subjects) in column order. + max_iterations: Maximum MML iterations. + quadrature_n: Quadrature points for numerical integration. + model_type: "1PL", "2PL", or "3PL". + + Returns + ------- + Dictionary with keys: + - item_parameters: dict task_id -> {discrimination, difficulty, guessing} + - model_info: n_items, n_persons, model_type, method, note + """ + if not GIRTH_AVAILABLE: + raise ImportError( + "The 'girth' library is required for IRT fitting. " + "Install it with: pip install girth" + ) + + model_type = (model_type or "3PL").upper() + if model_type not in {"1PL", "2PL", "3PL"}: + raise ValueError( + f"Unsupported IRT model_type '{model_type}'. " + "Supported values are '1PL', '2PL', and '3PL'." + ) + + data = np.array(response_matrix, dtype=int) + n_items, n_persons = data.shape + + # Girth 3PL/2PL/1PL need at least 2 items and 2 persons (otherwise internal .dot fails) + if n_items < 2 or n_persons < 2: + raise ValueError( + f"IRT fitting requires at least 2 items (questions) and 2 persons (models). " + f"Got n_items={n_items}, n_persons={n_persons}. " + "Skip this capability or add more data." + ) + + logger.info( + "Fitting %s IRT model on %d items and %d models ...", + model_type, + n_items, + n_persons, + ) + + options = { + "max_iteration": int(max_iterations), + "quadrature_n": int(quadrature_n), + } + + if model_type == "1PL": + item_results = rasch_mml(data, options=options) + difficulty = item_results["Difficulty"] + discrimination = np.ones_like(difficulty, dtype=float) + guessing = np.zeros_like(difficulty, dtype=float) + note = ( + "1PL (Rasch)-style parameters: discrimination fixed to 1, " + "guessing fixed to 0; upper asymptote fixed at 1.0." + ) + elif model_type == "2PL": + item_results = twopl_mml(data, options=options) + discrimination = item_results["Discrimination"] + difficulty = item_results["Difficulty"] + guessing = np.zeros_like(difficulty, dtype=float) + note = ( + "2PL-style parameters from fit with guessing fixed to 0; " + "upper asymptote fixed at 1.0." + ) + else: + item_results = threepl_mml(data, options=options) + discrimination = item_results["Discrimination"] + difficulty = item_results["Difficulty"] + guessing = item_results.get("Guessing") + if guessing is None: + guessing = np.zeros_like(difficulty, dtype=float) + note = ( + "3PL model: upper asymptote is fixed at 1.0 (not estimated)." + ) + + # Optionally estimate person abilities (not returned) + ability_3pl_eap(data, difficulty, discrimination, guessing) + + item_parameters: Dict[str, Dict[str, float]] = {} + for idx, q_id in enumerate(question_ids): + if idx < len(discrimination): + item_parameters[q_id] = { + "discrimination": float(discrimination[idx]), + "difficulty": float(difficulty[idx]), + "guessing": float(guessing[idx]), + } + + model_info = { + "n_items": n_items, + "n_persons": n_persons, + "model_type": model_type, + "method": "MML (Marginal Maximum Likelihood)", + "note": note, + } + + logger.info( + "IRT %s fit complete. Discrimination range [%.3f, %.3f], " + "difficulty range [%.3f, %.3f], guessing range [%.3f, %.3f].", + model_type, + float(np.min(discrimination)), + float(np.max(discrimination)), + float(np.min(difficulty)), + float(np.max(difficulty)), + float(np.min(guessing)), + float(np.max(guessing)), + ) + + return { + "item_parameters": item_parameters, + "model_info": model_info, + } + + +def calculate_response_statistics( + response_matrix: List[List[int]], + question_ids: List[str], + model_names: List[str], +) -> Dict[str, Any]: + """Compute basic statistics for the response matrix.""" + matrix = np.array(response_matrix) + if matrix.size == 0: + return { + "question_statistics": {}, + "model_statistics": {}, + "overall": { + "total_responses": 0, + "correct_responses": 0, + "accuracy": 0.0, + "n_questions": 0, + "n_models": 0, + }, + } + + stats: Dict[str, Any] = { + "question_statistics": {}, + "model_statistics": {}, + "overall": { + "total_responses": int(matrix.size), + "correct_responses": int(np.sum(matrix)), + "accuracy": float(np.mean(matrix)), + "n_questions": len(question_ids), + "n_models": len(model_names), + }, + } + + for idx, question_id in enumerate(question_ids): + if idx < matrix.shape[0]: + row = matrix[idx, :] + stats["question_statistics"][question_id] = { + "mean_score": float(np.mean(row)), + "std_score": float(np.std(row)) if row.size > 1 else 0.0, + "total_correct": int(np.sum(row)), + "total_attempts": len(row), + } + + for idx, model_name in enumerate(model_names): + if idx < matrix.shape[1]: + col = matrix[:, idx] + stats["model_statistics"][model_name] = { + "mean_score": float(np.mean(col)), + "std_score": float(np.std(col)) if col.size > 1 else 0.0, + "total_correct": int(np.sum(col)), + "total_attempts": len(col), + } + + return stats From df31927f2ab0b212aa5c03d4220a8aaa83964b56 Mon Sep 17 00:00:00 2001 From: Negiiiin Date: Tue, 3 Feb 2026 12:26:07 -0500 Subject: [PATCH 2/3] Added Task fields --- src/cfg/irt_cfg.yaml | 4 +- src/run_irt_analysis.py | 54 +++++++++++++++ src/utils/irt_utils.py | 146 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 203 insertions(+), 1 deletion(-) diff --git a/src/cfg/irt_cfg.yaml b/src/cfg/irt_cfg.yaml index d6bfc695..4ab4b392 100644 --- a/src/cfg/irt_cfg.yaml +++ b/src/cfg/irt_cfg.yaml @@ -8,9 +8,11 @@ hydra: run: dir: . -data_cfg: # Set this to the path to the scores directory containing evaluation JSON files +data_cfg: scores_dir: data/scores_sample # required; all **/*.json under this dir per_capability: true # true: fit per capability; false: one combined fit + # If set, task JSONs under this dir (**/tasks.json) are updated with IRT params and saved to output_dir/updated_tasks/ + capabilities_dir: /projects/aieng/public/ace/artifacts/negin_ace/taks/math output_cfg: output_dir: null # null = irt_results/_ in project root diff --git a/src/run_irt_analysis.py b/src/run_irt_analysis.py index a4e7152b..31c2b91b 100644 --- a/src/run_irt_analysis.py +++ b/src/run_irt_analysis.py @@ -21,10 +21,12 @@ from src.utils.irt_utils import ( build_response_matrix_from_inspect_score_files, create_response_matrix_from_flat, + discover_tasks_files, extract_question_responses, fit_3pl_irt, group_response_data_by_capability, load_score_files, + update_tasks_with_irt_and_save, ) logger = logging.getLogger(__name__) @@ -153,6 +155,7 @@ def run_irt_analysis_flat( output_filename: str = "irt_analysis.json", dataset_id: str | None = None, per_capability: bool = False, + capabilities_dir: str | None = None, ) -> IRTAnalysis | Dict[str, IRTAnalysis]: """ Run IRT pipeline using flat loading: load all **/*.json under scores_dir, @@ -168,6 +171,8 @@ def run_irt_analysis_flat( per_capability: If True, group by capability (eval.task), fit IRT separately per capability, and save one JSON with all capability analyses. If False, one combined response matrix and one IRT fit (current behavior). + capabilities_dir: If set with output_dir, task JSONs under this dir (**/tasks.json) + are updated with IRT item parameters and saved to output_dir/updated_tasks/. Returns ------- @@ -250,6 +255,26 @@ def run_irt_analysis_flat( out_path, len(analyses), ) + if capabilities_dir and output_dir: + task_files = discover_tasks_files(capabilities_dir) + for cap_name, analysis in analyses.items(): + if cap_name not in task_files: + continue + params_plain = { + qid: { + "difficulty": p.difficulty, + "discrimination": p.discrimination, + "guessing": p.guessing, + } + for qid, p in analysis.item_parameters.items() + } + update_tasks_with_irt_and_save( + capability_name=cap_name, + item_parameters=params_plain, + task_files=task_files[cap_name], + capabilities_dir=capabilities_dir, + output_dir=output_dir, + ) return analyses # Combined: one response matrix, one IRT fit @@ -273,6 +298,27 @@ def run_irt_analysis_flat( out_path = os.path.join(output_dir, output_filename) write_json_file(out_path, analysis.to_dict()) logger.info("IRT analysis saved to %s", out_path) + if capabilities_dir and output_dir and question_info: + task_files = discover_tasks_files(capabilities_dir) + cap_to_params: Dict[str, Dict[str, Dict[str, float]]] = {} + for qid, p in analysis.item_parameters.items(): + cap = question_info.get(qid, {}).get("task", "") + if cap: + cap_to_params.setdefault(cap, {})[qid] = { + "difficulty": p.difficulty, + "discrimination": p.discrimination, + "guessing": p.guessing, + } + for cap_name, params in cap_to_params.items(): + if cap_name not in task_files: + continue + update_tasks_with_irt_and_save( + capability_name=cap_name, + item_parameters=params, + task_files=task_files[cap_name], + capabilities_dir=capabilities_dir, + output_dir=output_dir, + ) return analysis @@ -414,6 +460,13 @@ def main(cfg: DictConfig) -> None: dataset_id = str(dataset_id) if dataset_id is not None else "flat" output_dir = _resolve_output_dir(cfg, dataset_id) per_capability = bool(data_cfg.get("per_capability", False)) + capabilities_dir_val = data_cfg.get("capabilities_dir") + capabilities_dir = ( + str(capabilities_dir_val).strip() + if capabilities_dir_val + and str(capabilities_dir_val).strip().lower() not in ("null", "") + else None + ) run_irt_analysis_flat( scores_dir=scores_dir, @@ -425,6 +478,7 @@ def main(cfg: DictConfig) -> None: output_filename=output_filename, dataset_id=dataset_id if dataset_id != "flat" else None, per_capability=per_capability, + capabilities_dir=capabilities_dir, ) logger.info( "IRT analysis completed (per_capability=%s).", diff --git a/src/utils/irt_utils.py b/src/utils/irt_utils.py index 99ed9a44..461a720f 100644 --- a/src/utils/irt_utils.py +++ b/src/utils/irt_utils.py @@ -564,3 +564,149 @@ def calculate_response_statistics( } return stats + + +def discover_tasks_files( + capabilities_dir: str, +) -> Dict[str, List[Tuple[str, Dict[str, Any]]]]: + """Discover tasks.json files under capabilities_dir and group by capability name. + + Args + ---- + capabilities_dir: Base directory to search for **/tasks.json. + + Returns + ------- + capability_name -> [(absolute_file_path, loaded_data)], where data has "metadata" and "tasks". + Capability name is taken from the first task's capability_name in each file. + """ + if not os.path.isdir(capabilities_dir): + logger.warning("Capabilities dir does not exist: %s", capabilities_dir) + return {} + + pattern = os.path.join(capabilities_dir, "**", "tasks.json") + files = glob.glob(pattern, recursive=True) + out: Dict[str, List[Tuple[str, Dict[str, Any]]]] = defaultdict(list) + + for path in files: + try: + with open(path, encoding="utf-8") as f: + data = json.load(f) + except Exception as e: + logger.warning("Failed to load %s: %s", path, e) + continue + tasks = data.get("tasks", []) + if not tasks: + logger.debug("No tasks in %s", path) + continue + # Use capability_name from first task (tasks from one file belong to one capability) + first = tasks[0] + cap_name = first.get("capability_name") or first.get("capability_id") or "" + if not cap_name: + logger.debug("No capability_name in %s", path) + continue + out[cap_name].append((os.path.abspath(path), data)) + + logger.info( + "Discovered tasks files for %d capabilities under %s", + len(out), + capabilities_dir, + ) + return dict(out) + + +def update_tasks_with_irt_and_save( + capability_name: str, + item_parameters: Dict[str, Dict[str, float]], + task_files: List[Tuple[str, Dict[str, Any]]], + capabilities_dir: str, + output_dir: str, +) -> int: + """Update task dicts with IRT params and save to output_dir/updated_tasks/. + + item_parameters is keyed by unique_question_id (e.g. capability_name_task_id) or task_id. + Adds irt_difficulty, irt_discrimination, irt_guessing to each matching task. + + Returns + ------- + Number of files written. + """ + capabilities_dir_abs = os.path.abspath(capabilities_dir) + out_subdir = os.path.join(output_dir, "updated_tasks") + written = 0 + + # Set of task identifiers we have IRT params for (normalize to bare task_id for comparison) + prefix = capability_name + "_" + irt_task_ids_bare = set() + for k in item_parameters: + if k.startswith(prefix): + irt_task_ids_bare.add(k[len(prefix) :]) + else: + irt_task_ids_bare.add(k) + + for file_path, data in task_files: + tasks = data.get("tasks", []) + if not tasks: + continue + file_task_ids = {t.get("task_id") for t in tasks if t.get("task_id")} + + # Check if the capability file's tasks match what we have scores for + file_ids_match_irt = file_task_ids <= irt_task_ids_bare + irt_ids_in_file = { + tid for tid in irt_task_ids_bare if tid in file_task_ids + } + irt_ids_not_in_file = irt_task_ids_bare - file_task_ids + if not file_ids_match_irt or irt_ids_not_in_file: + logger.warning( + "Capability %s: task set mismatch in %s. " + "File has %d task_ids; IRT has %d. " + "File tasks not in IRT: %s. IRT tasks not in file: %s.", + capability_name, + file_path, + len(file_task_ids), + len(irt_task_ids_bare), + sorted(file_task_ids - irt_ids_in_file)[:10] + + (["..."] if len(file_task_ids - irt_ids_in_file) > 10 else []), + sorted(irt_ids_not_in_file)[:10] + + (["..."] if len(irt_ids_not_in_file) > 10 else []), + ) + + updated = 0 + for task in tasks: + task_id = task.get("task_id") + if not task_id: + continue + unique_id = f"{capability_name}_{task_id}" + params = item_parameters.get(unique_id) or item_parameters.get(task_id) + if not params: + continue + task["irt_difficulty"] = params.get("difficulty") + task["irt_discrimination"] = params.get("discrimination") + task["irt_guessing"] = params.get("guessing") + updated += 1 + + if updated == 0: + logger.debug( + "No IRT match for capability %s in %s (task_ids in file may not match scores)", + capability_name, + file_path, + ) + continue + + try: + rel = os.path.relpath(file_path, capabilities_dir_abs) + except ValueError: + rel = os.path.basename(file_path) + out_path = os.path.join(out_subdir, rel) + os.makedirs(os.path.dirname(out_path), exist_ok=True) + with open(out_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + written += 1 + logger.info( + "Updated %d tasks with IRT params for %s -> %s", + updated, + capability_name, + out_path, + ) + + return written From 3de2b55e4ca486d64b7b1a91124a05c6cd348a34 Mon Sep 17 00:00:00 2001 From: Negiiiin Date: Sat, 14 Feb 2026 00:58:18 -0500 Subject: [PATCH 3/3] Added IRT params to task schemas --- src/cfg/irt_cfg.yaml | 21 +-- src/run_irt_analysis.py | 254 +++++++----------------------------- src/schemas/task_schemas.py | 13 ++ 3 files changed, 63 insertions(+), 225 deletions(-) diff --git a/src/cfg/irt_cfg.yaml b/src/cfg/irt_cfg.yaml index 4ab4b392..4f6ed8b1 100644 --- a/src/cfg/irt_cfg.yaml +++ b/src/cfg/irt_cfg.yaml @@ -1,13 +1,6 @@ -# IRT analysis configuration (src/run_irt_analysis.py). -# CLI overrides: data_cfg.scores_dir=/path, data_cfg.per_capability=true - defaults: - _self_ -hydra: - run: - dir: . - data_cfg: scores_dir: data/scores_sample # required; all **/*.json under this dir per_capability: true # true: fit per capability; false: one combined fit @@ -15,19 +8,11 @@ data_cfg: capabilities_dir: /projects/aieng/public/ace/artifacts/negin_ace/taks/math output_cfg: - output_dir: null # null = irt_results/_ in project root - output_filename: irt_analysis.json + output_dir: null # required: directory to write irt_analysis.json (and updated_tasks/ if capabilities_dir set) + output_filename: irt_analysis.json # required irt_cfg: - model_type: "3PL" # 1PL, 2PL, or 3PL + model_type: "3PL" # 1PL, 2PL, or 3PL (required) max_iterations: 1000 tolerance: 1.0e-6 quadrature_n: 41 - -# Top-level fallbacks when nested keys not overridden -model_type: "3PL" -max_iterations: 2000 -quadrature_n: 41 -output_dir: null -dataset_id: null -verbose: false diff --git a/src/run_irt_analysis.py b/src/run_irt_analysis.py index 31c2b91b..d93706ba 100644 --- a/src/run_irt_analysis.py +++ b/src/run_irt_analysis.py @@ -1,15 +1,11 @@ -"""Run IRT analysis on evaluation results. +"""Run IRT analysis on evaluation scores. -Loads all **/*.json under data_cfg.scores_dir, extracts (model, question) responses, -builds a response matrix, and fits 1PL/2PL/3PL IRT (girth). data_cfg.per_capability -controls whether to fit one combined model (false) or one model per capability (true). -Configuration: src/cfg/irt_cfg.yaml. Override from CLI, e.g.: - python src/run_irt_analysis.py data_cfg.scores_dir=/path/to/scores +Loads **/*.json from data_cfg.scores_dir, builds a response matrix, fits 1PL/2PL/3PL (girth). +Config: src/cfg/irt_cfg.yaml. Override from CLI, e.g. data_cfg.scores_dir=/path output_cfg.output_dir=/out. """ import logging import os -from datetime import datetime from typing import Any, Dict, List, Tuple import hydra @@ -17,9 +13,8 @@ from src.schemas.irt_schemas import IRTAnalysis, IRTItemParameters from src.utils import constants -from src.utils.data_utils import list_dir, write_json_file +from src.utils.data_utils import write_json_file from src.utils.irt_utils import ( - build_response_matrix_from_inspect_score_files, create_response_matrix_from_flat, discover_tasks_files, extract_question_responses, @@ -32,76 +27,6 @@ logger = logging.getLogger(__name__) -def discover_model_score_files( - scores_dir: str, - domain: str, - capability_name: str, - model_names: List[str] | None = None, -) -> List[Tuple[str, str]]: - """ - Discover (model_name, score_json_path) for a capability. - - Expects layout: scores_dir / model_name / domain / capability_name / *.json - - Args - ---- - scores_dir: Base directory containing model subdirs. - domain: Capability domain (e.g. "mathematics"). - capability_name: Capability name (e.g. "number_theory_combinatorics"). - model_names: If provided, only these models are used; otherwise - all subdirs of scores_dir are treated as model names. - - Returns - ------- - List of (model_name, json_path). Only includes models that have - at least one .json file in their capability folder. - """ - if model_names is None: - try: - model_names = list_dir(scores_dir) - except (FileNotFoundError, NotADirectoryError) as e: - logger.error("Cannot list scores_dir %s: %s", scores_dir, e) - return [] - - result: List[Tuple[str, str]] = [] - for model_name in model_names: - cap_dir = os.path.join( - scores_dir, model_name, domain, capability_name - ) - if not os.path.isdir(cap_dir): - logger.debug( - "Skipping model %s: no directory %s", - model_name, - cap_dir, - ) - continue - try: - files = [ - f - for f in list_dir(cap_dir) - if f.endswith(".json") - ] - except Exception as e: - logger.warning( - "Skipping model %s: failed to list %s: %s", - model_name, - cap_dir, - e, - ) - continue - if not files: - logger.debug( - "Skipping model %s: no .json in %s", - model_name, - cap_dir, - ) - continue - # Use first JSON file (e.g. single eval run per model) - json_path = os.path.join(cap_dir, files[0]) - result.append((model_name, json_path)) - return result - - def _fit_and_build( response_matrix: List[List[int]], question_ids: List[str], @@ -145,7 +70,7 @@ def _fit_and_build( ) -def run_irt_analysis_flat( +def run_irt_analysis( scores_dir: str, model_type: str = "3PL", max_iterations: int = 1000, @@ -158,8 +83,8 @@ def run_irt_analysis_flat( capabilities_dir: str | None = None, ) -> IRTAnalysis | Dict[str, IRTAnalysis]: """ - Run IRT pipeline using flat loading: load all **/*.json under scores_dir, - extract (model, question) responses (custom_scorer C), build matrix, fit IRT. + Run IRT pipeline: load all **/*.json under scores_dir, extract (model, question) + responses (custom_scorer C), build matrix, fit IRT. Args ---- @@ -322,143 +247,58 @@ def run_irt_analysis_flat( return analysis -def run_irt_analysis( - scores_dir: str, - domain: str, - capability_name: str, - model_names: List[str] | None = None, - model_type: str = "3PL", - max_iterations: int = 2000, - quadrature_n: int = 41, - tolerance: float = 1e-6, - output_dir: str | None = None, - output_filename: str = "irt_analysis.json", - dataset_id: str | None = None, -) -> IRTAnalysis: - """ - Run full IRT pipeline (per-capability): discover score files by domain/capability, - build matrix, fit IRT, return IRTAnalysis. +def _require_config(cfg: DictConfig) -> None: + """Validate required config; raise ValueError if any required key is missing or null.""" + missing: List[str] = [] - Args - ---- - scores_dir: Base directory for evaluation scores (layout: scores_dir/model/domain/capability/*.json). - domain: Capability domain. - capability_name: Capability name. - model_names: Optional list of model names; if None, discovered from scores_dir. - model_type: "1PL", "2PL", or "3PL". - max_iterations: MML max iterations. - quadrature_n: Quadrature points. - tolerance: Convergence tolerance (stored in evaluation_settings). - output_dir: If set, IRTAnalysis is saved here as output_filename. - output_filename: Output JSON filename. - dataset_id: Identifier for the dataset (default: capability_name). + data_cfg = cfg.get("data_cfg") or {} + if not data_cfg.get("scores_dir") or str(data_cfg.get("scores_dir", "")).strip().lower() in ("null", ""): + missing.append("data_cfg.scores_dir") - Returns - ------- - IRTAnalysis instance with item parameters and context. - """ - model_score_files = discover_model_score_files( - scores_dir=scores_dir, - domain=domain, - capability_name=capability_name, - model_names=model_names, - ) - if not model_score_files: - raise FileNotFoundError( - f"No score files found for capability {capability_name} " - f"(domain={domain}) under {scores_dir}. " - "Ensure evaluation has been run for at least one model." - ) + output_cfg = cfg.get("output_cfg") or {} + if not output_cfg.get("output_dir") or str(output_cfg.get("output_dir", "")).strip().lower() == "null": + missing.append("output_cfg.output_dir") + if not output_cfg.get("output_filename") or str(output_cfg.get("output_filename", "")).strip() == "": + missing.append("output_cfg.output_filename") - response_matrix, question_ids, names = ( - build_response_matrix_from_inspect_score_files(model_score_files) - ) - if not response_matrix or not question_ids or not names: + irt = cfg.get("irt_cfg") or {} + if not irt.get("model_type") or str(irt.get("model_type", "")).strip() == "": + missing.append("irt_cfg.model_type") + if irt.get("max_iterations") is None: + missing.append("irt_cfg.max_iterations") + if irt.get("tolerance") is None: + missing.append("irt_cfg.tolerance") + if irt.get("quadrature_n") is None: + missing.append("irt_cfg.quadrature_n") + + if missing: raise ValueError( - f"Response matrix is empty for {capability_name}. " - "Check that all model score files share at least one task id." + "Missing required config (set in irt_cfg.yaml or override from CLI): " + ", ".join(missing) ) - analysis = _fit_and_build( - response_matrix=response_matrix, - question_ids=question_ids, - model_names=names, - dataset_id=dataset_id or capability_name, - model_type=model_type, - max_iterations=max_iterations, - quadrature_n=quadrature_n, - tolerance=tolerance, - ) - if output_dir: - os.makedirs(output_dir, exist_ok=True) - out_path = os.path.join(output_dir, output_filename) - write_json_file(out_path, analysis.to_dict()) - logger.info("IRT analysis saved to %s", out_path) - return analysis +@hydra.main(version_base=None, config_path="cfg", config_name="irt_cfg") +def main(cfg: DictConfig) -> None: + """Run IRT analysis using config from cfg/irt_cfg.yaml (overridable via CLI).""" + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") -def _resolve_output_dir( - cfg: DictConfig, - default_dataset_id: str, -) -> str: - """Resolve output_dir from output_cfg.output_dir or output_dir; default to irt_results/_.""" - output_cfg = cfg.get("output_cfg") or {} - out = output_cfg.get("output_dir") if output_cfg else None - if out is None: - out = cfg.get("output_dir") - if out is None or (isinstance(out, str) and out.lower() == "null"): - timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%SZ") - # Use local project directory instead of shared artifacts directory - project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - return os.path.join( - project_root, - "irt_results", - f"{default_dataset_id}_{timestamp}", - ) - return str(out) - + _require_config(cfg) -def _resolve_output_filename(cfg: DictConfig) -> str: - """Resolve output filename from output_cfg.output_filename or default irt_analysis.json.""" + data_cfg = cfg.get("data_cfg") or {} output_cfg = cfg.get("output_cfg") or {} - name = output_cfg.get("output_filename") if output_cfg else None - if name is None or (isinstance(name, str) and name.strip() == ""): - return "irt_analysis.json" - return str(name).strip() - - -def _resolve_irt_params(cfg: DictConfig) -> tuple[str, int, float, int]: - """Return (model_type, max_iterations, tolerance, quadrature_n) from irt_cfg with top-level fallback.""" irt = cfg.get("irt_cfg") or {} - model_type = str(irt.get("model_type") or cfg.get("model_type", "3PL")) - max_iterations = int(irt.get("max_iterations") or cfg.get("max_iterations", 2000)) - tolerance = float(irt.get("tolerance", 1e-6)) - quadrature_n = int(irt.get("quadrature_n") or cfg.get("quadrature_n", 41)) - return model_type, max_iterations, tolerance, quadrature_n + scores_dir = str(data_cfg["scores_dir"]).strip() + output_dir = str(output_cfg["output_dir"]).strip() + output_filename = str(output_cfg["output_filename"]).strip() + model_type = str(irt["model_type"]).strip() + max_iterations = int(irt["max_iterations"]) + tolerance = float(irt["tolerance"]) + quadrature_n = int(irt["quadrature_n"]) -@hydra.main(version_base=None, config_path="cfg", config_name="irt_cfg") -def main(cfg: DictConfig) -> None: - """Run IRT analysis using config from cfg/irt_cfg.yaml (overridable via CLI).""" - logging.basicConfig( - level=logging.DEBUG if cfg.get("verbose", False) else logging.INFO, - format="%(asctime)s %(levelname)s %(name)s: %(message)s", - ) - - model_type, max_iterations, tolerance, quadrature_n = _resolve_irt_params(cfg) - output_filename = _resolve_output_filename(cfg) + dataset_id = data_cfg.get("dataset_id") or cfg.get("dataset_id") + dataset_id = str(dataset_id).strip() if dataset_id is not None and str(dataset_id).strip().lower() not in ("null", "") else None - data_cfg = cfg.get("data_cfg") or {} - scores_dir = data_cfg.get("scores_dir") if data_cfg else None - if not scores_dir or str(scores_dir).strip().lower() in ("null", ""): - raise ValueError( - "data_cfg.scores_dir is required. Set it in irt_cfg.yaml or override from CLI, e.g. " - "python src/run_irt_analysis.py data_cfg.scores_dir=/path/to/scores" - ) - scores_dir = str(scores_dir).strip() - dataset_id = cfg.get("dataset_id") - dataset_id = str(dataset_id) if dataset_id is not None else "flat" - output_dir = _resolve_output_dir(cfg, dataset_id) per_capability = bool(data_cfg.get("per_capability", False)) capabilities_dir_val = data_cfg.get("capabilities_dir") capabilities_dir = ( @@ -468,7 +308,7 @@ def main(cfg: DictConfig) -> None: else None ) - run_irt_analysis_flat( + run_irt_analysis( scores_dir=scores_dir, model_type=model_type, max_iterations=max_iterations, @@ -476,7 +316,7 @@ def main(cfg: DictConfig) -> None: tolerance=tolerance, output_dir=output_dir, output_filename=output_filename, - dataset_id=dataset_id if dataset_id != "flat" else None, + dataset_id=dataset_id, per_capability=per_capability, capabilities_dir=capabilities_dir, ) diff --git a/src/schemas/task_schemas.py b/src/schemas/task_schemas.py index ed399d04..1cc2ccf3 100644 --- a/src/schemas/task_schemas.py +++ b/src/schemas/task_schemas.py @@ -27,6 +27,10 @@ class Task: None # [{"label": "A", "solution": "..."}] ) generation_metadata: Optional[Dict[str, Any]] = field(default_factory=dict) + # IRT (Item Response Theory) parameters from 3PL fit (optional until analysis is run) + irt_difficulty: Optional[float] = None + irt_discrimination: Optional[float] = None + irt_guessing: Optional[float] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" @@ -42,6 +46,12 @@ def to_dict(self) -> Dict[str, Any]: } if self.generation_metadata: result["generation_metadata"] = self.generation_metadata + if self.irt_difficulty is not None: + result["irt_difficulty"] = self.irt_difficulty + if self.irt_discrimination is not None: + result["irt_discrimination"] = self.irt_discrimination + if self.irt_guessing is not None: + result["irt_guessing"] = self.irt_guessing return result @classmethod @@ -58,4 +68,7 @@ def from_dict(cls, data: Dict[str, Any]) -> Task: choices=data.get("choices"), capability=capability, generation_metadata=data.get("generation_metadata", {}), + irt_difficulty=data.get("irt_difficulty"), + irt_discrimination=data.get("irt_discrimination"), + irt_guessing=data.get("irt_guessing"), )