From c4303e1b2cede508e5e0571d92def9579be9b0c3 Mon Sep 17 00:00:00 2001 From: Seungpyo1007 Date: Mon, 22 Jun 2026 11:25:08 +0900 Subject: [PATCH 1/3] feat(verify): add tiered data verification layer (Tier 0 offline scoring) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds app/verify/, an existence/trust verification layer that sits above the structural validator (app/validate.py, untouched). It answers "does this record describe a real, existing device/part — confidently enough to set verified:true?" to lift the ~1.2% verified ratio. - Tier 0 (offline, deterministic, all ~102k records): completeness + cross-field consistency (signals.py) + source-host trust (hosts.py) + provenance -> a green/yellow/red band. Full scores cached to gitignored data/_verify/state/; the tracked data/_verify/ledger.jsonl is reserved for promotion decisions. - Tier 1 (http_check.py): source_urls HTTP liveness, urllib + ThreadPool, per-host rate limit, resumable TTL cache. - Tier 2 (crossref.py): external cross-reference under a strict exact-heading rule (no fuzzy matching; ambiguous candidates never auto-promote). - Tier 3 (promote.py): hybrid escalation + surgical verified:false->true write-back (only that token, atomic, LF-preserved, never clobbers curated data). CLI: python -m app.verify score|report|check-urls|crossref|promote. CI: non-blocking verify-offline job in validate-data.yml; scheduled/manual verify-network.yml for network tiers with a diff-scope guard. Validates that the offline scorer reproduces the human-curated verified CPU set (40 tests pass). Refs #1 --- .github/workflows/validate-data.yml | 26 ++ .github/workflows/verify-network.yml | 116 +++++++ .gitignore | 4 + app/verify/__init__.py | 18 ++ app/verify/__main__.py | 8 + app/verify/cli.py | 407 +++++++++++++++++++++++++ app/verify/common.py | 111 +++++++ app/verify/crossref.py | 124 ++++++++ app/verify/hosts.py | 115 +++++++ app/verify/http_check.py | 228 ++++++++++++++ app/verify/ledger.py | 101 ++++++ app/verify/offline.py | 135 ++++++++ app/verify/promote.py | 88 ++++++ app/verify/signals.py | 253 +++++++++++++++ tests/conftest.py | 4 + tests/verify/__init__.py | 0 tests/verify/test_http_check.py | 104 +++++++ tests/verify/test_offline.py | 65 ++++ tests/verify/test_parity_and_golden.py | 56 ++++ tests/verify/test_promote_crossref.py | 122 ++++++++ tests/verify/test_signals.py | 88 ++++++ 21 files changed, 2173 insertions(+) create mode 100644 .github/workflows/verify-network.yml create mode 100644 app/verify/__init__.py create mode 100644 app/verify/__main__.py create mode 100644 app/verify/cli.py create mode 100644 app/verify/common.py create mode 100644 app/verify/crossref.py create mode 100644 app/verify/hosts.py create mode 100644 app/verify/http_check.py create mode 100644 app/verify/ledger.py create mode 100644 app/verify/offline.py create mode 100644 app/verify/promote.py create mode 100644 app/verify/signals.py create mode 100644 tests/conftest.py create mode 100644 tests/verify/__init__.py create mode 100644 tests/verify/test_http_check.py create mode 100644 tests/verify/test_offline.py create mode 100644 tests/verify/test_parity_and_golden.py create mode 100644 tests/verify/test_promote_crossref.py create mode 100644 tests/verify/test_signals.py diff --git a/.github/workflows/validate-data.yml b/.github/workflows/validate-data.yml index eb276b3b863..f18c88dde71 100644 --- a/.github/workflows/validate-data.yml +++ b/.github/workflows/validate-data.yml @@ -7,11 +7,15 @@ on: paths: - "data/**" - "app/validate.py" + - "app/verify/**" + - "tests/verify/**" push: branches: [main] paths: - "data/**" - "app/validate.py" + - "app/verify/**" + - "tests/verify/**" jobs: self-validate: @@ -24,6 +28,28 @@ jobs: - name: Self-check (bundled validator) run: python -m app.validate + # Non-blocking existence/trust signal: scores the records changed in this PR + # with the Tier 0 offline verifier and prints a band histogram. Informational + # only — never gates the merge (continue-on-error). + verify-offline: + runs-on: ubuntu-latest + continue-on-error: true + env: + PYTHONIOENCODING: utf-8 + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - name: Tier 0 verification report (changed records) + run: | + git fetch origin main --depth=1 || true + python -m app.verify score --changed --no-cache + - name: Verifier unit tests + run: python -m pytest tests/verify -q -m "not slow" + engine-validate: needs: self-validate uses: GetTechAPI/TechEngine/.github/workflows/validate-data.yml@main diff --git a/.github/workflows/verify-network.yml b/.github/workflows/verify-network.yml new file mode 100644 index 00000000000..92c3e3305b1 --- /dev/null +++ b/.github/workflows/verify-network.yml @@ -0,0 +1,116 @@ +name: verify-network + +# Network verification tiers (source-URL liveness + external cross-reference) and +# verified promotion. NEVER runs on pull_request — these tiers hit external sites, +# are rate-limited, and must not gate a merge. Scheduled + manual only. Promotions +# are written on a branch and opened as a PR for human review; the job hard-guards +# that nothing but `verified` flags and the ledger changed. + +on: + workflow_dispatch: + inputs: + apply: + description: "Flip verified->true and open a PR (otherwise dry-run only)" + type: boolean + default: false + max_urls: + description: "Frontier records to URL-check" + default: "2000" + max_crossref: + description: "Yellow/red records to cross-reference" + default: "500" + schedule: + - cron: "0 4 * * 1" # Mondays 04:00 UTC + +permissions: + contents: write + pull-requests: write + +jobs: + verify-network: + runs-on: ubuntu-latest + env: + PYTHONIOENCODING: utf-8 + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + # Resumable caches (URL + crossref). Recomputable, so a miss is harmless. + - name: Restore verify caches + uses: actions/cache@v4 + with: + path: data/_verify/state + key: verify-state-${{ github.run_id }} + restore-keys: verify-state- + + - name: Tier 0 score (writes scores cache) + run: python -m app.verify score + + - name: Tier 1 source-URL liveness + run: python -m app.verify check-urls --max ${{ github.event.inputs.max_urls || '2000' }} + + - name: Tier 2 external cross-reference + run: python -m app.verify crossref --max ${{ github.event.inputs.max_crossref || '500' }} + + - name: Tier 3 promote (dry-run) + run: python -m app.verify promote + + - name: Tier 3 promote (apply) + if: ${{ github.event.inputs.apply == 'true' }} + run: python -m app.verify promote --apply + + - name: Structural validator self-check + if: ${{ github.event.inputs.apply == 'true' }} + run: python -m app.validate + + # Guard: the only tracked changes may be `verified` toggles in data/**.json + # plus the promotion ledger. Anything else fails the run loudly. + - name: Guard diff scope + if: ${{ github.event.inputs.apply == 'true' }} + run: | + python - <<'PY' + import subprocess, sys + out = subprocess.run(["git", "diff", "--unified=0", "--", "data/"], + capture_output=True, text=True).stdout + bad = [] + for line in out.splitlines(): + if line.startswith(("+++", "---", "@@", "diff ", "index ")): + continue + if line.startswith(("+", "-")) and line[1:].strip(): + body = line[1:].strip().rstrip(",") + if body not in ('"verified": true', '"verified": false'): + bad.append(line) + if bad: + print("Unexpected non-verified changes:") + print("\n".join(bad[:50])) + sys.exit(1) + print("diff scope OK: only verified toggles") + PY + + - name: Open promotion PR + if: ${{ github.event.inputs.apply == 'true' }} + env: + GH_TOKEN: ${{ secrets.TECHAPI_TOKEN || secrets.GITHUB_TOKEN }} + run: | + set -e + if git diff --quiet -- data/; then + echo "no promotions to commit"; exit 0 + fi + branch="verify/promote-${{ github.run_id }}" + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + git checkout -b "$branch" + git add data/ + git commit -m "data(verify): promote records to verified via cross-reference + + Auto-promotions from the verification layer (green+live-T1 or crossref-confirm). + Each flip is verified:false->true only; see data/_verify/ledger.jsonl. Refs #1" + git push origin "$branch" + gh pr create --base main --head "$branch" \ + --title "data(verify): verified promotions ($(date -u +%Y-%m-%d))" \ + --body "Automated verified promotions from \`app.verify promote\`. Each change flips only the \`verified\` flag; structural validator passed and diff scope guarded. Review before merge. Refs #1" diff --git a/.gitignore b/.gitignore index 9fff308ed68..6b890c3926a 100644 --- a/.gitignore +++ b/.gitignore @@ -31,6 +31,10 @@ env/ # Note: data/_staging/ (raw collected candidate pool) is intentionally tracked — # comprehensive data collection is a purpose of this repo. +# Verification layer caches: full Tier 0 scores + network caches are cheap to +# recompute. Only data/_verify/ledger.jsonl (the promotion audit trail) is tracked. +data/_verify/state/ + # Testing / coverage .pytest_cache/ .coverage diff --git a/app/verify/__init__.py b/app/verify/__init__.py new file mode 100644 index 00000000000..981370d1713 --- /dev/null +++ b/app/verify/__init__.py @@ -0,0 +1,18 @@ +"""TechAPI data *verification* layer (§ existence/trust, sits above structural validation). + +``app.validate`` answers "is this record well-formed?". ``app.verify`` answers +"does this record describe a real, actually-existing device/part — confidently +enough to mark it ``verified``?". + +It is a separate, additive layer: the structural validator (``app/validate.py``) +stays the fast CI gate and is never rewritten. Verification is tiered: + +* Tier 0 — offline deterministic plausibility score over the whole dataset + (``offline``/``signals``/``hosts``); bands records green/yellow/red. +* Tier 1 — ``source_urls`` HTTP liveness (``http_check``). +* Tier 2 — external cross-reference under an exact-heading rule (``crossref``). +* Tier 3 — hybrid escalation + safe ``verified:true`` write-back (``promote``). + +Decisions are recorded append-only in ``data/_verify/ledger.jsonl`` so runs are +incremental and resumable. +""" diff --git a/app/verify/__main__.py b/app/verify/__main__.py new file mode 100644 index 00000000000..d90380f8460 --- /dev/null +++ b/app/verify/__main__.py @@ -0,0 +1,8 @@ +"""``python -m app.verify`` entry point.""" + +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/app/verify/cli.py b/app/verify/cli.py new file mode 100644 index 00000000000..57a49b4fdb9 --- /dev/null +++ b/app/verify/cli.py @@ -0,0 +1,407 @@ +"""Command-line entry for the verification layer: ``python -m app.verify ...``. + +Phase A implements the offline tier: + +* ``score`` — score records, print a band histogram, append Tier 0 ledger entries. +* ``report`` — summarize the latest ledger state per category. + +Network subcommands (``check-urls``, ``crossref``, ``promote``) are added in later +phases; they are declared here so ``--help`` lists the eventual surface. +""" + +from __future__ import annotations + +import argparse +import subprocess +from collections import Counter, defaultdict +from datetime import datetime, timezone + +from . import crossref, http_check, ledger, offline, promote +from .common import ( + CATEGORIES, + SCORES_PATH, + Record, + configure_stdout, + foreign_key_sets, + load_all, + repo_path, +) + +BANDS = ("green", "yellow", "red") + + +def _now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _changed_data_slugs() -> set[str]: + """Repo-relative data/ paths changed vs origin/main (for CI --changed).""" + try: + out = subprocess.run( + ["git", "diff", "--name-only", "origin/main...HEAD", "--", "data/"], + capture_output=True, text=True, check=True, + ).stdout + except Exception: + out = "" + # strip leading "data/" so it matches Record.path + paths = set() + for line in out.splitlines(): + line = line.strip() + if line.startswith("data/") and line.endswith(".json"): + paths.add(line[len("data/"):]) + return paths + + +def _iter_selected( + records: dict[str, list[Record]], + categories: tuple[str, ...], + unverified_only: bool, + changed: set[str] | None, + limit: int | None, +): + count = 0 + for cat in categories: + for rec in records[cat]: + if unverified_only and rec.verified: + continue + if changed is not None and rec.path not in changed: + continue + yield rec + count += 1 + if limit is not None and count >= limit: + return + + +def cmd_score(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + ts = _now_iso() + + categories = tuple(args.category) if args.category else CATEGORIES + changed = _changed_data_slugs() if args.changed else None + + # The scores cache is a full-dataset snapshot; only rewrite it on a full run. + full_scope = args.category is None and args.max is None and not args.changed + write_cache = full_scope and not args.no_cache + + # category -> band -> count + hist: dict[str, Counter] = defaultdict(Counter) + hard_flags: Counter = Counter() + entries = [] + scored = 0 + + for rec in _iter_selected(records, categories, args.unverified_only, changed, args.max): + if not rec.slug: + continue + s = offline.score_record(rec, now_year, soc_release) + hist[rec.category][s.band] += 1 + scored += 1 + for f in s.flags: + if f.startswith("!"): + hard_flags[f] += 1 + if write_cache: + entries.append( + ledger.make_tier0_entry( + rec.category, rec.slug, rec.path, rec.content_hash(), + s.score, s.band, s.subscores, s.flags, s.best_tier, ts, + ) + ) + + if write_cache: + ledger.replace_all(entries, SCORES_PATH) + + _print_histogram(hist, scored, hard_flags, wrote_cache=write_cache) + return 0 + + +def _print_histogram(hist, scored, hard_flags, wrote_cache) -> None: + print(f"Tier 0 offline score — {scored} record(s)\n") + header = f"{'category':<12} {'green':>8} {'yellow':>8} {'red':>8} {'total':>8}" + print(header) + print("-" * len(header)) + totals = Counter() + for cat in CATEGORIES: + if cat not in hist: + continue + c = hist[cat] + tot = sum(c.values()) + totals.update(c) + print(f"{cat:<12} {c['green']:>8} {c['yellow']:>8} {c['red']:>8} {tot:>8}") + print("-" * len(header)) + gtot = sum(totals.values()) or 1 + print( + f"{'ALL':<12} {totals['green']:>8} {totals['yellow']:>8} " + f"{totals['red']:>8} {sum(totals.values()):>8}" + ) + print( + f"\nbands: green {100*totals['green']/gtot:.1f}% " + f"yellow {100*totals['yellow']/gtot:.1f}% red {100*totals['red']/gtot:.1f}%" + ) + if hard_flags: + print("\ntop hard violations:") + for name, n in hard_flags.most_common(10): + print(f" {n:>7} {name}") + if wrote_cache: + print("\ncache: wrote full Tier 0 scores to data/_verify/state/scores.jsonl") + + +def cmd_report(args: argparse.Namespace) -> int: + if not SCORES_PATH.exists(): + print("no scores cache — run `python -m app.verify score` first") + return 0 + hist: dict[str, Counter] = defaultdict(Counter) + hard_flags: Counter = Counter() + for entry in ledger.iter_entries(SCORES_PATH): + cat = entry.get("category") + t0 = entry.get("tier0", {}) + band = t0.get("band") + if cat and band: + hist[cat][band] += 1 + for f in t0.get("flags", []): + if isinstance(f, str) and f.startswith("!"): + hard_flags[f] += 1 + scored = sum(sum(c.values()) for c in hist.values()) + _print_histogram(hist, scored, hard_flags, wrote_cache=False) + + # Promotion decisions live in the git-tracked ledger. + promoted: Counter = Counter() + for (cat, _slug), entry in ledger.latest_by_key().items(): + if entry.get("decision") == "promote": + promoted[cat] += 1 + if sum(promoted.values()): + print("\npromoted to verified (ledger):") + for cat, n in promoted.most_common(): + print(f" {n:>7} {cat}") + return 0 + + +def _ranked_unverified(records, soc_release, now_year, categories): + """Unverified records of the given categories, scored, highest-confidence first.""" + scored = [] + for cat in categories: + for rec in records[cat]: + if rec.verified or not rec.slug: + continue + s = offline.score_record(rec, now_year, soc_release) + scored.append((s.score, rec)) + scored.sort(key=lambda t: t[0], reverse=True) + return [rec for _score, rec in scored] + + +def cmd_check_urls(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + categories = tuple(args.category) if args.category else CATEGORIES + + frontier = _ranked_unverified(records, soc_release, now_year, categories) + if args.max is not None: + frontier = frontier[: args.max] + + urls: list[str] = [] + for rec in frontier: + urls.extend(u for u in rec.data.get("source_urls", []) if isinstance(u, str)) + targets = http_check.dedupe_urls(urls) + + cache = http_check.load_cache() + now = datetime.now(timezone.utc) + if args.recheck: + todo = targets + else: + todo = [u for u in targets if not ( + u in cache and http_check.is_fresh(cache[u], now, args.ttl_days) + )] + + print( + f"check-urls: {len(frontier)} record(s) -> {len(targets)} unique URL(s); " + f"{len(targets) - len(todo)} fresh in cache, checking {len(todo)}" + ) + if not todo: + _summarize_cache(cache, targets) + return 0 + + ts = _now_iso() + results = http_check.check_urls( + todo, + max_workers=args.workers, + min_interval=args.min_interval, + ) + for r in results: + cache[r.url] = http_check.result_to_entry(r, ts) + http_check.save_cache(cache) + print(f"cache: wrote {len(cache)} URL result(s) to data/_verify/state/url_cache.jsonl") + _summarize_cache(cache, targets) + return 0 + + +def _summarize_cache(cache, targets) -> None: + from collections import Counter + alive = sum(1 for u in targets if cache.get(u, {}).get("alive")) + dead = sum(1 for u in targets if u in cache and not cache[u].get("alive")) + print(f"\nliveness over {len(targets)} targeted URL(s): {alive} alive, {dead} dead") + reasons = Counter( + cache[u].get("reason") for u in targets + if u in cache and not cache[u].get("alive") + ) + if reasons: + print("dead reasons:") + for reason, n in reasons.most_common(10): + print(f" {n:>6} {reason}") + + +def cmd_crossref(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + categories = tuple(args.category) if args.category else CATEGORIES + + # Escalation target: yellow/red unverified frontier (greens promote via live T1). + targets = [] + for rec in _ranked_unverified(records, soc_release, now_year, categories): + s = offline.score_record(rec, now_year, soc_release) + if s.band in ("yellow", "red"): + targets.append(rec) + targets = targets[: args.max] + + fetcher = crossref.WikipediaFetcher() + cache = promote.load_crossref_cache() + ts = _now_iso() + decisions = Counter() + new_entries = [] + for rec in targets: + key = (rec.category, rec.slug) + if not args.recheck and key in cache: + decisions[cache[key].get("decision", "cached")] += 1 + continue + res = crossref.crossref_record(rec.data, fetcher) + decisions[res.decision] += 1 + new_entries.append({ + "ts": ts, "category": rec.category, "slug": rec.slug, + "source": res.source, "decision": res.decision, + "exact_heading": res.exact_heading, "matched_url": res.matched_url, + }) + if new_entries: + cache.update({(e["category"], e["slug"]): e for e in new_entries}) + ledger.replace_all(list(cache.values()), promote.CROSSREF_CACHE_PATH) + + print(f"crossref: examined {len(targets)} record(s)") + for decision, n in decisions.most_common(): + print(f" {n:>6} {decision}") + return 0 + + +def cmd_promote(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + categories = tuple(args.category) if args.category else CATEGORIES + + url_cache = http_check.load_cache() + xref_cache = promote.load_crossref_cache() + ts = _now_iso() + + candidates = [] # (rec, band, reason) + blocked = Counter() + for cat in categories: + for rec in records[cat]: + if rec.verified or not rec.slug: + continue + s = offline.score_record(rec, now_year, soc_release) + urls = [u for u in rec.data.get("source_urls", []) if isinstance(u, str)] + xref = xref_cache.get((cat, rec.slug), {}).get("decision") + d = promote.decide( + band=s.band, source_urls=urls, url_cache=url_cache, crossref_decision=xref, + ) + if d.promote: + candidates.append((rec, s, d.reason)) + elif s.band == "green": + blocked["green-needs-live-t1"] += 1 + + if args.max is not None: + candidates = candidates[: args.max] + + print(f"promote: {len(candidates)} record(s) eligible " + f"({'APPLY' if args.apply else 'dry-run'})") + by_reason = Counter(reason for _r, _s, reason in candidates) + for reason, n in by_reason.most_common(): + print(f" {n:>6} {reason}") + if blocked: + print("blocked (green but no live T1 source yet — run check-urls):") + for reason, n in blocked.most_common(): + print(f" {n:>6} {reason}") + + if not args.apply: + for rec, s, reason in candidates[:20]: + print(f" would promote: {rec.path} [{s.band} {s.score}] {reason}") + if len(candidates) > 20: + print(f" ... and {len(candidates) - 20} more") + return 0 + + written = 0 + entries = [] + for rec, s, reason in candidates: + if promote.write_verified_true(repo_path(rec.path)): + written += 1 + entries.append({ + "ts": ts, "category": rec.category, "slug": rec.slug, "path": rec.path, + "hash": rec.content_hash(), "decision": "promote", + "prev_verified": False, "new_verified": True, "reason": reason, + "tier0": {"score": s.score, "band": s.band}, + "actor": "app.verify.promote", + }) + ledger.append_many(entries) + print(f"\napplied: flipped verified->true in {written} file(s); ledger updated") + print("next: run `python -m app.validate` and `git diff` to confirm only verified changed") + return 0 + + +def _not_implemented(args: argparse.Namespace) -> int: + print(f"`{args.cmd}` is a later-phase subcommand and is not implemented yet.") + return 2 + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(prog="python -m app.verify", description=__doc__) + sub = p.add_subparsers(dest="cmd", required=True) + + sc = sub.add_parser("score", help="Tier 0 offline plausibility scoring") + sc.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + sc.add_argument("--max", type=int, default=None, help="cap number scored") + sc.add_argument("--unverified-only", action="store_true", help="skip verified:true records") + sc.add_argument("--changed", action="store_true", help="only records changed vs origin/main") + sc.add_argument("--no-cache", action="store_true", help="do not write the scores cache") + sc.set_defaults(func=cmd_score) + + rp = sub.add_parser("report", help="summarize latest ledger state") + rp.set_defaults(func=cmd_report) + + cu = sub.add_parser("check-urls", help="Tier 1: source_urls HTTP liveness") + cu.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + cu.add_argument("--max", type=int, default=500, help="number of frontier records to target") + cu.add_argument("--workers", type=int, default=8, help="concurrent HTTP workers") + cu.add_argument("--min-interval", type=float, default=1.0, help="seconds between hits per host") + cu.add_argument("--ttl-days", type=int, default=http_check.DEFAULT_TTL_DAYS, help="cache freshness") + cu.add_argument("--recheck", action="store_true", help="ignore cache freshness") + cu.set_defaults(func=cmd_check_urls) + + cr = sub.add_parser("crossref", help="Tier 2: external cross-reference (exact heading)") + cr.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + cr.add_argument("--max", type=int, default=200, help="number of yellow/red records to escalate") + cr.add_argument("--recheck", action="store_true", help="ignore crossref cache") + cr.set_defaults(func=cmd_crossref) + + pr = sub.add_parser("promote", help="Tier 3: hybrid escalation + verified write-back") + pr.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + pr.add_argument("--max", type=int, default=None, help="cap number promoted") + pr.add_argument("--apply", action="store_true", help="actually flip verified (default: dry-run)") + pr.set_defaults(func=cmd_promote) + + return p + + +def main(argv: list[str] | None = None) -> int: + configure_stdout() + parser = build_parser() + args = parser.parse_args(argv) + return args.func(args) diff --git a/app/verify/common.py b/app/verify/common.py new file mode 100644 index 00000000000..06afb0996e0 --- /dev/null +++ b/app/verify/common.py @@ -0,0 +1,111 @@ +"""Shared loading + identity helpers for the verification layer. + +Reuses ``app.validate._load`` (the canonical seed loader) rather than +re-implementing JSON discovery, and rebuilds the brand/SoC foreign-key slug sets +the same way ``app.validate.validate`` does, so the verifier sees exactly the +data the structural gate sees. +""" + +from __future__ import annotations + +import hashlib +import json +import sys +from pathlib import Path +from typing import Any, Iterable + +from app.validate import DATA_DIR, _load + +# Categories the verifier knows about, in load order. Mirrors app.validate.validate. +CATEGORIES: tuple[str, ...] = ( + "brand", + "soc", + "smartphone", + "tablet", + "watch", + "pda", + "gpu", + "cpu", +) + +VERIFY_DIR = DATA_DIR / "_verify" +LEDGER_PATH = VERIFY_DIR / "ledger.jsonl" # git-tracked: promotion decisions only +STATE_DIR = VERIFY_DIR / "state" # gitignored caches +SCORES_PATH = STATE_DIR / "scores.jsonl" # full Tier 0 results (cheap to recompute) + + +class Record: + """A single seed record paired with its repo-relative path and category.""" + + __slots__ = ("category", "path", "data") + + def __init__(self, category: str, path: str, data: dict[str, Any]) -> None: + self.category = category + self.path = path # e.g. "cpu/intel/2023/desktop/core-i9-14900k.json" + self.data = data + + @property + def slug(self) -> str | None: + slug = self.data.get("slug") + return slug if isinstance(slug, str) else None + + @property + def verified(self) -> bool: + return self.data.get("verified") is True + + def content_hash(self) -> str: + """Stable hash of the record body — invalidates stale ledger decisions on edit.""" + blob = json.dumps(self.data, sort_keys=True, ensure_ascii=False) + return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] + + def __repr__(self) -> str: # pragma: no cover - debug aid + return f"Record({self.category}, {self.slug!r})" + + +def load_category(category: str) -> list[Record]: + """Load one category's records as :class:`Record` objects.""" + return [Record(category, path, data) for path, data in _load(category)] + + +def load_all(categories: Iterable[str] = CATEGORIES) -> dict[str, list[Record]]: + """Load every category into ``{category: [Record, ...]}``.""" + return {cat: load_category(cat) for cat in categories} + + +def foreign_key_sets( + records: dict[str, list[Record]], +) -> tuple[set[str], set[str], dict[str, str]]: + """Build FK lookups the way ``app.validate`` does, plus a SoC release-date map. + + Returns ``(brand_slugs, soc_slugs, soc_release_date)`` where ``soc_release_date`` + maps a SoC slug to its ISO release date (used for "chip can't postdate device"). + """ + brand_slugs = {r.slug for r in records.get("brand", []) if r.slug} + soc_slugs = {r.slug for r in records.get("soc", []) if r.slug} + soc_release: dict[str, str] = {} + for r in records.get("soc", []): + rd = r.data.get("release_date") + if r.slug and isinstance(rd, str): + soc_release[r.slug] = rd + return brand_slugs, soc_slugs, soc_release + + +def configure_stdout() -> None: + """Force UTF-8 stdout so emoji/box-drawing don't crash on Windows cp949. + + Mirrors ``app.validate.run`` (validate.py:336-340). + """ + try: + sys.stdout.reconfigure(encoding="utf-8") # type: ignore[union-attr] + except Exception: + pass + + +def ensure_verify_dirs() -> None: + VERIFY_DIR.mkdir(parents=True, exist_ok=True) + STATE_DIR.mkdir(parents=True, exist_ok=True) + + +def repo_path(rel: str) -> Path: + """Resolve a repo-relative seed path (as stored on a Record) to an absolute path.""" + return DATA_DIR / rel diff --git a/app/verify/crossref.py b/app/verify/crossref.py new file mode 100644 index 00000000000..d181aa3c6d0 --- /dev/null +++ b/app/verify/crossref.py @@ -0,0 +1,124 @@ +"""Tier 2 — external cross-reference under a strict exact-heading rule. + +Confirms a record describes a real, documented part by finding an authoritative +page (Wikidata / Wikipedia) whose *title* matches the record name exactly after +normalization. Fuzzy matches are explicitly NOT trusted: project experience shows +fuzzy heading matching serves the wrong SKU ~35% of the time, so a non-exact +candidate yields ``ambiguous`` (never an auto-promote). + +All network access goes through an injected ``fetcher`` so the decision logic is +unit-tested offline. The concrete fetcher (urllib against the Wikipedia/Wikidata +REST APIs) is only used by the CLI / scheduled workflow. +""" + +from __future__ import annotations + +import json +import re +from typing import Any, NamedTuple, Protocol +from urllib.parse import quote +from urllib.request import Request, urlopen + +# Decisions +CONFIRM = "confirm" +AMBIGUOUS = "ambiguous" +CONTRADICT = "contradict" +NOTFOUND = "notfound" + +_NORM_RE = re.compile(r"[^a-z0-9]+") + + +def normalize_heading(text: str) -> str: + """Lowercase, drop everything but [a-z0-9]. 'iPhone XR' -> 'iphonexr'.""" + return _NORM_RE.sub("", text.lower()) + + +class Candidate(NamedTuple): + title: str + url: str + year: int | None = None # release/inception year if the source exposes one + + +class Fetcher(Protocol): + def search(self, name: str) -> list[Candidate]: + ... + + +class CrossrefResult(NamedTuple): + slug: str + source: str + decision: str + exact_heading: bool + matched_url: str | None + spec_agreements: int + + +def _year_of(value: Any) -> int | None: + if isinstance(value, str) and len(value) >= 4 and value[:4].isdigit(): + return int(value[:4]) + return None + + +def crossref_record( + rec: dict[str, Any], fetcher: Fetcher, source: str = "wikidata" +) -> CrossrefResult: + """Decide confirm/ambiguous/contradict/notfound for one record.""" + name = rec.get("name") + slug = rec.get("slug") or "" + if not isinstance(name, str) or not name.strip(): + return CrossrefResult(slug, source, NOTFOUND, False, None, 0) + + candidates = fetcher.search(name) + if not candidates: + return CrossrefResult(slug, source, NOTFOUND, False, None, 0) + + target = normalize_heading(name) + exact = [c for c in candidates if normalize_heading(c.title) == target] + if not exact: + # Something came back, but no title matches exactly -> do not trust. + return CrossrefResult(slug, source, AMBIGUOUS, False, candidates[0].url, 0) + + cand = exact[0] + # Secondary gate: if both sides expose a release year, they must roughly agree. + rec_year = _year_of(rec.get("release_date")) + agreements = 0 + if rec_year is not None and cand.year is not None: + if abs(cand.year - rec_year) <= 1: + agreements = 1 + else: + return CrossrefResult(slug, source, CONTRADICT, True, cand.url, 0) + return CrossrefResult(slug, source, CONFIRM, True, cand.url, agreements) + + +# --- concrete fetchers (network; not exercised by unit tests) -------------------- + + +class WikipediaFetcher: + """Queries the MediaWiki opensearch API for candidate page titles.""" + + API = "https://en.wikipedia.org/w/api.php" + UA = "TechAPI-verify/0.1 (https://github.com/GetTechAPI)" + + def __init__(self, timeout: float = 10.0, limit: int = 5) -> None: + self.timeout = timeout + self.limit = limit + + def search(self, name: str) -> list[Candidate]: + url = ( + f"{self.API}?action=opensearch&format=json&limit={self.limit}" + f"&search={quote(name)}" + ) + try: + req = Request(url, headers={"User-Agent": self.UA}) + with urlopen(req, timeout=self.timeout) as resp: + data = json.loads(resp.read().decode("utf-8")) + except Exception: + return [] + # opensearch returns [query, [titles...], [descs...], [urls...]] + titles = data[1] if len(data) > 1 else [] + urls = data[3] if len(data) > 3 else [] + out: list[Candidate] = [] + for i, title in enumerate(titles): + url_i = urls[i] if i < len(urls) else "" + out.append(Candidate(title=title, url=url_i)) + return out diff --git a/app/verify/hosts.py b/app/verify/hosts.py new file mode 100644 index 00000000000..00ed9152e6e --- /dev/null +++ b/app/verify/hosts.py @@ -0,0 +1,115 @@ +"""Source-host trust classification. + +Grounded in a real signal in the dataset: already-``verified`` records cite +authoritative hosts (en.wikipedia.org, ark.intel.com, amd.com, apple.com, +cpubenchmark.net, ...), while bulk-imported unverified records cite *only* +kaggle.com. The host a record's ``source_urls`` point at is therefore a strong, +learned discriminator of "is this a real, documented part?". +""" + +from __future__ import annotations + +from typing import Iterable +from urllib.parse import urlparse + +# Tier 1 — primary/manufacturer + top reference encyclopaedias. A live T1 source +# is strong enough to auto-promote a green record without external cross-ref. +T1_HOSTS: frozenset[str] = frozenset( + { + "ark.intel.com", + "intel.com", + "amd.com", + "qualcomm.com", + "apple.com", + "nvidia.com", + "samsung.com", + "mediatek.com", + "arm.com", + "en.wikipedia.org", + "wikipedia.org", + "wikichip.org", + "en.wikichip.org", + "techpowerup.com", + } +) + +# Tier 2 — reputable spec/benchmark databases. Trustworthy but secondary. +T2_HOSTS: frozenset[str] = frozenset( + { + "gsmarena.com", + "phonedb.net", + "cpubenchmark.net", + "videocardbenchmark.net", + "nanoreview.net", + "technical.city", + "topcpu.net", + "notebookcheck.net", + "geekbench.com", + "kimovil.com", + "devicespecifications.com", + } +) + +# Tier 3 — bulk dumps / aggregators / CDNs. Present in nearly every unverified +# import; on their own they do not establish real-world existence. +T3_HOSTS: frozenset[str] = frozenset( + { + "kaggle.com", + "github.com", + "raw.githubusercontent.com", + "commons.wikimedia.org", + "jsdelivr.net", + "cdn.jsdelivr.net", + "aitoolbuzz.com", + } +) + + +def host_of(url: str) -> str: + """Return the lowercased registrable-ish host of a URL (``www.`` stripped).""" + try: + netloc = urlparse(url).netloc.lower() + except Exception: + return "" + netloc = netloc.split("@")[-1].split(":")[0] + if netloc.startswith("www."): + netloc = netloc[4:] + return netloc + + +def _matches(host: str, hosts: frozenset[str]) -> bool: + # Exact host or a subdomain of a listed host (e.g. "x.intel.com" -> "intel.com"). + if host in hosts: + return True + return any(host.endswith("." + h) for h in hosts) + + +def tier_of_host(host: str) -> int: + """1, 2, or 3 for a known host; 0 for unknown/unclassified.""" + if _matches(host, T1_HOSTS): + return 1 + if _matches(host, T2_HOSTS): + return 2 + if _matches(host, T3_HOSTS): + return 3 + return 0 + + +def best_tier(urls: Iterable[str]) -> int: + """Best (lowest-numbered) known tier among ``urls``; 0 if none classified. + + Note: lower tier number == higher trust, so "best" means the minimum of the + classified tiers (1 beats 2 beats 3). + """ + classified = [t for t in (tier_of_host(host_of(u)) for u in urls) if t] + return min(classified) if classified else 0 + + +def distinct_strong_hosts(urls: Iterable[str]) -> int: + """Count of distinct T1/T2 hosts — used for a corroboration bonus.""" + strong: set[str] = set() + for u in urls: + h = host_of(u) + if tier_of_host(h) in (1, 2): + strong.add(h) + return len(strong) diff --git a/app/verify/http_check.py b/app/verify/http_check.py new file mode 100644 index 00000000000..f22470da9f0 --- /dev/null +++ b/app/verify/http_check.py @@ -0,0 +1,228 @@ +"""Tier 1 — source_urls liveness. + +Answers "do this record's cited sources actually resolve?" without trusting the +page contents (that is Tier 2). Pure-ish: all network I/O goes through an injected +*opener* so tests run offline with a fake. + +Design constraints (project memory): stdlib only (urllib + concurrent.futures), +per-host rate limiting, a resumable TTL cache, and never re-check fresh URLs. +""" + +from __future__ import annotations + +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from typing import Any, Callable, Iterable, NamedTuple +from urllib.parse import urlparse +from urllib.request import Request, build_opener + +from . import ledger +from .common import STATE_DIR +from .hosts import host_of + +URL_CACHE_PATH = STATE_DIR / "url_cache.jsonl" +DEFAULT_TTL_DAYS = 30 +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36 TechAPI-verify/0.1" +) + + +class CheckResult(NamedTuple): + url: str + status: int | None + final_url: str | None + alive: bool + reason: str + + +# --- opener abstraction (injectable for tests) ----------------------------------- + + +class _Opener: + """Thin wrapper over urllib's opener exposing ``open(url, method) -> (status, final)``.""" + + def __init__(self, timeout: float = 10.0) -> None: + self._opener = build_opener() + self.timeout = timeout + + def open(self, url: str, method: str) -> tuple[int, str]: + req = Request(url, method=method, headers={"User-Agent": USER_AGENT}) + resp = self._opener.open(req, timeout=self.timeout) + try: + status = getattr(resp, "status", None) or resp.getcode() + final = resp.geturl() + return int(status), final + finally: + resp.close() + + +def default_opener_factory(timeout: float = 10.0) -> _Opener: + return _Opener(timeout=timeout) + + +# --- classification -------------------------------------------------------------- + + +def _path_depth(url: str) -> int: + try: + path = urlparse(url).path.strip("/") + except Exception: + return 0 + return len([p for p in path.split("/") if p]) + + +def _is_homepage_redirect(original: str, final: str) -> bool: + """A deep page that redirects to the site root is a soft-404 ("not found" page).""" + if not final or final == original: + return False + return _path_depth(original) >= 1 and _path_depth(final) == 0 + + +def classify(original_url: str, status: int | None, final_url: str | None) -> tuple[bool, str]: + if status is None: + return False, "error" + if status >= 400: + return False, f"http-{status}" + if final_url and _is_homepage_redirect(original_url, final_url): + return False, "homepage-redirect" + return True, f"http-{status}" + + +def check_one(url: str, opener: Any) -> CheckResult: + """HEAD first; fall back to GET when HEAD is rejected (405/403) or errors.""" + status: int | None = None + final: str | None = None + for method in ("HEAD", "GET"): + try: + status, final = opener.open(url, method) + if method == "HEAD" and status in (400, 403, 405, 501): + continue # server dislikes HEAD -> retry GET + break + except Exception as exc: # HTTPError carries a code; everything else is dead + code = getattr(exc, "code", None) + if isinstance(code, int): + status, final = code, getattr(exc, "url", None) or url + if method == "HEAD" and code in (400, 403, 405, 501): + continue + break + status, final = None, None + alive, reason = classify(url, status, final) + return CheckResult(url, status, final, alive, reason) + + +# --- rate limiting --------------------------------------------------------------- + + +class HostRateLimiter: + """Token-ish per-host limiter: enforce a minimum interval between requests.""" + + def __init__(self, min_interval: float = 1.0) -> None: + self.min_interval = min_interval + self._last: dict[str, float] = {} + self._lock = threading.Lock() + + def wait(self, host: str) -> None: + with self._lock: + now = time.time() + prev = self._last.get(host, 0.0) + sleep_for = max(0.0, self.min_interval - (now - prev)) + self._last[host] = now + sleep_for + if sleep_for > 0: + time.sleep(sleep_for) + + +# --- batch driver ---------------------------------------------------------------- + + +def dedupe_urls(urls: Iterable[str]) -> list[str]: + """Collapse to one representative per (host, path) — kaggle dumps share a URL.""" + seen: dict[tuple[str, str], str] = {} + for u in urls: + try: + p = urlparse(u) + except Exception: + continue + key = (p.netloc.lower(), p.path.rstrip("/")) + seen.setdefault(key, u) + return list(seen.values()) + + +def check_urls( + urls: list[str], + *, + max_workers: int = 8, + min_interval: float = 1.0, + opener_factory: Callable[[], Any] = default_opener_factory, + limiter: HostRateLimiter | None = None, +) -> list[CheckResult]: + limiter = limiter or HostRateLimiter(min_interval) + local = threading.local() + + def _get_opener() -> Any: + op = getattr(local, "opener", None) + if op is None: + op = opener_factory() + local.opener = op + return op + + def _task(url: str) -> CheckResult: + limiter.wait(host_of(url)) + return check_one(url, _get_opener()) + + if not urls: + return [] + with ThreadPoolExecutor(max_workers=max_workers) as pool: + return list(pool.map(_task, urls)) + + +# --- cache ----------------------------------------------------------------------- + + +def load_cache(path=URL_CACHE_PATH) -> dict[str, dict[str, Any]]: + return {e["url"]: e for e in ledger.iter_entries(path) if isinstance(e.get("url"), str)} + + +def _parse_ts(ts: str) -> datetime | None: + try: + return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + except Exception: + return None + + +def is_fresh(entry: dict[str, Any], now: datetime, ttl_days: int) -> bool: + ts = _parse_ts(entry.get("checked_at", "")) + if ts is None: + return False + return (now - ts).days < ttl_days + + +def save_cache(cache: dict[str, dict[str, Any]], path=URL_CACHE_PATH) -> None: + ledger.replace_all(list(cache.values()), path) + + +def result_to_entry(r: CheckResult, ts: str) -> dict[str, Any]: + return { + "url": r.url, + "status": r.status, + "final_url": r.final_url, + "alive": r.alive, + "reason": r.reason, + "checked_at": ts, + } + + +def record_liveness(source_urls: list[str], cache: dict[str, dict[str, Any]]) -> tuple[int, int]: + """(#live, #dead) for a record's URLs that are present in the cache.""" + live = dead = 0 + for u in source_urls: + e = cache.get(u) + if e is None: + continue + if e.get("alive"): + live += 1 + else: + dead += 1 + return live, dead diff --git a/app/verify/ledger.py b/app/verify/ledger.py new file mode 100644 index 00000000000..4f890bddfb1 --- /dev/null +++ b/app/verify/ledger.py @@ -0,0 +1,101 @@ +"""Append-only JSONL verification ledger — the audit trail + resume cursor. + +One decision per line in ``data/_verify/ledger.jsonl`` (git-tracked, diffable, +merge-friendly). Each tier appends; the latest entry per (category, slug) wins. +A record whose ``content_hash`` is unchanged since its last fresh decision can be +skipped, which is what makes multi-tier runs incremental and resumable. + +Timestamps are passed in by the caller (never generated here) so the module stays +pure and the CLI controls the clock. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Iterator + +from .common import LEDGER_PATH, ensure_verify_dirs + + +def append(entry: dict[str, Any], path: Path = LEDGER_PATH) -> None: + ensure_verify_dirs() + line = json.dumps(entry, ensure_ascii=False, sort_keys=True) + with path.open("a", encoding="utf-8", newline="\n") as fh: + fh.write(line + "\n") + + +def append_many(entries: list[dict[str, Any]], path: Path = LEDGER_PATH) -> None: + if not entries: + return + ensure_verify_dirs() + with path.open("a", encoding="utf-8", newline="\n") as fh: + for entry in entries: + fh.write(json.dumps(entry, ensure_ascii=False, sort_keys=True) + "\n") + + +def replace_all(entries: list[dict[str, Any]], path: Path) -> None: + """Truncate-write a full result set (used for the cheap-to-recompute scores cache).""" + ensure_verify_dirs() + with path.open("w", encoding="utf-8", newline="\n") as fh: + for entry in entries: + fh.write(json.dumps(entry, ensure_ascii=False, sort_keys=True) + "\n") + + +def iter_entries(path: Path = LEDGER_PATH) -> Iterator[dict[str, Any]]: + if not path.exists(): + return + with path.open("r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if line: + yield json.loads(line) + + +def latest_by_key(path: Path = LEDGER_PATH) -> dict[tuple[str, str], dict[str, Any]]: + """Most-recent ledger entry per (category, slug). Later lines override earlier.""" + out: dict[tuple[str, str], dict[str, Any]] = {} + for entry in iter_entries(path): + cat, slug = entry.get("category"), entry.get("slug") + if isinstance(cat, str) and isinstance(slug, str): + out[(cat, slug)] = entry + return out + + +def make_tier0_entry( + category: str, + slug: str, + rel_path: str, + content_hash: str, + score: float, + band: str, + subscores: dict[str, float], + flags: list[str], + best_tier: int, + ts: str, +) -> dict[str, Any]: + return { + "ts": ts, + "category": category, + "slug": slug, + "path": rel_path, + "hash": content_hash, + "tier0": { + "score": score, + "band": band, + "subscores": subscores, + "flags": flags, + "best_host_tier": best_tier, + }, + } + + +def is_fresh( + entry: dict[str, Any] | None, content_hash: str, tier: str +) -> bool: + """True if ``entry`` already has a result for ``tier`` and the record is unchanged.""" + if not entry: + return False + if entry.get("hash") != content_hash: + return False # record edited since -> stale + return tier in entry diff --git a/app/verify/offline.py b/app/verify/offline.py new file mode 100644 index 00000000000..4a9927d691b --- /dev/null +++ b/app/verify/offline.py @@ -0,0 +1,135 @@ +"""Tier 0 — offline, deterministic plausibility scoring over the whole dataset. + +No network. Combines four sub-scores into 0..100 and a green/yellow/red band: + +* completeness 0..25 — how richly populated beyond the required fields +* consistency 0..35 — cross-field predicates from :mod:`signals` +* host trust 0..30 — authority of the cited ``source_urls`` (:mod:`hosts`) +* provenance 0..10 — clean normalized data vs raw-blob-only imports + +Hard predicate violations (threads red + +# "Rich" fields per category: presence (non-null) signals a fleshed-out record. +# Dotted paths index into nested dicts (e.g. "display.ppi"). +RICH_FIELDS: dict[str, tuple[str, ...]] = { + "cpu": ("architecture", "base_clock_ghz", "boost_clock_ghz", "l3_cache_mb", + "socket", "tdp_w", "passmark_cpu_mark"), + "gpu": ("architecture", "boost_clock_mhz", "memory_type", "memory_bandwidth_gbps", + "fp32_tflops", "cuda_cores", "stream_processors"), + "soc": ("transistors_billion", "cpu_config", "gpu_cores", "gpu_clock_mhz", + "npu_tops", "geekbench_multi"), + "smartphone": ("soc", "display.size_inch", "display.resolution", "display.ppi", + "cameras", "storage_options_gb", "charging_wired_w", "os_version"), + "tablet": ("display.size_inch", "display.resolution", "storage_options_gb", + "cameras", "os_version"), + "watch": ("display.size_inch", "display.resolution", "os_version"), + "pda": ("display.size_inch", "display.resolution", "os_version"), + "brand": ("founded_year", "description_en"), +} + + +class Score(NamedTuple): + score: float + band: str # "green" | "yellow" | "red" + subscores: dict[str, float] + flags: list[str] # names of failed predicates (hard prefixed with "!") + best_tier: int + + +def _get_path(data: dict[str, Any], path: str) -> Any: + cur: Any = data + for part in path.split("."): + if not isinstance(cur, dict): + return None + cur = cur.get(part) + return cur + + +def _completeness(category: str, data: dict[str, Any]) -> float: + fields = RICH_FIELDS.get(category, ()) + if not fields: + return W_COMPLETENESS + present = sum(1 for f in fields if _get_path(data, f) not in (None, "", [], {})) + return W_COMPLETENESS * present / len(fields) + + +def _consistency(sigs: list[signals.Signal]) -> tuple[float, list[str], bool]: + evaluated = [s for s in sigs if s.result in ("pass", "fail")] + failed = [s for s in sigs if s.failed] + hard_failed = any(s.hard for s in failed) + flags = [("!" if s.hard else "") + s.name for s in failed] + if not evaluated: + return W_CONSISTENCY, flags, hard_failed + passed = sum(1 for s in evaluated if s.result == "pass") + return W_CONSISTENCY * passed / len(evaluated), flags, hard_failed + + +def _host_score(urls: list[str]) -> tuple[float, int]: + best = hosts.best_tier(urls) + base = {1: 26.0, 2: 18.0, 3: 6.0, 0: 3.0}[best] + if hosts.distinct_strong_hosts(urls) >= 2: + base += 4.0 + return min(base, W_HOST), best + + +def _provenance(data: dict[str, Any], best_tier: int) -> float: + has_raw = any(k.startswith("raw_") for k in data.keys()) + if not has_raw: + return 7.0 + prov = 5.0 + (3.0 if best_tier in (1, 2) else -3.0) + return max(0.0, min(prov, W_PROVENANCE)) + + +def score_record( + rec: Record, now_year: int, soc_release: dict[str, str] +) -> Score: + data = rec.data + urls = [u for u in data.get("source_urls", []) if isinstance(u, str)] + + completeness = _completeness(rec.category, data) + sigs = signals.signals_for(rec.category, data, now_year, soc_release) + consistency, flags, hard_failed = _consistency(sigs) + host, best_tier = _host_score(urls) + provenance = _provenance(data, best_tier) + + total = completeness + consistency + host + provenance + subscores = { + "completeness": round(completeness, 1), + "consistency": round(consistency, 1), + "host": round(host, 1), + "provenance": round(provenance, 1), + } + + if hard_failed: + band = "red" + elif total >= GREEN_MIN and best_tier in (1, 2): + band = "green" + elif total < RED_MAX: + band = "red" + else: + band = "yellow" + + return Score(round(total, 1), band, subscores, flags, best_tier) + + +def now_year_today() -> int: + return date.today().year diff --git a/app/verify/promote.py b/app/verify/promote.py new file mode 100644 index 00000000000..8d5ac15f6b9 --- /dev/null +++ b/app/verify/promote.py @@ -0,0 +1,88 @@ +"""Tier 3 — hybrid escalation + safe ``verified:true`` write-back. + +Promotion rules (only ever ``false -> true``, never a demotion): +* band green AND >=1 cited source is a *live* Tier-1 host -> auto-promote +* Tier 2 cross-reference returned ``confirm`` (exact heading) -> promote +* otherwise stay unverified, with a logged reason + +Write-back is *surgical*: only the ``"verified": false`` token is rewritten to +``true`` in the raw bytes. Full re-serialization is intentionally avoided because +the seed files keep short arrays inline (``[64, 128, 256]``) while ``json.dumps`` +would expand them, producing a huge spurious diff and defeating the "only verified +changed" guard. Edits are atomic (temp file + ``os.replace``) and preserve LF. +""" + +from __future__ import annotations + +import os +import re +from pathlib import Path +from typing import Any, NamedTuple + +from . import hosts +from .common import STATE_DIR + +CROSSREF_CACHE_PATH = STATE_DIR / "crossref_cache.jsonl" + +# A top-level, one-key-per-line "verified": false entry (2-space indented). +_VERIFIED_FALSE_RE = re.compile(r'^( )"verified": false(,?)[ \t]*$', re.MULTILINE) + + +class PromotionDecision(NamedTuple): + promote: bool + reason: str + + +def has_live_t1(source_urls: list[str], url_cache: dict[str, dict[str, Any]]) -> bool: + """True if some cited URL is a Tier-1 host AND confirmed alive in the cache.""" + for u in source_urls: + entry = url_cache.get(u) + if entry and entry.get("alive") and hosts.tier_of_host(hosts.host_of(u)) == 1: + return True + return False + + +def decide( + *, band: str, source_urls: list[str], url_cache: dict[str, dict[str, Any]], + crossref_decision: str | None, +) -> PromotionDecision: + if crossref_decision == "confirm": + return PromotionDecision(True, "crossref-confirm") + if band == "green" and has_live_t1(source_urls, url_cache): + return PromotionDecision(True, "green+live-t1") + return PromotionDecision(False, "needs-confirmation") + + +# --- surgical write-back --------------------------------------------------------- + + +def flip_verified_text(raw: str) -> str | None: + """Return ``raw`` with a single top-level ``verified:false`` flipped to true. + + Returns None (refuse) unless exactly one such token exists, so we never touch + a record that isn't shaped the way we expect. + """ + new, n = _VERIFIED_FALSE_RE.subn(r'\g<1>"verified": true\g<2>', raw) + return new if n == 1 else None + + +def write_verified_true(abs_path: Path) -> bool: + """Atomically flip verified false->true in a seed file. Returns True if written.""" + raw = abs_path.read_bytes().decode("utf-8") + new = flip_verified_text(raw) + if new is None: + return False + tmp = abs_path.with_suffix(abs_path.suffix + ".tmp") + tmp.write_bytes(new.encode("utf-8")) + os.replace(tmp, abs_path) + return True + + +def load_crossref_cache(path=CROSSREF_CACHE_PATH) -> dict[tuple[str, str], dict[str, Any]]: + from . import ledger + out: dict[tuple[str, str], dict[str, Any]] = {} + for e in ledger.iter_entries(path): + cat, slug = e.get("category"), e.get("slug") + if isinstance(cat, str) and isinstance(slug, str): + out[(cat, slug)] = e + return out diff --git a/app/verify/signals.py b/app/verify/signals.py new file mode 100644 index 00000000000..d7573681605 --- /dev/null +++ b/app/verify/signals.py @@ -0,0 +1,253 @@ +"""Per-category cross-field consistency predicates (pure functions). + +The structural validator only range-checks single fields. These predicates check +*relations between fields* — the kind of contradiction that means a record cannot +describe a real part (threads < cores, a chip that postdates the device it powers, +a clock that boosts below its base). Each predicate yields a :class:`Signal`. + +Severity: +* ``hard`` — logically impossible. Forces the record's band to red regardless of score. +* soft — implausible but physically possible; only subtracts from the score. + +``NA`` results (inputs absent) are neither pass nor fail and never penalize. +""" + +from __future__ import annotations + +import math +import re +from typing import Any, NamedTuple + +# Range table mirrored from app.validate's _check_range call sites, keyed by +# (category, field) -> (lo, hi). A parity smoke test asserts this stays in sync. +RANGES: dict[tuple[str, str], tuple[float, float]] = { + ("brand", "founded_year"): (1800, 2100), + ("soc", "process_nm"): (1.0, 100.0), + ("smartphone", "ram_gb"): (1, 64), + ("smartphone", "battery_mah"): (500, 12000), + ("smartphone", "weight_g"): (50, 500), + ("smartphone", "msrp_usd"): (50, 5000), + ("mobile", "ram_gb"): (0.016, 64), + ("mobile", "battery_mah"): (50, 20000), + ("mobile", "weight_g"): (10, 2000), + ("mobile", "msrp_usd"): (10, 10000), + ("gpu", "memory_gb"): (0.001, 512), + ("gpu", "tdp_w"): (1, 3000), + ("gpu", "msrp_usd"): (50, 100000), + ("cpu", "cores"): (1, 512), + ("cpu", "threads"): (1, 1024), + ("cpu", "msrp_usd"): (20, 50000), +} + +_RESOLUTION_RE = re.compile(r"(\d{2,5})\s*[x×]\s*(\d{2,5})") +_ANDROID_RE = re.compile(r"android\s*(\d{1,2})", re.IGNORECASE) + +# Earliest plausible release year for a given Android major version (release-vs-era). +_ANDROID_MIN_YEAR: dict[int, int] = { + 4: 2011, 5: 2014, 6: 2015, 7: 2016, 8: 2017, 9: 2018, + 10: 2019, 11: 2020, 12: 2021, 13: 2022, 14: 2023, 15: 2024, 16: 2025, +} + + +class Signal(NamedTuple): + name: str + result: str # "pass" | "fail" | "na" + hard: bool = False + + @property + def failed(self) -> bool: + return self.result == "fail" + + +def _num(value: Any) -> float | None: + return value if isinstance(value, (int, float)) and not isinstance(value, bool) else None + + +def _cmp_ge(name: str, a: Any, b: Any, *, hard: bool) -> Signal: + """``a >= b`` when both present, else NA.""" + x, y = _num(a), _num(b) + if x is None or y is None: + return Signal(name, "na", hard) + return Signal(name, "pass" if x >= y else "fail", hard) + + +def _year_of(value: Any) -> int | None: + if isinstance(value, str) and len(value) >= 4 and value[:4].isdigit(): + return int(value[:4]) + return None + + +def parse_resolution(value: Any) -> tuple[int, int] | None: + if not isinstance(value, str): + return None + m = _RESOLUTION_RE.search(value) + if not m: + return None + return int(m.group(1)), int(m.group(2)) + + +def _release_not_future(rec: dict[str, Any], now_year: int) -> Signal: + y = _year_of(rec.get("release_date")) + if y is None: + return Signal("release_not_future", "na", hard=True) + return Signal("release_not_future", "pass" if y <= now_year + 1 else "fail", hard=True) + + +# --- per-category predicate sets ------------------------------------------------- + + +def cpu_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + out = [ + _cmp_ge("threads_ge_cores", rec.get("threads"), rec.get("cores"), hard=True), + _cmp_ge("boost_ge_base", rec.get("boost_clock_ghz"), rec.get("base_clock_ghz"), hard=True), + _cmp_ge("max_tdp_ge_tdp", rec.get("max_tdp_w"), rec.get("tdp_w"), hard=False), + _cmp_ge("passmark_multi_ge_single", rec.get("passmark_cpu_mark"), rec.get("passmark_single"), hard=False), + _cmp_ge("cb23_multi_ge_single", rec.get("cinebench_r23_multi"), rec.get("cinebench_r23_single"), hard=False), + _cmp_ge("gb_multi_ge_single", rec.get("geekbench_multi"), rec.get("geekbench_single"), hard=False), + _release_not_future(rec, now_year), + ] + # p_cores + e_cores == cores (hybrid parts), only when both core splits given. + p, e, c = _num(rec.get("p_cores")), _num(rec.get("e_cores")), _num(rec.get("cores")) + if p is not None and e is not None and c is not None: + out.append(Signal("hybrid_core_sum", "pass" if p + e == c else "fail", hard=False)) + else: + out.append(Signal("hybrid_core_sum", "na", hard=False)) + return out + + +def gpu_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + out = [ + _cmp_ge("boost_ge_base", rec.get("boost_clock_mhz"), rec.get("base_clock_mhz"), hard=True), + _release_not_future(rec, now_year), + ] + # Vendor core field present: nvidia -> cuda_cores, amd/intel -> stream_processors. + mfr = str(rec.get("manufacturer") or "").lower() + if mfr == "nvidia": + has_core = _num(rec.get("cuda_cores")) is not None + elif mfr in {"amd", "intel"}: + has_core = _num(rec.get("stream_processors")) is not None + else: + has_core = _num(rec.get("cuda_cores")) is not None or _num(rec.get("stream_processors")) is not None + out.append(Signal("vendor_core_field", "pass" if has_core else "fail", hard=False)) + # RT / Tensor cores only plausible on post-2018 (Turing / RDNA2) parts. + y = _year_of(rec.get("release_date")) + rt = _num(rec.get("rt_cores")) + if rt is not None and rt > 0 and y is not None: + out.append(Signal("rt_cores_era", "pass" if y >= 2018 else "fail", hard=False)) + else: + out.append(Signal("rt_cores_era", "na", hard=False)) + return out + + +def _ppi_signal(display: dict[str, Any]) -> Signal: + size = _num(display.get("size_inch")) + ppi = _num(display.get("ppi")) + res = parse_resolution(display.get("resolution")) + if size is None or ppi is None or res is None or size <= 0: + return Signal("ppi_consistent", "na", hard=False) + w, h = res + computed = math.hypot(w, h) / size + return Signal("ppi_consistent", "pass" if abs(computed - ppi) <= 0.15 * ppi else "fail", hard=False) + + +def _storage_signal(rec: dict[str, Any]) -> Signal: + vals = rec.get("storage_options_gb") + if not isinstance(vals, list) or not vals: + return Signal("storage_sane", "na", hard=False) + nums = [v for v in vals if isinstance(v, int) and not isinstance(v, bool)] + if len(nums) != len(vals): + return Signal("storage_sane", "fail", hard=False) + ok = all(v >= 1 for v in nums) and len(set(nums)) == len(nums) and nums == sorted(nums) + return Signal("storage_sane", "pass" if ok else "fail", hard=False) + + +def _android_era_signal(rec: dict[str, Any]) -> Signal: + text = f"{rec.get('os') or ''} {rec.get('os_version') or ''}" + m = _ANDROID_RE.search(text) + y = _year_of(rec.get("release_date")) + if not m or y is None: + return Signal("os_era", "na", hard=False) + major = int(m.group(1)) + min_year = _ANDROID_MIN_YEAR.get(major) + if min_year is None: + return Signal("os_era", "na", hard=False) + return Signal("os_era", "pass" if y >= min_year else "fail", hard=False) + + +def mobile_signals( + rec: dict[str, Any], now_year: int, soc_release: dict[str, str] +) -> list[Signal]: + """Shared by smartphone / tablet / watch / pda.""" + display = rec.get("display") if isinstance(rec.get("display"), dict) else {} + out = [ + _ppi_signal(display), + _storage_signal(rec), + _android_era_signal(rec), + _release_not_future(rec, now_year), + ] + # ram_gb <= max(storage_options_gb) + ram = _num(rec.get("ram_gb")) + vals = rec.get("storage_options_gb") + if ram is not None and isinstance(vals, list) and vals: + nums = [v for v in vals if isinstance(v, (int, float)) and not isinstance(v, bool)] + if nums: + out.append(Signal("ram_le_storage", "pass" if ram <= max(nums) else "fail", hard=False)) + else: + out.append(Signal("ram_le_storage", "na", hard=False)) + else: + out.append(Signal("ram_le_storage", "na", hard=False)) + # SoC should not postdate the device it powers. SOFT, not hard: the dataset's + # SoC release_dates are largely placeholder "YYYY-01-01" values that skew late + # (e.g. Snapdragon 888 stored as 2022-01-01), so a mismatch usually means the + # *SoC* record's date is wrong, not the device. We flag + penalize but don't + # force-red the device on the strength of a second record's bad date. + soc = rec.get("soc") + dev_year = _year_of(rec.get("release_date")) + soc_year = _year_of(soc_release.get(soc)) if isinstance(soc, str) else None + if dev_year is not None and soc_year is not None: + out.append(Signal("soc_not_after_device", "pass" if soc_year <= dev_year else "fail", hard=False)) + else: + out.append(Signal("soc_not_after_device", "na", hard=False)) + return out + + +def soc_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + out = [_release_not_future(rec, now_year)] + # process_nm vs era: no sub-7nm before 2017, no sub-3nm before 2022 (coarse guard). + nm = _num(rec.get("process_nm")) + y = _year_of(rec.get("release_date")) + if nm is not None and y is not None: + too_advanced = (nm < 7 and y < 2017) or (nm < 3 and y < 2022) + out.append(Signal("process_nm_era", "fail" if too_advanced else "pass", hard=False)) + else: + out.append(Signal("process_nm_era", "na", hard=False)) + gpu_name = rec.get("gpu_name") + out.append( + Signal("gpu_name_present", "pass" if isinstance(gpu_name, str) and gpu_name.strip() else "fail", hard=False) + ) + return out + + +def brand_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + fy = _num(rec.get("founded_year")) + if fy is None: + founded = Signal("founded_not_future", "na", hard=False) + else: + founded = Signal("founded_not_future", "pass" if fy <= now_year else "fail", hard=False) + return [founded] + + +def signals_for( + category: str, rec: dict[str, Any], now_year: int, soc_release: dict[str, str] +) -> list[Signal]: + if category == "cpu": + return cpu_signals(rec, now_year) + if category == "gpu": + return gpu_signals(rec, now_year) + if category == "soc": + return soc_signals(rec, now_year) + if category == "brand": + return brand_signals(rec, now_year) + if category in {"smartphone", "tablet", "watch", "pda"}: + return mobile_signals(rec, now_year, soc_release) + return [] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000000..d99b49bec92 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,4 @@ +def pytest_configure(config): + config.addinivalue_line( + "markers", "slow: marks tests that scan the full dataset (deselect with '-m \"not slow\"')" + ) diff --git a/tests/verify/__init__.py b/tests/verify/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/verify/test_http_check.py b/tests/verify/test_http_check.py new file mode 100644 index 00000000000..c7363920f0f --- /dev/null +++ b/tests/verify/test_http_check.py @@ -0,0 +1,104 @@ +"""Tier 1 liveness tests — fully offline via a fake opener.""" + +from app.verify import http_check +from app.verify.http_check import CheckResult + + +class FakeOpener: + """Maps url -> (status, final_url) or raises a urllib-style error with .code.""" + + def __init__(self, table): + self.table = table + self.calls = [] + + def open(self, url, method): + self.calls.append((url, method)) + val = self.table[url] + if isinstance(val, Exception): + raise val + return val + + +def _factory(table): + op = FakeOpener(table) + return lambda: op + + +def test_alive_200(): + table = {"https://en.wikipedia.org/wiki/X": (200, "https://en.wikipedia.org/wiki/X")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert res.alive and res.status == 200 + + +def test_dead_404(): + table = {"https://gsmarena.com/x-9999.php": (404, "https://gsmarena.com/x-9999.php")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert not res.alive and res.reason == "http-404" + + +def test_homepage_redirect_is_soft_dead(): + table = {"https://phonedb.net/index.php?m=device&id=123": (200, "https://phonedb.net/")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert not res.alive and res.reason == "homepage-redirect" + + +def test_head_rejected_falls_back_to_get(): + err = type("E", (Exception,), {"code": 405, "url": None})() + table = {"https://x.com/deep/page": err} + + class TwoStep: + def __init__(self): + self.n = 0 + + def open(self, url, method): + self.n += 1 + if method == "HEAD": + raise err + return (200, "https://x.com/deep/page") + + res = http_check.check_one("https://x.com/deep/page", TwoStep()) + assert res.alive and res.status == 200 + + +def test_connection_error_is_dead(): + table = {"https://nope.invalid/x": ConnectionError("no route")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert not res.alive and res.reason == "error" + + +def test_dedupe_by_host_and_path(): + urls = [ + "https://www.kaggle.com/datasets/a", + "https://www.kaggle.com/datasets/a", # exact dup + "https://www.kaggle.com/datasets/b", + ] + assert len(http_check.dedupe_urls(urls)) == 2 + + +def test_cache_freshness(): + from datetime import datetime, timezone + now = datetime(2026, 6, 22, tzinfo=timezone.utc) + fresh = {"checked_at": "2026-06-20T00:00:00Z"} + stale = {"checked_at": "2026-01-01T00:00:00Z"} + assert http_check.is_fresh(fresh, now, ttl_days=30) + assert not http_check.is_fresh(stale, now, ttl_days=30) + + +def test_record_liveness(): + cache = { + "a": {"alive": True}, "b": {"alive": False}, "c": {"alive": True}, + } + assert http_check.record_liveness(["a", "b", "c", "missing"], cache) == (2, 1) + + +def test_cache_roundtrip(): + # tmp_path fixture is unreliable on this Windows runner; use a local scratch file. + from pathlib import Path + path = Path(__file__).parent / "_scratch_url_cache.jsonl" + try: + r = CheckResult("https://x.com/y", 200, "https://x.com/y", True, "http-200") + http_check.save_cache({r.url: http_check.result_to_entry(r, "2026-06-22T00:00:00Z")}, path) + loaded = http_check.load_cache(path) + assert loaded["https://x.com/y"]["alive"] is True + finally: + path.unlink(missing_ok=True) diff --git a/tests/verify/test_offline.py b/tests/verify/test_offline.py new file mode 100644 index 00000000000..f985e291cc1 --- /dev/null +++ b/tests/verify/test_offline.py @@ -0,0 +1,65 @@ +"""Tier 0 scorer + host classification tests.""" + +from app.verify import hosts, offline +from app.verify.common import Record + +NOW = 2026 +NO_SOC: dict[str, str] = {} + + +def _score(category, data): + return offline.score_record(Record(category, f"{category}/x.json", data), NOW, NO_SOC) + + +def test_host_tiers(): + assert hosts.tier_of_host("en.wikipedia.org") == 1 + assert hosts.tier_of_host("ark.intel.com") == 1 # subdomain of intel.com + assert hosts.tier_of_host("gsmarena.com") == 2 + assert hosts.tier_of_host("www.kaggle.com") == 3 + assert hosts.tier_of_host("example.org") == 0 + assert hosts.best_tier(["https://kaggle.com/x", "https://en.wikipedia.org/y"]) == 1 + + +def test_complete_authoritative_cpu_is_green(): + rec = { + "slug": "core-i9-14900k", "cores": 24, "threads": 32, + "base_clock_ghz": 3.2, "boost_clock_ghz": 6.0, "l3_cache_mb": 36, + "socket": "LGA1700", "tdp_w": 125, "passmark_cpu_mark": 60000, + "architecture": "Raptor Lake", "release_date": "2023-10-17", + "source_urls": ["https://ark.intel.com/x", "https://en.wikipedia.org/wiki/x"], + } + s = _score("cpu", rec) + assert s.band == "green" + assert s.best_tier == 1 + + +def test_hard_violation_forces_red_despite_good_source(): + rec = { + "slug": "bad", "cores": 16, "threads": 8, # threads < cores -> hard + "base_clock_ghz": 3.0, "boost_clock_ghz": 4.0, "release_date": "2023-01-01", + "architecture": "x", "socket": "y", "tdp_w": 65, "l3_cache_mb": 8, + "passmark_cpu_mark": 20000, + "source_urls": ["https://en.wikipedia.org/wiki/x"], + } + s = _score("cpu", rec) + assert s.band == "red" + assert "!threads_ge_cores" in s.flags + + +def test_kaggle_only_sparse_is_not_green(): + rec = { + "slug": "sgh-x", "name": "SGH-X", "release_date": "2016-01-01", + "display": {"type": "Alphanumeric"}, + "source_urls": ["https://www.kaggle.com/datasets/msainani/gsmarena-mobile-devices"], + } + s = _score("smartphone", rec) + assert s.band != "green" # T3-only source can never auto-green + assert s.best_tier == 3 + + +def test_future_release_red(): + rec = { + "slug": "ghost", "cores": 8, "threads": 16, "release_date": "2099-01-01", + "source_urls": ["https://en.wikipedia.org/wiki/x"], + } + assert _score("cpu", rec).band == "red" diff --git a/tests/verify/test_parity_and_golden.py b/tests/verify/test_parity_and_golden.py new file mode 100644 index 00000000000..f3ba3697547 --- /dev/null +++ b/tests/verify/test_parity_and_golden.py @@ -0,0 +1,56 @@ +"""Guardrail tests: + +* RANGES parity — signals.RANGES must not drift from app.validate's bounds. +* Golden subset — the offline scorer, blind to the ``verified`` flag, should + reproduce the human-curated verified CPU set with high agreement. This is the + empirical justification for using the offline score to drive promotion. +""" + +import pytest + +from app.verify import offline, signals +from app.verify.common import foreign_key_sets, load_all + + +def test_ranges_parity_with_validator(): + """If app.validate's numeric bounds change, this test should force a sync. + + Mirrors the _check_range call sites in app/validate.py. Keep in lockstep. + """ + expected = { + ("brand", "founded_year"): (1800, 2100), + ("soc", "process_nm"): (1.0, 100.0), + ("smartphone", "ram_gb"): (1, 64), + ("smartphone", "battery_mah"): (500, 12000), + ("smartphone", "weight_g"): (50, 500), + ("smartphone", "msrp_usd"): (50, 5000), + ("mobile", "ram_gb"): (0.016, 64), + ("mobile", "battery_mah"): (50, 20000), + ("mobile", "weight_g"): (10, 2000), + ("mobile", "msrp_usd"): (10, 10000), + ("gpu", "memory_gb"): (0.001, 512), + ("gpu", "tdp_w"): (1, 3000), + ("gpu", "msrp_usd"): (50, 100000), + ("cpu", "cores"): (1, 512), + ("cpu", "threads"): (1, 1024), + ("cpu", "msrp_usd"): (20, 50000), + } + assert signals.RANGES == expected + + +@pytest.mark.slow +def test_verified_cpus_land_green(): + """≥95% of already-verified CPUs should score green under the offline tier.""" + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + + verified = [r for r in records["cpu"] if r.verified and r.slug] + if not verified: + pytest.skip("no verified CPUs in dataset") + green = sum( + 1 for r in verified + if offline.score_record(r, now_year, soc_release).band == "green" + ) + ratio = green / len(verified) + assert ratio >= 0.95, f"only {ratio:.1%} of verified CPUs scored green" diff --git a/tests/verify/test_promote_crossref.py b/tests/verify/test_promote_crossref.py new file mode 100644 index 00000000000..82374c57c47 --- /dev/null +++ b/tests/verify/test_promote_crossref.py @@ -0,0 +1,122 @@ +"""Tier 2/3 tests: exact-heading rule, surgical write-back, no-clobber, escalation.""" + +from pathlib import Path + +from app.verify import crossref, promote +from app.verify.crossref import Candidate + + +class FakeFetcher: + def __init__(self, candidates): + self._c = candidates + + def search(self, name): + return self._c + + +# --- exact-heading rule ---------------------------------------------------------- + + +def test_exact_heading_confirms(): + rec = {"slug": "iphone-xr", "name": "iPhone XR", "release_date": "2018-10-26"} + f = FakeFetcher([Candidate("iPhone XR", "https://en.wikipedia.org/wiki/IPhone_XR", 2018)]) + res = crossref.crossref_record(rec, f) + assert res.decision == crossref.CONFIRM and res.exact_heading + + +def test_near_miss_is_ambiguous_not_confirm(): + # A different SKU comes back; fuzzy match must NOT auto-confirm. + rec = {"slug": "iphone-xr", "name": "iPhone XR"} + f = FakeFetcher([Candidate("iPhone XS", "https://en.wikipedia.org/wiki/IPhone_XS")]) + res = crossref.crossref_record(rec, f) + assert res.decision == crossref.AMBIGUOUS and not res.exact_heading + + +def test_year_contradiction_blocks_confirm(): + rec = {"slug": "x", "name": "Widget 9000", "release_date": "2018-01-01"} + f = FakeFetcher([Candidate("Widget 9000", "http://x", 2010)]) + assert crossref.crossref_record(rec, f).decision == crossref.CONTRADICT + + +def test_no_candidates_is_notfound(): + rec = {"slug": "x", "name": "Obscure Thing"} + assert crossref.crossref_record(rec, FakeFetcher([])).decision == crossref.NOTFOUND + + +def test_normalize_heading(): + assert crossref.normalize_heading("iPhone XR") == "iphonexr" + assert crossref.normalize_heading("Core i9-14900K") == "corei914900k" + + +# --- surgical write-back --------------------------------------------------------- + +SEED = ( + '{\n' + ' "slug": "demo",\n' + ' "name": "Demo",\n' + ' "storage_options_gb": [64, 128, 256],\n' + ' "verified": false,\n' + ' "source_urls": [\n' + ' "https://en.wikipedia.org/wiki/Demo"\n' + ' ]\n' + '}\n' +) + + +def test_flip_only_touches_verified_token(): + out = promote.flip_verified_text(SEED) + assert out is not None + # Exactly one line changed; inline array preserved verbatim. + assert '"verified": true,' in out + assert '"storage_options_gb": [64, 128, 256],' in out + diff = [(a, b) for a, b in zip(SEED.splitlines(), out.splitlines()) if a != b] + assert diff == [(' "verified": false,', ' "verified": true,')] + + +def test_flip_refuses_already_true(): + assert promote.flip_verified_text(SEED.replace("false", "true")) is None + + +def test_write_back_atomic_lf_preserved(): + path = Path(__file__).parent / "_scratch_seed.json" + try: + path.write_bytes(SEED.encode("utf-8")) + assert promote.write_verified_true(path) is True + raw = path.read_bytes() + assert b'"verified": true,' in raw + assert b"\r\n" not in raw # LF preserved on Windows + assert raw.endswith(b"}\n") + # idempotent guard: second call refuses (already true) + assert promote.write_verified_true(path) is False + finally: + path.unlink(missing_ok=True) + + +# --- promotion decision ---------------------------------------------------------- + + +def test_green_with_live_t1_promotes(): + cache = {"https://en.wikipedia.org/wiki/X": {"alive": True}} + d = promote.decide( + band="green", source_urls=["https://en.wikipedia.org/wiki/X"], + url_cache=cache, crossref_decision=None, + ) + assert d.promote and d.reason == "green+live-t1" + + +def test_green_without_live_source_blocked(): + d = promote.decide(band="green", source_urls=["https://en.wikipedia.org/wiki/X"], + url_cache={}, crossref_decision=None) + assert not d.promote + + +def test_yellow_with_crossref_confirm_promotes(): + d = promote.decide(band="yellow", source_urls=[], url_cache={}, crossref_decision="confirm") + assert d.promote and d.reason == "crossref-confirm" + + +def test_dead_t1_does_not_promote(): + cache = {"https://en.wikipedia.org/wiki/X": {"alive": False}} + d = promote.decide(band="green", source_urls=["https://en.wikipedia.org/wiki/X"], + url_cache=cache, crossref_decision=None) + assert not d.promote diff --git a/tests/verify/test_signals.py b/tests/verify/test_signals.py new file mode 100644 index 00000000000..5f74c1ec542 --- /dev/null +++ b/tests/verify/test_signals.py @@ -0,0 +1,88 @@ +"""Unit tests for cross-field consistency predicates (app.verify.signals).""" + +from app.verify import signals + +NOW = 2026 +NO_SOC: dict[str, str] = {} + + +def _named(sigs, name): + return next(s for s in sigs if s.name == name) + + +def test_threads_below_cores_is_hard_fail(): + rec = {"cores": 8, "threads": 4, "release_date": "2020-01-01"} + s = _named(signals.cpu_signals(rec, NOW), "threads_ge_cores") + assert s.failed and s.hard + + +def test_threads_ge_cores_passes(): + rec = {"cores": 8, "threads": 16, "release_date": "2020-01-01"} + assert _named(signals.cpu_signals(rec, NOW), "threads_ge_cores").result == "pass" + + +def test_boost_below_base_is_hard_fail(): + rec = {"base_clock_ghz": 3.5, "boost_clock_ghz": 3.0, "cores": 4, "threads": 4} + s = _named(signals.cpu_signals(rec, NOW), "boost_ge_base") + assert s.failed and s.hard + + +def test_missing_inputs_are_na_not_fail(): + rec = {"cores": 4, "threads": 4} # no clocks + assert _named(signals.cpu_signals(rec, NOW), "boost_ge_base").result == "na" + + +def test_future_release_is_hard_fail(): + rec = {"cores": 1, "threads": 1, "release_date": "2099-01-01"} + s = _named(signals.cpu_signals(rec, NOW), "release_not_future") + assert s.failed and s.hard + + +def test_hybrid_core_sum(): + ok = {"cores": 8, "threads": 8, "p_cores": 4, "e_cores": 4} + bad = {"cores": 8, "threads": 8, "p_cores": 4, "e_cores": 2} + assert _named(signals.cpu_signals(ok, NOW), "hybrid_core_sum").result == "pass" + assert _named(signals.cpu_signals(bad, NOW), "hybrid_core_sum").result == "fail" + + +def test_gpu_boost_and_vendor_core(): + rec = { + "manufacturer": "nvidia", "base_clock_mhz": 1500, "boost_clock_mhz": 1800, + "cuda_cores": 4096, "release_date": "2022-01-01", + } + sigs = signals.gpu_signals(rec, NOW) + assert _named(sigs, "boost_ge_base").result == "pass" + assert _named(sigs, "vendor_core_field").result == "pass" + + +def test_gpu_rt_cores_before_turing_fail(): + rec = {"manufacturer": "nvidia", "rt_cores": 50, "release_date": "2015-01-01", + "cuda_cores": 2048} + assert _named(signals.gpu_signals(rec, NOW), "rt_cores_era").result == "fail" + + +def test_ppi_consistency(): + # 1792x828 over 6.1" -> ~326 ppi (matches iPhone XR). + good = {"display": {"size_inch": 6.1, "resolution": "1792x828", "ppi": 326}} + bad = {"display": {"size_inch": 6.1, "resolution": "1792x828", "ppi": 500}} + assert _named(signals.mobile_signals(good, NOW, NO_SOC), "ppi_consistent").result == "pass" + assert _named(signals.mobile_signals(bad, NOW, NO_SOC), "ppi_consistent").result == "fail" + + +def test_storage_must_be_sorted_positive_unique(): + good = {"storage_options_gb": [64, 128, 256]} + bad = {"storage_options_gb": [256, 64]} + assert _named(signals.mobile_signals(good, NOW, NO_SOC), "storage_sane").result == "pass" + assert _named(signals.mobile_signals(bad, NOW, NO_SOC), "storage_sane").result == "fail" + + +def test_soc_not_after_device_is_soft(): + rec = {"soc": "chip-x", "release_date": "2020-01-01"} + soc_release = {"chip-x": "2022-01-01"} + s = _named(signals.mobile_signals(rec, NOW, soc_release), "soc_not_after_device") + assert s.failed and not s.hard # flagged but never forces red + + +def test_soc_process_nm_era(): + rec = {"process_nm": 5.0, "release_date": "2010-01-01", "gpu_name": "x"} + assert _named(signals.soc_signals(rec, NOW), "process_nm_era").result == "fail" From dacc817bc157791bb55c6ed968d6e53f847777bc Mon Sep 17 00:00:00 2001 From: Seungpyo1007 Date: Mon, 22 Jun 2026 11:47:28 +0900 Subject: [PATCH 2/3] ci(verify): route Tier 0 analysis to a TechEngineBot PR comment Reworks how verification surfaces on PRs so TechEngineBot owns the analysis, instead of TechAPI running its own (failing) job: - Remove the self-run verify-offline job from validate-data.yml. It failed because the stdlib-only CI image has no pytest, and having TechAPI score its own PRs duplicated what the bot should own. validate-data.yml is back to the pure structural gate. - Add verify-report.yml: runs `app.verify score` (changed records + full baseline) and has TechEngineBot post the band histogram as a PR comment via ENGINE_TOKEN. Dormant if the token is unset; same-repo PRs only; never gates a merge; updates one marked comment in place. - Add app/verify/** to request-engine-pr-validation paths so the engine's PR validation (and its TechEngineBot comment) also covers verifier changes. Refs #1 --- .../request-engine-pr-validation.yml | 1 + .github/workflows/validate-data.yml | 26 ----- .github/workflows/verify-report.yml | 99 +++++++++++++++++++ 3 files changed, 100 insertions(+), 26 deletions(-) create mode 100644 .github/workflows/verify-report.yml diff --git a/.github/workflows/request-engine-pr-validation.yml b/.github/workflows/request-engine-pr-validation.yml index 0bec07247b5..372cb6e1830 100644 --- a/.github/workflows/request-engine-pr-validation.yml +++ b/.github/workflows/request-engine-pr-validation.yml @@ -17,6 +17,7 @@ on: - "site/package.json" - "site/package-lock.json" - "app/validate.py" + - "app/verify/**" workflow_dispatch: inputs: pr_number: diff --git a/.github/workflows/validate-data.yml b/.github/workflows/validate-data.yml index f18c88dde71..eb276b3b863 100644 --- a/.github/workflows/validate-data.yml +++ b/.github/workflows/validate-data.yml @@ -7,15 +7,11 @@ on: paths: - "data/**" - "app/validate.py" - - "app/verify/**" - - "tests/verify/**" push: branches: [main] paths: - "data/**" - "app/validate.py" - - "app/verify/**" - - "tests/verify/**" jobs: self-validate: @@ -28,28 +24,6 @@ jobs: - name: Self-check (bundled validator) run: python -m app.validate - # Non-blocking existence/trust signal: scores the records changed in this PR - # with the Tier 0 offline verifier and prints a band histogram. Informational - # only — never gates the merge (continue-on-error). - verify-offline: - runs-on: ubuntu-latest - continue-on-error: true - env: - PYTHONIOENCODING: utf-8 - steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - uses: actions/setup-python@v5 - with: - python-version: "3.12" - - name: Tier 0 verification report (changed records) - run: | - git fetch origin main --depth=1 || true - python -m app.verify score --changed --no-cache - - name: Verifier unit tests - run: python -m pytest tests/verify -q -m "not slow" - engine-validate: needs: self-validate uses: GetTechAPI/TechEngine/.github/workflows/validate-data.yml@main diff --git a/.github/workflows/verify-report.yml b/.github/workflows/verify-report.yml new file mode 100644 index 00000000000..2805dd9185c --- /dev/null +++ b/.github/workflows/verify-report.yml @@ -0,0 +1,99 @@ +name: verify-report + +# Run the Tier 0 offline data *verification* (existence/trust scoring) on a PR and +# let TechEngineBot post the band analysis as a PR comment. The bot owns the +# analysis surface: this workflow only computes the report and hands it to the bot +# (the comment is authored via the bot's token). It never gates a merge. +# +# Dormant unless ENGINE_TOKEN (TechEngineBot's PAT) is configured. Restricted to +# same-repo branch PRs so fork PRs never see the token. The structural gate stays +# in validate-data.yml; this is purely informational. +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + paths: + - "data/**" + - "app/validate.py" + - "app/verify/**" + +permissions: + contents: read + pull-requests: write + +concurrency: + group: verify-report-${{ github.event.pull_request.number }} + cancel-in-progress: true + +jobs: + verify-report: + runs-on: ubuntu-latest + if: github.event.pull_request.head.repo.full_name == github.repository + env: + PYTHONIOENCODING: utf-8 + ENGINE_TOKEN: ${{ secrets.ENGINE_TOKEN }} + steps: + - name: Dormant when ENGINE_TOKEN is unset + if: env.ENGINE_TOKEN == '' + run: echo "::warning::ENGINE_TOKEN not configured — TechEngineBot verify comment skipped." + + - uses: actions/checkout@v4 + if: env.ENGINE_TOKEN != '' + with: + fetch-depth: 0 + + - uses: actions/setup-python@v5 + if: env.ENGINE_TOKEN != '' + with: + python-version: "3.12" + + - name: Tier 0 verification (changed + full baseline) + if: env.ENGINE_TOKEN != '' + id: verify + run: | + git fetch origin main --depth=1 || true + { + echo 'report<> "$GITHUB_OUTPUT" + + - name: TechEngineBot posts the verification analysis + if: env.ENGINE_TOKEN != '' + uses: actions/github-script@v7 + env: + REPORT: ${{ steps.verify.outputs.report }} + with: + github-token: ${{ secrets.ENGINE_TOKEN }} + script: | + const marker = ''; + const report = (process.env.REPORT || '').trim() || '(no output)'; + const body = [ + marker, + '## 🔎 Data verification — Tier 0 (offline existence/trust)', + '', + 'Scored by `app.verify`; posted by **TechEngineBot**. Informational only —', + 'the structural gate (`app.validate`) is separate and authoritative for merge.', + '', + '```text', + report, + '```', + '', + 'green = authoritative source + complete + consistent · ' + + 'yellow = plausible, needs confirmation · red = sparse/weak source or a hard contradiction. ' + + 'Promotion to `verified` runs in the scheduled `verify-network` workflow.', + ].join('\n'); + const { owner, repo } = context.repo; + const issue_number = context.payload.pull_request.number; + const comments = await github.paginate(github.rest.issues.listComments, { + owner, repo, issue_number, per_page: 100, + }); + const existing = comments.find((c) => c.body && c.body.includes(marker)); + if (existing) { + await github.rest.issues.updateComment({ owner, repo, comment_id: existing.id, body }); + } else { + await github.rest.issues.createComment({ owner, repo, issue_number, body }); + } From 228a38feb269a6d928e381e5c3d99624420673c9 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Mon, 22 Jun 2026 11:55:55 +0900 Subject: [PATCH 3/3] ci(verify): author the verify analysis comment as TechEngineBot Use TECHENGINEBOT_TOKEN (the bot's PAT) for the github-script step so the Tier 0 analysis comment is authored by TechEngineBot, falling back to ENGINE_TOKEN only to keep the workflow running if the bot token is absent. Refs #1 --- .github/workflows/verify-report.yml | 32 ++++++++++++++++------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/.github/workflows/verify-report.yml b/.github/workflows/verify-report.yml index 2805dd9185c..1d2a4dc5d15 100644 --- a/.github/workflows/verify-report.yml +++ b/.github/workflows/verify-report.yml @@ -2,12 +2,13 @@ name: verify-report # Run the Tier 0 offline data *verification* (existence/trust scoring) on a PR and # let TechEngineBot post the band analysis as a PR comment. The bot owns the -# analysis surface: this workflow only computes the report and hands it to the bot -# (the comment is authored via the bot's token). It never gates a merge. +# analysis surface: this workflow only computes the report and hands it to the bot, +# which authors the comment via its own PAT (TECHENGINEBOT_TOKEN). It never gates a +# merge. # -# Dormant unless ENGINE_TOKEN (TechEngineBot's PAT) is configured. Restricted to -# same-repo branch PRs so fork PRs never see the token. The structural gate stays -# in validate-data.yml; this is purely informational. +# Dormant unless a bot/automation token is configured. Restricted to same-repo +# branch PRs so fork PRs never see the token. The structural gate stays in +# validate-data.yml; this is purely informational. on: pull_request: types: [opened, synchronize, reopened, ready_for_review] @@ -30,24 +31,27 @@ jobs: if: github.event.pull_request.head.repo.full_name == github.repository env: PYTHONIOENCODING: utf-8 - ENGINE_TOKEN: ${{ secrets.ENGINE_TOKEN }} + # Prefer TechEngineBot's PAT so the analysis comment is authored by the bot + # (TECHENGINEBOT_TOKEN, Issues/PR write on both repos). Fall back to + # ENGINE_TOKEN only so the workflow still runs if the bot token is absent. + BOT_TOKEN: ${{ secrets.TECHENGINEBOT_TOKEN || secrets.ENGINE_TOKEN }} steps: - - name: Dormant when ENGINE_TOKEN is unset - if: env.ENGINE_TOKEN == '' - run: echo "::warning::ENGINE_TOKEN not configured — TechEngineBot verify comment skipped." + - name: Dormant when no bot token is configured + if: env.BOT_TOKEN == '' + run: echo "::warning::No TECHENGINEBOT_TOKEN/ENGINE_TOKEN — TechEngineBot verify comment skipped." - uses: actions/checkout@v4 - if: env.ENGINE_TOKEN != '' + if: env.BOT_TOKEN != '' with: fetch-depth: 0 - uses: actions/setup-python@v5 - if: env.ENGINE_TOKEN != '' + if: env.BOT_TOKEN != '' with: python-version: "3.12" - name: Tier 0 verification (changed + full baseline) - if: env.ENGINE_TOKEN != '' + if: env.BOT_TOKEN != '' id: verify run: | git fetch origin main --depth=1 || true @@ -62,12 +66,12 @@ jobs: } >> "$GITHUB_OUTPUT" - name: TechEngineBot posts the verification analysis - if: env.ENGINE_TOKEN != '' + if: env.BOT_TOKEN != '' uses: actions/github-script@v7 env: REPORT: ${{ steps.verify.outputs.report }} with: - github-token: ${{ secrets.ENGINE_TOKEN }} + github-token: ${{ secrets.TECHENGINEBOT_TOKEN || secrets.ENGINE_TOKEN }} script: | const marker = ''; const report = (process.env.REPORT || '').trim() || '(no output)';