diff --git a/scripts/build_fixture_matrix.py b/scripts/build_fixture_matrix.py index 70c0069..d377e91 100644 --- a/scripts/build_fixture_matrix.py +++ b/scripts/build_fixture_matrix.py @@ -1172,6 +1172,77 @@ def _try_salvage_complete_shard( _AUTO_SUBSHARD_MAX_DEPTH = 2 +def _has_rootlevel_eligible_files( + root: str, extra_filename_filters: list, +) -> bool: + """True iff at least one file DIRECTLY at ``root`` (no subdir) passes + the same ingestion filters as :func:`_iter_ingestable_files`. + + Used by :func:`_decompose_oversized_root` (#213) to decide whether a + decomposed root needs a ``__root`` catch-all entry. A + non-recursive scan: subdir contents are the subshards' business. + """ + try: + entries = os.listdir(root) + except OSError: + return False + for fname in entries: + ext = os.path.splitext(fname)[1].lower() + if ext not in INGEST_EXTS: + continue + fpath = os.path.join(root, fname) + if not os.path.isfile(fpath): + continue + if any(f(fpath) for f in extra_filename_filters): + continue + try: + size = os.path.getsize(fpath) + except OSError: + continue + if size < MIN_FILE_SIZE or size > MAX_FILE_SIZE: + continue + return True + return False + + +def _catchall_indices(entries: list[tuple[str, str]]) -> set[int]: + """Indices of ``(slug, path)`` entries whose path is a strict ancestor + of another entry's path — i.e. the ``__root`` catch-alls emitted by + :func:`_decompose_oversized_root` (#213). + + Structural (label-agnostic) so a genuine subdir that happens to slug + to ``root`` is never misclassified: a non-decomposed subdir entry has + no other entry's path underneath it. + """ + norm = [os.path.normpath(p) for _slug, p in entries] + out: set[int] = set() + for i, pi in enumerate(norm): + prefix = pi + os.sep + for j, pj in enumerate(norm): + if i != j and pj.startswith(prefix): + out.add(i) + break + return out + + +def _rootfiles_only_skip_dirs(root: str, skip_dirs: set[str]) -> set[str]: + """Skip-set for a ``__root`` catch-all task (#213): the shared + ``skip_dirs`` plus every immediate subdirectory name of ``root``. + + With all level-1 dirs skipped, ``os.walk`` never descends, so the + catch-all shard ingests exactly the root-level files and cannot + duplicate the per-subdir shards. + """ + try: + subdirs = { + entry for entry in os.listdir(root) + if os.path.isdir(os.path.join(root, entry)) + } + except OSError: + subdirs = set() + return set(skip_dirs) | subdirs + + def _decompose_oversized_root( root: str, skip_dirs: set[str], @@ -1197,6 +1268,11 @@ def _decompose_oversized_root( stays flat at the main-DB level. See issue #147 for the design and diagnostic that motivated this. + When the decomposed root also has eligible files DIRECTLY at its top + level, a ``__root`` catch-all entry for those files is + appended (#213) — without it they matched no sub-shard prefix and + were silently never ingested. + A non-existent root returns ``[]`` — caller (typically :func:`build_profile_sharded`) tracks the missing entry in ``stats["missing_roots"]`` separately, same as the pre-#147 @@ -1240,6 +1316,26 @@ def _decompose_oversized_root( # carry the full path lineage in the main-DB shards table. for sub_slug, sub_path in sub_parts: parts.append((f"{parent_slug}__{sub_slug}", sub_path)) + # Root-level catch-all (#213): decomposing along subdirs must not + # drop files living DIRECTLY at the root (e.g. ``slack/-*.json`` + # with no channel subfolder — measured as 0-indexed-anywhere across + # all 100 shards of the v2 Onyx corpus). Emit a dedicated + # ``__root`` entry pointing at the root itself; + # ``build_profile_sharded`` marks it ``rootfiles_only`` so its walk + # skips every immediate subdir and ingests only the root-level + # files. Omitted when nothing eligible lives at the root, keeping + # the pre-#213 (#147) output unchanged for subdir-only roots. + if parts and _has_rootlevel_eligible_files(root, extra_filename_filters): + taken = {slug for slug, _path in parts} + catchall_slug = f"{parent_slug}__root" + n = 2 + while catchall_slug in taken: + # A genuine subdir named ``root`` already owns + # ``__root``; disambiguate deterministically + # instead of clobbering its label. + catchall_slug = f"{parent_slug}__root{n}" + n += 1 + parts.append((catchall_slug, root)) # Flat-layout fallback: no subdirs (or all skipped) → single shard. return parts or [(parent_slug, root)] @@ -1473,6 +1569,90 @@ def _shard_worker_entry(task: dict) -> dict: ) +def _assert_shard_coverage( + roots: list[str], + tasks: list[dict], + skip_dirs: set[str], + extra_filename_filters: list, +) -> None: + """Builder coverage assertion (#213): every eligible file under the + profile roots must fall under at least one task's root prefix. + + The slack-root gap shipped silently — 4/84 never-surfaced golds were + files no shard ever ingested. This turns the fall-through into a + build error: on violation, log ERROR with the orphan list and raise + ``RuntimeError``. Set ``HELIX_BFM_COVERAGE_CHECK=0`` to skip on huge + corpora (default ON). + + Cost: ONE walk of the roots (same pruning + eligibility filters as + ingest) with a per-directory coverage cache — a directory's verdict + is shared by all files in it, so the inner prefix loop runs once per + directory, not once per file, and per-file ``stat`` eligibility only + runs inside uncovered directories. No re-walk per task. + + A ``rootfiles_only`` task (the ``__root`` catch-all) covers only the + files directly at its root: counting its whole subtree as covered + would let a dropped-subdir bug slide through unnoticed. + """ + if not _env_flag("HELIX_BFM_COVERAGE_CHECK"): + log.info( + "shard coverage check skipped (HELIX_BFM_COVERAGE_CHECK=0)", + ) + return + prefixes = [ + (os.path.normpath(t["root"]), bool(t.get("rootfiles_only"))) + for t in tasks + ] + orphans: list[str] = [] + dir_covered: dict[str, bool] = {} + for root in roots: + if not os.path.exists(root): + continue + for dirpath, dirnames, filenames in os.walk(root): + dirnames[:] = [d for d in dirnames if d not in skip_dirs] + dpath = os.path.normpath(dirpath) + covered = dir_covered.get(dpath) + if covered is None: + covered = False + for troot, rootfiles_only in prefixes: + if rootfiles_only: + if dpath == troot: + covered = True + break + elif dpath == troot or dpath.startswith(troot + os.sep): + covered = True + break + dir_covered[dpath] = covered + if covered: + continue + for fname in filenames: + ext = os.path.splitext(fname)[1].lower() + if ext not in INGEST_EXTS: + continue + fpath = os.path.join(dirpath, fname) + if any(f(fpath) for f in extra_filename_filters): + continue + try: + size = os.path.getsize(fpath) + except OSError: + continue + if size < MIN_FILE_SIZE or size > MAX_FILE_SIZE: + continue + orphans.append(fpath) + if orphans: + preview = orphans[:20] + log.error( + "shard coverage gap (#213): %d eligible file(s) under the " + "profile roots map to no shard task and would be silently " + "skipped — first %d: %s", + len(orphans), len(preview), preview, + ) + raise RuntimeError( + f"shard coverage gap (#213): {len(orphans)} eligible file(s) " + f"map to no shard task; first orphans: {preview}" + ) + + def build_profile_sharded( name: str, profile_out_dir: str, @@ -1509,7 +1689,11 @@ def build_profile_sharded( decomposable subdirs falls back to single-shard (flat-layout fallback). The shard label nests under ``__`` so the slack root decomposes into e.g. ``slack__aditya_rao``, ``slack__eng_sre`` in - the main-DB ``shards`` table. + the main-DB ``shards`` table. Files living directly at a decomposed + root get a dedicated ``__root`` catch-all shard (#213), + and a post-construction coverage assertion verifies every eligible + file falls under some task's root (``HELIX_BFM_COVERAGE_CHECK=0`` + to skip). """ profile = PROFILES[name] if shard_file_workers <= 0: @@ -1589,9 +1773,10 @@ def build_profile_sharded( root, auto_subshard_threshold_bytes, auto_subshard_threshold_files, len(sub_entries), ) - for label, sub_root in sub_entries: + catchall_idx = _catchall_indices(sub_entries) + for i, (label, sub_root) in enumerate(sub_entries): shard_db = corpus_shard_db(sub_root, label, profile_out_dir) - tasks.append({ + task = { "label": label, "root": sub_root, "shard_db_path": str(shard_db), @@ -1601,13 +1786,34 @@ def build_profile_sharded( "shard_file_workers": shard_file_workers, "shard_file_chunksize": 4, "rebuild": rebuild, - }) + } + if i in catchall_idx: + # Root-level catch-all (#213): this shard owns only the + # files living directly at its root — every immediate + # subdir already has (or contains) its own shard task, + # so the walk skips them all. + task["rootfiles_only"] = True + task["skip_dirs"] = _rootfiles_only_skip_dirs( + sub_root, skip_dirs, + ) + tasks.append(task) if decomposed_count: log.info( "auto-subshard: decomposed %d of %d profile roots into %d total shards", decomposed_count, len(profile["roots"]), len(tasks), ) + # Builder coverage assertion (#213): every eligible file under the + # profile roots must fall under some task's root, or the build dies + # here instead of shipping a fixture with 0-indexed-anywhere docs. + try: + _assert_shard_coverage( + profile["roots"], tasks, skip_dirs, extra_filename_filters, + ) + except Exception: + main_conn.close() + raise + # Pre-ingest sizing — order shards largest-first so the long pole # gets the longest head start on the worker pool (issue #97 A.1). # When ``shard_workers <= 1`` the order doesn't affect wall-clock, @@ -1615,8 +1821,11 @@ def build_profile_sharded( if sort_largest_first and tasks: sizing_t0 = time.perf_counter() for task in tasks: + # Per-task skip set (#213): a ``__root`` catch-all walks only + # its root-level files, so size it the same way. files, bytes_ = _estimate_eligible_bytes( - task["root"], skip_dirs, extra_filename_filters, + task["root"], task["skip_dirs"], + task["extra_filename_filters"], ) task["eligible_files"] = files task["eligible_bytes"] = bytes_ diff --git a/tests/test_sharder_rootfile_coverage.py b/tests/test_sharder_rootfile_coverage.py new file mode 100644 index 0000000..aaf1540 --- /dev/null +++ b/tests/test_sharder_rootfile_coverage.py @@ -0,0 +1,352 @@ +"""Sharder catch-all for root-level files + builder coverage assertion (#213). + +Measured by an external collaborator on the 100-shard v2 Onyx corpus +(cc-exchange embedding-upgrade turn 0024, L1a): four ``slack/-*.json`` +files living DIRECTLY at the slack source root (no channel subfolder) were +never ingested into ANY shard — 4/84 never-surfaced golds missing, 0 +slack-root docs indexed anywhere, a +3.2pp ceiling @200_lt on his 125-query +set. Structural cause: ``_decompose_oversized_root`` (the #147 +auto-subshard) splits an oversized root along its top-level subdirectories +and silently DROPS files that live directly at the root — they match no +sub-shard prefix. + +Tests pin: + +- the failing-on-master repro: root-level files map to a shard after the + fix (a dedicated ``__root`` catch-all entry) +- decompose-with-no-subdirs (flat root) output unchanged +- decompose with subdirs but no root-level files: no catch-all emitted + (#147 output unchanged) +- ``__root`` shard naming/slug is stable + deterministic, with a + collision guard for a genuine subdir named ``root`` +- the catch-all task's walk picks up ONLY root-level files (no + duplication of subdir shards) +- the builder coverage assertion: passes on a complete mapping, raises + with the orphan list on a synthetic gap, and honours the + ``HELIX_BFM_COVERAGE_CHECK=0`` kill-switch +- ``build_profile_sharded`` wires the catch-all task end-to-end +""" +from __future__ import annotations + +import logging +import os +import sys +from pathlib import Path + +import pytest + +# Make scripts/ importable. +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "scripts")) + +import build_fixture_matrix as bfm + + +# --- helpers ------------------------------------------------------------ + +# MIN_FILE_SIZE is 50 bytes; keep synthetic files comfortably eligible. +_BODY = b"x" * 256 + + +def _make_slack_like_root(tmp_path: Path) -> Path: + """Mirror the measured corpus shape: a ``slack`` root with two channel + subfolders plus two ``-*.json`` files DIRECTLY at the root.""" + root = tmp_path / "slack" + for chan in ("eng-sre", "aditya-rao"): + d = root / chan + d.mkdir(parents=True) + for i in range(30): + (d / f"17183{i:03d}-msg.json").write_bytes(_BODY) + # The orphans from issue #213: channel-less root-level exports. + (root / "1718323200-export.json").write_bytes(_BODY) + (root / "1718409600-export.json").write_bytes(_BODY) + return root + + +def _eligible_files_under(root: Path) -> list[str]: + """Every file in the synthetic trees is eligible (ext + size).""" + return [ + os.path.join(dirpath, f) + for dirpath, _dirs, files in os.walk(root) + for f in files + ] + + +def _covered_by_entries(fpath: str, entries: list[tuple[str, str]]) -> bool: + """True iff ``fpath`` falls under at least one decomposed shard root.""" + f = os.path.normpath(fpath) + for _slug, p in entries: + p = os.path.normpath(p) + if f == p or f.startswith(p + os.sep): + return True + return False + + +def _tasks_from_entries( + entries: list[tuple[str, str]], skip_dirs: set[str], +) -> list[dict]: + """Minimal task dicts the way ``build_profile_sharded`` constructs + them: catch-all entries (a path that is a strict ancestor of another + entry's path) are marked ``rootfiles_only`` with an extended + skip-set so their walk stays at the top level.""" + catchalls = bfm._catchall_indices(entries) + tasks = [] + for i, (label, root) in enumerate(entries): + task = {"label": label, "root": root, "skip_dirs": skip_dirs} + if i in catchalls: + task["rootfiles_only"] = True + task["skip_dirs"] = bfm._rootfiles_only_skip_dirs(root, skip_dirs) + tasks.append(task) + return tasks + + +_LOW_THRESHOLDS = dict(threshold_bytes=10_000, threshold_files=10) + + +# --- the #213 repro (FAILS on master) ----------------------------------- + + +def test_root_level_files_map_to_a_shard_after_decompose(tmp_path): + """THE bug: decompose an oversized root that has both subdirs and + root-level files; every eligible file must map to >= 1 shard entry. + + On master the decomposition returns only the per-subdir entries, so + the two slack-root ``-*.json`` files match no shard prefix — + this assertion fails there (0 slack-root docs indexed anywhere). + """ + root = _make_slack_like_root(tmp_path) + + entries = bfm._decompose_oversized_root( + str(root), skip_dirs=set(), extra_filename_filters=[], + **_LOW_THRESHOLDS, + ) + + orphans = [ + f for f in _eligible_files_under(root) + if not _covered_by_entries(f, entries) + ] + assert not orphans, ( + f"{len(orphans)} eligible file(s) fell through the decomposition " + f"and map to no shard: {orphans}" + ) + + # The catch-all self-identifies: a dedicated ``__root`` + # entry whose path is the root itself. + by_slug = dict(entries) + assert "slack__root" in by_slug, f"expected slack__root in {sorted(by_slug)}" + assert by_slug["slack__root"] == str(root) + # Channel subshards are still present and unchanged. + assert by_slug["slack__eng-sre"] == str(root / "eng-sre") + assert by_slug["slack__aditya-rao"] == str(root / "aditya-rao") + + +# --- no-regression pins on #147 behaviour -------------------------------- + + +def test_decompose_flat_root_unchanged(tmp_path): + """An oversized root with NO subdirs still falls back to a single + ``(slug, root)`` shard — no ``__root`` entry is invented for it.""" + root = tmp_path / "flatdump" + root.mkdir() + for i in range(40): + (root / f"loose_{i:03d}.txt").write_bytes(_BODY) + + entries = bfm._decompose_oversized_root( + str(root), skip_dirs=set(), extra_filename_filters=[], + **_LOW_THRESHOLDS, + ) + assert entries == [("flatdump", str(root))] + + +def test_decompose_without_rootlevel_files_emits_no_catchall(tmp_path): + """Subdirs only, nothing directly at the root: the #147 output is + byte-for-byte what it was before — no ``__root`` entry.""" + root = tmp_path / "bigproject" + for sub in ("alpha", "beta"): + d = root / sub + d.mkdir(parents=True) + for i in range(30): + (d / f"doc_{i:03d}.txt").write_bytes(_BODY) + + entries = bfm._decompose_oversized_root( + str(root), skip_dirs=set(), extra_filename_filters=[], + **_LOW_THRESHOLDS, + ) + assert sorted(s for s, _ in entries) == [ + "bigproject__alpha", "bigproject__beta", + ] + + +# --- __root naming stability --------------------------------------------- + + +def test_root_catchall_slug_naming_stable(tmp_path): + """The catch-all slug is ``__root``, deterministic across + calls, and collision-guarded against a genuine subdir named ``root``.""" + root = tmp_path / "Slack Export-v2" + (root / "general").mkdir(parents=True) + for i in range(30): + (root / "general" / f"m_{i:03d}.json").write_bytes(_BODY) + (root / "1718323200-export.json").write_bytes(_BODY) + + kwargs = dict(skip_dirs=set(), extra_filename_filters=[], **_LOW_THRESHOLDS) + first = bfm._decompose_oversized_root(str(root), **kwargs) + second = bfm._decompose_oversized_root(str(root), **kwargs) + assert first == second, "decomposition must be deterministic" + by_slug = dict(first) + assert by_slug["slack-export-v2__root"] == str(root) + + # Collision guard: a genuine subdir literally named ``root`` already + # owns ``__root``; the catch-all must take a distinct, + # deterministic slug instead of clobbering it. + (root / "root").mkdir() + for i in range(30): + (root / "root" / f"r_{i:03d}.json").write_bytes(_BODY) + entries = bfm._decompose_oversized_root(str(root), **kwargs) + slugs = [s for s, _ in entries] + assert len(slugs) == len(set(slugs)), f"duplicate shard labels: {slugs}" + by_slug = dict(entries) + assert by_slug["slack-export-v2__root"] == str(root / "root") + catchall = [ + s for s, p in entries + if p == str(root) and s.startswith("slack-export-v2__root") + ] + assert catchall, f"catch-all lost in collision case: {entries}" + + +# --- the catch-all task walks ONLY root-level files ----------------------- + + +def test_rootfiles_only_walk_excludes_subdirs(tmp_path): + """The ``__root`` shard must not re-ingest the subdir shards' files: + its task walks the root with every immediate subdir skipped.""" + root = _make_slack_like_root(tmp_path) + + task_skip = bfm._rootfiles_only_skip_dirs(str(root), {"node_modules"}) + assert {"eng-sre", "aditya-rao", "node_modules"} <= task_skip + + stats = {"missing_roots": [], "skipped": 0} + files = bfm._iter_ingestable_files( + [str(root)], task_skip, [], stats, + ) + names = sorted(os.path.basename(f) for f, _ext in files) + assert names == ["1718323200-export.json", "1718409600-export.json"] + + +# --- builder coverage assertion (#213) ------------------------------------ + + +def test_coverage_check_passes_on_complete_mapping(tmp_path, monkeypatch): + """A decomposition that covers every eligible file (post-fix shape, + including the rootfiles-only catch-all) passes silently.""" + monkeypatch.delenv("HELIX_BFM_COVERAGE_CHECK", raising=False) + root = _make_slack_like_root(tmp_path) + entries = bfm._decompose_oversized_root( + str(root), skip_dirs=set(), extra_filename_filters=[], + **_LOW_THRESHOLDS, + ) + tasks = _tasks_from_entries(entries, set()) + # Must not raise. + bfm._assert_shard_coverage([str(root)], tasks, set(), []) + + +def test_coverage_check_raises_with_orphan_list(tmp_path, caplog): + """A synthetic gap (the master bug shape: subdir tasks only) raises + and logs ERROR with the orphan list.""" + root = _make_slack_like_root(tmp_path) + tasks = [ + {"label": "slack__eng-sre", "root": str(root / "eng-sre"), + "skip_dirs": set()}, + {"label": "slack__aditya-rao", "root": str(root / "aditya-rao"), + "skip_dirs": set()}, + ] + with caplog.at_level(logging.ERROR, logger="build_fixture_matrix"): + with pytest.raises(RuntimeError, match=r"coverage gap.*2 eligible"): + bfm._assert_shard_coverage([str(root)], tasks, set(), []) + assert any( + "1718323200-export.json" in rec.getMessage() + and rec.levelno == logging.ERROR + for rec in caplog.records + ), "ERROR log must carry the orphan list" + + +def test_coverage_check_env_kill_switch(tmp_path, monkeypatch): + """``HELIX_BFM_COVERAGE_CHECK=0`` skips the check (speed valve for + huge corpora) — the same synthetic gap no longer raises.""" + root = _make_slack_like_root(tmp_path) + tasks = [ + {"label": "slack__eng-sre", "root": str(root / "eng-sre"), + "skip_dirs": set()}, + ] + monkeypatch.setenv("HELIX_BFM_COVERAGE_CHECK", "0") + bfm._assert_shard_coverage([str(root)], tasks, set(), []) # no raise + + +def test_coverage_check_default_on(tmp_path, monkeypatch): + """Unset env means the check runs (default ON).""" + monkeypatch.delenv("HELIX_BFM_COVERAGE_CHECK", raising=False) + root = _make_slack_like_root(tmp_path) + with pytest.raises(RuntimeError): + bfm._assert_shard_coverage([str(root)], [], set(), []) + + +# --- build_profile_sharded wiring ------------------------------------------ + + +def test_build_profile_sharded_wires_catchall_task(tmp_path, monkeypatch): + """End-to-end through the builder's task construction: the catch-all + becomes a real task (rootfiles_only + extended skip set), the sizing + pass counts only its root-level files, and the coverage assertion + passes on the resulting task list.""" + monkeypatch.delenv("HELIX_BFM_COVERAGE_CHECK", raising=False) + root = _make_slack_like_root(tmp_path) + out_dir = tmp_path / "out" + + monkeypatch.setitem(bfm.PROFILES, "tiny213", { + "label": "issue-213 repro profile", + "active_roots": 1, + "roots": [str(root)], + "extra_skip_dirs": set(), + "extra_filename_filters": [], + }) + # Force decomposition of the small synthetic root. + monkeypatch.setattr(bfm, "DEFAULT_AUTO_SUBSHARD_THRESHOLD_BYTES", 10_000) + monkeypatch.setattr(bfm, "DEFAULT_AUTO_SUBSHARD_THRESHOLD_FILES", 10) + + seen_tasks: list[dict] = [] + + def _stub_worker(task: dict) -> dict: + seen_tasks.append(task) + return { + "label": task["label"], "root": task["root"], + "shard_db_path": task["shard_db_path"], "gene_count": 0, + "byte_size": 0, "elapsed_s": 0.0, "files": 0, "genes": 0, + "skipped": 0, "errors": 0, "missing_roots": [], + "fingerprint_payload": [], "source_index_payload": [], + "dense_coverage": 0.0, "dense_genes_populated": 0, + "paused": False, + } + + monkeypatch.setattr(bfm, "_shard_worker_entry", _stub_worker) + + totals = bfm.build_profile_sharded( + "tiny213", str(out_dir), shard_workers=1, shard_file_workers=1, + auto_subshard_threshold_bytes=10_000, + auto_subshard_threshold_files=10, + ) + + by_label = {t["label"]: t for t in seen_tasks} + assert set(by_label) == { + "slack__eng-sre", "slack__aditya-rao", "slack__root", + }, f"unexpected task labels: {sorted(by_label)}" + + catchall = by_label["slack__root"] + assert catchall["root"] == str(root) + assert catchall.get("rootfiles_only") is True + assert {"eng-sre", "aditya-rao"} <= catchall["skip_dirs"] + # Sizing must respect the restricted walk: 2 root-level files only. + assert catchall["eligible_files"] == 2 + # Subdir tasks keep the shared skip set (no rootfiles_only flag). + assert by_label["slack__eng-sre"].get("rootfiles_only") is None + assert by_label["slack__eng-sre"]["eligible_files"] == 30 + + assert totals["shard_count"] == 3