diff --git a/.devague/current b/.devague/current index d3365f0..63d0030 100644 --- a/.devague/current +++ b/.devague/current @@ -1 +1 @@ -data-refinery-cli-ships-the-storage-data-quality-i +data-refinery-now-owns-store-file-migration-a-cons diff --git a/.devague/frames/data-refinery-now-owns-store-file-migration-a-cons.json b/.devague/frames/data-refinery-now-owns-store-file-migration-a-cons.json new file mode 100644 index 0000000..a47072c --- /dev/null +++ b/.devague/frames/data-refinery-now-owns-store-file-migration-a-cons.json @@ -0,0 +1,181 @@ +{ + "slug": "data-refinery-now-owns-store-file-migration-a-cons", + "title": "data-refinery now owns store-file migration: a consumer upgrades an on-disk store to the current Envelope format by supplying only a transform, never constructing a filesystem write path \u2014 files granularity first", + "schema_version": 1, + "status": "exported", + "created": "2026-06-21T14:46:10Z", + "updated": "2026-06-21T14:57:43Z", + "claims": [ + { + "id": "c1", + "kind": "announcement", + "text": "data-refinery now owns store-file migration: a consumer upgrades an on-disk store to the current Envelope format by supplying only a transform, never constructing a filesystem write path \u2014 files granularity first", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h4", + "text": "the endpoint ships BOTH importable (store.migrate) and as a CLI verb (store migrate), both documented in the pinnable contract.md with a version bump", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c2", + "kind": "audience", + "text": "eidetic-cli (first consumer over the import + subprocess boundary) and any future consumer of data-refinery's store boundary", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h5", + "text": "eidetic can reach the endpoint over BOTH the import boundary (callable transform) and the subprocess boundary (self-canonicalize); no third component is needed", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c3", + "kind": "after_state", + "text": "a consumer upgrades a populated legacy on-disk store to the current Envelope-JSONL format by calling data_refinery.store.migrate(transform) (import) or 'data-refinery store migrate' (subprocess), supplying only a transform/target format \u2014 data-refinery resolves the store root and owns the atomic per-file rewrite", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h6", + "text": "in the eidetic call site, the only argument eidetic supplies is a transform callable (and optionally the store root it already owns) \u2014 never a constructed per-file *.jsonl.tmp path", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c4", + "kind": "before_state", + "text": "eidetic's migrate_store.py globs the operator-supplied store dir, writes *.jsonl.tmp then os.replace; SonarCloud flags that consumer-side write sink as pythonsecurity:S2083 BLOCKER, which is structurally unsatisfiable for a local CLI and fails eidetic's gate", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h7", + "text": "the S2083 BLOCKER is on eidetic's write sink in migrate_store.py and is unsatisfiable there because writing into the operator's chosen dir IS the feature", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c5", + "kind": "why_it_matters", + "text": "the path-construction concern (and the S2083 sink) belongs to the component that OWNS the storage layout; moving it behind data-refinery's boundary lets eidetic delete migrate_store.py and go green without any in-repo rule suppression", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h8", + "text": "after the cutover eidetic's gate clears with zero in-repo suppression (no # NOSONAR, no sonar exclusion entry for migrate_store.py)", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c6", + "kind": "success_signal", + "text": "eidetic deletes migrate_store.py + its tests and replaces 'eidetic migrate store' with a thin call into data-refinery; eidetic's S2083 BLOCKER disappears and its gate goes green with no rule suppression; re-running migrate converts nothing (idempotent); an interrupted run is safe to resume (atomic per file)", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h9", + "text": "all four issue-#8 acceptance criteria are demonstrably met by a live test: upgrade-without-path, idempotent, atomic-per-file, eidetic deletes the module", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c7", + "kind": "boundary", + "text": "no eidetic Record/memory semantics leak into data-refinery; files backend granularity FIRST (mongo/vectors then neo4j/graph are later granularities); not a general ETL framework", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h10", + "text": "mongo/neo4j migration get a clean extension seam (a backend-level hook) but only the files backend actually rewrites now; data-refinery never imports eidetic's Record schema", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c8", + "kind": "decision", + "text": "the importable store.migrate(transform) takes a Python callable Callable[[dict], Envelope|None]; the 'data-refinery store migrate' CLI verb canNOT cross a callable over argv, so it only re-canonicalizes data-refinery's OWN Envelope-JSONL (re-validate + re-fill hash + atomic rewrite) \u2014 a self-heal/format-version bump, never a consumer transform", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [], + "hard_questions": [], + "links": [] + }, + { + "id": "c9", + "kind": "requirement", + "text": "the rewrite is atomic per file (tmp sibling in the same dir + os.replace) and idempotent (a file already in target format is left byte-identical; a re-run converts nothing)", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h1", + "text": "running migrate twice over the same store yields a byte-identical store on the second run, and killing the process mid-rewrite leaves either the old or the new file intact (never a partial/truncated file), because os.replace is atomic on POSIX", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c10", + "kind": "requirement", + "text": "data-refinery resolves and validates the store root internally (canonicalize via os.path.realpath + containment-check via os.path.commonpath against an owner-controlled root); the consumer supplies a root directory or a transform, never a constructed per-file write path", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h2", + "text": "a migrate call whose resolved per-file path escapes the canonicalized store root (e.g. via a symlink) is refused with a structured code-2 CliError, and Sonar's S2083 taint is satisfiable here because the sink reasons against an owner-canonicalized root rather than a raw consumer arg", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + }, + { + "id": "c11", + "kind": "requirement", + "text": "every transformed line is validated against the Envelope shape and the public/private scope no-leak (can_serve) before being written; an unparseable/invalid legacy line fails the migration with a structured CliError, never a traceback", + "origin": "user", + "status": "confirmed", + "honesty_conditions": [ + { + "id": "h3", + "text": "a legacy line that does not transform into a valid Envelope (bad shape, or an unrecognised scope.visibility) aborts the file's migration before any os.replace, leaving the original file untouched, and emits error:/hint: on stderr with no traceback", + "status": "confirmed" + } + ], + "hard_questions": [], + "links": [] + } + ], + "open_vagueness": [] +} diff --git a/.gitignore b/.gitignore index 67d57a5..857d6ee 100644 --- a/.gitignore +++ b/.gitignore @@ -234,3 +234,5 @@ skills.local.yaml # data-refinery stack bind-mount data (docker-compose volumes) .data/ + +.devague/reviews/ diff --git a/AGENTS.colleague.md b/AGENTS.colleague.md index 2406362..3b0dbf4 100644 --- a/AGENTS.colleague.md +++ b/AGENTS.colleague.md @@ -22,7 +22,14 @@ files/mongo/neo4j `Backend`, also importable as `data_refinery.store`), and a (`{id, hash, content, scope, metadata}`) and never interprets them as "memories" — that semantics stays in eidetic, the first consumer over a subprocess-not-import boundary. Waves 1 (stack) and 2 (store + quality) are -built; Wave 3 (the pinned verb contract + eidetic consumption) is open. +built; Wave 3's first slice (issue #8) — the **store-migration endpoint** +`data_refinery.store.migrate(transform, …)` + `data-refinery store migrate` — is +built: a consumer upgrades a populated store to the current Envelope format by +supplying only a *transform* (never a filesystem write path), so the rewrite — +and its path-construction concern — lives behind data-refinery's boundary. It is +**atomic per file** (temp sibling + `os.replace`) and **idempotent** +(byte-identical 2nd run); **files granularity only** today (mongo/neo4j raise). +The rest of Wave 3 (the pinned verb contract + eidetic consumption) is open. ## Names (keep them straight) diff --git a/CHANGELOG.md b/CHANGELOG.md index 739e275..3bb9a84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. Format follows [Keep a Changelog](https://keepachangelog.com/). This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.6.0] - 2026-06-21 + +### Added + +- store migration endpoint (issue #8): importable data_refinery.store.migrate(transform, *, backend, base_dir, dry_run) mirrored by the data-refinery store migrate CLI verb — a consumer upgrades a populated on-disk store to the current Envelope format by supplying only a transform, never constructing a filesystem write path (the rewrite, and its path-construction concern, lives behind data-refinery's boundary). +- store migrate is atomic per file (temp sibling + os.replace) and idempotent (a second run rewrites nothing, byte-identical); files granularity only today (mongo/neo4j raise a structured CliError as the files-first seam). + +### Changed + +- FilesBackend writes are now atomic via a shared _atomic_write helper (temp sibling + os.replace), hardening the day-to-day upsert/delete path against truncate-on-crash, not just migration. +- store migrate validation is now whole-store: every scope file is transformed and validated before any write, so a corrupt line / invalid transform output / symlink escape in any file aborts the whole migration before it touches disk (was per-file). Orphan temp-file reaping moved to the start of the run. (Folded from a colleague review pass.) +- store I/O faults now obey the exit-code contract: an unreadable/unwritable scope file (permissions, full disk, a failed os.replace) surfaces as a structured CliError with exit code 2 and an actionable remediation, and a valid-JSON-but-non-object line or a record missing its id surfaces as a code-2 "corrupt line" — instead of a raw OSError/AttributeError/KeyError wrapped by the dispatcher as a generic code-1 "unexpected" error. Applied to both the migration and the day-to-day load path via a shared _atomic_write / _corrupt_line. (Folded from a Qodo review pass on PR #9.) +- docs/contract.md is now contract version 3 (adds the store-migration endpoint). + ## [0.5.2] - 2026-06-21 ### Changed diff --git a/CLAUDE.md b/CLAUDE.md index 009c964..60dc4ff 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -27,10 +27,21 @@ Protocol; plus the **data-quality verbs** (`validate` / `dedup` / `integrity` / lazy-imported behind the optional `[store]` extra. The remaining code is the inherited *agent-first introspection scaffold* (`whoami` / `learn` / `explain` / `overview` / `doctor` + a `cli` noun), cited from -[teken](https://github.com/agentculture/teken)'s `python-cli` reference. **Wave -3** (the full pinnable verb-JSON contract + eidetic consuming the surface over -the subprocess boundary) is still open; the build order lives in **issue #1** / -**issue #3** (see "Domain roadmap"). +[teken](https://github.com/agentculture/teken)'s `python-cli` reference. *Wave 3, +first slice* (issue #8): the **store-migration endpoint** — the importable +`data_refinery.store.migrate(transform, *, backend, base_dir, dry_run)` +(`data_refinery/store/migrate.py`) mirrored by `data-refinery store migrate`, so a +consumer (eidetic-cli) upgrades a populated store to the current Envelope format +by supplying only a *transform* and **never constructing a filesystem write +path** — moving the path-construction (and eidetic's `S2083`) sink to the +storage owner. The rewrite is atomic per file (temp sibling + `os.replace`, a +shared `_atomic_write` that also hardened the day-to-day `upsert`) and idempotent +(byte-identical 2nd run). **Files granularity only** today; `mongo` (vectors) / +`neo4j` (graph) raise a structured `CliError` (the files-first seam). The CLI +verb self-canonicalises data-refinery's own format (no callable crosses argv). +The rest of **Wave 3** (freezing the full pinnable verb-JSON contract + eidetic +consuming the surface over the subprocess boundary) is still open; the build +order lives in **issue #1** / **issue #3** / **issue #8** (see "Domain roadmap"). ## Names: there are three, and they differ on purpose @@ -254,11 +265,20 @@ the default), the importable `data_refinery.store.put/get/list` mirrored by the (`data_refinery/quality/`). Idempotent dedup + the public/private scope no-leak (`can_serve`, enforced by every backend's `get`/`list`) are the load-bearing invariants. README, `AGENTS.colleague.md`, `learn`, `overview`, the explain -catalog, and `docs/contract.md` (now contract version 2) were updated for the -surface. What is still open: +catalog, and `docs/contract.md` (now contract version 3) were updated for the +surface. *Wave 3, first slice* (issue #8) = the **store-migration endpoint**: +`data_refinery.store.migrate(transform, *, backend, base_dir, dry_run)` + +`data-refinery store migrate`, so a consumer upgrades a populated store to the +current Envelope format supplying only a transform (never a write path), behind +data-refinery's boundary — which lets eidetic delete its path-constructing +`migrate_store.py` and clears its `S2083` BLOCKER. Atomic per file +(`_atomic_write`, also applied to `upsert`) + idempotent (byte-identical 2nd run) +are the new load-bearing invariants; files granularity only (mongo/neo4j raise). +What is still open: 1. **Wave 3 — the full pinnable verb contract + eidetic consumption** over the - subprocess boundary (eidetic drops/thins `neo4j`+`pymongo`). The verb-JSON + subprocess boundary (eidetic drops/thins `neo4j`+`pymongo`, and replaces + `eidetic migrate store` with a thin call into `store.migrate`). The verb-JSON shapes are documented in `docs/contract.md`; Wave 3 freezes them as the pinned surface eidetic consumes process-to-process. diff --git a/README.md b/README.md index 5a1e917..91f0fab 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ uv run teken cli doctor . --strict # the agent-first rubric gate CI runs echo '{"id":"a","content":"hello"}' | uv run data-refinery store put --json uv run data-refinery store get a --json uv run data-refinery store list --json +uv run data-refinery store migrate --json # re-canonicalise own JSONL (idempotent) uv run data-refinery integrity --json # hash matches content? uv run data-refinery dedup --json # collapse same-hash dups (idempotent) echo '{"id":"a","content":"x"}' | uv run data-refinery validate --json @@ -57,6 +58,10 @@ import data_refinery.store as store store.put(store.Envelope(id="a", content="hello")) store.get("a") # -> Envelope | None store.list() # -> list[Envelope] + +# Upgrade a populated legacy store to the current Envelope format — the consumer +# supplies only a transform, never a filesystem write path (data-refinery owns it): +store.migrate(record_to_envelope, base_dir="/path/to/store") ``` ## CLI @@ -65,6 +70,7 @@ store.list() # -> list[Envelope] |------|--------------| | `stack up\|down\|status` | Manage the storage substrate (mongo + neo4j) via docker compose. | | `store put\|get\|list` | Put/get/list opaque envelopes (`--backend files\|mongo\|neo4j`). | +| `store migrate` | Re-canonicalise the store's own Envelope-JSONL (atomic, idempotent); consumers import `store.migrate(transform)` to upgrade a legacy store. | | `validate` | Check envelope shape for JSON piped on stdin. | | `dedup` | Collapse same-hash-same-scope duplicates in the store (idempotent). | | `integrity` | Check every stored hash matches `sha256(content)`. | diff --git a/data_refinery/cli/_commands/store.py b/data_refinery/cli/_commands/store.py index 3dd0fbf..c11503d 100644 --- a/data_refinery/cli/_commands/store.py +++ b/data_refinery/cli/_commands/store.py @@ -19,6 +19,7 @@ from data_refinery.cli._errors import EXIT_USER_ERROR, CliError from data_refinery.cli._output import emit_result from data_refinery.store import get_backend +from data_refinery.store import migrate as store_migrate from data_refinery.store.envelope import DEFAULT_SCOPE, Envelope, Scope _BACKENDS = ("files", "mongo", "neo4j") @@ -125,6 +126,31 @@ def cmd_store_list(args: argparse.Namespace) -> int: return 0 +def cmd_store_migrate(args: argparse.Namespace) -> int: + """Re-canonicalise the store's own Envelope-JSONL (self-heal / format bump). + + The CLI boundary cannot carry a Python transform, so this verb only + normalises data-refinery's **own** format — re-validate each line, re-fill a + missing hash, rewrite atomically per file. A consumer upgrading a *legacy* + format imports ``data_refinery.store.migrate(transform)`` and supplies the + transform there; over the CLI there is no write path for a consumer to + construct. The store root comes from ``DR_DATA_DIR`` (never a flag), so the + owner — not the caller — resolves where the rewrite lands. + """ + json_mode = bool(getattr(args, "json", False)) + result = store_migrate(transform=None, backend=args.backend, dry_run=args.dry_run) + if json_mode: + emit_result(result, json_mode=True) + else: + verb = "would migrate" if args.dry_run else "migrated" + emit_result( + f"{verb} {result['migrated']}/{result['files']} file(s); " + f"{result['skipped']} already canonical", + json_mode=False, + ) + return 0 + + def _store_overview(args: argparse.Namespace) -> int: """`data-refinery store` with no sub-verb prints the noun's overview.""" from data_refinery.cli._commands.overview import emit_overview @@ -136,6 +162,7 @@ def _store_overview(args: argparse.Namespace) -> int: "store put — upsert an envelope (JSON on stdin, or --id/--content)", "store get — fetch an envelope visible to a scope", "store list — list envelopes visible to a scope", + "store migrate — re-canonicalise the store's own Envelope-JSONL (self-heal)", ], }, { @@ -216,6 +243,19 @@ def register(sub: argparse._SubParsersAction) -> None: _add_json_flag(list_p) list_p.set_defaults(func=cmd_store_list) + mig = verb.add_parser( + "migrate", + help="Re-canonicalise the store's own Envelope-JSONL (self-heal / format bump).", + ) + _add_backend_flag(mig) + mig.add_argument( + "--dry-run", + action="store_true", + help="Report what would change without writing.", + ) + _add_json_flag(mig) + mig.set_defaults(func=cmd_store_migrate) + ov = verb.add_parser("overview", help="Describe the store noun.") _add_json_flag(ov) ov.set_defaults(func=_store_overview) diff --git a/data_refinery/explain/catalog.py b/data_refinery/explain/catalog.py index b0d42dc..0183d3b 100644 --- a/data_refinery/explain/catalog.py +++ b/data_refinery/explain/catalog.py @@ -174,6 +174,10 @@ - `data-refinery store get ` — fetch an envelope visible to a scope. Returns `{...,"found":true}` or `{"id":…,"found":false}`. - `data-refinery store list` — list envelopes visible to a scope. +- `data-refinery store migrate` — re-canonicalise the store's **own** + Envelope-JSONL (re-validate each line, re-fill a missing hash, rewrite + atomically per file). A self-heal / format-version pass; idempotent (a second + run rewrites nothing). `--dry-run` reports without writing. ## Backends & scope @@ -183,11 +187,26 @@ - `--scope`/`--visibility` select the scope. A **private**-scope document is never returned by a **public**-scope fetch (`can_serve`). +## Migration (consumers) + +data-refinery **owns** the on-disk layout, so it owns the rewrite. A consumer +upgrading a populated *legacy* store imports the library and supplies only a +transform — never a filesystem write path: + + from data_refinery.store import migrate + migrate(record_to_envelope, base_dir="/path/to/store") # files backend + +The rewrite is atomic per file (temp sibling + `os.replace`) and idempotent. The +CLI `store migrate` verb cannot carry a Python transform, so it self-canonicalises +data-refinery's own format only. Only the `files` backend migrates today; +`mongo`/`neo4j` are a later granularity and exit `1` with a `hint:`. + ## Usage echo '{"id":"a","content":"hello"}' | data-refinery store put --json data-refinery store get a --json data-refinery store list --scope vault --visibility private --json + data-refinery store migrate --dry-run --json """ _VALIDATE = """\ @@ -268,6 +287,7 @@ ("store", "put"): _STORE, ("store", "get"): _STORE, ("store", "list"): _STORE, + ("store", "migrate"): _STORE, ("store", "overview"): _STORE, ("validate",): _VALIDATE, ("dedup",): _DEDUP, diff --git a/data_refinery/store/__init__.py b/data_refinery/store/__init__.py index 9cce9b4..ed1fe1f 100644 --- a/data_refinery/store/__init__.py +++ b/data_refinery/store/__init__.py @@ -26,6 +26,7 @@ can_serve, content_hash, ) +from data_refinery.store.migrate import migrate __all__ = [ "Envelope", @@ -39,6 +40,7 @@ "put", "get", "list", + "migrate", ] diff --git a/data_refinery/store/backends/files.py b/data_refinery/store/backends/files.py index 13bc234..63b4b27 100644 --- a/data_refinery/store/backends/files.py +++ b/data_refinery/store/backends/files.py @@ -11,13 +11,22 @@ import json import os from pathlib import Path +from typing import Any, Callable, get_args -from data_refinery.cli._errors import EXIT_ENV_ERROR, CliError +from data_refinery.cli._errors import EXIT_ENV_ERROR, EXIT_USER_ERROR, CliError from data_refinery.store.backend import Backend -from data_refinery.store.envelope import Envelope, Scope, can_serve +from data_refinery.store.envelope import Envelope, Scope, Visibility, can_serve _ENV_DIR = "DR_DATA_DIR" _JSONL_GLOB = "*.jsonl" # one scope file per (name, visibility) +_TMP_SUFFIX = ".tmp" # atomic-write temp sibling: ".jsonl.tmp" +# Re-derived from the public `Visibility` type so it never drifts from it. +_VISIBILITIES: tuple[str, ...] = get_args(Visibility) + +# A consumer-supplied converter: one decoded legacy line -> an Envelope (or None +# to drop the record). The consumer owns its legacy schema; data-refinery never +# imports it. ``None`` (in place of a transform) means "self-canonicalise". +Transform = Callable[[dict[str, Any]], Envelope | None] class FilesBackend: @@ -79,6 +88,145 @@ def delete(self, id: str) -> bool: removed = True return removed + # -- migration ------------------------------------------------------- + + def migrate( + self, transform: Transform | None = None, *, dry_run: bool = False + ) -> dict[str, Any]: + """Rewrite every scope file through *transform*, atomically per file. + + With ``transform=None`` this re-canonicalises data-refinery's **own** + Envelope-JSONL: re-validate every line, re-fill a missing hash, normalise + the on-disk form (the self-heal / format-version path the ``store + migrate`` CLI verb uses). With a *transform* the consumer converts each + decoded legacy line into an :class:`Envelope` (return ``None`` to drop a + record); the consumer supplies only the transform — never a write path. + + The rewrite is **atomic per file** (a temp sibling + ``os.replace``) and + **idempotent**: a file whose canonical re-serialisation already equals + its current bytes is left untouched, so a second run rewrites nothing. An + interrupted run leaves either the old or the new file intact (never a + partial file) and is safe to resume. Validation is **whole-store**: every + scope file is transformed and validated *before any write*, so a corrupt + line, an invalid transform output, or a symlink escape in **any** file + aborts the whole migration before it touches disk (not merely before it + touches that one file). Returns a summary dict. + """ + root = self._base.resolve() # canonicalise once; harden the write sink + self._reap_orphan_tmp(root) # clear a prior crash's debris before planning + # Pass 1 — plan + validate EVERY file before writing one byte. Any + # CliError (corrupt line, unknown visibility, symlink escape) raised here + # aborts the whole migration with the store untouched (whole-store + # abort-safety, strictly stronger than per-file). + plan: list[tuple[Path, str]] = [] # (path, new_text) for files that change + files = 0 + skipped = 0 + for path in sorted(root.glob(_JSONL_GLOB)): + files += 1 + self._assert_contained(path, root) + try: + original = path.read_text(encoding="utf-8") + except OSError as exc: # unreadable scope file is an environment fault + raise CliError( + code=EXIT_ENV_ERROR, + message=f"could not read {path.name}: {exc}", + remediation=f"check permissions on {path}", + ) from exc + new_text = _serialize(self._migrate_lines(original, transform, path)) + if new_text == original: + skipped += 1 + else: + plan.append((path, new_text)) + # Pass 2 — apply. Every file above validated cleanly; writes are atomic + # per file (temp sibling + os.replace), so a crash here still leaves each + # file either fully old or fully new and the run is safe to resume. + if not dry_run: + for path, new_text in plan: + self._atomic_write(path, new_text) + return { + "backend": "files", + "files": files, + "migrated": len(plan), + "migrated_files": [p.name for p, _ in plan], + "skipped": skipped, + "dry_run": dry_run, + } + + def _migrate_lines(self, text: str, transform: Transform | None, path: Path) -> list[Envelope]: + out: list[Envelope] = [] + for line in text.splitlines(): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError as exc: + raise self._corrupt_line(path, exc) from exc + if not isinstance(obj, dict): # valid JSON but not an object: [] "x" 1 + raise self._corrupt_line(path, f"expected a JSON object, got {type(obj).__name__}") + try: + env = _to_envelope(obj, transform) + except CliError: + raise # already structured (e.g. unknown visibility) — keep its code + except (KeyError, TypeError, AttributeError, ValueError) as exc: + # A dict missing required keys (e.g. no ``id``) is a corrupt line, + # not a code-1 "unexpected" wrap — surface it as such, with code 2. + raise self._corrupt_line(path, exc) from exc + if env is None: # transform dropped the record (e.g. a tombstone) + continue + out.append(_validate(env)) + return out + + @staticmethod + def _corrupt_line(path: Path, detail: object) -> CliError: + """Build the structured ``corrupt line`` error (code 2) for *path*. + + Shared by the migration and the day-to-day load path so a malformed + scope line always surfaces as an environment fault with a repair + remediation — never a generic code-1 "unexpected" wrap. + """ + return CliError( + code=EXIT_ENV_ERROR, + message=f"corrupt line in {path.name}: {detail}", + remediation=f"remove or repair the corrupt line in {path}", + ) + + @staticmethod + def _assert_contained(path: Path, root: Path) -> None: + """Refuse a scope file that resolves outside the canonical store root. + + ``glob`` already constrains the listing to *root*, but a symlinked scope + file could still point elsewhere; resolving and containment-checking each + path keeps the write sink reasoning against an owner-controlled root (not + an attacker-reachable target) — the defensible posture for the component + that *owns* the storage layout. + """ + resolved = path.resolve() + try: + contained = os.path.commonpath([str(resolved), str(root)]) == str(root) + except ValueError: # pragma: no cover - different drives / mixed roots (Windows) + contained = False + if not contained: + raise CliError( + code=EXIT_ENV_ERROR, + message=f"{path.name} resolves outside the store root {root}", + remediation="remove the symlink or point DR_DATA_DIR at the real store directory", + ) + + @staticmethod + def _reap_orphan_tmp(root: Path) -> None: + """Remove ``*.jsonl.tmp`` left by a prior interrupted rewrite. + + ``os.replace`` consumes the temp on success, so a surviving temp is the + residue of a crash *before* the swap — the real file is intact. Reaping + keeps the store dir tidy and the ``*.jsonl`` glob unambiguous. + """ + for tmp in root.glob(_JSONL_GLOB + _TMP_SUFFIX): + try: + tmp.unlink() + except OSError: # pragma: no cover - best effort + pass + # -- internal helpers ------------------------------------------------ def _visible(self, scope: Scope) -> list[Envelope]: @@ -102,20 +250,99 @@ def _load(self, path: Path) -> list[Envelope]: if not line: continue try: - out.append(Envelope.from_dict(json.loads(line))) - except (json.JSONDecodeError, KeyError) as exc: - raise CliError( - code=EXIT_ENV_ERROR, - message=f"corrupt line in {path.name}: {exc}", - remediation=f"remove or repair the corrupt line in {path}", - ) from exc + obj = json.loads(line) + except json.JSONDecodeError as exc: + raise self._corrupt_line(path, exc) from exc + if not isinstance(obj, dict): # valid JSON but not an object + raise self._corrupt_line(path, f"expected a JSON object, got {type(obj).__name__}") + try: + out.append(Envelope.from_dict(obj)) + except CliError: + raise # already structured (e.g. unknown visibility) — keep its code + except (KeyError, TypeError, AttributeError, ValueError) as exc: + raise self._corrupt_line(path, exc) from exc return out def _save(self, path: Path, records: list[Envelope]) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - with open(path, "w", encoding="utf-8") as f: - for r in records: - f.write(json.dumps(r.to_dict()) + "\n") + self._atomic_write(path, _serialize(records)) + + def _atomic_write(self, path: Path, text: str) -> None: + """Write *text* to *path* atomically (temp sibling + ``os.replace``). + + The temp is a sibling in the same directory, so ``os.replace`` is a + same-filesystem atomic rename: a crash leaves either the old file or the + new one — never a half-written file. Shared by ``upsert``/``delete`` and + the migration rewrite, so every write to a scope file is durable. + + A write fault (full disk, denied permission, cross-device temp) deletes + the temp and surfaces as a structured ``CliError`` with **exit code 2** + (environment fault) — never a raw ``OSError`` that the dispatcher would + wrap as a generic code-1 "unexpected" error. + """ + tmp = path.with_name(path.name + _TMP_SUFFIX) + try: + path.parent.mkdir(parents=True, exist_ok=True) + tmp.write_text(text, encoding="utf-8") + os.replace(tmp, path) + except OSError as exc: + try: + tmp.unlink() + except OSError: # pragma: no cover - best effort cleanup + pass + raise CliError( + code=EXIT_ENV_ERROR, + message=f"could not write {path.name}: {exc}", + remediation=f"check free space and permissions on {path.parent}", + ) from exc + + +def _serialize(records: list[Envelope]) -> str: + """Canonical Envelope-JSONL: one ``to_dict()`` per line, trailing newline.""" + return "".join(json.dumps(r.to_dict()) + "\n" for r in records) + + +def _validate(env: Envelope) -> Envelope: + """Fail closed on an envelope whose scope visibility is unrecognised. + + The no-leak invariant (:func:`can_serve`) only holds for a known visibility; + a transform that produced an unknown one must abort the migration **before** + any write rather than persist an unservable record. + """ + if env.scope.visibility not in _VISIBILITIES: + raise CliError( + code=EXIT_USER_ERROR, + message=( + f"transformed envelope {env.id!r} has unknown " + f"scope.visibility {env.scope.visibility!r}" + ), + remediation='the transform must set scope.visibility to "public" or "private"', + ) + return env + + +def _to_envelope(obj: dict[str, Any], transform: Transform | None) -> Envelope | None: + """Map one decoded line (guaranteed a dict by the caller) to an Envelope. + + ``transform=None`` self-canonicalises (every line is already data-refinery's + own form). With a *transform*, an already-canonical line is kept **verbatim** + so a re-run never re-applies the consumer's transform to migrated data — that + is what makes a second run a byte-for-byte no-op without data-refinery ever + knowing the consumer's legacy schema. The "already-canonical" test + (``already.to_dict() == obj``) is exact because the Envelope round-trip is a + stable fixpoint, so the consumer's transform need not itself be idempotent. + + A shape error (missing ``id``, etc.) propagates to the caller, which maps it + to a structured code-2 "corrupt line" error. + """ + if transform is None: + return Envelope.from_dict(obj) + try: + already = Envelope.from_dict(obj) + except (KeyError, TypeError, AttributeError, ValueError, CliError): + already = None + if already is not None and already.to_dict() == obj: + return already + return transform(obj) def build(**_kwargs: object) -> Backend: diff --git a/data_refinery/store/migrate.py b/data_refinery/store/migrate.py new file mode 100644 index 0000000..36929cf --- /dev/null +++ b/data_refinery/store/migrate.py @@ -0,0 +1,63 @@ +"""Store migration — upgrade an on-disk store to the current Envelope format. + +data-refinery owns the storage layout, so it owns the **rewrite**: a consumer +upgrading a populated store supplies only a *transform* (each decoded legacy line +-> an :class:`Envelope`, or ``None`` to drop it) and never constructs a +filesystem write path itself. data-refinery resolves the store root, walks it, +validates every produced envelope, and rewrites **atomically per file**. This is +the endpoint that lets a consumer (eidetic-cli first) delete its own +path-constructing rewrite — moving the write sink to the component that *owns*, +and can reason about, the store directory. + +Files granularity ships first; ``mongo`` (vectors) and ``neo4j`` (graph) are a +later granularity and raise a structured error today. +""" + +from __future__ import annotations + +from typing import Any + +from data_refinery.cli._errors import EXIT_USER_ERROR, CliError +from data_refinery.store.backends.files import FilesBackend, Transform + +DEFAULT_BACKEND = "files" + + +def migrate( + transform: Transform | None = None, + *, + backend: str = DEFAULT_BACKEND, + base_dir: str | None = None, + dry_run: bool = False, +) -> dict[str, Any]: + """Upgrade an on-disk store to the current Envelope format. + + With ``transform=None`` this re-canonicalises data-refinery's **own** + Envelope-JSONL (the self-heal / format-version path the ``store migrate`` CLI + verb uses). With a *transform* the consumer converts each decoded legacy line + into an :class:`Envelope`; the consumer supplies only the transform (and + optionally the store root it already owns via *base_dir*) — never a per-file + write path. + + Idempotent: a second run rewrites nothing. The consumer's transform need + **not** itself be idempotent — after the first run every line is a canonical + Envelope, and the files backend keeps an already-canonical line **verbatim** + (it is never fed back through the transform), so even a non-idempotent + transform (one that, say, stamps a timestamp) is applied exactly once. This + rests on data-refinery's own Envelope round-trip being a stable fixpoint + (``Envelope.from_dict(e.to_dict()).to_dict() == e.to_dict()``), which is + guaranteed and test-covered — it is *not* a constraint the consumer must + satisfy. + + Returns a summary ``{backend, files, migrated, migrated_files, skipped, + dry_run}``. Idempotent (a second run rewrites nothing) and atomic per file. + Only the ``files`` backend is supported today; ``mongo``/``neo4j`` raise a + structured :class:`CliError`. + """ + if backend == "files": + return FilesBackend(base_dir).migrate(transform, dry_run=dry_run) + raise CliError( + code=EXIT_USER_ERROR, + message=f"store migration is not yet supported for backend {backend!r}", + remediation="migrate the 'files' backend today; mongo/neo4j are a later granularity", + ) diff --git a/docs/contract.md b/docs/contract.md index 81e0270..187b417 100644 --- a/docs/contract.md +++ b/docs/contract.md @@ -6,8 +6,8 @@ shape, exit-code meaning, or the image tag scheme requires a **version bump** in [`pyproject.toml`](../pyproject.toml) (the `version-check` CI job enforces a bump on every PR). -- **Contract version:** `2` (Wave 2 — adds the store + data-quality surface to - the Wave 1 infrastructure surface). +- **Contract version:** `3` (Wave 3 — adds the store-migration endpoint to the + Wave 2 store + data-quality surface). - **Package version pinned by a consumer:** see `pyproject.toml` `version`. ## Wave 1 — the storage stack (stable) @@ -90,6 +90,11 @@ are **no** memory fields: a consumer's `lifecycle` / `signal` / `recall_count` / - `store get --json` → the envelope plus `"found": true`, or `{"id": "...", "found": false}`. Scope-filtered (`--scope`/`--visibility`). - `store list --json` → a JSON array of envelopes visible to the scope. +- `store migrate --json` → `{"backend", "files", "migrated", "migrated_files": + [...], "skipped", "dry_run"}`. Re-canonicalises data-refinery's **own** + Envelope-JSONL (self-heal / format-version pass); `--dry-run` reports without + writing. **Idempotent** — a second run rewrites nothing (`migrated: 0`). + Files-only today; `--backend mongo|neo4j` exits `1` with a `hint:`. ### `data-refinery` data-quality verbs @@ -126,6 +131,46 @@ A store/quality verb selecting `--backend mongo|neo4j` without the extra exits returned by a public-scope `get`/`list` (`can_serve` is enforced by every backend, not just the consumer). +## Wave 3 — the store-migration endpoint (stable) + +data-refinery **owns** the on-disk store layout, so it owns the **rewrite** that +upgrades a populated store to the current Envelope format. A consumer upgrading a +legacy store supplies only a **transform** and never constructs a filesystem +write path itself — the path-construction concern (and any path-safety review) +lives with the component that owns, and can reason about, the store directory. + +### Importable primitive + +```python +from data_refinery.store import migrate + +# Upgrade a populated legacy store. The consumer owns its legacy schema and +# passes a converter; data-refinery resolves the root and owns the atomic rewrite. +summary = migrate(record_to_envelope, base_dir="/path/to/store") # -> dict +# record_to_envelope: Callable[[dict], Envelope | None] (None drops a record) +``` + +`migrate(transform=None, *, backend="files", base_dir=None, dry_run=False)` +returns `{backend, files, migrated, migrated_files, skipped, dry_run}`. With +`transform=None` it re-canonicalises data-refinery's own Envelope-JSONL (the +self-heal path the `store migrate` CLI verb uses). The consumer supplies a +transform (and optionally the store root it already owns) — **never** a per-file +write path. + +### Invariants the consumer can rely on + +- **Idempotent** — a second run rewrites nothing (`migrated: 0`); a file whose + canonical re-serialisation already equals its bytes is left untouched. +- **Atomic per file** — each file is rewritten via a temp sibling + `os.replace`; + an interrupted run leaves either the old or the new file intact (never a + partial file) and is safe to resume. +- **Validated before write** — every produced envelope is checked (a transform + that yields an unknown `scope.visibility` aborts that file's migration before + any write, exit `1` with a `hint:`); a corrupt source line aborts with exit `2` + and leaves the original file untouched. Never a traceback. +- **Files granularity only** today — `mongo` (vectors) / `neo4j` (graph) + migration are a later granularity and exit `1` with a `hint:`. + ## Versioning policy | Change | Requires | diff --git a/docs/specs/2026-06-21-data-refinery-now-owns-store-file-migration-a-cons.md b/docs/specs/2026-06-21-data-refinery-now-owns-store-file-migration-a-cons.md new file mode 100644 index 0000000..3249b20 --- /dev/null +++ b/docs/specs/2026-06-21-data-refinery-now-owns-store-file-migration-a-cons.md @@ -0,0 +1,47 @@ +# data-refinery now owns store-file migration: a consumer upgrades an on-disk store to the current Envelope format by supplying only a transform, never constructing a filesystem write path — files granularity first + +> data-refinery now owns store-file migration: a consumer upgrades an on-disk store to the current Envelope format by supplying only a transform, never constructing a filesystem write path — files granularity first + +## Audience + +- eidetic-cli (first consumer over the import + subprocess boundary) and any future consumer of data-refinery's store boundary + +## Before → After + +- Before: eidetic's migrate_store.py globs the operator-supplied store dir, writes *.jsonl.tmp then os.replace; SonarCloud flags that consumer-side write sink as pythonsecurity:S2083 BLOCKER, which is structurally unsatisfiable for a local CLI and fails eidetic's gate +- After: a consumer upgrades a populated legacy on-disk store to the current Envelope-JSONL format by calling data_refinery.store.migrate(transform) (import) or 'data-refinery store migrate' (subprocess), supplying only a transform/target format — data-refinery resolves the store root and owns the atomic per-file rewrite + +## Why it matters + +- the path-construction concern (and the S2083 sink) belongs to the component that OWNS the storage layout; moving it behind data-refinery's boundary lets eidetic delete migrate_store.py and go green without any in-repo rule suppression + +## Requirements + +- the rewrite is atomic per file (tmp sibling in the same dir + os.replace) and idempotent (a file already in target format is left byte-identical; a re-run converts nothing) + - honesty: running migrate twice over the same store yields a byte-identical store on the second run, and killing the process mid-rewrite leaves either the old or the new file intact (never a partial/truncated file), because os.replace is atomic on POSIX +- data-refinery resolves and validates the store root internally (canonicalize via os.path.realpath + containment-check via os.path.commonpath against an owner-controlled root); the consumer supplies a root directory or a transform, never a constructed per-file write path + - honesty: a migrate call whose resolved per-file path escapes the canonicalized store root (e.g. via a symlink) is refused with a structured code-2 CliError, and Sonar's S2083 taint is satisfiable here because the sink reasons against an owner-canonicalized root rather than a raw consumer arg +- every transformed line is validated against the Envelope shape and the public/private scope no-leak (can_serve) before being written; an unparseable/invalid legacy line fails the migration with a structured CliError, never a traceback + - honesty: a legacy line that does not transform into a valid Envelope (bad shape, or an unrecognised scope.visibility) aborts the file's migration before any os.replace, leaving the original file untouched, and emits error:/hint: on stderr with no traceback + +## Honesty conditions + +- the endpoint ships BOTH importable (store.migrate) and as a CLI verb (store migrate), both documented in the pinnable contract.md with a version bump +- eidetic can reach the endpoint over BOTH the import boundary (callable transform) and the subprocess boundary (self-canonicalize); no third component is needed +- in the eidetic call site, the only argument eidetic supplies is a transform callable (and optionally the store root it already owns) — never a constructed per-file *.jsonl.tmp path +- the S2083 BLOCKER is on eidetic's write sink in migrate_store.py and is unsatisfiable there because writing into the operator's chosen dir IS the feature +- after the cutover eidetic's gate clears with zero in-repo suppression (no # NOSONAR, no sonar exclusion entry for migrate_store.py) +- all four issue-#8 acceptance criteria are demonstrably met by a live test: upgrade-without-path, idempotent, atomic-per-file, eidetic deletes the module +- mongo/neo4j migration get a clean extension seam (a backend-level hook) but only the files backend actually rewrites now; data-refinery never imports eidetic's Record schema + +## Success signals + +- eidetic deletes migrate_store.py + its tests and replaces 'eidetic migrate store' with a thin call into data-refinery; eidetic's S2083 BLOCKER disappears and its gate goes green with no rule suppression; re-running migrate converts nothing (idempotent); an interrupted run is safe to resume (atomic per file) + +## Scope / boundaries + +- no eidetic Record/memory semantics leak into data-refinery; files backend granularity FIRST (mongo/vectors then neo4j/graph are later granularities); not a general ETL framework + +## Decisions + +- the importable store.migrate(transform) takes a Python callable Callable[[dict], Envelope|None]; the 'data-refinery store migrate' CLI verb canNOT cross a callable over argv, so it only re-canonicalizes data-refinery's OWN Envelope-JSONL (re-validate + re-fill hash + atomic rewrite) — a self-heal/format-version bump, never a consumer transform diff --git a/pyproject.toml b/pyproject.toml index e156c05..e6da794 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "data-refinery-cli" -version = "0.5.2" +version = "0.6.0" description = "Agent and CLI for data quality in storage and retrieval — validating, deduplicating, and checking the integrity and freshness of data as it is stored and fetched. Split out of eidetic-cli so eidetic keeps agent-memory; sibling to daria, the Data Refinery Intelligent Agent." readme = "README.md" license = "Apache-2.0" diff --git a/tests/test_store_migrate.py b/tests/test_store_migrate.py new file mode 100644 index 0000000..0b342f0 --- /dev/null +++ b/tests/test_store_migrate.py @@ -0,0 +1,433 @@ +"""Store migration endpoint — files granularity (issue #8). + +data-refinery owns the on-disk layout, so it owns the rewrite. These tests prove +the load-bearing guarantees: a consumer supplies only a *transform* (never a +write path), the rewrite is **idempotent** (a 2nd run is byte-identical) and +**atomic per file** (an abort leaves the original intact), every produced +envelope is validated before any write, and the files-first seam holds +(`mongo`/`neo4j` raise today). +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +import data_refinery.store as store +import data_refinery.store.backends.files as files_mod +from data_refinery.cli import main +from data_refinery.cli._errors import CliError +from data_refinery.store.backends.files import FilesBackend +from data_refinery.store.envelope import Envelope, Scope + +# A line already in data-refinery's own canonical-ish shape (but missing `hash`). +_NEEDS_HASH = { + "id": "a", + "content": "hello", + "scope": {"name": "default", "visibility": "public"}, + "metadata": {}, +} + + +def _write_lines(path: Path, objs: list[dict]) -> None: + path.write_text("".join(json.dumps(o) + "\n" for o in objs), encoding="utf-8") + + +def _read_lines(path: Path) -> list[dict]: + return [json.loads(ln) for ln in path.read_text(encoding="utf-8").splitlines() if ln.strip()] + + +def _seed(tmp_path: Path, objs: list[dict], name: str = "default__public.jsonl") -> Path: + base = tmp_path / "store" + base.mkdir(exist_ok=True) + path = base / name + _write_lines(path, objs) + return base + + +# --- self-canonicalise (transform=None) ------------------------------------- + + +def test_self_canonicalize_fills_missing_hash(tmp_path: Path) -> None: + base = _seed(tmp_path, [_NEEDS_HASH]) + result = FilesBackend(base_dir=str(base)).migrate() # transform=None + assert result == { + "backend": "files", + "files": 1, + "migrated": 1, + "migrated_files": ["default__public.jsonl"], + "skipped": 0, + "dry_run": False, + } + assert _read_lines(base / "default__public.jsonl")[0]["hash"] == store.content_hash("hello") + + +def test_migrate_is_idempotent(tmp_path: Path) -> None: + base = _seed(tmp_path, [_NEEDS_HASH]) + backend = FilesBackend(base_dir=str(base)) + backend.migrate() + canonical = (base / "default__public.jsonl").read_bytes() + second = backend.migrate() + assert second["migrated"] == 0 and second["skipped"] == 1 + assert (base / "default__public.jsonl").read_bytes() == canonical # byte-identical + + +def test_already_canonical_store_is_left_untouched(tmp_path: Path) -> None: + # A file written by the backend itself is already canonical -> 0 migrated. + base = tmp_path / "store" + backend = FilesBackend(base_dir=str(base)) + backend.upsert(Envelope(id="a", content="hello")) + before = (base / "default__public.jsonl").read_bytes() + result = backend.migrate() + assert result["migrated"] == 0 and result["skipped"] == 1 + assert (base / "default__public.jsonl").read_bytes() == before + + +# --- consumer transform path ------------------------------------------------ + + +def test_transform_converts_legacy_then_is_idempotent(tmp_path: Path) -> None: + # An arbitrary legacy shape data-refinery knows nothing about. + base = _seed(tmp_path, [{"key": "a", "value": "hello", "vis": "public"}], name="legacy.jsonl") + + def transform(raw: dict) -> Envelope: + return Envelope(id=raw["key"], content=raw["value"], scope=Scope("default", raw["vis"])) + + backend = FilesBackend(base_dir=str(base)) + first = backend.migrate(transform) + assert first["migrated"] == 1 + row = _read_lines(base / "legacy.jsonl")[0] + assert row["id"] == "a" and row["content"] == "hello" + assert row["hash"] == store.content_hash("hello") + + # 2nd run: the line is now a canonical Envelope and is kept verbatim — the + # transform (which would KeyError on raw["key"]) is never re-applied. + second = backend.migrate(transform) + assert second["migrated"] == 0 and second["skipped"] == 1 + + +def test_envelope_round_trip_is_a_fixpoint() -> None: + # The whole idempotency contract rests on this: re-serialising an already- + # canonical line reproduces it byte-for-byte, so `_to_envelope` recognises it + # and keeps it verbatim instead of re-running the transform. Guard it directly + # across varied shapes (default/private scope, present/absent metadata+hash). + for env in [ + Envelope(id="a", content="hello"), + Envelope(id="b", content="x", scope=Scope("vault", "private")), + Envelope(id="c", content="y", metadata={"k": 1, "nested": {"z": [1, 2]}}), + Envelope(id="d", content="z", hash="deadbeef"), # mismatched hash preserved + ]: + d = env.to_dict() + assert Envelope.from_dict(d).to_dict() == d + + +def test_non_idempotent_transform_is_applied_exactly_once(tmp_path: Path) -> None: + # A transform that is NOT a fixpoint (it stamps a marker every call). Because + # an already-canonical line is kept verbatim, the marker is written once and a + # 2nd run never stamps it again — idempotency holds for any transform. + base = _seed(tmp_path, [{"key": "a", "value": "hello"}], name="legacy.jsonl") + calls = {"n": 0} + + def transform(raw: dict) -> Envelope: + calls["n"] += 1 + return Envelope(id=raw["key"], content=raw["value"], metadata={"stamped": calls["n"]}) + + backend = FilesBackend(base_dir=str(base)) + backend.migrate(transform) + after_first = (base / "legacy.jsonl").read_bytes() + assert _read_lines(base / "legacy.jsonl")[0]["metadata"] == {"stamped": 1} + + backend.migrate(transform) # 2nd run: canonical line kept verbatim + assert (base / "legacy.jsonl").read_bytes() == after_first # marker not doubled + assert calls["n"] == 1 # transform never called a second time + + +def test_transform_returning_none_drops_the_record(tmp_path: Path) -> None: + base = _seed( + tmp_path, + [{"key": "keep", "value": "x"}, {"key": "drop", "value": "y", "tombstone": True}], + name="legacy.jsonl", + ) + + def transform(raw: dict) -> Envelope | None: + if raw.get("tombstone"): + return None + return Envelope(id=raw["key"], content=raw["value"]) + + FilesBackend(base_dir=str(base)).migrate(transform) + assert [r["id"] for r in _read_lines(base / "legacy.jsonl")] == ["keep"] + + +def test_consumer_supplies_only_a_transform_via_library(tmp_path: Path) -> None: + base = _seed(tmp_path, [_NEEDS_HASH]) + # The public library entry point — consumer passes a root it owns, no path. + result = store.migrate(base_dir=str(base)) + assert result["migrated"] == 1 + assert _read_lines(base / "default__public.jsonl")[0]["hash"] == store.content_hash("hello") + + +# --- validation / abort-safety ---------------------------------------------- + + +def test_unknown_visibility_aborts_before_any_write(tmp_path: Path) -> None: + base = _seed(tmp_path, [{"key": "a", "value": "hello"}], name="legacy.jsonl") + before = (base / "legacy.jsonl").read_bytes() + + def transform(raw: dict) -> Envelope: + return Envelope(id=raw["key"], content=raw["value"], scope=Scope("default", "secret")) + + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate(transform) + assert exc.value.code == 1 + assert (base / "legacy.jsonl").read_bytes() == before # untouched + + +def test_corrupt_source_line_aborts_with_code_2(tmp_path: Path) -> None: + base = tmp_path / "store" + base.mkdir() + path = base / "default__public.jsonl" + path.write_text( + json.dumps(_NEEDS_HASH) + "\n" + "{not json\n", + encoding="utf-8", + ) + before = path.read_bytes() + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 2 + assert path.read_bytes() == before # untouched + + +def test_whole_store_validation_aborts_before_any_write(tmp_path: Path) -> None: + # Two files: the first (sorted first) WOULD migrate; the second has a corrupt + # line. Validation is whole-store, so the corrupt second file aborts the run + # BEFORE the first file is rewritten — not merely before the second. + base = tmp_path / "store" + base.mkdir() + first = base / "a__public.jsonl" + first.write_text( + json.dumps(_NEEDS_HASH) + "\n", encoding="utf-8" + ) # needs a hash -> would change + (base / "b__public.jsonl").write_text("{corrupt\n", encoding="utf-8") + first_before = first.read_bytes() + + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 2 + assert first.read_bytes() == first_before # untouched despite sorting first + assert list(base.glob("*.tmp")) == [] # nothing half-written + + +def test_atomic_write_failure_surfaces_code_2_and_leaves_original_intact( + tmp_path: Path, monkeypatch +) -> None: + base = _seed(tmp_path, [_NEEDS_HASH]) + path = base / "default__public.jsonl" + before = path.read_bytes() + + def boom(_src, _dst): + raise OSError("simulated crash before the atomic swap") + + monkeypatch.setattr(files_mod.os, "replace", boom) + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 2 # environment fault, not a generic code-1 wrap + assert path.read_bytes() == before # original intact — os.replace never ran + assert list(base.glob("*.tmp")) == [] # temp sibling cleaned up + + +def test_non_object_json_line_aborts_with_code_2(tmp_path: Path) -> None: + # Valid JSON but not an object ([], "x", 1) must be a structured "corrupt + # line" (code 2), not an AttributeError wrapped as a generic code-1 error. + base = tmp_path / "store" + base.mkdir() + path = base / "default__public.jsonl" + path.write_text("[1, 2, 3]\n", encoding="utf-8") + before = path.read_bytes() + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 2 + assert "expected a JSON object" in exc.value.message + assert path.read_bytes() == before # untouched + + +def test_object_missing_id_aborts_with_code_2(tmp_path: Path) -> None: + # A dict missing the required ``id`` raises KeyError inside Envelope.from_dict; + # it must surface as a code-2 corrupt line, not a code-1 "unexpected" wrap. + base = tmp_path / "store" + base.mkdir() + path = base / "default__public.jsonl" + path.write_text(json.dumps({"content": "x"}) + "\n", encoding="utf-8") + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 2 + + +def test_unreadable_scope_file_surfaces_code_2(tmp_path: Path) -> None: + # A read fault (here: a directory where a scope file is expected -> the glob + # matches it but read_text raises IsADirectoryError) is an environment fault. + base = tmp_path / "store" + base.mkdir() + (base / "weird__public.jsonl").mkdir() + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 2 + assert "could not read" in exc.value.message + + +def test_self_canonicalize_bad_visibility_source_propagates_cli_error(tmp_path: Path) -> None: + # A self-canonicalise source line with an unknown visibility: Envelope.from_dict + # raises a structured CliError that migrate passes through unchanged (not a + # corrupt-line re-wrap, not a generic code-1 "unexpected"). + base = tmp_path / "store" + base.mkdir() + (base / "default__public.jsonl").write_text( + json.dumps({"id": "a", "content": "x", "scope": {"name": "d", "visibility": "secret"}}) + + "\n", + encoding="utf-8", + ) + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 1 # unknown visibility kept verbatim + + +# --- read-path parity: _load shares the same corrupt-line contract ---------- + + +@pytest.mark.parametrize( + "line, needle", + [ + ("{not json\n", "corrupt line"), + ("[1, 2]\n", "expected a JSON object"), + (json.dumps({"content": "x"}) + "\n", "corrupt line"), # missing id + ], +) +def test_load_read_path_rejects_malformed_lines(tmp_path: Path, line: str, needle: str) -> None: + base = tmp_path / "store" + base.mkdir() + (base / "default__public.jsonl").write_text(line, encoding="utf-8") + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).all() # all() -> _load + assert exc.value.code == 2 and needle in exc.value.message + + +def test_load_read_path_bad_visibility_propagates_cli_error(tmp_path: Path) -> None: + base = tmp_path / "store" + base.mkdir() + (base / "default__public.jsonl").write_text( + json.dumps({"id": "a", "content": "x", "scope": {"visibility": "secret"}}) + "\n", + encoding="utf-8", + ) + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).all() + assert exc.value.code == 1 + + +def test_load_skips_blank_lines(tmp_path: Path) -> None: + base = tmp_path / "store" + base.mkdir() + (base / "default__public.jsonl").write_text( + json.dumps(_NEEDS_HASH) + "\n\n \n", encoding="utf-8" + ) + assert [e.id for e in FilesBackend(base_dir=str(base)).all()] == ["a"] + + +def test_symlinked_scope_file_outside_root_is_refused(tmp_path: Path) -> None: + base = tmp_path / "store" + base.mkdir() + outside = tmp_path / "outside.jsonl" + _write_lines(outside, [_NEEDS_HASH]) + (base / "default__public.jsonl").symlink_to(outside) + with pytest.raises(CliError) as exc: + FilesBackend(base_dir=str(base)).migrate() + assert exc.value.code == 2 + assert "store root" in exc.value.message + + +# --- dry-run + files-first seam --------------------------------------------- + + +def test_dry_run_reports_without_writing(tmp_path: Path) -> None: + base = _seed(tmp_path, [_NEEDS_HASH]) + before = (base / "default__public.jsonl").read_bytes() + result = FilesBackend(base_dir=str(base)).migrate(dry_run=True) + assert result["migrated"] == 1 and result["dry_run"] is True + assert (base / "default__public.jsonl").read_bytes() == before # nothing written + + +@pytest.mark.parametrize("backend", ["mongo", "neo4j"]) +def test_unsupported_backend_raises(tmp_path: Path, backend: str) -> None: + with pytest.raises(CliError) as exc: + store.migrate(backend=backend, base_dir=str(tmp_path)) + assert exc.value.code == 1 + assert backend in exc.value.message + + +# --- atomic upsert (the shared helper also fixes the day-to-day write) ------- + + +def test_upsert_is_atomic_and_leaves_no_temp(tmp_path: Path) -> None: + backend = FilesBackend(base_dir=str(tmp_path / "store")) + backend.upsert(Envelope(id="a", content="x")) + backend.upsert(Envelope(id="b", content="y")) + assert list((tmp_path / "store").glob("*.tmp")) == [] + assert {e.id for e in backend.all()} == {"a", "b"} + + +# --- CLI verb ---------------------------------------------------------------- + + +def test_cli_store_migrate_json_then_idempotent(files_env: str, capsys) -> None: + base = Path(files_env) + base.mkdir(parents=True, exist_ok=True) + _write_lines(base / "default__public.jsonl", [_NEEDS_HASH]) + + assert main(["store", "migrate", "--json"]) == 0 + out = json.loads(capsys.readouterr().out) + assert out["backend"] == "files" and out["migrated"] == 1 + + assert main(["store", "migrate", "--json"]) == 0 # idempotent + assert json.loads(capsys.readouterr().out)["migrated"] == 0 + + +def test_cli_store_migrate_dry_run_text(files_env: str, capsys) -> None: + base = Path(files_env) + base.mkdir(parents=True, exist_ok=True) + _write_lines(base / "default__public.jsonl", [_NEEDS_HASH]) + assert main(["store", "migrate", "--dry-run"]) == 0 + assert "would migrate" in capsys.readouterr().out + + +def test_blank_lines_are_skipped(tmp_path: Path) -> None: + base = tmp_path / "store" + base.mkdir() + path = base / "default__public.jsonl" + path.write_text(json.dumps(_NEEDS_HASH) + "\n\n \n", encoding="utf-8") # trailing blanks + result = FilesBackend(base_dir=str(base)).migrate() + assert result["migrated"] == 1 + assert len(_read_lines(path)) == 1 # blank lines dropped, not preserved + + +def test_orphan_temp_from_a_prior_crash_is_reaped(tmp_path: Path) -> None: + # An already-canonical store (nothing to rewrite) plus an orphan temp under a + # *different* name — the residue of a prior interrupted run on another file. + backend = FilesBackend(base_dir=str(tmp_path / "store")) + backend.upsert(Envelope(id="a", content="hello")) + orphan = (tmp_path / "store") / "stale__public.jsonl.tmp" + orphan.write_text("half-written garbage from a crash\n", encoding="utf-8") + result = backend.migrate() + assert result["migrated"] == 0 # nothing to rewrite + assert not orphan.exists() # but the orphan temp was reaped + assert list((tmp_path / "store").glob("*.tmp")) == [] + + +def test_cli_store_migrate_unsupported_backend_exits_1(files_env: str, capsys) -> None: + # JSON mode: a structured error line on stderr (never a traceback). + assert main(["store", "migrate", "--backend", "neo4j", "--json"]) == 1 + err = json.loads(capsys.readouterr().err) + assert err["code"] == 1 and "neo4j" in err["message"] and err["remediation"] + + # Text mode: the same error renders the load-bearing `hint:` prefix. + assert main(["store", "migrate", "--backend", "mongo"]) == 1 + assert "hint:" in capsys.readouterr().err diff --git a/uv.lock b/uv.lock index 709bbd0..14157a2 100644 --- a/uv.lock +++ b/uv.lock @@ -156,7 +156,7 @@ wheels = [ [[package]] name = "data-refinery-cli" -version = "0.5.2" +version = "0.6.0" source = { editable = "." } [package.optional-dependencies]