From eadeb7e5ad68c88e91a81996ac6956f16779fba7 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:22 +0900 Subject: [PATCH 01/54] feat(verify): migrate verification layer from TechAPI --- app/verify/__init__.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 app/verify/__init__.py diff --git a/app/verify/__init__.py b/app/verify/__init__.py new file mode 100644 index 0000000..981370d --- /dev/null +++ b/app/verify/__init__.py @@ -0,0 +1,18 @@ +"""TechAPI data *verification* layer (§ existence/trust, sits above structural validation). + +``app.validate`` answers "is this record well-formed?". ``app.verify`` answers +"does this record describe a real, actually-existing device/part — confidently +enough to mark it ``verified``?". + +It is a separate, additive layer: the structural validator (``app/validate.py``) +stays the fast CI gate and is never rewritten. Verification is tiered: + +* Tier 0 — offline deterministic plausibility score over the whole dataset + (``offline``/``signals``/``hosts``); bands records green/yellow/red. +* Tier 1 — ``source_urls`` HTTP liveness (``http_check``). +* Tier 2 — external cross-reference under an exact-heading rule (``crossref``). +* Tier 3 — hybrid escalation + safe ``verified:true`` write-back (``promote``). + +Decisions are recorded append-only in ``data/_verify/ledger.jsonl`` so runs are +incremental and resumable. +""" From b43d77fef71ab637a86706e5177673423c35c5f8 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:23 +0900 Subject: [PATCH 02/54] feat(verify): migrate verification layer from TechAPI --- app/verify/__main__.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 app/verify/__main__.py diff --git a/app/verify/__main__.py b/app/verify/__main__.py new file mode 100644 index 0000000..d90380f --- /dev/null +++ b/app/verify/__main__.py @@ -0,0 +1,8 @@ +"""``python -m app.verify`` entry point.""" + +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) From 74399ef3e7f9636fec40081f5096f2db88d0145f Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:25 +0900 Subject: [PATCH 03/54] feat(verify): migrate verification layer from TechAPI --- app/verify/cli.py | 697 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 697 insertions(+) create mode 100644 app/verify/cli.py diff --git a/app/verify/cli.py b/app/verify/cli.py new file mode 100644 index 0000000..85dd063 --- /dev/null +++ b/app/verify/cli.py @@ -0,0 +1,697 @@ +"""Command-line entry for the verification layer: ``python -m app.verify ...``. + +Phase A implements the offline tier: + +* ``score`` — score records, print a band histogram, append Tier 0 ledger entries. +* ``report`` — summarize the latest ledger state per category. + +Network subcommands (``check-urls``, ``crossref``, ``promote``) are added in later +phases; they are declared here so ``--help`` lists the eventual surface. +""" + +from __future__ import annotations + +import argparse +import json +import subprocess +from collections import Counter, defaultdict +from datetime import datetime, timezone +from pathlib import Path + +from . import crossref, http_check, ledger, offline, promote +from .common import ( + CATEGORIES, + SCORES_PATH, + VERIFY_DIR, + Record, + configure_stdout, + ensure_verify_dirs, + foreign_key_sets, + load_all, + repo_path, +) + +BANDS = ("green", "yellow", "red") + + +def _now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _changed_data_slugs() -> set[str]: + """Repo-relative data/ paths changed vs origin/main (for CI --changed). + + Direct two-tree diff (``origin/main HEAD``), NOT three-dot ``origin/main...HEAD``: + CI fetches main shallow (``--depth=1``), so there is no merge-base and the + three-dot form silently returns nothing. A direct tree diff only needs both + commit tips, which are always present. + + Runs git in the *data* repository (DATA_DIR's parent), so it works whether this + package lives in TechAPI (data alongside) or TechEngine (data in a separate + TechAPI checkout pointed at by TECHAPI_DATA_DIR). + """ + from .common import DATA_DIR + try: + out = subprocess.run( + ["git", "diff", "--name-only", "origin/main", "HEAD", "--", "data/"], + capture_output=True, text=True, check=True, cwd=DATA_DIR.parent, + ).stdout + except Exception: + out = "" + # strip leading "data/" so it matches Record.path + paths = set() + for line in out.splitlines(): + line = line.strip() + if line.startswith("data/") and line.endswith(".json"): + paths.add(line[len("data/"):]) + return paths + + +def _iter_selected( + records: dict[str, list[Record]], + categories: tuple[str, ...], + unverified_only: bool, + changed: set[str] | None, + limit: int | None, +): + count = 0 + for cat in categories: + for rec in records[cat]: + if unverified_only and rec.verified: + continue + if changed is not None and rec.path not in changed: + continue + yield rec + count += 1 + if limit is not None and count >= limit: + return + + +def cmd_score(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + ts = _now_iso() + + categories = tuple(args.category) if args.category else CATEGORIES + changed = _changed_data_slugs() if args.changed else None + + # The scores cache is a full-dataset snapshot; only rewrite it on a full run. + full_scope = args.category is None and args.max is None and not args.changed + write_cache = full_scope and not args.no_cache + + # category -> band -> count + hist: dict[str, Counter] = defaultdict(Counter) + hard_flags: Counter = Counter() + entries = [] + scored = 0 + + for rec in _iter_selected(records, categories, args.unverified_only, changed, args.max): + if not rec.slug: + continue + s = offline.score_record(rec, now_year, soc_release) + hist[rec.category][s.band] += 1 + scored += 1 + for f in s.flags: + if f.startswith("!"): + hard_flags[f] += 1 + if write_cache: + entries.append( + ledger.make_tier0_entry( + rec.category, rec.slug, rec.path, rec.content_hash(), + s.score, s.band, s.subscores, s.flags, s.best_tier, ts, + ) + ) + + if write_cache: + ledger.replace_all(entries, SCORES_PATH) + + if getattr(args, "format", "text") == "md": + _print_markdown(hist, scored, hard_flags) + else: + _print_histogram(hist, scored, hard_flags, wrote_cache=write_cache) + return 0 + + +def _print_histogram(hist, scored, hard_flags, wrote_cache) -> None: + print(f"Tier 0 offline score — {scored} record(s)\n") + header = f"{'category':<12} {'green':>8} {'yellow':>8} {'red':>8} {'total':>8}" + print(header) + print("-" * len(header)) + totals: Counter[str] = Counter() + for cat in CATEGORIES: + if cat not in hist: + continue + c = hist[cat] + tot = sum(c.values()) + totals.update(c) + print(f"{cat:<12} {c['green']:>8} {c['yellow']:>8} {c['red']:>8} {tot:>8}") + print("-" * len(header)) + gtot = sum(totals.values()) or 1 + print( + f"{'ALL':<12} {totals['green']:>8} {totals['yellow']:>8} " + f"{totals['red']:>8} {sum(totals.values()):>8}" + ) + print( + f"\nbands: green {100*totals['green']/gtot:.1f}% " + f"yellow {100*totals['yellow']/gtot:.1f}% red {100*totals['red']/gtot:.1f}%" + ) + if hard_flags: + print("\ntop hard violations:") + for name, n in hard_flags.most_common(10): + print(f" {n:>7} {name}") + if wrote_cache: + print("\ncache: wrote full Tier 0 scores to data/_verify/state/scores.jsonl") + + +def _band_bar(green: int, yellow: int, red: int, width: int = 12) -> str: + """Proportional colored-square bar: 🟩 green · 🟨 yellow · 🟥 red, summing to width.""" + tot = green + yellow + red + if tot == 0: + return "—" + cells = {"🟩": green, "🟨": yellow, "🟥": red} + counts = {k: round(width * v / tot) for k, v in cells.items()} + # Reconcile rounding so the bar is exactly `width` wide. + while sum(counts.values()) > width: + counts[max(counts, key=lambda k: counts[k])] -= 1 + while sum(counts.values()) < width: + # give the slack to the largest non-zero raw bucket + counts[max(cells, key=lambda k: cells[k])] += 1 + # Don't let a non-zero band vanish to 0 cells. + for k in cells: + if cells[k] > 0 and counts[k] == 0: + counts[k] = 1 + counts[max(counts, key=lambda j: counts[j])] -= 1 + return "🟩" * counts["🟩"] + "🟨" * counts["🟨"] + "🟥" * counts["🟥"] + + +def _print_markdown(hist, scored, hard_flags) -> None: + """Readable PR-comment report: a Mermaid pie of the overall band split (GitHub + renders it natively) + a per-category table with a proportional colored bar.""" + if scored == 0: + print("_No records scored._") + return + totals: Counter[str] = Counter() + rows = [] + for cat in CATEGORIES: + if cat not in hist: + continue + c = hist[cat] + tot = sum(c.values()) + totals.update(c) + gpct = 100 * c["green"] / tot if tot else 0.0 + bar = _band_bar(c["green"], c["yellow"], c["red"]) + rows.append( + f"| {cat} | {bar} | {tot} | {c['green']} | {c['yellow']} | {c['red']} | {gpct:.1f}% |" + ) + gtot = sum(totals.values()) or 1 + print(f"**{scored} record(s) scored.**\n") + + # Overall distribution as a Mermaid pie (rendered by GitHub). Mermaid colors + # slices pie1/pie2/pie3 in declaration order, so pin them to green/amber/red + # to match the labels (default palette would show black/red/blue). + print("```mermaid") + print('%%{init: {"theme":"base","themeVariables":' + '{"pie1":"#3fb950","pie2":"#d29922","pie3":"#f85149",' + '"pieStrokeWidth":"0px","pieOpacity":"1"}}}%%') + print("pie showData") + print(' title Verification bands — all records') + print(f' "Green" : {totals["green"]}') + print(f' "Yellow" : {totals["yellow"]}') + print(f' "Red" : {totals["red"]}') + print("```\n") + + print("| Category | Distribution | Total | 🟢 | 🟡 | 🔴 | 🟢 % |") + print("| --- | :-- | ---: | ---: | ---: | ---: | ---: |") + for r in rows: + print(r) + print( + f"| **All** | {_band_bar(totals['green'], totals['yellow'], totals['red'])} | " + f"**{sum(totals.values())}** | **{totals['green']}** | " + f"**{totals['yellow']}** | **{totals['red']}** | " + f"**{100*totals['green']/gtot:.1f}%** |" + ) + if hard_flags: + print("\n**Hard violations** (forced red):\n") + print("| Count | Check |") + print("| ---: | --- |") + for name, n in hard_flags.most_common(10): + print(f"| {n} | `{name}` |") + + +def cmd_status(args: argparse.Namespace) -> int: + """Aggregate the verification state into one JSON file (the synced source of + truth for "how much is verified"): per-category `verified` counts + Tier 0 + bands + promotion candidates. Default output: data/_verify/status.json.""" + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + + by_category: dict[str, dict] = {} + tot = ver = g = y = r = 0 + for cat in CATEGORIES: + ct = cv = cg = cy = cr = 0 + for rec in records[cat]: + if not rec.slug: + continue + ct += 1 + if rec.verified: + cv += 1 + band = offline.score_record(rec, now_year, soc_release).band + cg += band == "green" + cy += band == "yellow" + cr += band == "red" + by_category[cat] = { + "total": ct, + "verified": cv, + "verified_pct": round(100 * cv / ct, 2) if ct else 0.0, + "green": cg, + "yellow": cy, + "red": cr, + # green = high-confidence band; the promotion candidate pool. + "promotable": cg, + } + tot += ct + ver += cv + g += cg + y += cy + r += cr + + status = { + "generated_at": _now_iso(), + "schema": 1, + "totals": { + "records": tot, + "verified": ver, + "verified_pct": round(100 * ver / tot, 2) if tot else 0.0, + "green": g, + "yellow": y, + "red": r, + "promotable": g, + }, + "by_category": by_category, + } + blob = json.dumps(status, indent=2, ensure_ascii=False) + "\n" + + if args.stdout: + print(blob, end="") + else: + out = args.output or (VERIFY_DIR / "status.json") + ensure_verify_dirs() + out.write_text(blob, encoding="utf-8") + print(f"wrote verification status: {out} " + f"({ver}/{tot} verified = {100*ver/tot:.2f}%, " + f"{g} green / {y} yellow / {r} red)") + return 0 + + +def cmd_report(args: argparse.Namespace) -> int: + if not SCORES_PATH.exists(): + print("no scores cache — run `python -m app.verify score` first") + return 0 + hist: dict[str, Counter] = defaultdict(Counter) + hard_flags: Counter = Counter() + for entry in ledger.iter_entries(SCORES_PATH): + cat = entry.get("category") + t0 = entry.get("tier0", {}) + band = t0.get("band") + if cat and band: + hist[cat][band] += 1 + for f in t0.get("flags", []): + if isinstance(f, str) and f.startswith("!"): + hard_flags[f] += 1 + scored = sum(sum(c.values()) for c in hist.values()) + _print_histogram(hist, scored, hard_flags, wrote_cache=False) + + # Promotion decisions live in the git-tracked ledger. + promoted: Counter = Counter() + for (cat, _slug), entry in ledger.latest_by_key().items(): + if entry.get("decision") == "promote": + promoted[cat] += 1 + if sum(promoted.values()): + print("\npromoted to verified (ledger):") + for cat, n in promoted.most_common(): + print(f" {n:>7} {cat}") + return 0 + + +def _ranked_unverified(records, soc_release, now_year, categories): + """Unverified records of the given categories, scored, highest-confidence first.""" + scored = [] + for cat in categories: + for rec in records[cat]: + if rec.verified or not rec.slug: + continue + s = offline.score_record(rec, now_year, soc_release) + scored.append((s.score, rec)) + scored.sort(key=lambda t: t[0], reverse=True) + return [rec for _score, rec in scored] + + +def cmd_check_urls(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + categories = tuple(args.category) if args.category else CATEGORIES + + frontier = _ranked_unverified(records, soc_release, now_year, categories) + if args.max is not None: + frontier = frontier[: args.max] + + urls: list[str] = [] + for rec in frontier: + urls.extend(u for u in rec.data.get("source_urls", []) if isinstance(u, str)) + targets = http_check.dedupe_urls(urls) + + cache = http_check.load_cache() + now = datetime.now(timezone.utc) + if args.recheck: + todo = targets + else: + todo = [u for u in targets if not ( + u in cache and http_check.is_fresh(cache[u], now, args.ttl_days) + )] + + print( + f"check-urls: {len(frontier)} record(s) -> {len(targets)} unique URL(s); " + f"{len(targets) - len(todo)} fresh in cache, checking {len(todo)}" + ) + if not todo: + _summarize_cache(cache, targets) + return 0 + + ts = _now_iso() + results = http_check.check_urls( + todo, + max_workers=args.workers, + min_interval=args.min_interval, + ) + for r in results: + cache[r.url] = http_check.result_to_entry(r, ts) + http_check.save_cache(cache) + print(f"cache: wrote {len(cache)} URL result(s) to data/_verify/state/url_cache.jsonl") + _summarize_cache(cache, targets) + return 0 + + +def _summarize_cache(cache, targets) -> None: + from collections import Counter + alive = sum(1 for u in targets if cache.get(u, {}).get("alive")) + dead = sum(1 for u in targets if u in cache and not cache[u].get("alive")) + print(f"\nliveness over {len(targets)} targeted URL(s): {alive} alive, {dead} dead") + reasons = Counter( + cache[u].get("reason") for u in targets + if u in cache and not cache[u].get("alive") + ) + if reasons: + print("dead reasons:") + for reason, n in reasons.most_common(10): + print(f" {n:>6} {reason}") + + +def cmd_crossref(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + categories = tuple(args.category) if args.category else CATEGORIES + + # Cross-reference the whole unverified frontier, ranked by score. Greens are + # included on purpose: reality must be able to CONFIRM them (strongest promote) + # or CONTRADICT them (veto) before they are verified. + targets = _ranked_unverified(records, soc_release, now_year, categories)[: args.max] + + fetcher = crossref.WikidataFetcher() + cache = promote.load_crossref_cache() + ts = _now_iso() + decisions: Counter[str] = Counter() + new_entries = [] + for rec in targets: + key = (rec.category, rec.slug) + if not args.recheck and key in cache: + decisions[cache[key].get("decision", "cached")] += 1 + continue + res = crossref.crossref_record(rec.data, fetcher) + decisions[res.decision] += 1 + new_entries.append({ + "ts": ts, "category": rec.category, "slug": rec.slug, + "source": res.source, "decision": res.decision, + "exact_heading": res.exact_heading, "matched_url": res.matched_url, + }) + if new_entries: + cache.update({(e["category"], e["slug"]): e for e in new_entries}) + ledger.replace_all(list(cache.values()), promote.CROSSREF_CACHE_PATH) + + print(f"crossref: examined {len(targets)} record(s)") + for decision, n in decisions.most_common(): + print(f" {n:>6} {decision}") + return 0 + + +def cmd_promote(args: argparse.Namespace) -> int: + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + categories = tuple(args.category) if args.category else CATEGORIES + + url_cache = http_check.load_cache() + xref_cache = promote.load_crossref_cache() + ts = _now_iso() + + candidates = [] # (rec, band, reason) + blocked: Counter[str] = Counter() + for cat in categories: + for rec in records[cat]: + if rec.verified or not rec.slug: + continue + s = offline.score_record(rec, now_year, soc_release) + urls = [u for u in rec.data.get("source_urls", []) if isinstance(u, str)] + xref = xref_cache.get((cat, rec.slug), {}).get("decision") + d = promote.decide( + band=s.band, source_urls=urls, url_cache=url_cache, crossref_decision=xref, + ) + if d.promote: + candidates.append((rec, s, d.reason)) + elif s.band == "green": + blocked["green-needs-live-t1"] += 1 + + if args.max is not None: + candidates = candidates[: args.max] + + print(f"promote: {len(candidates)} record(s) eligible " + f"({'APPLY' if args.apply else 'dry-run'})") + by_reason = Counter(reason for _r, _s, reason in candidates) + for reason, n in by_reason.most_common(): + print(f" {n:>6} {reason}") + if blocked: + print("blocked (green but no live T1 source yet — run check-urls):") + for reason, n in blocked.most_common(): + print(f" {n:>6} {reason}") + + if not args.apply: + for rec, s, reason in candidates[:20]: + print(f" would promote: {rec.path} [{s.band} {s.score}] {reason}") + if len(candidates) > 20: + print(f" ... and {len(candidates) - 20} more") + return 0 + + written = 0 + entries = [] + for rec, s, reason in candidates: + if promote.write_verified_true(repo_path(rec.path)): + written += 1 + entries.append({ + "ts": ts, "category": rec.category, "slug": rec.slug, "path": rec.path, + "hash": rec.content_hash(), "decision": "promote", + "prev_verified": False, "new_verified": True, "reason": reason, + "tier0": {"score": s.score, "band": s.band}, + "actor": "app.verify.promote", + }) + ledger.append_many(entries) + print(f"\napplied: flipped verified->true in {written} file(s); ledger updated") + print("next: run `python -m app.validate` and `git diff` to confirm only verified changed") + return 0 + + +def cmd_pr(args: argparse.Namespace) -> int: + """All-tiers verification of a PR's changed records, as one markdown report. + + Tier 0 (offline score) + Tier 1 (source-URL liveness) + Tier 2 (external + cross-reference) + Tier 3 (promotion decision, DRY-RUN — never writes). Network + tiers run only over the records changed vs origin/main, capped by --max. + """ + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + + changed = _changed_data_slugs() + changed_recs = [ + rec for cat in CATEGORIES for rec in records[cat] + if rec.slug and rec.path in changed + ] + + print("## 🔎 Data verification — Tiers 0–3 (on demand)\n") + + if not changed_recs: + print("_No data records changed in this PR. Showing the full-dataset " + "Tier 0 baseline only; network tiers (1–3) have nothing to check._\n") + else: + sub = changed_recs[: args.max] + truncated = len(changed_recs) > args.max + note = f" (showing first {args.max} for network tiers)" if truncated else "" + print(f"**{len(changed_recs)} changed data record(s)**{note}. " + "Tier 3 is dry-run — no `verified` flags are written.\n") + + # Tier 0 — offline score of the changed records. + scored = [(r, offline.score_record(r, now_year, soc_release)) for r in sub] + print("### Tier 0 — offline score (changed)\n") + print("| Slug | Category | Band | Score | Flags |") + print("| --- | --- | :--: | ---: | --- |") + for r, s in scored: + badge = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(s.band, s.band) + flags = ", ".join(f"`{f}`" for f in s.flags) or "—" + print(f"| {r.slug} | {r.category} | {badge} | {s.score} | {flags} |") + print() + + # Tier 1 — source-URL liveness (network). + urls = sorted({u for r, _ in scored + for u in r.data.get("source_urls", []) if isinstance(u, str)}) + ts = _now_iso() + url_cache: dict[str, dict] = {} + try: + for res in http_check.check_urls(urls, min_interval=0.5): + url_cache[res.url] = http_check.result_to_entry(res, ts) + except Exception as exc: # network hiccup must not sink the report + print(f"_Tier 1 skipped: {exc}_\n") + alive = sum(1 for e in url_cache.values() if e.get("alive")) + dead = len(url_cache) - alive + print("### Tier 1 — source-URL liveness (changed)\n") + print(f"Checked **{len(url_cache)}** unique URL(s): **{alive} alive**, **{dead} dead**.\n") + dead_reasons = Counter(e["reason"] for e in url_cache.values() if not e.get("alive")) + if dead_reasons: + print("| Dead reason | Count |") + print("| --- | ---: |") + for reason, n in dead_reasons.most_common(8): + print(f"| `{reason}` | {n} |") + print() + + # Tier 2 — external cross-reference (network, exact-heading only). + fetcher = crossref.WikidataFetcher() + xref: dict[str, str] = {} + decisions: Counter[str] = Counter() + for r, _ in scored: + try: + xres = crossref.crossref_record(r.data, fetcher) + if r.slug: + xref[r.slug] = xres.decision + decisions[xres.decision] += 1 + except Exception: + decisions["error"] += 1 + print("### Tier 2 — external cross-reference (changed)\n") + if decisions: + print("| Decision | Count |") + print("| --- | ---: |") + for d, n in decisions.most_common(): + print(f"| `{d}` | {n} |") + print() + + # Tier 3 — promotion decision (DRY-RUN). + promote_rows = [] + hold = 0 + for r, s in scored: + urls_r = [u for u in r.data.get("source_urls", []) if isinstance(u, str)] + dec = promote.decide(band=s.band, source_urls=urls_r, url_cache=url_cache, + crossref_decision=xref.get(r.slug) if r.slug else None) + if dec.promote: + promote_rows.append((r, dec.reason)) + else: + hold += 1 + print("### Tier 3 — promotion (dry-run)\n") + print(f"**{len(promote_rows)}** record(s) would promote to `verified:true`, " + f"**{hold}** held.\n") + if promote_rows: + print("| Slug | Reason |") + print("| --- | --- |") + for r, reason in promote_rows: + print(f"| {r.slug} | `{reason}` |") + print() + + # Full-dataset Tier 0 baseline (always). + hist: dict[str, Counter] = defaultdict(Counter) + hard_flags: Counter = Counter() + scored_n = 0 + for cat in CATEGORIES: + for rec in records[cat]: + if not rec.slug: + continue + s = offline.score_record(rec, now_year, soc_release) + hist[rec.category][s.band] += 1 + scored_n += 1 + for f in s.flags: + if f.startswith("!"): + hard_flags[f] += 1 + print("### Full-dataset Tier 0 baseline\n") + _print_markdown(hist, scored_n, hard_flags) + return 0 + + +def _not_implemented(args: argparse.Namespace) -> int: + print(f"`{args.cmd}` is a later-phase subcommand and is not implemented yet.") + return 2 + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(prog="python -m app.verify", description=__doc__) + sub = p.add_subparsers(dest="cmd", required=True) + + sc = sub.add_parser("score", help="Tier 0 offline plausibility scoring") + sc.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + sc.add_argument("--max", type=int, default=None, help="cap number scored") + sc.add_argument("--unverified-only", action="store_true", help="skip verified:true records") + sc.add_argument("--changed", action="store_true", help="only records changed vs origin/main") + sc.add_argument("--no-cache", action="store_true", help="do not write the scores cache") + sc.add_argument("--format", choices=["text", "md"], default="text", + help="output format: text histogram (default) or markdown table") + sc.set_defaults(func=cmd_score) + + rp = sub.add_parser("report", help="summarize latest ledger state") + rp.set_defaults(func=cmd_report) + + st = sub.add_parser("status", help="write the aggregated verification status JSON") + st.add_argument("--output", type=Path, default=None, + help="output path (default: data/_verify/status.json)") + st.add_argument("--stdout", action="store_true", help="print JSON instead of writing a file") + st.set_defaults(func=cmd_status) + + cu = sub.add_parser("check-urls", help="Tier 1: source_urls HTTP liveness") + cu.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + cu.add_argument("--max", type=int, default=500, help="number of frontier records to target") + cu.add_argument("--workers", type=int, default=8, help="concurrent HTTP workers") + cu.add_argument("--min-interval", type=float, default=1.0, help="seconds between hits per host") + cu.add_argument("--ttl-days", type=int, default=http_check.DEFAULT_TTL_DAYS, help="cache freshness") + cu.add_argument("--recheck", action="store_true", help="ignore cache freshness") + cu.set_defaults(func=cmd_check_urls) + + cr = sub.add_parser("crossref", help="Tier 2: external cross-reference (exact heading)") + cr.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + cr.add_argument("--max", type=int, default=200, help="number of yellow/red records to escalate") + cr.add_argument("--recheck", action="store_true", help="ignore crossref cache") + cr.set_defaults(func=cmd_crossref) + + pm = sub.add_parser("promote", help="Tier 3: hybrid escalation + verified write-back") + pm.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") + pm.add_argument("--max", type=int, default=None, help="cap number promoted") + pm.add_argument("--apply", action="store_true", help="actually flip verified (default: dry-run)") + pm.set_defaults(func=cmd_promote) + + pr = sub.add_parser("pr", help="all-tiers (0-3) markdown report for a PR's changed records") + pr.add_argument("--max", type=int, default=40, help="cap changed records for network tiers") + pr.set_defaults(func=cmd_pr) + + return p + + +def main(argv: list[str] | None = None) -> int: + configure_stdout() + parser = build_parser() + args = parser.parse_args(argv) + return args.func(args) From 066307b4067ac803f859b495cb74f473b839c5f7 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:26 +0900 Subject: [PATCH 04/54] feat(verify): migrate verification layer from TechAPI --- app/verify/common.py | 111 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 app/verify/common.py diff --git a/app/verify/common.py b/app/verify/common.py new file mode 100644 index 0000000..06afb09 --- /dev/null +++ b/app/verify/common.py @@ -0,0 +1,111 @@ +"""Shared loading + identity helpers for the verification layer. + +Reuses ``app.validate._load`` (the canonical seed loader) rather than +re-implementing JSON discovery, and rebuilds the brand/SoC foreign-key slug sets +the same way ``app.validate.validate`` does, so the verifier sees exactly the +data the structural gate sees. +""" + +from __future__ import annotations + +import hashlib +import json +import sys +from pathlib import Path +from typing import Any, Iterable + +from app.validate import DATA_DIR, _load + +# Categories the verifier knows about, in load order. Mirrors app.validate.validate. +CATEGORIES: tuple[str, ...] = ( + "brand", + "soc", + "smartphone", + "tablet", + "watch", + "pda", + "gpu", + "cpu", +) + +VERIFY_DIR = DATA_DIR / "_verify" +LEDGER_PATH = VERIFY_DIR / "ledger.jsonl" # git-tracked: promotion decisions only +STATE_DIR = VERIFY_DIR / "state" # gitignored caches +SCORES_PATH = STATE_DIR / "scores.jsonl" # full Tier 0 results (cheap to recompute) + + +class Record: + """A single seed record paired with its repo-relative path and category.""" + + __slots__ = ("category", "path", "data") + + def __init__(self, category: str, path: str, data: dict[str, Any]) -> None: + self.category = category + self.path = path # e.g. "cpu/intel/2023/desktop/core-i9-14900k.json" + self.data = data + + @property + def slug(self) -> str | None: + slug = self.data.get("slug") + return slug if isinstance(slug, str) else None + + @property + def verified(self) -> bool: + return self.data.get("verified") is True + + def content_hash(self) -> str: + """Stable hash of the record body — invalidates stale ledger decisions on edit.""" + blob = json.dumps(self.data, sort_keys=True, ensure_ascii=False) + return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] + + def __repr__(self) -> str: # pragma: no cover - debug aid + return f"Record({self.category}, {self.slug!r})" + + +def load_category(category: str) -> list[Record]: + """Load one category's records as :class:`Record` objects.""" + return [Record(category, path, data) for path, data in _load(category)] + + +def load_all(categories: Iterable[str] = CATEGORIES) -> dict[str, list[Record]]: + """Load every category into ``{category: [Record, ...]}``.""" + return {cat: load_category(cat) for cat in categories} + + +def foreign_key_sets( + records: dict[str, list[Record]], +) -> tuple[set[str], set[str], dict[str, str]]: + """Build FK lookups the way ``app.validate`` does, plus a SoC release-date map. + + Returns ``(brand_slugs, soc_slugs, soc_release_date)`` where ``soc_release_date`` + maps a SoC slug to its ISO release date (used for "chip can't postdate device"). + """ + brand_slugs = {r.slug for r in records.get("brand", []) if r.slug} + soc_slugs = {r.slug for r in records.get("soc", []) if r.slug} + soc_release: dict[str, str] = {} + for r in records.get("soc", []): + rd = r.data.get("release_date") + if r.slug and isinstance(rd, str): + soc_release[r.slug] = rd + return brand_slugs, soc_slugs, soc_release + + +def configure_stdout() -> None: + """Force UTF-8 stdout so emoji/box-drawing don't crash on Windows cp949. + + Mirrors ``app.validate.run`` (validate.py:336-340). + """ + try: + sys.stdout.reconfigure(encoding="utf-8") # type: ignore[union-attr] + except Exception: + pass + + +def ensure_verify_dirs() -> None: + VERIFY_DIR.mkdir(parents=True, exist_ok=True) + STATE_DIR.mkdir(parents=True, exist_ok=True) + + +def repo_path(rel: str) -> Path: + """Resolve a repo-relative seed path (as stored on a Record) to an absolute path.""" + return DATA_DIR / rel From 332f46fd22b3f21dcebcaab1d2ccdb53b992ef17 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:28 +0900 Subject: [PATCH 05/54] feat(verify): migrate verification layer from TechAPI --- app/verify/crossref.py | 196 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 app/verify/crossref.py diff --git a/app/verify/crossref.py b/app/verify/crossref.py new file mode 100644 index 0000000..adc8045 --- /dev/null +++ b/app/verify/crossref.py @@ -0,0 +1,196 @@ +"""Tier 2 — external cross-reference under a strict exact-heading rule. + +Confirms a record describes a real, documented part by finding an authoritative +page (Wikidata / Wikipedia) whose *title* matches the record name exactly after +normalization. Fuzzy matches are explicitly NOT trusted: project experience shows +fuzzy heading matching serves the wrong SKU ~35% of the time, so a non-exact +candidate yields ``ambiguous`` (never an auto-promote). + +All network access goes through an injected ``fetcher`` so the decision logic is +unit-tested offline. The concrete fetcher (urllib against the Wikipedia/Wikidata +REST APIs) is only used by the CLI / scheduled workflow. +""" + +from __future__ import annotations + +import json +import re +from typing import Any, NamedTuple, Protocol +from urllib.parse import quote +from urllib.request import Request, urlopen + +# Decisions +CONFIRM = "confirm" +AMBIGUOUS = "ambiguous" +CONTRADICT = "contradict" +NOTFOUND = "notfound" + +_NORM_RE = re.compile(r"[^a-z0-9]+") + + +def normalize_heading(text: str) -> str: + """Lowercase, drop everything but [a-z0-9]. 'iPhone XR' -> 'iphonexr'.""" + return _NORM_RE.sub("", text.lower()) + + +class Candidate(NamedTuple): + title: str + url: str + year: int | None = None # release/inception year if the source exposes one + + +class Fetcher(Protocol): + def search(self, name: str) -> list[Candidate]: + ... + + +class CrossrefResult(NamedTuple): + slug: str + source: str + decision: str + exact_heading: bool + matched_url: str | None + spec_agreements: int + + +def _year_of(value: Any) -> int | None: + if isinstance(value, str) and len(value) >= 4 and value[:4].isdigit(): + return int(value[:4]) + return None + + +def _heading_matches(rec_name: str, cand_title: str) -> bool: + """Exact normalized match, or the candidate is the model-name suffix of the + record (authoritative sources often omit the maker prefix: record 'AMD Ryzen 7 + 5800X' vs Wikidata label 'Ryzen 7 5800X'). This is NOT fuzzy matching — it + requires a full, contiguous suffix of >=4 chars, so it can't drift to a + different SKU the way Levenshtein does.""" + r, c = normalize_heading(rec_name), normalize_heading(cand_title) + if not r or not c: + return False + if r == c: + return True + return len(c) >= 4 and (r.endswith(c) or c.endswith(r)) + + +def crossref_record( + rec: dict[str, Any], fetcher: Fetcher, source: str = "wikidata" +) -> CrossrefResult: + """Decide confirm/ambiguous/contradict/notfound for one record. + + Reality-based: CONFIRM requires an exact-heading authoritative entity whose + release year agrees. A year disagreement is a CONTRADICT (reality veto — the + record must NOT be promoted, even if it scored green). A name match with no + comparable year is only AMBIGUOUS (existence, but specs unconfirmed).""" + name = rec.get("name") + slug = rec.get("slug") or "" + if not isinstance(name, str) or not name.strip(): + return CrossrefResult(slug, source, NOTFOUND, False, None, 0) + + candidates = fetcher.search(name) + if not candidates: + return CrossrefResult(slug, source, NOTFOUND, False, None, 0) + + exact = [c for c in candidates if _heading_matches(name, c.title)] + if not exact: + return CrossrefResult(slug, source, AMBIGUOUS, False, candidates[0].url, 0) + + # Prefer an exact match that carries a year (so we can actually confirm specs). + cand = next((c for c in exact if c.year is not None), exact[0]) + rec_year = _year_of(rec.get("release_date")) + if rec_year is not None and cand.year is not None: + if abs(cand.year - rec_year) <= 1: + return CrossrefResult(slug, source, CONFIRM, True, cand.url, 1) + return CrossrefResult(slug, source, CONTRADICT, True, cand.url, 0) + # Name matches an authoritative entity but no year to verify the data against. + return CrossrefResult(slug, source, AMBIGUOUS, True, cand.url, 0) + + +# --- concrete fetchers (network; not exercised by unit tests) -------------------- + + +def _wikidata_claim_year(entity: dict) -> int | None: + """First year from inception (P571) or publication date (P577) claims.""" + claims = entity.get("claims", {}) + for prop in ("P571", "P577"): + for claim in claims.get(prop, []): + try: + t = claim["mainsnak"]["datavalue"]["value"]["time"] # "+2007-02-19T..." + except (KeyError, TypeError): + continue + digits = t.lstrip("+")[:4] + if digits.isdigit(): + return int(digits) + return None + + +class WikidataFetcher: + """Structured cross-reference against Wikidata: search entities by label, then + read their release year (P571/P577) to verify the record's data against reality. + Two HTTP calls per record (search + a batched entity fetch).""" + + API = "https://www.wikidata.org/w/api.php" + UA = "TechAPI-verify/0.1 (https://github.com/GetTechAPI)" + + def __init__(self, timeout: float = 10.0, limit: int = 5) -> None: + self.timeout = timeout + self.limit = limit + + def _get(self, url: str) -> dict: + req = Request(url, headers={"User-Agent": self.UA}) + with urlopen(req, timeout=self.timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + def search(self, name: str) -> list[Candidate]: + try: + data = self._get( + f"{self.API}?action=wbsearchentities&format=json&language=en" + f"&limit={self.limit}&search={quote(name)}" + ) + hits = data.get("search", []) + if not hits: + return [] + ids = "|".join(h["id"] for h in hits if h.get("id")) + ent = self._get( + f"{self.API}?action=wbgetentities&format=json&props=claims&ids={ids}" + ).get("entities", {}) + except Exception: + return [] + out: list[Candidate] = [] + for h in hits: + qid = h.get("id") + label = h.get("label") or h.get("match", {}).get("text", "") + year = _wikidata_claim_year(ent.get(qid, {})) if qid else None + out.append(Candidate(title=label, url=f"https://www.wikidata.org/wiki/{qid}", year=year)) + return out + + +class WikipediaFetcher: + """Queries the MediaWiki opensearch API for candidate page titles.""" + + API = "https://en.wikipedia.org/w/api.php" + UA = "TechAPI-verify/0.1 (https://github.com/GetTechAPI)" + + def __init__(self, timeout: float = 10.0, limit: int = 5) -> None: + self.timeout = timeout + self.limit = limit + + def search(self, name: str) -> list[Candidate]: + url = ( + f"{self.API}?action=opensearch&format=json&limit={self.limit}" + f"&search={quote(name)}" + ) + try: + req = Request(url, headers={"User-Agent": self.UA}) + with urlopen(req, timeout=self.timeout) as resp: + data = json.loads(resp.read().decode("utf-8")) + except Exception: + return [] + # opensearch returns [query, [titles...], [descs...], [urls...]] + titles = data[1] if len(data) > 1 else [] + urls = data[3] if len(data) > 3 else [] + out: list[Candidate] = [] + for i, title in enumerate(titles): + url_i = urls[i] if i < len(urls) else "" + out.append(Candidate(title=title, url=url_i)) + return out From 3fcc062d8eaefba4b853af2222f9d2b33b370f44 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:29 +0900 Subject: [PATCH 06/54] feat(verify): migrate verification layer from TechAPI --- app/verify/hosts.py | 115 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 app/verify/hosts.py diff --git a/app/verify/hosts.py b/app/verify/hosts.py new file mode 100644 index 0000000..00ed915 --- /dev/null +++ b/app/verify/hosts.py @@ -0,0 +1,115 @@ +"""Source-host trust classification. + +Grounded in a real signal in the dataset: already-``verified`` records cite +authoritative hosts (en.wikipedia.org, ark.intel.com, amd.com, apple.com, +cpubenchmark.net, ...), while bulk-imported unverified records cite *only* +kaggle.com. The host a record's ``source_urls`` point at is therefore a strong, +learned discriminator of "is this a real, documented part?". +""" + +from __future__ import annotations + +from typing import Iterable +from urllib.parse import urlparse + +# Tier 1 — primary/manufacturer + top reference encyclopaedias. A live T1 source +# is strong enough to auto-promote a green record without external cross-ref. +T1_HOSTS: frozenset[str] = frozenset( + { + "ark.intel.com", + "intel.com", + "amd.com", + "qualcomm.com", + "apple.com", + "nvidia.com", + "samsung.com", + "mediatek.com", + "arm.com", + "en.wikipedia.org", + "wikipedia.org", + "wikichip.org", + "en.wikichip.org", + "techpowerup.com", + } +) + +# Tier 2 — reputable spec/benchmark databases. Trustworthy but secondary. +T2_HOSTS: frozenset[str] = frozenset( + { + "gsmarena.com", + "phonedb.net", + "cpubenchmark.net", + "videocardbenchmark.net", + "nanoreview.net", + "technical.city", + "topcpu.net", + "notebookcheck.net", + "geekbench.com", + "kimovil.com", + "devicespecifications.com", + } +) + +# Tier 3 — bulk dumps / aggregators / CDNs. Present in nearly every unverified +# import; on their own they do not establish real-world existence. +T3_HOSTS: frozenset[str] = frozenset( + { + "kaggle.com", + "github.com", + "raw.githubusercontent.com", + "commons.wikimedia.org", + "jsdelivr.net", + "cdn.jsdelivr.net", + "aitoolbuzz.com", + } +) + + +def host_of(url: str) -> str: + """Return the lowercased registrable-ish host of a URL (``www.`` stripped).""" + try: + netloc = urlparse(url).netloc.lower() + except Exception: + return "" + netloc = netloc.split("@")[-1].split(":")[0] + if netloc.startswith("www."): + netloc = netloc[4:] + return netloc + + +def _matches(host: str, hosts: frozenset[str]) -> bool: + # Exact host or a subdomain of a listed host (e.g. "x.intel.com" -> "intel.com"). + if host in hosts: + return True + return any(host.endswith("." + h) for h in hosts) + + +def tier_of_host(host: str) -> int: + """1, 2, or 3 for a known host; 0 for unknown/unclassified.""" + if _matches(host, T1_HOSTS): + return 1 + if _matches(host, T2_HOSTS): + return 2 + if _matches(host, T3_HOSTS): + return 3 + return 0 + + +def best_tier(urls: Iterable[str]) -> int: + """Best (lowest-numbered) known tier among ``urls``; 0 if none classified. + + Note: lower tier number == higher trust, so "best" means the minimum of the + classified tiers (1 beats 2 beats 3). + """ + classified = [t for t in (tier_of_host(host_of(u)) for u in urls) if t] + return min(classified) if classified else 0 + + +def distinct_strong_hosts(urls: Iterable[str]) -> int: + """Count of distinct T1/T2 hosts — used for a corroboration bonus.""" + strong: set[str] = set() + for u in urls: + h = host_of(u) + if tier_of_host(h) in (1, 2): + strong.add(h) + return len(strong) From d67fcb8f5b70e9ab8f15e108905bd1ddda1eaf19 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:30 +0900 Subject: [PATCH 07/54] feat(verify): migrate verification layer from TechAPI --- app/verify/http_check.py | 228 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 app/verify/http_check.py diff --git a/app/verify/http_check.py b/app/verify/http_check.py new file mode 100644 index 0000000..f22470d --- /dev/null +++ b/app/verify/http_check.py @@ -0,0 +1,228 @@ +"""Tier 1 — source_urls liveness. + +Answers "do this record's cited sources actually resolve?" without trusting the +page contents (that is Tier 2). Pure-ish: all network I/O goes through an injected +*opener* so tests run offline with a fake. + +Design constraints (project memory): stdlib only (urllib + concurrent.futures), +per-host rate limiting, a resumable TTL cache, and never re-check fresh URLs. +""" + +from __future__ import annotations + +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone +from typing import Any, Callable, Iterable, NamedTuple +from urllib.parse import urlparse +from urllib.request import Request, build_opener + +from . import ledger +from .common import STATE_DIR +from .hosts import host_of + +URL_CACHE_PATH = STATE_DIR / "url_cache.jsonl" +DEFAULT_TTL_DAYS = 30 +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36 TechAPI-verify/0.1" +) + + +class CheckResult(NamedTuple): + url: str + status: int | None + final_url: str | None + alive: bool + reason: str + + +# --- opener abstraction (injectable for tests) ----------------------------------- + + +class _Opener: + """Thin wrapper over urllib's opener exposing ``open(url, method) -> (status, final)``.""" + + def __init__(self, timeout: float = 10.0) -> None: + self._opener = build_opener() + self.timeout = timeout + + def open(self, url: str, method: str) -> tuple[int, str]: + req = Request(url, method=method, headers={"User-Agent": USER_AGENT}) + resp = self._opener.open(req, timeout=self.timeout) + try: + status = getattr(resp, "status", None) or resp.getcode() + final = resp.geturl() + return int(status), final + finally: + resp.close() + + +def default_opener_factory(timeout: float = 10.0) -> _Opener: + return _Opener(timeout=timeout) + + +# --- classification -------------------------------------------------------------- + + +def _path_depth(url: str) -> int: + try: + path = urlparse(url).path.strip("/") + except Exception: + return 0 + return len([p for p in path.split("/") if p]) + + +def _is_homepage_redirect(original: str, final: str) -> bool: + """A deep page that redirects to the site root is a soft-404 ("not found" page).""" + if not final or final == original: + return False + return _path_depth(original) >= 1 and _path_depth(final) == 0 + + +def classify(original_url: str, status: int | None, final_url: str | None) -> tuple[bool, str]: + if status is None: + return False, "error" + if status >= 400: + return False, f"http-{status}" + if final_url and _is_homepage_redirect(original_url, final_url): + return False, "homepage-redirect" + return True, f"http-{status}" + + +def check_one(url: str, opener: Any) -> CheckResult: + """HEAD first; fall back to GET when HEAD is rejected (405/403) or errors.""" + status: int | None = None + final: str | None = None + for method in ("HEAD", "GET"): + try: + status, final = opener.open(url, method) + if method == "HEAD" and status in (400, 403, 405, 501): + continue # server dislikes HEAD -> retry GET + break + except Exception as exc: # HTTPError carries a code; everything else is dead + code = getattr(exc, "code", None) + if isinstance(code, int): + status, final = code, getattr(exc, "url", None) or url + if method == "HEAD" and code in (400, 403, 405, 501): + continue + break + status, final = None, None + alive, reason = classify(url, status, final) + return CheckResult(url, status, final, alive, reason) + + +# --- rate limiting --------------------------------------------------------------- + + +class HostRateLimiter: + """Token-ish per-host limiter: enforce a minimum interval between requests.""" + + def __init__(self, min_interval: float = 1.0) -> None: + self.min_interval = min_interval + self._last: dict[str, float] = {} + self._lock = threading.Lock() + + def wait(self, host: str) -> None: + with self._lock: + now = time.time() + prev = self._last.get(host, 0.0) + sleep_for = max(0.0, self.min_interval - (now - prev)) + self._last[host] = now + sleep_for + if sleep_for > 0: + time.sleep(sleep_for) + + +# --- batch driver ---------------------------------------------------------------- + + +def dedupe_urls(urls: Iterable[str]) -> list[str]: + """Collapse to one representative per (host, path) — kaggle dumps share a URL.""" + seen: dict[tuple[str, str], str] = {} + for u in urls: + try: + p = urlparse(u) + except Exception: + continue + key = (p.netloc.lower(), p.path.rstrip("/")) + seen.setdefault(key, u) + return list(seen.values()) + + +def check_urls( + urls: list[str], + *, + max_workers: int = 8, + min_interval: float = 1.0, + opener_factory: Callable[[], Any] = default_opener_factory, + limiter: HostRateLimiter | None = None, +) -> list[CheckResult]: + limiter = limiter or HostRateLimiter(min_interval) + local = threading.local() + + def _get_opener() -> Any: + op = getattr(local, "opener", None) + if op is None: + op = opener_factory() + local.opener = op + return op + + def _task(url: str) -> CheckResult: + limiter.wait(host_of(url)) + return check_one(url, _get_opener()) + + if not urls: + return [] + with ThreadPoolExecutor(max_workers=max_workers) as pool: + return list(pool.map(_task, urls)) + + +# --- cache ----------------------------------------------------------------------- + + +def load_cache(path=URL_CACHE_PATH) -> dict[str, dict[str, Any]]: + return {e["url"]: e for e in ledger.iter_entries(path) if isinstance(e.get("url"), str)} + + +def _parse_ts(ts: str) -> datetime | None: + try: + return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + except Exception: + return None + + +def is_fresh(entry: dict[str, Any], now: datetime, ttl_days: int) -> bool: + ts = _parse_ts(entry.get("checked_at", "")) + if ts is None: + return False + return (now - ts).days < ttl_days + + +def save_cache(cache: dict[str, dict[str, Any]], path=URL_CACHE_PATH) -> None: + ledger.replace_all(list(cache.values()), path) + + +def result_to_entry(r: CheckResult, ts: str) -> dict[str, Any]: + return { + "url": r.url, + "status": r.status, + "final_url": r.final_url, + "alive": r.alive, + "reason": r.reason, + "checked_at": ts, + } + + +def record_liveness(source_urls: list[str], cache: dict[str, dict[str, Any]]) -> tuple[int, int]: + """(#live, #dead) for a record's URLs that are present in the cache.""" + live = dead = 0 + for u in source_urls: + e = cache.get(u) + if e is None: + continue + if e.get("alive"): + live += 1 + else: + dead += 1 + return live, dead From ddeb1332284f8252d9a7587e049757bed232e032 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:32 +0900 Subject: [PATCH 08/54] feat(verify): migrate verification layer from TechAPI --- app/verify/ledger.py | 101 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 app/verify/ledger.py diff --git a/app/verify/ledger.py b/app/verify/ledger.py new file mode 100644 index 0000000..4f890bd --- /dev/null +++ b/app/verify/ledger.py @@ -0,0 +1,101 @@ +"""Append-only JSONL verification ledger — the audit trail + resume cursor. + +One decision per line in ``data/_verify/ledger.jsonl`` (git-tracked, diffable, +merge-friendly). Each tier appends; the latest entry per (category, slug) wins. +A record whose ``content_hash`` is unchanged since its last fresh decision can be +skipped, which is what makes multi-tier runs incremental and resumable. + +Timestamps are passed in by the caller (never generated here) so the module stays +pure and the CLI controls the clock. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Iterator + +from .common import LEDGER_PATH, ensure_verify_dirs + + +def append(entry: dict[str, Any], path: Path = LEDGER_PATH) -> None: + ensure_verify_dirs() + line = json.dumps(entry, ensure_ascii=False, sort_keys=True) + with path.open("a", encoding="utf-8", newline="\n") as fh: + fh.write(line + "\n") + + +def append_many(entries: list[dict[str, Any]], path: Path = LEDGER_PATH) -> None: + if not entries: + return + ensure_verify_dirs() + with path.open("a", encoding="utf-8", newline="\n") as fh: + for entry in entries: + fh.write(json.dumps(entry, ensure_ascii=False, sort_keys=True) + "\n") + + +def replace_all(entries: list[dict[str, Any]], path: Path) -> None: + """Truncate-write a full result set (used for the cheap-to-recompute scores cache).""" + ensure_verify_dirs() + with path.open("w", encoding="utf-8", newline="\n") as fh: + for entry in entries: + fh.write(json.dumps(entry, ensure_ascii=False, sort_keys=True) + "\n") + + +def iter_entries(path: Path = LEDGER_PATH) -> Iterator[dict[str, Any]]: + if not path.exists(): + return + with path.open("r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if line: + yield json.loads(line) + + +def latest_by_key(path: Path = LEDGER_PATH) -> dict[tuple[str, str], dict[str, Any]]: + """Most-recent ledger entry per (category, slug). Later lines override earlier.""" + out: dict[tuple[str, str], dict[str, Any]] = {} + for entry in iter_entries(path): + cat, slug = entry.get("category"), entry.get("slug") + if isinstance(cat, str) and isinstance(slug, str): + out[(cat, slug)] = entry + return out + + +def make_tier0_entry( + category: str, + slug: str, + rel_path: str, + content_hash: str, + score: float, + band: str, + subscores: dict[str, float], + flags: list[str], + best_tier: int, + ts: str, +) -> dict[str, Any]: + return { + "ts": ts, + "category": category, + "slug": slug, + "path": rel_path, + "hash": content_hash, + "tier0": { + "score": score, + "band": band, + "subscores": subscores, + "flags": flags, + "best_host_tier": best_tier, + }, + } + + +def is_fresh( + entry: dict[str, Any] | None, content_hash: str, tier: str +) -> bool: + """True if ``entry`` already has a result for ``tier`` and the record is unchanged.""" + if not entry: + return False + if entry.get("hash") != content_hash: + return False # record edited since -> stale + return tier in entry From 13f97c971c0c5641534922249fa37b5c88ab4abc Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:33 +0900 Subject: [PATCH 09/54] feat(verify): migrate verification layer from TechAPI --- app/verify/offline.py | 135 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 app/verify/offline.py diff --git a/app/verify/offline.py b/app/verify/offline.py new file mode 100644 index 0000000..4a9927d --- /dev/null +++ b/app/verify/offline.py @@ -0,0 +1,135 @@ +"""Tier 0 — offline, deterministic plausibility scoring over the whole dataset. + +No network. Combines four sub-scores into 0..100 and a green/yellow/red band: + +* completeness 0..25 — how richly populated beyond the required fields +* consistency 0..35 — cross-field predicates from :mod:`signals` +* host trust 0..30 — authority of the cited ``source_urls`` (:mod:`hosts`) +* provenance 0..10 — clean normalized data vs raw-blob-only imports + +Hard predicate violations (threads red + +# "Rich" fields per category: presence (non-null) signals a fleshed-out record. +# Dotted paths index into nested dicts (e.g. "display.ppi"). +RICH_FIELDS: dict[str, tuple[str, ...]] = { + "cpu": ("architecture", "base_clock_ghz", "boost_clock_ghz", "l3_cache_mb", + "socket", "tdp_w", "passmark_cpu_mark"), + "gpu": ("architecture", "boost_clock_mhz", "memory_type", "memory_bandwidth_gbps", + "fp32_tflops", "cuda_cores", "stream_processors"), + "soc": ("transistors_billion", "cpu_config", "gpu_cores", "gpu_clock_mhz", + "npu_tops", "geekbench_multi"), + "smartphone": ("soc", "display.size_inch", "display.resolution", "display.ppi", + "cameras", "storage_options_gb", "charging_wired_w", "os_version"), + "tablet": ("display.size_inch", "display.resolution", "storage_options_gb", + "cameras", "os_version"), + "watch": ("display.size_inch", "display.resolution", "os_version"), + "pda": ("display.size_inch", "display.resolution", "os_version"), + "brand": ("founded_year", "description_en"), +} + + +class Score(NamedTuple): + score: float + band: str # "green" | "yellow" | "red" + subscores: dict[str, float] + flags: list[str] # names of failed predicates (hard prefixed with "!") + best_tier: int + + +def _get_path(data: dict[str, Any], path: str) -> Any: + cur: Any = data + for part in path.split("."): + if not isinstance(cur, dict): + return None + cur = cur.get(part) + return cur + + +def _completeness(category: str, data: dict[str, Any]) -> float: + fields = RICH_FIELDS.get(category, ()) + if not fields: + return W_COMPLETENESS + present = sum(1 for f in fields if _get_path(data, f) not in (None, "", [], {})) + return W_COMPLETENESS * present / len(fields) + + +def _consistency(sigs: list[signals.Signal]) -> tuple[float, list[str], bool]: + evaluated = [s for s in sigs if s.result in ("pass", "fail")] + failed = [s for s in sigs if s.failed] + hard_failed = any(s.hard for s in failed) + flags = [("!" if s.hard else "") + s.name for s in failed] + if not evaluated: + return W_CONSISTENCY, flags, hard_failed + passed = sum(1 for s in evaluated if s.result == "pass") + return W_CONSISTENCY * passed / len(evaluated), flags, hard_failed + + +def _host_score(urls: list[str]) -> tuple[float, int]: + best = hosts.best_tier(urls) + base = {1: 26.0, 2: 18.0, 3: 6.0, 0: 3.0}[best] + if hosts.distinct_strong_hosts(urls) >= 2: + base += 4.0 + return min(base, W_HOST), best + + +def _provenance(data: dict[str, Any], best_tier: int) -> float: + has_raw = any(k.startswith("raw_") for k in data.keys()) + if not has_raw: + return 7.0 + prov = 5.0 + (3.0 if best_tier in (1, 2) else -3.0) + return max(0.0, min(prov, W_PROVENANCE)) + + +def score_record( + rec: Record, now_year: int, soc_release: dict[str, str] +) -> Score: + data = rec.data + urls = [u for u in data.get("source_urls", []) if isinstance(u, str)] + + completeness = _completeness(rec.category, data) + sigs = signals.signals_for(rec.category, data, now_year, soc_release) + consistency, flags, hard_failed = _consistency(sigs) + host, best_tier = _host_score(urls) + provenance = _provenance(data, best_tier) + + total = completeness + consistency + host + provenance + subscores = { + "completeness": round(completeness, 1), + "consistency": round(consistency, 1), + "host": round(host, 1), + "provenance": round(provenance, 1), + } + + if hard_failed: + band = "red" + elif total >= GREEN_MIN and best_tier in (1, 2): + band = "green" + elif total < RED_MAX: + band = "red" + else: + band = "yellow" + + return Score(round(total, 1), band, subscores, flags, best_tier) + + +def now_year_today() -> int: + return date.today().year From 84fccf1f99478c21d0c1344331275db885107acc Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:35 +0900 Subject: [PATCH 10/54] feat(verify): migrate verification layer from TechAPI --- app/verify/promote.py | 109 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 app/verify/promote.py diff --git a/app/verify/promote.py b/app/verify/promote.py new file mode 100644 index 0000000..2485f89 --- /dev/null +++ b/app/verify/promote.py @@ -0,0 +1,109 @@ +"""Tier 3 — hybrid escalation + safe ``verified:true`` write-back. + +Promotion rules (only ever ``false -> true``, never a demotion): +* band green AND >=1 cited source is a *live* Tier-1 host -> auto-promote +* Tier 2 cross-reference returned ``confirm`` (exact heading) -> promote +* otherwise stay unverified, with a logged reason + +Write-back is *surgical*: only the ``"verified": false`` token is rewritten to +``true`` in the raw bytes. Full re-serialization is intentionally avoided because +the seed files keep short arrays inline (``[64, 128, 256]``) while ``json.dumps`` +would expand them, producing a huge spurious diff and defeating the "only verified +changed" guard. Edits are atomic (temp file + ``os.replace``) and preserve LF. +""" + +from __future__ import annotations + +import os +import re +from pathlib import Path +from typing import Any, NamedTuple + +from . import hosts +from .common import STATE_DIR + +CROSSREF_CACHE_PATH = STATE_DIR / "crossref_cache.jsonl" + +# A top-level, one-key-per-line "verified": false entry (2-space indented). +_VERIFIED_FALSE_RE = re.compile(r'^( )"verified": false(,?)[ \t]*$', re.MULTILINE) + + +class PromotionDecision(NamedTuple): + promote: bool + reason: str + + +def has_live_authoritative_source( + source_urls: list[str], url_cache: dict[str, dict[str, Any]] +) -> bool: + """True if some cited URL is an authoritative host (Tier 1 *or* Tier 2) AND + confirmed alive. The green band already requires a T1/T2 source + completeness + + consistency; this just adds "and that source actually resolves". Requiring a + *manufacturer/encyclopaedia* (T1 only) was too strict — it never promoted the + many green records sourced from reputable spec/benchmark DBs (gsmarena, + cpubenchmark, ...), so verified never moved off its floor. + """ + for u in source_urls: + entry = url_cache.get(u) + if entry and entry.get("alive") and hosts.tier_of_host(hosts.host_of(u)) in (1, 2): + return True + return False + + +# Backwards-compatible alias (older callers/tests). +has_live_t1 = has_live_authoritative_source + + +def decide( + *, band: str, source_urls: list[str], url_cache: dict[str, dict[str, Any]], + crossref_decision: str | None, +) -> PromotionDecision: + # Reality veto: if an authoritative external source contradicts the record's + # specs (e.g. release year mismatch), never promote — even a green record. + # Accuracy must be reality-based; that's the whole point of verification. + if crossref_decision == "contradict": + return PromotionDecision(False, "crossref-contradict") + # Reality confirm: external source agrees -> strongest promotion. + if crossref_decision == "confirm": + return PromotionDecision(True, "crossref-confirm") + # Heuristic fallback where reality is silent: a green record (consistent + + # complete + authoritative-source) whose source is live. green≈verified was + # validated against the human-curated set, so this is a sound proxy. + if band == "green" and has_live_authoritative_source(source_urls, url_cache): + return PromotionDecision(True, "green+live-source") + return PromotionDecision(False, "needs-confirmation") + + +# --- surgical write-back --------------------------------------------------------- + + +def flip_verified_text(raw: str) -> str | None: + """Return ``raw`` with a single top-level ``verified:false`` flipped to true. + + Returns None (refuse) unless exactly one such token exists, so we never touch + a record that isn't shaped the way we expect. + """ + new, n = _VERIFIED_FALSE_RE.subn(r'\g<1>"verified": true\g<2>', raw) + return new if n == 1 else None + + +def write_verified_true(abs_path: Path) -> bool: + """Atomically flip verified false->true in a seed file. Returns True if written.""" + raw = abs_path.read_bytes().decode("utf-8") + new = flip_verified_text(raw) + if new is None: + return False + tmp = abs_path.with_suffix(abs_path.suffix + ".tmp") + tmp.write_bytes(new.encode("utf-8")) + os.replace(tmp, abs_path) + return True + + +def load_crossref_cache(path=CROSSREF_CACHE_PATH) -> dict[tuple[str, str], dict[str, Any]]: + from . import ledger + out: dict[tuple[str, str], dict[str, Any]] = {} + for e in ledger.iter_entries(path): + cat, slug = e.get("category"), e.get("slug") + if isinstance(cat, str) and isinstance(slug, str): + out[(cat, slug)] = e + return out From e540f27bb214fecdaefa494de0aaaa1cbf7ac0bb Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:36 +0900 Subject: [PATCH 11/54] feat(verify): migrate verification layer from TechAPI --- app/verify/signals.py | 254 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 254 insertions(+) create mode 100644 app/verify/signals.py diff --git a/app/verify/signals.py b/app/verify/signals.py new file mode 100644 index 0000000..b7b140f --- /dev/null +++ b/app/verify/signals.py @@ -0,0 +1,254 @@ +"""Per-category cross-field consistency predicates (pure functions). + +The structural validator only range-checks single fields. These predicates check +*relations between fields* — the kind of contradiction that means a record cannot +describe a real part (threads < cores, a chip that postdates the device it powers, +a clock that boosts below its base). Each predicate yields a :class:`Signal`. + +Severity: +* ``hard`` — logically impossible. Forces the record's band to red regardless of score. +* soft — implausible but physically possible; only subtracts from the score. + +``NA`` results (inputs absent) are neither pass nor fail and never penalize. +""" + +from __future__ import annotations + +import math +import re +from typing import Any, NamedTuple + +# Range table mirrored from app.validate's _check_range call sites, keyed by +# (category, field) -> (lo, hi). A parity smoke test asserts this stays in sync. +RANGES: dict[tuple[str, str], tuple[float, float]] = { + ("brand", "founded_year"): (1800, 2100), + ("soc", "process_nm"): (1.0, 100.0), + ("smartphone", "ram_gb"): (1, 64), + ("smartphone", "battery_mah"): (500, 12000), + ("smartphone", "weight_g"): (50, 500), + ("smartphone", "msrp_usd"): (50, 5000), + ("mobile", "ram_gb"): (0.016, 64), + ("mobile", "battery_mah"): (50, 20000), + ("mobile", "weight_g"): (10, 2000), + ("mobile", "msrp_usd"): (10, 10000), + ("gpu", "memory_gb"): (0.001, 512), + ("gpu", "tdp_w"): (1, 3000), + ("gpu", "msrp_usd"): (50, 100000), + ("cpu", "cores"): (1, 512), + ("cpu", "threads"): (1, 1024), + ("cpu", "msrp_usd"): (20, 50000), +} + +_RESOLUTION_RE = re.compile(r"(\d{2,5})\s*[x×]\s*(\d{2,5})") +_ANDROID_RE = re.compile(r"android\s*(\d{1,2})", re.IGNORECASE) + +# Earliest plausible release year for a given Android major version (release-vs-era). +_ANDROID_MIN_YEAR: dict[int, int] = { + 4: 2011, 5: 2014, 6: 2015, 7: 2016, 8: 2017, 9: 2018, + 10: 2019, 11: 2020, 12: 2021, 13: 2022, 14: 2023, 15: 2024, 16: 2025, +} + + +class Signal(NamedTuple): + name: str + result: str # "pass" | "fail" | "na" + hard: bool = False + + @property + def failed(self) -> bool: + return self.result == "fail" + + +def _num(value: Any) -> float | None: + return value if isinstance(value, (int, float)) and not isinstance(value, bool) else None + + +def _cmp_ge(name: str, a: Any, b: Any, *, hard: bool) -> Signal: + """``a >= b`` when both present, else NA.""" + x, y = _num(a), _num(b) + if x is None or y is None: + return Signal(name, "na", hard) + return Signal(name, "pass" if x >= y else "fail", hard) + + +def _year_of(value: Any) -> int | None: + if isinstance(value, str) and len(value) >= 4 and value[:4].isdigit(): + return int(value[:4]) + return None + + +def parse_resolution(value: Any) -> tuple[int, int] | None: + if not isinstance(value, str): + return None + m = _RESOLUTION_RE.search(value) + if not m: + return None + return int(m.group(1)), int(m.group(2)) + + +def _release_not_future(rec: dict[str, Any], now_year: int) -> Signal: + y = _year_of(rec.get("release_date")) + if y is None: + return Signal("release_not_future", "na", hard=True) + return Signal("release_not_future", "pass" if y <= now_year + 1 else "fail", hard=True) + + +# --- per-category predicate sets ------------------------------------------------- + + +def cpu_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + out = [ + _cmp_ge("threads_ge_cores", rec.get("threads"), rec.get("cores"), hard=True), + _cmp_ge("boost_ge_base", rec.get("boost_clock_ghz"), rec.get("base_clock_ghz"), hard=True), + _cmp_ge("max_tdp_ge_tdp", rec.get("max_tdp_w"), rec.get("tdp_w"), hard=False), + _cmp_ge("passmark_multi_ge_single", rec.get("passmark_cpu_mark"), rec.get("passmark_single"), hard=False), + _cmp_ge("cb23_multi_ge_single", rec.get("cinebench_r23_multi"), rec.get("cinebench_r23_single"), hard=False), + _cmp_ge("gb_multi_ge_single", rec.get("geekbench_multi"), rec.get("geekbench_single"), hard=False), + _release_not_future(rec, now_year), + ] + # p_cores + e_cores == cores (hybrid parts), only when both core splits given. + p, e, c = _num(rec.get("p_cores")), _num(rec.get("e_cores")), _num(rec.get("cores")) + if p is not None and e is not None and c is not None: + out.append(Signal("hybrid_core_sum", "pass" if p + e == c else "fail", hard=False)) + else: + out.append(Signal("hybrid_core_sum", "na", hard=False)) + return out + + +def gpu_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + out = [ + _cmp_ge("boost_ge_base", rec.get("boost_clock_mhz"), rec.get("base_clock_mhz"), hard=True), + _release_not_future(rec, now_year), + ] + # Vendor core field present: nvidia -> cuda_cores, amd/intel -> stream_processors. + mfr = str(rec.get("manufacturer") or "").lower() + if mfr == "nvidia": + has_core = _num(rec.get("cuda_cores")) is not None + elif mfr in {"amd", "intel"}: + has_core = _num(rec.get("stream_processors")) is not None + else: + has_core = _num(rec.get("cuda_cores")) is not None or _num(rec.get("stream_processors")) is not None + out.append(Signal("vendor_core_field", "pass" if has_core else "fail", hard=False)) + # RT / Tensor cores only plausible on post-2018 (Turing / RDNA2) parts. + y = _year_of(rec.get("release_date")) + rt = _num(rec.get("rt_cores")) + if rt is not None and rt > 0 and y is not None: + out.append(Signal("rt_cores_era", "pass" if y >= 2018 else "fail", hard=False)) + else: + out.append(Signal("rt_cores_era", "na", hard=False)) + return out + + +def _ppi_signal(display: dict[str, Any]) -> Signal: + size = _num(display.get("size_inch")) + ppi = _num(display.get("ppi")) + res = parse_resolution(display.get("resolution")) + if size is None or ppi is None or res is None or size <= 0: + return Signal("ppi_consistent", "na", hard=False) + w, h = res + computed = math.hypot(w, h) / size + return Signal("ppi_consistent", "pass" if abs(computed - ppi) <= 0.15 * ppi else "fail", hard=False) + + +def _storage_signal(rec: dict[str, Any]) -> Signal: + vals = rec.get("storage_options_gb") + if not isinstance(vals, list) or not vals: + return Signal("storage_sane", "na", hard=False) + nums = [v for v in vals if isinstance(v, int) and not isinstance(v, bool)] + if len(nums) != len(vals): + return Signal("storage_sane", "fail", hard=False) + ok = all(v >= 1 for v in nums) and len(set(nums)) == len(nums) and nums == sorted(nums) + return Signal("storage_sane", "pass" if ok else "fail", hard=False) + + +def _android_era_signal(rec: dict[str, Any]) -> Signal: + text = f"{rec.get('os') or ''} {rec.get('os_version') or ''}" + m = _ANDROID_RE.search(text) + y = _year_of(rec.get("release_date")) + if not m or y is None: + return Signal("os_era", "na", hard=False) + major = int(m.group(1)) + min_year = _ANDROID_MIN_YEAR.get(major) + if min_year is None: + return Signal("os_era", "na", hard=False) + return Signal("os_era", "pass" if y >= min_year else "fail", hard=False) + + +def mobile_signals( + rec: dict[str, Any], now_year: int, soc_release: dict[str, str] +) -> list[Signal]: + """Shared by smartphone / tablet / watch / pda.""" + raw_display = rec.get("display") + display: dict[str, Any] = raw_display if isinstance(raw_display, dict) else {} + out = [ + _ppi_signal(display), + _storage_signal(rec), + _android_era_signal(rec), + _release_not_future(rec, now_year), + ] + # ram_gb <= max(storage_options_gb) + ram = _num(rec.get("ram_gb")) + vals = rec.get("storage_options_gb") + if ram is not None and isinstance(vals, list) and vals: + nums = [v for v in vals if isinstance(v, (int, float)) and not isinstance(v, bool)] + if nums: + out.append(Signal("ram_le_storage", "pass" if ram <= max(nums) else "fail", hard=False)) + else: + out.append(Signal("ram_le_storage", "na", hard=False)) + else: + out.append(Signal("ram_le_storage", "na", hard=False)) + # SoC should not postdate the device it powers. SOFT, not hard: the dataset's + # SoC release_dates are largely placeholder "YYYY-01-01" values that skew late + # (e.g. Snapdragon 888 stored as 2022-01-01), so a mismatch usually means the + # *SoC* record's date is wrong, not the device. We flag + penalize but don't + # force-red the device on the strength of a second record's bad date. + soc = rec.get("soc") + dev_year = _year_of(rec.get("release_date")) + soc_year = _year_of(soc_release.get(soc)) if isinstance(soc, str) else None + if dev_year is not None and soc_year is not None: + out.append(Signal("soc_not_after_device", "pass" if soc_year <= dev_year else "fail", hard=False)) + else: + out.append(Signal("soc_not_after_device", "na", hard=False)) + return out + + +def soc_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + out = [_release_not_future(rec, now_year)] + # process_nm vs era: no sub-7nm before 2017, no sub-3nm before 2022 (coarse guard). + nm = _num(rec.get("process_nm")) + y = _year_of(rec.get("release_date")) + if nm is not None and y is not None: + too_advanced = (nm < 7 and y < 2017) or (nm < 3 and y < 2022) + out.append(Signal("process_nm_era", "fail" if too_advanced else "pass", hard=False)) + else: + out.append(Signal("process_nm_era", "na", hard=False)) + gpu_name = rec.get("gpu_name") + out.append( + Signal("gpu_name_present", "pass" if isinstance(gpu_name, str) and gpu_name.strip() else "fail", hard=False) + ) + return out + + +def brand_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: + fy = _num(rec.get("founded_year")) + if fy is None: + founded = Signal("founded_not_future", "na", hard=False) + else: + founded = Signal("founded_not_future", "pass" if fy <= now_year else "fail", hard=False) + return [founded] + + +def signals_for( + category: str, rec: dict[str, Any], now_year: int, soc_release: dict[str, str] +) -> list[Signal]: + if category == "cpu": + return cpu_signals(rec, now_year) + if category == "gpu": + return gpu_signals(rec, now_year) + if category == "soc": + return soc_signals(rec, now_year) + if category == "brand": + return brand_signals(rec, now_year) + if category in {"smartphone", "tablet", "watch", "pda"}: + return mobile_signals(rec, now_year, soc_release) + return [] From 46792a64e5ccbc0f34ea6f07108989cb7d96df78 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:37 +0900 Subject: [PATCH 12/54] feat(verify): migrate verification layer from TechAPI --- tests/verify/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/verify/__init__.py diff --git a/tests/verify/__init__.py b/tests/verify/__init__.py new file mode 100644 index 0000000..e69de29 From 047d3f74a43046b4e2a3fb7571c1121d4a9e0ec5 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:39 +0900 Subject: [PATCH 13/54] feat(verify): migrate verification layer from TechAPI --- tests/verify/test_http_check.py | 103 ++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 tests/verify/test_http_check.py diff --git a/tests/verify/test_http_check.py b/tests/verify/test_http_check.py new file mode 100644 index 0000000..de57ec6 --- /dev/null +++ b/tests/verify/test_http_check.py @@ -0,0 +1,103 @@ +"""Tier 1 liveness tests — fully offline via a fake opener.""" + +from app.verify import http_check +from app.verify.http_check import CheckResult + + +class FakeOpener: + """Maps url -> (status, final_url) or raises a urllib-style error with .code.""" + + def __init__(self, table): + self.table = table + self.calls = [] + + def open(self, url, method): + self.calls.append((url, method)) + val = self.table[url] + if isinstance(val, Exception): + raise val + return val + + +def _factory(table): + op = FakeOpener(table) + return lambda: op + + +def test_alive_200(): + table = {"https://en.wikipedia.org/wiki/X": (200, "https://en.wikipedia.org/wiki/X")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert res.alive and res.status == 200 + + +def test_dead_404(): + table = {"https://gsmarena.com/x-9999.php": (404, "https://gsmarena.com/x-9999.php")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert not res.alive and res.reason == "http-404" + + +def test_homepage_redirect_is_soft_dead(): + table = {"https://phonedb.net/index.php?m=device&id=123": (200, "https://phonedb.net/")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert not res.alive and res.reason == "homepage-redirect" + + +def test_head_rejected_falls_back_to_get(): + err = type("E", (Exception,), {"code": 405, "url": None})() + + class TwoStep: + def __init__(self): + self.n = 0 + + def open(self, url, method): + self.n += 1 + if method == "HEAD": + raise err + return (200, "https://x.com/deep/page") + + res = http_check.check_one("https://x.com/deep/page", TwoStep()) + assert res.alive and res.status == 200 + + +def test_connection_error_is_dead(): + table = {"https://nope.invalid/x": ConnectionError("no route")} + [res] = http_check.check_urls(list(table), opener_factory=_factory(table), min_interval=0) + assert not res.alive and res.reason == "error" + + +def test_dedupe_by_host_and_path(): + urls = [ + "https://www.kaggle.com/datasets/a", + "https://www.kaggle.com/datasets/a", # exact dup + "https://www.kaggle.com/datasets/b", + ] + assert len(http_check.dedupe_urls(urls)) == 2 + + +def test_cache_freshness(): + from datetime import datetime, timezone + now = datetime(2026, 6, 22, tzinfo=timezone.utc) + fresh = {"checked_at": "2026-06-20T00:00:00Z"} + stale = {"checked_at": "2026-01-01T00:00:00Z"} + assert http_check.is_fresh(fresh, now, ttl_days=30) + assert not http_check.is_fresh(stale, now, ttl_days=30) + + +def test_record_liveness(): + cache = { + "a": {"alive": True}, "b": {"alive": False}, "c": {"alive": True}, + } + assert http_check.record_liveness(["a", "b", "c", "missing"], cache) == (2, 1) + + +def test_cache_roundtrip(): + # tmp_path fixture is unreliable on this Windows runner; use a local scratch file. + from pathlib import Path + path = Path(__file__).parent / "_scratch_url_cache.jsonl" + try: + r = CheckResult("https://x.com/y", 200, "https://x.com/y", True, "http-200") + http_check.save_cache({r.url: http_check.result_to_entry(r, "2026-06-22T00:00:00Z")}, path) + loaded = http_check.load_cache(path) + assert loaded["https://x.com/y"]["alive"] is True + finally: + path.unlink(missing_ok=True) From 6924ddeb1329bd97af9d362a95f447047015ad0a Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:40 +0900 Subject: [PATCH 14/54] feat(verify): migrate verification layer from TechAPI --- tests/verify/test_offline.py | 65 ++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 tests/verify/test_offline.py diff --git a/tests/verify/test_offline.py b/tests/verify/test_offline.py new file mode 100644 index 0000000..f985e29 --- /dev/null +++ b/tests/verify/test_offline.py @@ -0,0 +1,65 @@ +"""Tier 0 scorer + host classification tests.""" + +from app.verify import hosts, offline +from app.verify.common import Record + +NOW = 2026 +NO_SOC: dict[str, str] = {} + + +def _score(category, data): + return offline.score_record(Record(category, f"{category}/x.json", data), NOW, NO_SOC) + + +def test_host_tiers(): + assert hosts.tier_of_host("en.wikipedia.org") == 1 + assert hosts.tier_of_host("ark.intel.com") == 1 # subdomain of intel.com + assert hosts.tier_of_host("gsmarena.com") == 2 + assert hosts.tier_of_host("www.kaggle.com") == 3 + assert hosts.tier_of_host("example.org") == 0 + assert hosts.best_tier(["https://kaggle.com/x", "https://en.wikipedia.org/y"]) == 1 + + +def test_complete_authoritative_cpu_is_green(): + rec = { + "slug": "core-i9-14900k", "cores": 24, "threads": 32, + "base_clock_ghz": 3.2, "boost_clock_ghz": 6.0, "l3_cache_mb": 36, + "socket": "LGA1700", "tdp_w": 125, "passmark_cpu_mark": 60000, + "architecture": "Raptor Lake", "release_date": "2023-10-17", + "source_urls": ["https://ark.intel.com/x", "https://en.wikipedia.org/wiki/x"], + } + s = _score("cpu", rec) + assert s.band == "green" + assert s.best_tier == 1 + + +def test_hard_violation_forces_red_despite_good_source(): + rec = { + "slug": "bad", "cores": 16, "threads": 8, # threads < cores -> hard + "base_clock_ghz": 3.0, "boost_clock_ghz": 4.0, "release_date": "2023-01-01", + "architecture": "x", "socket": "y", "tdp_w": 65, "l3_cache_mb": 8, + "passmark_cpu_mark": 20000, + "source_urls": ["https://en.wikipedia.org/wiki/x"], + } + s = _score("cpu", rec) + assert s.band == "red" + assert "!threads_ge_cores" in s.flags + + +def test_kaggle_only_sparse_is_not_green(): + rec = { + "slug": "sgh-x", "name": "SGH-X", "release_date": "2016-01-01", + "display": {"type": "Alphanumeric"}, + "source_urls": ["https://www.kaggle.com/datasets/msainani/gsmarena-mobile-devices"], + } + s = _score("smartphone", rec) + assert s.band != "green" # T3-only source can never auto-green + assert s.best_tier == 3 + + +def test_future_release_red(): + rec = { + "slug": "ghost", "cores": 8, "threads": 16, "release_date": "2099-01-01", + "source_urls": ["https://en.wikipedia.org/wiki/x"], + } + assert _score("cpu", rec).band == "red" From 50e01cd429b97e72410c2f2b542aaf6e81256717 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:42 +0900 Subject: [PATCH 15/54] feat(verify): migrate verification layer from TechAPI --- tests/verify/test_parity_and_golden.py | 56 ++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 tests/verify/test_parity_and_golden.py diff --git a/tests/verify/test_parity_and_golden.py b/tests/verify/test_parity_and_golden.py new file mode 100644 index 0000000..f3ba369 --- /dev/null +++ b/tests/verify/test_parity_and_golden.py @@ -0,0 +1,56 @@ +"""Guardrail tests: + +* RANGES parity — signals.RANGES must not drift from app.validate's bounds. +* Golden subset — the offline scorer, blind to the ``verified`` flag, should + reproduce the human-curated verified CPU set with high agreement. This is the + empirical justification for using the offline score to drive promotion. +""" + +import pytest + +from app.verify import offline, signals +from app.verify.common import foreign_key_sets, load_all + + +def test_ranges_parity_with_validator(): + """If app.validate's numeric bounds change, this test should force a sync. + + Mirrors the _check_range call sites in app/validate.py. Keep in lockstep. + """ + expected = { + ("brand", "founded_year"): (1800, 2100), + ("soc", "process_nm"): (1.0, 100.0), + ("smartphone", "ram_gb"): (1, 64), + ("smartphone", "battery_mah"): (500, 12000), + ("smartphone", "weight_g"): (50, 500), + ("smartphone", "msrp_usd"): (50, 5000), + ("mobile", "ram_gb"): (0.016, 64), + ("mobile", "battery_mah"): (50, 20000), + ("mobile", "weight_g"): (10, 2000), + ("mobile", "msrp_usd"): (10, 10000), + ("gpu", "memory_gb"): (0.001, 512), + ("gpu", "tdp_w"): (1, 3000), + ("gpu", "msrp_usd"): (50, 100000), + ("cpu", "cores"): (1, 512), + ("cpu", "threads"): (1, 1024), + ("cpu", "msrp_usd"): (20, 50000), + } + assert signals.RANGES == expected + + +@pytest.mark.slow +def test_verified_cpus_land_green(): + """≥95% of already-verified CPUs should score green under the offline tier.""" + records = load_all() + _, _, soc_release = foreign_key_sets(records) + now_year = offline.now_year_today() + + verified = [r for r in records["cpu"] if r.verified and r.slug] + if not verified: + pytest.skip("no verified CPUs in dataset") + green = sum( + 1 for r in verified + if offline.score_record(r, now_year, soc_release).band == "green" + ) + ratio = green / len(verified) + assert ratio >= 0.95, f"only {ratio:.1%} of verified CPUs scored green" From a209b52db9597a23d2ab904a9e17623740e72f6e Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:43 +0900 Subject: [PATCH 16/54] feat(verify): migrate verification layer from TechAPI --- tests/verify/test_promote_crossref.py | 167 ++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 tests/verify/test_promote_crossref.py diff --git a/tests/verify/test_promote_crossref.py b/tests/verify/test_promote_crossref.py new file mode 100644 index 0000000..4457dc1 --- /dev/null +++ b/tests/verify/test_promote_crossref.py @@ -0,0 +1,167 @@ +"""Tier 2/3 tests: exact-heading rule, surgical write-back, no-clobber, escalation.""" + +from pathlib import Path + +from app.verify import crossref, promote +from app.verify.crossref import Candidate + + +class FakeFetcher: + def __init__(self, candidates): + self._c = candidates + + def search(self, name): + return self._c + + +# --- exact-heading rule ---------------------------------------------------------- + + +def test_exact_heading_confirms(): + rec = {"slug": "iphone-xr", "name": "iPhone XR", "release_date": "2018-10-26"} + f = FakeFetcher([Candidate("iPhone XR", "https://en.wikipedia.org/wiki/IPhone_XR", 2018)]) + res = crossref.crossref_record(rec, f) + assert res.decision == crossref.CONFIRM and res.exact_heading + + +def test_near_miss_is_ambiguous_not_confirm(): + # A different SKU comes back; fuzzy match must NOT auto-confirm. + rec = {"slug": "iphone-xr", "name": "iPhone XR"} + f = FakeFetcher([Candidate("iPhone XS", "https://en.wikipedia.org/wiki/IPhone_XS")]) + res = crossref.crossref_record(rec, f) + assert res.decision == crossref.AMBIGUOUS and not res.exact_heading + + +def test_year_contradiction_blocks_confirm(): + rec = {"slug": "x", "name": "Widget 9000", "release_date": "2018-01-01"} + f = FakeFetcher([Candidate("Widget 9000", "http://x", 2010)]) + assert crossref.crossref_record(rec, f).decision == crossref.CONTRADICT + + +def test_no_candidates_is_notfound(): + rec = {"slug": "x", "name": "Obscure Thing"} + assert crossref.crossref_record(rec, FakeFetcher([])).decision == crossref.NOTFOUND + + +def test_exact_heading_without_year_is_ambiguous(): + # Name matches an authoritative entity but there's no year to verify specs. + rec = {"slug": "x", "name": "Widget 9000", "release_date": "2018-01-01"} + f = FakeFetcher([Candidate("Widget 9000", "http://x", None)]) + assert crossref.crossref_record(rec, f).decision == crossref.AMBIGUOUS + + +def test_model_suffix_matches_maker_prefixed_record(): + # Wikidata often labels without the maker prefix. + rec = {"slug": "x", "name": "AMD Ryzen 7 5800X", "release_date": "2020-11-05"} + f = FakeFetcher([Candidate("Ryzen 7 5800X", "http://x", 2020)]) + assert crossref.crossref_record(rec, f).decision == crossref.CONFIRM + + +def test_normalize_heading(): + assert crossref.normalize_heading("iPhone XR") == "iphonexr" + assert crossref.normalize_heading("Core i9-14900K") == "corei914900k" + + +# --- surgical write-back --------------------------------------------------------- + +SEED = ( + '{\n' + ' "slug": "demo",\n' + ' "name": "Demo",\n' + ' "storage_options_gb": [64, 128, 256],\n' + ' "verified": false,\n' + ' "source_urls": [\n' + ' "https://en.wikipedia.org/wiki/Demo"\n' + ' ]\n' + '}\n' +) + + +def test_flip_only_touches_verified_token(): + out = promote.flip_verified_text(SEED) + assert out is not None + # Exactly one line changed; inline array preserved verbatim. + assert '"verified": true,' in out + assert '"storage_options_gb": [64, 128, 256],' in out + diff = [(a, b) for a, b in zip(SEED.splitlines(), out.splitlines()) if a != b] + assert diff == [(' "verified": false,', ' "verified": true,')] + + +def test_flip_refuses_already_true(): + assert promote.flip_verified_text(SEED.replace("false", "true")) is None + + +def test_write_back_atomic_lf_preserved(): + path = Path(__file__).parent / "_scratch_seed.json" + try: + path.write_bytes(SEED.encode("utf-8")) + assert promote.write_verified_true(path) is True + raw = path.read_bytes() + assert b'"verified": true,' in raw + assert b"\r\n" not in raw # LF preserved on Windows + assert raw.endswith(b"}\n") + # idempotent guard: second call refuses (already true) + assert promote.write_verified_true(path) is False + finally: + path.unlink(missing_ok=True) + + +# --- promotion decision ---------------------------------------------------------- + + +def test_green_with_live_t1_promotes(): + cache = {"https://en.wikipedia.org/wiki/X": {"alive": True}} + d = promote.decide( + band="green", source_urls=["https://en.wikipedia.org/wiki/X"], + url_cache=cache, crossref_decision=None, + ) + assert d.promote and d.reason == "green+live-source" + + +def test_green_with_live_t2_promotes(): + # A reputable T2 spec/benchmark DB (cpubenchmark) that is alive also promotes. + cache = {"https://www.cpubenchmark.net/cpu.php?id=1": {"alive": True}} + d = promote.decide( + band="green", source_urls=["https://www.cpubenchmark.net/cpu.php?id=1"], + url_cache=cache, crossref_decision=None, + ) + assert d.promote and d.reason == "green+live-source" + + +def test_green_with_only_t3_source_held(): + # kaggle (T3) alive is NOT enough to promote even if green. + cache = {"https://www.kaggle.com/x": {"alive": True}} + d = promote.decide( + band="green", source_urls=["https://www.kaggle.com/x"], + url_cache=cache, crossref_decision=None, + ) + assert not d.promote + + +def test_green_without_live_source_blocked(): + d = promote.decide(band="green", source_urls=["https://en.wikipedia.org/wiki/X"], + url_cache={}, crossref_decision=None) + assert not d.promote + + +def test_yellow_with_crossref_confirm_promotes(): + d = promote.decide(band="yellow", source_urls=[], url_cache={}, crossref_decision="confirm") + assert d.promote and d.reason == "crossref-confirm" + + +def test_crossref_contradict_vetoes_even_green(): + # Reality veto: a green record with a live source is NOT promoted if an + # authoritative source contradicts its specs. + cache = {"https://en.wikipedia.org/wiki/X": {"alive": True}} + d = promote.decide( + band="green", source_urls=["https://en.wikipedia.org/wiki/X"], + url_cache=cache, crossref_decision="contradict", + ) + assert not d.promote and d.reason == "crossref-contradict" + + +def test_dead_t1_does_not_promote(): + cache = {"https://en.wikipedia.org/wiki/X": {"alive": False}} + d = promote.decide(band="green", source_urls=["https://en.wikipedia.org/wiki/X"], + url_cache=cache, crossref_decision=None) + assert not d.promote From 31bbf7e5740074e5e27f7f7a868b37931f525236 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:45 +0900 Subject: [PATCH 17/54] feat(verify): migrate verification layer from TechAPI --- tests/verify/test_signals.py | 88 ++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 tests/verify/test_signals.py diff --git a/tests/verify/test_signals.py b/tests/verify/test_signals.py new file mode 100644 index 0000000..5f74c1e --- /dev/null +++ b/tests/verify/test_signals.py @@ -0,0 +1,88 @@ +"""Unit tests for cross-field consistency predicates (app.verify.signals).""" + +from app.verify import signals + +NOW = 2026 +NO_SOC: dict[str, str] = {} + + +def _named(sigs, name): + return next(s for s in sigs if s.name == name) + + +def test_threads_below_cores_is_hard_fail(): + rec = {"cores": 8, "threads": 4, "release_date": "2020-01-01"} + s = _named(signals.cpu_signals(rec, NOW), "threads_ge_cores") + assert s.failed and s.hard + + +def test_threads_ge_cores_passes(): + rec = {"cores": 8, "threads": 16, "release_date": "2020-01-01"} + assert _named(signals.cpu_signals(rec, NOW), "threads_ge_cores").result == "pass" + + +def test_boost_below_base_is_hard_fail(): + rec = {"base_clock_ghz": 3.5, "boost_clock_ghz": 3.0, "cores": 4, "threads": 4} + s = _named(signals.cpu_signals(rec, NOW), "boost_ge_base") + assert s.failed and s.hard + + +def test_missing_inputs_are_na_not_fail(): + rec = {"cores": 4, "threads": 4} # no clocks + assert _named(signals.cpu_signals(rec, NOW), "boost_ge_base").result == "na" + + +def test_future_release_is_hard_fail(): + rec = {"cores": 1, "threads": 1, "release_date": "2099-01-01"} + s = _named(signals.cpu_signals(rec, NOW), "release_not_future") + assert s.failed and s.hard + + +def test_hybrid_core_sum(): + ok = {"cores": 8, "threads": 8, "p_cores": 4, "e_cores": 4} + bad = {"cores": 8, "threads": 8, "p_cores": 4, "e_cores": 2} + assert _named(signals.cpu_signals(ok, NOW), "hybrid_core_sum").result == "pass" + assert _named(signals.cpu_signals(bad, NOW), "hybrid_core_sum").result == "fail" + + +def test_gpu_boost_and_vendor_core(): + rec = { + "manufacturer": "nvidia", "base_clock_mhz": 1500, "boost_clock_mhz": 1800, + "cuda_cores": 4096, "release_date": "2022-01-01", + } + sigs = signals.gpu_signals(rec, NOW) + assert _named(sigs, "boost_ge_base").result == "pass" + assert _named(sigs, "vendor_core_field").result == "pass" + + +def test_gpu_rt_cores_before_turing_fail(): + rec = {"manufacturer": "nvidia", "rt_cores": 50, "release_date": "2015-01-01", + "cuda_cores": 2048} + assert _named(signals.gpu_signals(rec, NOW), "rt_cores_era").result == "fail" + + +def test_ppi_consistency(): + # 1792x828 over 6.1" -> ~326 ppi (matches iPhone XR). + good = {"display": {"size_inch": 6.1, "resolution": "1792x828", "ppi": 326}} + bad = {"display": {"size_inch": 6.1, "resolution": "1792x828", "ppi": 500}} + assert _named(signals.mobile_signals(good, NOW, NO_SOC), "ppi_consistent").result == "pass" + assert _named(signals.mobile_signals(bad, NOW, NO_SOC), "ppi_consistent").result == "fail" + + +def test_storage_must_be_sorted_positive_unique(): + good = {"storage_options_gb": [64, 128, 256]} + bad = {"storage_options_gb": [256, 64]} + assert _named(signals.mobile_signals(good, NOW, NO_SOC), "storage_sane").result == "pass" + assert _named(signals.mobile_signals(bad, NOW, NO_SOC), "storage_sane").result == "fail" + + +def test_soc_not_after_device_is_soft(): + rec = {"soc": "chip-x", "release_date": "2020-01-01"} + soc_release = {"chip-x": "2022-01-01"} + s = _named(signals.mobile_signals(rec, NOW, soc_release), "soc_not_after_device") + assert s.failed and not s.hard # flagged but never forces red + + +def test_soc_process_nm_era(): + rec = {"process_nm": 5.0, "release_date": "2010-01-01", "gpu_name": "x"} + assert _named(signals.soc_signals(rec, NOW), "process_nm_era").result == "fail" From e1a41467d38673e0407e3f47c72f54fc4b4b4f37 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:18:46 +0900 Subject: [PATCH 18/54] chore: omit verify CLI from coverage + register slow marker --- pyproject.toml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 40d6105..db06087 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,7 +58,17 @@ testpaths = ["tests"] addopts = "-q" asyncio_mode = "auto" pythonpath = ["."] +markers = [ + "slow: full-dataset scans (deselect with -m 'not slow')", +] [tool.coverage.run] source = ["app"] -omit = ["app/main.py"] +omit = [ + "app/main.py", + # verify CLI orchestration is integration-tested (score/pr smoke), not unit- + # tested line-by-line; the verify logic modules (signals/offline/promote/...) + # are unit-covered. + "app/verify/cli.py", + "app/verify/__main__.py", +] From b38fc7b9db88d07017387c4bb930f9c8abebd552 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:40 +0900 Subject: [PATCH 19/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From ea45c156070d5e214c2977819ffe383e3277643f Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:42 +0900 Subject: [PATCH 20/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From d447c20f639b9c015399afb6fc810fdaab2a8c92 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:44 +0900 Subject: [PATCH 21/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- app/verify/cli.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/app/verify/cli.py b/app/verify/cli.py index 85dd063..8523dcc 100644 --- a/app/verify/cli.py +++ b/app/verify/cli.py @@ -667,7 +667,8 @@ def build_parser() -> argparse.ArgumentParser: cu.add_argument("--max", type=int, default=500, help="number of frontier records to target") cu.add_argument("--workers", type=int, default=8, help="concurrent HTTP workers") cu.add_argument("--min-interval", type=float, default=1.0, help="seconds between hits per host") - cu.add_argument("--ttl-days", type=int, default=http_check.DEFAULT_TTL_DAYS, help="cache freshness") + cu.add_argument("--ttl-days", type=int, default=http_check.DEFAULT_TTL_DAYS, + help="cache freshness") cu.add_argument("--recheck", action="store_true", help="ignore cache freshness") cu.set_defaults(func=cmd_check_urls) @@ -680,7 +681,8 @@ def build_parser() -> argparse.ArgumentParser: pm = sub.add_parser("promote", help="Tier 3: hybrid escalation + verified write-back") pm.add_argument("--category", nargs="*", choices=CATEGORIES, help="limit to categories") pm.add_argument("--max", type=int, default=None, help="cap number promoted") - pm.add_argument("--apply", action="store_true", help="actually flip verified (default: dry-run)") + pm.add_argument("--apply", action="store_true", + help="actually flip verified (default: dry-run)") pm.set_defaults(func=cmd_promote) pr = sub.add_parser("pr", help="all-tiers (0-3) markdown report for a PR's changed records") From 5bead1040784fdb763b84523ee6a0155ba987d3d Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:45 +0900 Subject: [PATCH 22/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- app/verify/common.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/verify/common.py b/app/verify/common.py index 06afb09..01136a4 100644 --- a/app/verify/common.py +++ b/app/verify/common.py @@ -11,8 +11,9 @@ import hashlib import json import sys +from collections.abc import Iterable from pathlib import Path -from typing import Any, Iterable +from typing import Any from app.validate import DATA_DIR, _load From 91bae3d495471fd8eec3164748eb8e9a244ba358 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:47 +0900 Subject: [PATCH 23/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- app/verify/crossref.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/verify/crossref.py b/app/verify/crossref.py index adc8045..5731c34 100644 --- a/app/verify/crossref.py +++ b/app/verify/crossref.py @@ -161,7 +161,9 @@ def search(self, name: str) -> list[Candidate]: qid = h.get("id") label = h.get("label") or h.get("match", {}).get("text", "") year = _wikidata_claim_year(ent.get(qid, {})) if qid else None - out.append(Candidate(title=label, url=f"https://www.wikidata.org/wiki/{qid}", year=year)) + out.append( + Candidate(title=label, url=f"https://www.wikidata.org/wiki/{qid}", year=year) + ) return out From f94e561c61e6b1b9f1d19b83c0b0246283603c0b Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:48 +0900 Subject: [PATCH 24/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- app/verify/hosts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/verify/hosts.py b/app/verify/hosts.py index 00ed915..13102dc 100644 --- a/app/verify/hosts.py +++ b/app/verify/hosts.py @@ -9,7 +9,7 @@ from __future__ import annotations -from typing import Iterable +from collections.abc import Iterable from urllib.parse import urlparse # Tier 1 — primary/manufacturer + top reference encyclopaedias. A live T1 source From 627ab02221ee7989f1b5e19dce62f9f34c9f3c59 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:49 +0900 Subject: [PATCH 25/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- app/verify/http_check.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/verify/http_check.py b/app/verify/http_check.py index f22470d..c4b9503 100644 --- a/app/verify/http_check.py +++ b/app/verify/http_check.py @@ -12,9 +12,10 @@ import threading import time +from collections.abc import Callable, Iterable from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone -from typing import Any, Callable, Iterable, NamedTuple +from typing import Any, NamedTuple from urllib.parse import urlparse from urllib.request import Request, build_opener From 1a35f867b890a421af86956fa6fee28ed336daaa Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:51 +0900 Subject: [PATCH 26/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- app/verify/ledger.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/verify/ledger.py b/app/verify/ledger.py index 4f890bd..c7459d9 100644 --- a/app/verify/ledger.py +++ b/app/verify/ledger.py @@ -12,8 +12,9 @@ from __future__ import annotations import json +from collections.abc import Iterator from pathlib import Path -from typing import Any, Iterator +from typing import Any from .common import LEDGER_PATH, ensure_verify_dirs From 1395890a915c886f5be3a7b2116cad001a4e24ec Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:52 +0900 Subject: [PATCH 27/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From fcd5e7e62f81f9a2ec887e31aae63cb252675fbf Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:54 +0900 Subject: [PATCH 28/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From 339caf8ff8a9e619ee9eaadb043123ee75498618 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:55 +0900 Subject: [PATCH 29/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- app/verify/signals.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/app/verify/signals.py b/app/verify/signals.py index b7b140f..50bf370 100644 --- a/app/verify/signals.py +++ b/app/verify/signals.py @@ -101,9 +101,12 @@ def cpu_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: _cmp_ge("threads_ge_cores", rec.get("threads"), rec.get("cores"), hard=True), _cmp_ge("boost_ge_base", rec.get("boost_clock_ghz"), rec.get("base_clock_ghz"), hard=True), _cmp_ge("max_tdp_ge_tdp", rec.get("max_tdp_w"), rec.get("tdp_w"), hard=False), - _cmp_ge("passmark_multi_ge_single", rec.get("passmark_cpu_mark"), rec.get("passmark_single"), hard=False), - _cmp_ge("cb23_multi_ge_single", rec.get("cinebench_r23_multi"), rec.get("cinebench_r23_single"), hard=False), - _cmp_ge("gb_multi_ge_single", rec.get("geekbench_multi"), rec.get("geekbench_single"), hard=False), + _cmp_ge("passmark_multi_ge_single", rec.get("passmark_cpu_mark"), + rec.get("passmark_single"), hard=False), + _cmp_ge("cb23_multi_ge_single", rec.get("cinebench_r23_multi"), + rec.get("cinebench_r23_single"), hard=False), + _cmp_ge("gb_multi_ge_single", rec.get("geekbench_multi"), + rec.get("geekbench_single"), hard=False), _release_not_future(rec, now_year), ] # p_cores + e_cores == cores (hybrid parts), only when both core splits given. @@ -127,7 +130,10 @@ def gpu_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: elif mfr in {"amd", "intel"}: has_core = _num(rec.get("stream_processors")) is not None else: - has_core = _num(rec.get("cuda_cores")) is not None or _num(rec.get("stream_processors")) is not None + has_core = ( + _num(rec.get("cuda_cores")) is not None + or _num(rec.get("stream_processors")) is not None + ) out.append(Signal("vendor_core_field", "pass" if has_core else "fail", hard=False)) # RT / Tensor cores only plausible on post-2018 (Turing / RDNA2) parts. y = _year_of(rec.get("release_date")) @@ -147,7 +153,8 @@ def _ppi_signal(display: dict[str, Any]) -> Signal: return Signal("ppi_consistent", "na", hard=False) w, h = res computed = math.hypot(w, h) / size - return Signal("ppi_consistent", "pass" if abs(computed - ppi) <= 0.15 * ppi else "fail", hard=False) + ok = abs(computed - ppi) <= 0.15 * ppi + return Signal("ppi_consistent", "pass" if ok else "fail", hard=False) def _storage_signal(rec: dict[str, Any]) -> Signal: @@ -206,7 +213,8 @@ def mobile_signals( dev_year = _year_of(rec.get("release_date")) soc_year = _year_of(soc_release.get(soc)) if isinstance(soc, str) else None if dev_year is not None and soc_year is not None: - out.append(Signal("soc_not_after_device", "pass" if soc_year <= dev_year else "fail", hard=False)) + ok = soc_year <= dev_year + out.append(Signal("soc_not_after_device", "pass" if ok else "fail", hard=False)) else: out.append(Signal("soc_not_after_device", "na", hard=False)) return out @@ -224,7 +232,11 @@ def soc_signals(rec: dict[str, Any], now_year: int) -> list[Signal]: out.append(Signal("process_nm_era", "na", hard=False)) gpu_name = rec.get("gpu_name") out.append( - Signal("gpu_name_present", "pass" if isinstance(gpu_name, str) and gpu_name.strip() else "fail", hard=False) + Signal( + "gpu_name_present", + "pass" if isinstance(gpu_name, str) and gpu_name.strip() else "fail", + hard=False, + ) ) return out From e54a5161dcc5c445359de98db1d111300f3a5aea Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:57 +0900 Subject: [PATCH 30/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From 8db0562f1daf40a94a10b6151ab762558f5719f5 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:25:58 +0900 Subject: [PATCH 31/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From 82c6b5aaebb7f834d3fe1790528bb3bdc9270f6c Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:26:00 +0900 Subject: [PATCH 32/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From c53a3e7b254b6f3fed18b5626c8b56857300c964 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:26:01 +0900 Subject: [PATCH 33/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy --- tests/verify/test_promote_crossref.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/verify/test_promote_crossref.py b/tests/verify/test_promote_crossref.py index 4457dc1..7aa5865 100644 --- a/tests/verify/test_promote_crossref.py +++ b/tests/verify/test_promote_crossref.py @@ -83,7 +83,11 @@ def test_flip_only_touches_verified_token(): # Exactly one line changed; inline array preserved verbatim. assert '"verified": true,' in out assert '"storage_options_gb": [64, 128, 256],' in out - diff = [(a, b) for a, b in zip(SEED.splitlines(), out.splitlines()) if a != b] + diff = [ + (a, b) + for a, b in zip(SEED.splitlines(), out.splitlines(), strict=False) + if a != b + ] assert diff == [(' "verified": false,', ' "verified": true,')] From e61e4ff865a171ff2117c401579cbd7548b179af Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:26:03 +0900 Subject: [PATCH 34/54] style(verify): satisfy TechEngine ruff (UP/B/E501) + mypy From e9e0443c92aed60b7d051d03e21ae3c5c574d083 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:30:51 +0900 Subject: [PATCH 35/54] style(verify): UP017 datetime.UTC (py312) --- app/verify/cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/verify/cli.py b/app/verify/cli.py index 8523dcc..ada6581 100644 --- a/app/verify/cli.py +++ b/app/verify/cli.py @@ -15,7 +15,7 @@ import json import subprocess from collections import Counter, defaultdict -from datetime import datetime, timezone +from datetime import UTC, datetime from pathlib import Path from . import crossref, http_check, ledger, offline, promote @@ -35,7 +35,7 @@ def _now_iso() -> str: - return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") def _changed_data_slugs() -> set[str]: @@ -364,7 +364,7 @@ def cmd_check_urls(args: argparse.Namespace) -> int: targets = http_check.dedupe_urls(urls) cache = http_check.load_cache() - now = datetime.now(timezone.utc) + now = datetime.now(UTC) if args.recheck: todo = targets else: From 9b6d63c654e89675373583f326a3c2e87a940da1 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:30:52 +0900 Subject: [PATCH 36/54] style(verify): UP017 datetime.UTC (py312) --- app/verify/http_check.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/verify/http_check.py b/app/verify/http_check.py index c4b9503..c49f98e 100644 --- a/app/verify/http_check.py +++ b/app/verify/http_check.py @@ -14,7 +14,7 @@ import time from collections.abc import Callable, Iterable from concurrent.futures import ThreadPoolExecutor -from datetime import datetime, timezone +from datetime import UTC, datetime from typing import Any, NamedTuple from urllib.parse import urlparse from urllib.request import Request, build_opener @@ -188,7 +188,7 @@ def load_cache(path=URL_CACHE_PATH) -> dict[str, dict[str, Any]]: def _parse_ts(ts: str) -> datetime | None: try: - return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC) except Exception: return None From 729604f4dd2410e1fae49211594d16aca388ea44 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:30:53 +0900 Subject: [PATCH 37/54] style(verify): UP017 datetime.UTC (py312) --- tests/verify/test_http_check.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/verify/test_http_check.py b/tests/verify/test_http_check.py index de57ec6..a2852b5 100644 --- a/tests/verify/test_http_check.py +++ b/tests/verify/test_http_check.py @@ -1,5 +1,7 @@ """Tier 1 liveness tests — fully offline via a fake opener.""" +from datetime import UTC + from app.verify import http_check from app.verify.http_check import CheckResult @@ -75,8 +77,8 @@ def test_dedupe_by_host_and_path(): def test_cache_freshness(): - from datetime import datetime, timezone - now = datetime(2026, 6, 22, tzinfo=timezone.utc) + from datetime import datetime + now = datetime(2026, 6, 22, tzinfo=UTC) fresh = {"checked_at": "2026-06-20T00:00:00Z"} stale = {"checked_at": "2026-01-01T00:00:00Z"} assert http_check.is_fresh(fresh, now, ttl_days=30) From 560f6e3af08e5dcdcbf09cdc5820ededa69354c3 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:34 +0900 Subject: [PATCH 38/54] style(verify): strict mypy + ruff compliance for TechEngine From 633f64c3d85b89356ac376f4f8e67b7dc96e40c5 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:36 +0900 Subject: [PATCH 39/54] style(verify): strict mypy + ruff compliance for TechEngine From 6d4ff934063692f2d61c013583579b20e7ef7481 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:37 +0900 Subject: [PATCH 40/54] style(verify): strict mypy + ruff compliance for TechEngine --- app/verify/cli.py | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/app/verify/cli.py b/app/verify/cli.py index ada6581..c129e65 100644 --- a/app/verify/cli.py +++ b/app/verify/cli.py @@ -15,8 +15,12 @@ import json import subprocess from collections import Counter, defaultdict +from collections.abc import Iterator from datetime import UTC, datetime from pathlib import Path +from typing import Any + +from app.validate import DATA_DIR from . import crossref, http_check, ledger, offline, promote from .common import ( @@ -50,7 +54,6 @@ def _changed_data_slugs() -> set[str]: package lives in TechAPI (data alongside) or TechEngine (data in a separate TechAPI checkout pointed at by TECHAPI_DATA_DIR). """ - from .common import DATA_DIR try: out = subprocess.run( ["git", "diff", "--name-only", "origin/main", "HEAD", "--", "data/"], @@ -73,7 +76,7 @@ def _iter_selected( unverified_only: bool, changed: set[str] | None, limit: int | None, -): +) -> Iterator[Record]: count = 0 for cat in categories: for rec in records[cat]: @@ -101,8 +104,8 @@ def cmd_score(args: argparse.Namespace) -> int: write_cache = full_scope and not args.no_cache # category -> band -> count - hist: dict[str, Counter] = defaultdict(Counter) - hard_flags: Counter = Counter() + hist: dict[str, Counter[str]] = defaultdict(Counter) + hard_flags: Counter[str] = Counter() entries = [] scored = 0 @@ -133,7 +136,9 @@ def cmd_score(args: argparse.Namespace) -> int: return 0 -def _print_histogram(hist, scored, hard_flags, wrote_cache) -> None: +def _print_histogram( + hist: dict[str, Counter[str]], scored: int, hard_flags: Counter[str], wrote_cache: bool +) -> None: print(f"Tier 0 offline score — {scored} record(s)\n") header = f"{'category':<12} {'green':>8} {'yellow':>8} {'red':>8} {'total':>8}" print(header) @@ -185,7 +190,7 @@ def _band_bar(green: int, yellow: int, red: int, width: int = 12) -> str: return "🟩" * counts["🟩"] + "🟨" * counts["🟨"] + "🟥" * counts["🟥"] -def _print_markdown(hist, scored, hard_flags) -> None: +def _print_markdown(hist: dict[str, Counter[str]], scored: int, hard_flags: Counter[str]) -> None: """Readable PR-comment report: a Mermaid pie of the overall band split (GitHub renders it natively) + a per-category table with a proportional colored bar.""" if scored == 0: @@ -247,7 +252,7 @@ def cmd_status(args: argparse.Namespace) -> int: _, _, soc_release = foreign_key_sets(records) now_year = offline.now_year_today() - by_category: dict[str, dict] = {} + by_category: dict[str, dict[str, Any]] = {} tot = ver = g = y = r = 0 for cat in CATEGORIES: ct = cv = cg = cy = cr = 0 @@ -309,8 +314,8 @@ def cmd_report(args: argparse.Namespace) -> int: if not SCORES_PATH.exists(): print("no scores cache — run `python -m app.verify score` first") return 0 - hist: dict[str, Counter] = defaultdict(Counter) - hard_flags: Counter = Counter() + hist: dict[str, Counter[str]] = defaultdict(Counter) + hard_flags: Counter[str] = Counter() for entry in ledger.iter_entries(SCORES_PATH): cat = entry.get("category") t0 = entry.get("tier0", {}) @@ -324,7 +329,7 @@ def cmd_report(args: argparse.Namespace) -> int: _print_histogram(hist, scored, hard_flags, wrote_cache=False) # Promotion decisions live in the git-tracked ledger. - promoted: Counter = Counter() + promoted: Counter[str] = Counter() for (cat, _slug), entry in ledger.latest_by_key().items(): if entry.get("decision") == "promote": promoted[cat] += 1 @@ -335,7 +340,10 @@ def cmd_report(args: argparse.Namespace) -> int: return 0 -def _ranked_unverified(records, soc_release, now_year, categories): +def _ranked_unverified( + records: dict[str, list[Record]], soc_release: dict[str, str], now_year: int, + categories: tuple[str, ...], +) -> list[Record]: """Unverified records of the given categories, scored, highest-confidence first.""" scored = [] for cat in categories: @@ -394,7 +402,7 @@ def cmd_check_urls(args: argparse.Namespace) -> int: return 0 -def _summarize_cache(cache, targets) -> None: +def _summarize_cache(cache: dict[str, dict[str, Any]], targets: list[str]) -> None: from collections import Counter alive = sum(1 for u in targets if cache.get(u, {}).get("alive")) dead = sum(1 for u in targets if u in cache and not cache[u].get("alive")) @@ -438,7 +446,7 @@ def cmd_crossref(args: argparse.Namespace) -> int: "exact_heading": res.exact_heading, "matched_url": res.matched_url, }) if new_entries: - cache.update({(e["category"], e["slug"]): e for e in new_entries}) + cache.update({(str(e["category"]), str(e["slug"])): e for e in new_entries}) ledger.replace_all(list(cache.values()), promote.CROSSREF_CACHE_PATH) print(f"crossref: examined {len(targets)} record(s)") @@ -556,7 +564,7 @@ def cmd_pr(args: argparse.Namespace) -> int: urls = sorted({u for r, _ in scored for u in r.data.get("source_urls", []) if isinstance(u, str)}) ts = _now_iso() - url_cache: dict[str, dict] = {} + url_cache: dict[str, dict[str, Any]] = {} try: for res in http_check.check_urls(urls, min_interval=0.5): url_cache[res.url] = http_check.result_to_entry(res, ts) @@ -616,8 +624,8 @@ def cmd_pr(args: argparse.Namespace) -> int: print() # Full-dataset Tier 0 baseline (always). - hist: dict[str, Counter] = defaultdict(Counter) - hard_flags: Counter = Counter() + hist: dict[str, Counter[str]] = defaultdict(Counter) + hard_flags: Counter[str] = Counter() scored_n = 0 for cat in CATEGORIES: for rec in records[cat]: @@ -696,4 +704,5 @@ def main(argv: list[str] | None = None) -> int: configure_stdout() parser = build_parser() args = parser.parse_args(argv) - return args.func(args) + result: int = args.func(args) + return result From 2f7f0b684fbbc93343ffb3dcb2f190c98e35fd01 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:38 +0900 Subject: [PATCH 41/54] style(verify): strict mypy + ruff compliance for TechEngine From 31ab2e6392e221d595fd563173dfd76d2189ffbc Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:40 +0900 Subject: [PATCH 42/54] style(verify): strict mypy + ruff compliance for TechEngine --- app/verify/crossref.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/app/verify/crossref.py b/app/verify/crossref.py index 5731c34..b9a4142 100644 --- a/app/verify/crossref.py +++ b/app/verify/crossref.py @@ -109,7 +109,7 @@ def crossref_record( # --- concrete fetchers (network; not exercised by unit tests) -------------------- -def _wikidata_claim_year(entity: dict) -> int | None: +def _wikidata_claim_year(entity: dict[str, Any]) -> int | None: """First year from inception (P571) or publication date (P577) claims.""" claims = entity.get("claims", {}) for prop in ("P571", "P577"): @@ -136,10 +136,11 @@ def __init__(self, timeout: float = 10.0, limit: int = 5) -> None: self.timeout = timeout self.limit = limit - def _get(self, url: str) -> dict: + def _get(self, url: str) -> dict[str, Any]: req = Request(url, headers={"User-Agent": self.UA}) with urlopen(req, timeout=self.timeout) as resp: - return json.loads(resp.read().decode("utf-8")) + data: dict[str, Any] = json.loads(resp.read().decode("utf-8")) + return data def search(self, name: str) -> list[Candidate]: try: From 8787ab274cbfacbea8b7c7adcb6fd28e792dcc07 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:41 +0900 Subject: [PATCH 43/54] style(verify): strict mypy + ruff compliance for TechEngine From fc17108dc31c6249f6ca1730976c89563b920fbc Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:43 +0900 Subject: [PATCH 44/54] style(verify): strict mypy + ruff compliance for TechEngine --- app/verify/http_check.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/verify/http_check.py b/app/verify/http_check.py index c49f98e..d56b754 100644 --- a/app/verify/http_check.py +++ b/app/verify/http_check.py @@ -15,6 +15,7 @@ from collections.abc import Callable, Iterable from concurrent.futures import ThreadPoolExecutor from datetime import UTC, datetime +from pathlib import Path from typing import Any, NamedTuple from urllib.parse import urlparse from urllib.request import Request, build_opener @@ -182,7 +183,7 @@ def _task(url: str) -> CheckResult: # --- cache ----------------------------------------------------------------------- -def load_cache(path=URL_CACHE_PATH) -> dict[str, dict[str, Any]]: +def load_cache(path: Path = URL_CACHE_PATH) -> dict[str, dict[str, Any]]: return {e["url"]: e for e in ledger.iter_entries(path) if isinstance(e.get("url"), str)} @@ -200,7 +201,7 @@ def is_fresh(entry: dict[str, Any], now: datetime, ttl_days: int) -> bool: return (now - ts).days < ttl_days -def save_cache(cache: dict[str, dict[str, Any]], path=URL_CACHE_PATH) -> None: +def save_cache(cache: dict[str, dict[str, Any]], path: Path = URL_CACHE_PATH) -> None: ledger.replace_all(list(cache.values()), path) From 311b25a5466123993f508503aa1cab5ec0730aea Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:44 +0900 Subject: [PATCH 45/54] style(verify): strict mypy + ruff compliance for TechEngine From dee504b3e8fa32f7166f855470a8ae0bd6b895fb Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:45 +0900 Subject: [PATCH 46/54] style(verify): strict mypy + ruff compliance for TechEngine From 66f7d08c292229183ec5e2cbd5110d5a4d8e117b Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:47 +0900 Subject: [PATCH 47/54] style(verify): strict mypy + ruff compliance for TechEngine --- app/verify/promote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/verify/promote.py b/app/verify/promote.py index 2485f89..1448fea 100644 --- a/app/verify/promote.py +++ b/app/verify/promote.py @@ -99,7 +99,7 @@ def write_verified_true(abs_path: Path) -> bool: return True -def load_crossref_cache(path=CROSSREF_CACHE_PATH) -> dict[tuple[str, str], dict[str, Any]]: +def load_crossref_cache(path: Path = CROSSREF_CACHE_PATH) -> dict[tuple[str, str], dict[str, Any]]: from . import ledger out: dict[tuple[str, str], dict[str, Any]] = {} for e in ledger.iter_entries(path): From 588fdb92771734c8b5bd6d60cfa2bc3b65a36efc Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:48 +0900 Subject: [PATCH 48/54] style(verify): strict mypy + ruff compliance for TechEngine From 7494530376f87a660399cde7d0577865b70a8c65 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:50 +0900 Subject: [PATCH 49/54] style(verify): strict mypy + ruff compliance for TechEngine From 1bf56e8916d0e94c7c810cbebd2daa9054a05f6c Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:51 +0900 Subject: [PATCH 50/54] style(verify): strict mypy + ruff compliance for TechEngine From 2c7cb123aa36486a2f84812afc84db53be7b0a32 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:53 +0900 Subject: [PATCH 51/54] style(verify): strict mypy + ruff compliance for TechEngine From fb572a20c296cd4f0a3c8372d6393070df765f4e Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:54 +0900 Subject: [PATCH 52/54] style(verify): strict mypy + ruff compliance for TechEngine From 2a4a5c4316234e426a8274f17c5b4b88f2e92371 Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:39:55 +0900 Subject: [PATCH 53/54] style(verify): strict mypy + ruff compliance for TechEngine From 7b3a5f57098cf172d377f1a835c8231552cf937d Mon Sep 17 00:00:00 2001 From: Seungpyo Hong Date: Tue, 23 Jun 2026 14:49:23 +0900 Subject: [PATCH 54/54] test(mobile): don't hardcode verified=False (data-driven via verification) --- tests/integration/test_mobile_devices.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_mobile_devices.py b/tests/integration/test_mobile_devices.py index 46b0518..102ada3 100644 --- a/tests/integration/test_mobile_devices.py +++ b/tests/integration/test_mobile_devices.py @@ -30,7 +30,9 @@ def test_mobile_device_detail_includes_variant_fields(client: TestClient) -> Non assert body["brand"]["slug"] == "apple" assert body["variant"]["region"] == "global" assert body["variant"]["memory"] == {"ram_gb": 8, "storage_gb": 256} - assert body["verified"] is False + # `verified` is present and boolean; its value is data-driven (the verification + # layer may promote this record), so don't assert a fixed value here. + assert isinstance(body["verified"], bool) def test_mobile_device_filters(client: TestClient) -> None: