Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 214 additions & 5 deletions scripts/build_fixture_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,77 @@ def _try_salvage_complete_shard(
_AUTO_SUBSHARD_MAX_DEPTH = 2


def _has_rootlevel_eligible_files(
root: str, extra_filename_filters: list,
) -> bool:
"""True iff at least one file DIRECTLY at ``root`` (no subdir) passes
the same ingestion filters as :func:`_iter_ingestable_files`.

Used by :func:`_decompose_oversized_root` (#213) to decide whether a
decomposed root needs a ``<root-slug>__root`` catch-all entry. A
non-recursive scan: subdir contents are the subshards' business.
"""
try:
entries = os.listdir(root)
except OSError:
return False
for fname in entries:
ext = os.path.splitext(fname)[1].lower()
if ext not in INGEST_EXTS:
continue
fpath = os.path.join(root, fname)
if not os.path.isfile(fpath):
continue
if any(f(fpath) for f in extra_filename_filters):
continue
try:
size = os.path.getsize(fpath)
except OSError:
continue
if size < MIN_FILE_SIZE or size > MAX_FILE_SIZE:
continue
return True
return False


def _catchall_indices(entries: list[tuple[str, str]]) -> set[int]:
"""Indices of ``(slug, path)`` entries whose path is a strict ancestor
of another entry's path — i.e. the ``__root`` catch-alls emitted by
:func:`_decompose_oversized_root` (#213).

Structural (label-agnostic) so a genuine subdir that happens to slug
to ``root`` is never misclassified: a non-decomposed subdir entry has
no other entry's path underneath it.
"""
norm = [os.path.normpath(p) for _slug, p in entries]
out: set[int] = set()
for i, pi in enumerate(norm):
prefix = pi + os.sep
for j, pj in enumerate(norm):
if i != j and pj.startswith(prefix):
out.add(i)
break
return out


def _rootfiles_only_skip_dirs(root: str, skip_dirs: set[str]) -> set[str]:
"""Skip-set for a ``__root`` catch-all task (#213): the shared
``skip_dirs`` plus every immediate subdirectory name of ``root``.

With all level-1 dirs skipped, ``os.walk`` never descends, so the
catch-all shard ingests exactly the root-level files and cannot
duplicate the per-subdir shards.
"""
try:
subdirs = {
entry for entry in os.listdir(root)
if os.path.isdir(os.path.join(root, entry))
}
except OSError:
subdirs = set()
return set(skip_dirs) | subdirs


def _decompose_oversized_root(
root: str,
skip_dirs: set[str],
Expand All @@ -1197,6 +1268,11 @@ def _decompose_oversized_root(
stays flat at the main-DB level. See issue #147 for the design and
diagnostic that motivated this.

When the decomposed root also has eligible files DIRECTLY at its top
level, a ``<root-slug>__root`` catch-all entry for those files is
appended (#213) — without it they matched no sub-shard prefix and
were silently never ingested.

A non-existent root returns ``[]`` — caller (typically
:func:`build_profile_sharded`) tracks the missing entry in
``stats["missing_roots"]`` separately, same as the pre-#147
Expand Down Expand Up @@ -1240,6 +1316,26 @@ def _decompose_oversized_root(
# carry the full path lineage in the main-DB shards table.
for sub_slug, sub_path in sub_parts:
parts.append((f"{parent_slug}__{sub_slug}", sub_path))
# Root-level catch-all (#213): decomposing along subdirs must not
# drop files living DIRECTLY at the root (e.g. ``slack/<epoch>-*.json``
# with no channel subfolder — measured as 0-indexed-anywhere across
# all 100 shards of the v2 Onyx corpus). Emit a dedicated
# ``<root-slug>__root`` entry pointing at the root itself;
# ``build_profile_sharded`` marks it ``rootfiles_only`` so its walk
# skips every immediate subdir and ingests only the root-level
# files. Omitted when nothing eligible lives at the root, keeping
# the pre-#213 (#147) output unchanged for subdir-only roots.
if parts and _has_rootlevel_eligible_files(root, extra_filename_filters):
taken = {slug for slug, _path in parts}
catchall_slug = f"{parent_slug}__root"
n = 2
while catchall_slug in taken:
# A genuine subdir named ``root`` already owns
# ``<parent>__root``; disambiguate deterministically
# instead of clobbering its label.
catchall_slug = f"{parent_slug}__root{n}"
n += 1
parts.append((catchall_slug, root))
# Flat-layout fallback: no subdirs (or all skipped) → single shard.
return parts or [(parent_slug, root)]

Expand Down Expand Up @@ -1473,6 +1569,90 @@ def _shard_worker_entry(task: dict) -> dict:
)


def _assert_shard_coverage(
roots: list[str],
tasks: list[dict],
skip_dirs: set[str],
extra_filename_filters: list,
) -> None:
"""Builder coverage assertion (#213): every eligible file under the
profile roots must fall under at least one task's root prefix.

The slack-root gap shipped silently — 4/84 never-surfaced golds were
files no shard ever ingested. This turns the fall-through into a
build error: on violation, log ERROR with the orphan list and raise
``RuntimeError``. Set ``HELIX_BFM_COVERAGE_CHECK=0`` to skip on huge
corpora (default ON).

Cost: ONE walk of the roots (same pruning + eligibility filters as
ingest) with a per-directory coverage cache — a directory's verdict
is shared by all files in it, so the inner prefix loop runs once per
directory, not once per file, and per-file ``stat`` eligibility only
runs inside uncovered directories. No re-walk per task.

A ``rootfiles_only`` task (the ``__root`` catch-all) covers only the
files directly at its root: counting its whole subtree as covered
would let a dropped-subdir bug slide through unnoticed.
"""
if not _env_flag("HELIX_BFM_COVERAGE_CHECK"):
log.info(
"shard coverage check skipped (HELIX_BFM_COVERAGE_CHECK=0)",
)
return
prefixes = [
(os.path.normpath(t["root"]), bool(t.get("rootfiles_only")))
for t in tasks
]
orphans: list[str] = []
dir_covered: dict[str, bool] = {}
for root in roots:
if not os.path.exists(root):
continue
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in skip_dirs]
dpath = os.path.normpath(dirpath)
covered = dir_covered.get(dpath)
if covered is None:
covered = False
for troot, rootfiles_only in prefixes:
if rootfiles_only:
if dpath == troot:
covered = True
break
elif dpath == troot or dpath.startswith(troot + os.sep):
covered = True
break
dir_covered[dpath] = covered
if covered:
continue
for fname in filenames:
ext = os.path.splitext(fname)[1].lower()
if ext not in INGEST_EXTS:
continue
fpath = os.path.join(dirpath, fname)
if any(f(fpath) for f in extra_filename_filters):
continue
try:
size = os.path.getsize(fpath)
except OSError:
continue
if size < MIN_FILE_SIZE or size > MAX_FILE_SIZE:
continue
orphans.append(fpath)
if orphans:
preview = orphans[:20]
log.error(
"shard coverage gap (#213): %d eligible file(s) under the "
"profile roots map to no shard task and would be silently "
"skipped — first %d: %s",
len(orphans), len(preview), preview,
)
raise RuntimeError(
f"shard coverage gap (#213): {len(orphans)} eligible file(s) "
f"map to no shard task; first orphans: {preview}"
)


def build_profile_sharded(
name: str,
profile_out_dir: str,
Expand Down Expand Up @@ -1509,7 +1689,11 @@ def build_profile_sharded(
decomposable subdirs falls back to single-shard (flat-layout
fallback). The shard label nests under ``__`` so the slack root
decomposes into e.g. ``slack__aditya_rao``, ``slack__eng_sre`` in
the main-DB ``shards`` table.
the main-DB ``shards`` table. Files living directly at a decomposed
root get a dedicated ``<root-slug>__root`` catch-all shard (#213),
and a post-construction coverage assertion verifies every eligible
file falls under some task's root (``HELIX_BFM_COVERAGE_CHECK=0``
to skip).
"""
profile = PROFILES[name]
if shard_file_workers <= 0:
Expand Down Expand Up @@ -1589,9 +1773,10 @@ def build_profile_sharded(
root, auto_subshard_threshold_bytes,
auto_subshard_threshold_files, len(sub_entries),
)
for label, sub_root in sub_entries:
catchall_idx = _catchall_indices(sub_entries)
for i, (label, sub_root) in enumerate(sub_entries):
shard_db = corpus_shard_db(sub_root, label, profile_out_dir)
tasks.append({
task = {
"label": label,
"root": sub_root,
"shard_db_path": str(shard_db),
Expand All @@ -1601,22 +1786,46 @@ def build_profile_sharded(
"shard_file_workers": shard_file_workers,
"shard_file_chunksize": 4,
"rebuild": rebuild,
})
}
if i in catchall_idx:
# Root-level catch-all (#213): this shard owns only the
# files living directly at its root — every immediate
# subdir already has (or contains) its own shard task,
# so the walk skips them all.
task["rootfiles_only"] = True
task["skip_dirs"] = _rootfiles_only_skip_dirs(
sub_root, skip_dirs,
)
tasks.append(task)
if decomposed_count:
log.info(
"auto-subshard: decomposed %d of %d profile roots into %d total shards",
decomposed_count, len(profile["roots"]), len(tasks),
)

# Builder coverage assertion (#213): every eligible file under the
# profile roots must fall under some task's root, or the build dies
# here instead of shipping a fixture with 0-indexed-anywhere docs.
try:
_assert_shard_coverage(
profile["roots"], tasks, skip_dirs, extra_filename_filters,
)
except Exception:
main_conn.close()
raise

# Pre-ingest sizing — order shards largest-first so the long pole
# gets the longest head start on the worker pool (issue #97 A.1).
# When ``shard_workers <= 1`` the order doesn't affect wall-clock,
# but we still record the estimate in the manifest for diagnostics.
if sort_largest_first and tasks:
sizing_t0 = time.perf_counter()
for task in tasks:
# Per-task skip set (#213): a ``__root`` catch-all walks only
# its root-level files, so size it the same way.
files, bytes_ = _estimate_eligible_bytes(
task["root"], skip_dirs, extra_filename_filters,
task["root"], task["skip_dirs"],
task["extra_filename_filters"],
)
task["eligible_files"] = files
task["eligible_bytes"] = bytes_
Expand Down
Loading
Loading