diff --git a/crosslink/Cargo.toml b/crosslink/Cargo.toml index 1dd8e7df0..359565a20 100644 --- a/crosslink/Cargo.toml +++ b/crosslink/Cargo.toml @@ -57,9 +57,9 @@ portable-pty = "0.9" base64 = "0.22" aes-gcm = "0.10" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +tempfile = "3" [dev-dependencies] -tempfile = "3" proptest = "1" arbitrary = "1" tower = { version = "0.5", features = ["util"] } diff --git a/crosslink/src/commands/integrity_cmd.rs b/crosslink/src/commands/integrity_cmd.rs index b6b92e924..b73be65b7 100644 --- a/crosslink/src/commands/integrity_cmd.rs +++ b/crosslink/src/commands/integrity_cmd.rs @@ -52,8 +52,11 @@ pub fn run(action: Option<&IntegrityCommands>, crosslink_dir: &Path, db: &Databa print_result(&result); Ok(()) } - Some(IntegrityCommands::Hydration { repair }) => { - let result = check_hydration(crosslink_dir, db, *repair)?; + Some(IntegrityCommands::Hydration { + repair, + accept_data_loss, + }) => { + let result = check_hydration(crosslink_dir, db, *repair, *accept_data_loss)?; print_result(&result); Ok(()) } @@ -79,7 +82,7 @@ fn run_all(crosslink_dir: &Path, db: &Database) -> Result<()> { let results = vec![ check_schema(db, false)?, check_counters(crosslink_dir, db, false)?, - check_hydration(crosslink_dir, db, false)?, + check_hydration(crosslink_dir, db, false, false)?, check_locks(crosslink_dir, false)?, check_layout(crosslink_dir, false), ]; @@ -175,7 +178,12 @@ fn check_counters(crosslink_dir: &Path, db: &Database, repair: bool) -> Result Result { +fn check_hydration( + crosslink_dir: &Path, + db: &Database, + repair: bool, + accept_data_loss: bool, +) -> Result { let cache_dir = crosslink_dir.join(HUB_CACHE_DIR); if !cache_dir.exists() { return Ok(CheckResult { @@ -184,6 +192,116 @@ fn check_hydration(crosslink_dir: &Path, db: &Database, repair: bool) -> Result< }); } + // ---- 1. Count check (cheap, surfaces JSON↔SQLite size differences) ---- + let count_details = collect_count_mismatch(&cache_dir, db)?; + + // ---- 2. Content-level drift check (catches same-count divergence) ---- + let drift = super::integrity_drift::detect(&cache_dir, db)?; + + // ---- 3. Decide the disposition ---- + if count_details.is_empty() && drift.is_empty() { + return Ok(CheckResult { + name: "hydration".to_string(), + status: CheckStatus::Pass, + }); + } + + // Build a human-readable summary of every kind of divergence we + // found, for both Fail (no --repair) and Repaired result lines. + let summary = combined_drift_summary(&count_details, &drift); + + if !repair { + return Ok(CheckResult { + name: "hydration".to_string(), + status: CheckStatus::Fail(summary), + }); + } + + // ---- 4. Repair path ---- + // + // Always snapshot first, regardless of which sub-path we end up + // running. The snapshot is the recovery handle if anything goes + // wrong, and the user-visible record of what state the repair was + // applied against (#602 fix #4). + let snapshot_path = crate::db::snapshot::snapshot_to_integrity_dir( + db, + crosslink_dir, + crate::db::snapshot::HYDRATION_BACKUP_PREFIX, + )?; + let snapshot_rel = snapshot_path.strip_prefix(crosslink_dir).map_or_else( + |_| snapshot_path.to_string_lossy().into_owned(), + |p| p.to_string_lossy().into_owned(), + ); + + // If the drift contains rows that re-emit cannot represent in JSON + // (comments, time entries), refuse to destroy them without an + // explicit opt-in. The snapshot is already on disk for recovery + // (#602 fix #2). + if drift.has_unrecoverable_loss() && !accept_data_loss { + return Ok(CheckResult { + name: "hydration".to_string(), + status: CheckStatus::Fail(format!( + "{summary}; refusing destructive repair (would lose state with no JSON \ + representation). Snapshot at {snapshot_rel}. Re-run with \ + --accept-data-loss to proceed, or restore from the snapshot." + )), + }); + } + + // Try to re-emit SQLite-only state back to JSON when possible + // (#602 fix #3). Requires an initialized SharedWriter; if one isn't + // available (no agent.json / no hub branch), fall back to refusing + // without --accept-data-loss. + let writer = crate::shared_writer::SharedWriter::new(crosslink_dir)?; + let mut re_emit_summary: Option = None; + if !drift.is_empty() { + if let Some(w) = writer.as_ref() { + let stats = super::integrity_drift::re_emit(&drift, w, db)?; + if stats.total() > 0 { + re_emit_summary = Some(format!( + "re-emitted {} label(s), {} dep(s), {} relation(s), \ + {} milestone link(s) to JSON", + stats.labels, stats.dependencies, stats.relations, stats.milestone_issues + )); + } + } else if !accept_data_loss && !drift.is_empty() { + return Ok(CheckResult { + name: "hydration".to_string(), + status: CheckStatus::Fail(format!( + "{summary}; cannot re-emit SQLite-only state without an initialized \ + SharedWriter (missing agent.json or hub branch). Snapshot at \ + {snapshot_rel}. Re-run with --accept-data-loss to proceed destructively, \ + or initialize the workspace before retrying." + )), + }); + } + } + + // Now run the existing clear+rehydrate path. After re-emit the + // JSON event log is the union of both sides, so hydrate_to_sqlite + // restores everything that re-emit could express. + db.clear_shared_data()?; + let stats = hydrate_to_sqlite(&cache_dir, db)?; + + let mut parts: Vec = vec![format!( + "re-hydrated {} issues, {} comments", + stats.issues, stats.comments + )]; + if let Some(s) = re_emit_summary { + parts.push(s); + } + parts.push(format!("snapshot at {snapshot_rel}")); + + Ok(CheckResult { + name: "hydration".to_string(), + status: CheckStatus::Repaired(parts.join("; ")), + }) +} + +/// Run the legacy count comparison: returns a list of "`JSON` has N, +/// `SQLite` has M" detail strings for each category that mismatches. +/// Empty `Vec` means counts agree (content drift may still exist). +fn collect_count_mismatch(cache_dir: &Path, db: &Database) -> Result> { let issues_dir = cache_dir.join("issues"); let json_issues = read_all_issue_files(&issues_dir)?; let json_issue_count = json_issues @@ -192,7 +310,6 @@ fn check_hydration(crosslink_dir: &Path, db: &Database, repair: bool) -> Result< .count() as i64; let db_issue_count = db.get_issue_count()?; - // Count milestones: per-file first, fall back to legacy single-file let milestones_dir = cache_dir.join("meta").join("milestones"); let json_milestone_entries = read_all_milestone_files(&milestones_dir)?; let json_milestone_count = if json_milestone_entries.is_empty() { @@ -204,46 +321,30 @@ fn check_hydration(crosslink_dir: &Path, db: &Database, repair: bool) -> Result< }; let db_milestone_count = db.get_milestone_count()?; - let issues_ok = json_issue_count == db_issue_count; - let milestones_ok = json_milestone_count == db_milestone_count; - - if issues_ok && milestones_ok { - return Ok(CheckResult { - name: "hydration".to_string(), - status: CheckStatus::Pass, - }); - } - - let mut issues = Vec::new(); - if !issues_ok { - issues.push(format!( + let mut details = Vec::new(); + if json_issue_count != db_issue_count { + details.push(format!( "{json_issue_count} issues in JSON, {db_issue_count} in SQLite" )); } - if !milestones_ok { - issues.push(format!( + if json_milestone_count != db_milestone_count { + details.push(format!( "{json_milestone_count} milestones in JSON, {db_milestone_count} in SQLite" )); } - let details = issues.join("; "); + Ok(details) +} - if !repair { - return Ok(CheckResult { - name: "hydration".to_string(), - status: CheckStatus::Fail(details), - }); +fn combined_drift_summary( + count_details: &[String], + drift: &super::integrity_drift::HydrationDriftReport, +) -> String { + let mut parts: Vec = count_details.to_vec(); + let drift_summary = drift.summary(); + if !drift_summary.is_empty() { + parts.push(drift_summary); } - - db.clear_shared_data()?; - let stats = hydrate_to_sqlite(&cache_dir, db)?; - - Ok(CheckResult { - name: "hydration".to_string(), - status: CheckStatus::Repaired(format!( - "re-hydrated {} issues, {} comments", - stats.issues, stats.comments - )), - }) + parts.join("; ") } fn check_locks(crosslink_dir: &Path, repair: bool) -> Result { @@ -886,7 +987,7 @@ mod tests { fn test_check_hydration_skipped_no_cache() { let (db, dir) = test_db(); let crosslink_dir = dir.path(); - let result = check_hydration(crosslink_dir, &db, false).unwrap(); + let result = check_hydration(crosslink_dir, &db, false, false).unwrap(); assert_eq!(result.name, "hydration"); assert!(matches!(result.status, CheckStatus::Skipped(_))); } diff --git a/crosslink/src/commands/integrity_drift.rs b/crosslink/src/commands/integrity_drift.rs new file mode 100644 index 000000000..95a441399 --- /dev/null +++ b/crosslink/src/commands/integrity_drift.rs @@ -0,0 +1,692 @@ +//! Content-level drift detection between `SQLite` and `JSON`, plus re-emit +//! paths that close the gap by writing `SQLite`-only rows back to the +//! `JSON` event log via `SharedWriter` (#602). +//! +//! The existing count-based check in `integrity_cmd::check_hydration` +//! only catches divergence at the issue/milestone-count level. It misses +//! cases where the two sides have the same row counts but different +//! contents — most importantly: `SQLite` has a row (a label, a blocker, +//! a relation) that no `JSON` file represents. The repair path used to +//! silently delete those rows during the clear-then-rehydrate cycle. +//! +//! This module provides the structural primitives: +//! +//! - [`detect`] — diffs every shared table between `SQLite` and a fresh +//! hydration from `JSON`, returning a [`HydrationDriftReport`]. +//! - [`re_emit`] — for each re-emittable category, writes the +//! `SQLite`-only rows back to the `JSON` / git event log via +//! `SharedWriter`. +//! +//! Some categories (comments, time entries) have no `JSON` representation +//! and cannot be re-emitted; they are reported but require the snapshot +//! (`db::snapshot`) for recovery. + +use anyhow::{Context, Result}; +use std::path::Path; + +use crate::db::Database; +use crate::hydration::hydrate_to_sqlite; + +/// Categorized record of every `SQLite` row that is not represented in +/// the hydrated-from-`JSON` view of state. +/// +/// The two checks that operate on this report are: +/// +/// - [`HydrationDriftReport::is_empty`] — anything diverges? +/// - [`HydrationDriftReport::has_unrecoverable_loss`] — would a clear / +/// re-hydrate destroy state that re-emit cannot put back? +#[derive(Debug, Default, Clone)] +pub struct HydrationDriftReport { + /// Issues (by display id) whose UUID is not present in any JSON file. + /// The existing #427 self-heal logic in `hydrate_to_sqlite` already + /// preserves these (along with their child rows) for `created_by IS + /// NULL` issues; the field is populated for reporting only. + pub sqlite_only_issues: Vec, + + /// `(issue_display_id, label)` pairs present in `SQLite` but not in + /// `JSON`, restricted to issues that DO appear in `JSON`. + /// Re-emittable via `SharedWriter::add_label`. + pub sqlite_only_labels: Vec<(i64, String)>, + + /// `(blocker_display_id, blocked_display_id)` — blocker + /// dependencies in `SQLite` but not in `JSON`, restricted to + /// `JSON`-known issues on both sides. Re-emittable via + /// `SharedWriter::add_blocker`. + pub sqlite_only_dependencies: Vec<(i64, i64)>, + + /// `(issue_a_display_id, issue_b_display_id)` — relations in + /// `SQLite` but not in `JSON`, canonicalized as `(min, max)` + /// because `SQLite` stores both directions while `JSON` stores one. + /// Re-emittable via `SharedWriter::add_relation`. + pub sqlite_only_relations: Vec<(i64, i64)>, + + /// `(milestone_display_id, issue_display_id)` — milestone + /// assignments in `SQLite` that don't appear as `milestone_uuid` on + /// the `JSON` issue. Re-emittable via + /// `SharedWriter::set_milestone_on_issues`. + pub sqlite_only_milestone_issues: Vec<(i64, i64)>, + + /// `SQLite` comment ids whose UUIDs are not present in any `JSON` + /// comment file or embedded `issue.comments` array. NOT re-emittable + /// — re-emit would create a new comment with a fresh UUID and a + /// new event, losing the original identity. Recovery relies on the + /// snapshot file. + pub sqlite_only_comments: Vec, + + /// Time-entry ids in `SQLite` (on `JSON`-known issues) that would + /// be destroyed by `clear_shared_data`. Time entries have no `JSON` + /// representation, so they cannot be re-emitted; recovery relies on + /// the snapshot file. + pub sqlite_only_time_entries: Vec, +} + +impl HydrationDriftReport { + /// True when `SQLite` and the `JSON`-derived view agree on every row + /// of every shared table. + #[must_use] + pub const fn is_empty(&self) -> bool { + self.sqlite_only_issues.is_empty() + && self.sqlite_only_labels.is_empty() + && self.sqlite_only_dependencies.is_empty() + && self.sqlite_only_relations.is_empty() + && self.sqlite_only_milestone_issues.is_empty() + && self.sqlite_only_comments.is_empty() + && self.sqlite_only_time_entries.is_empty() + } + + /// True when running `clear_shared_data` would destroy `SQLite`-only + /// state that [`re_emit`] cannot represent in `JSON`. + /// + /// Currently: comments and time entries on `JSON`-known issues. + /// Issue rows with `created_by = NULL` are preserved by the existing + /// `hydrate_to_sqlite` self-heal path; they are not counted as + /// "unrecoverable" here. + #[must_use] + pub const fn has_unrecoverable_loss(&self) -> bool { + !self.sqlite_only_comments.is_empty() || !self.sqlite_only_time_entries.is_empty() + } + + /// True when every divergent row falls in a category that [`re_emit`] + /// can write back to `JSON` (labels, deps, relations, milestone + /// assignments). Used to decide whether `--repair` can proceed + /// without `--accept-data-loss`. + #[allow(dead_code)] // Exposed for callers reasoning about drift outside check_hydration. + #[must_use] + pub const fn is_fully_re_emittable(&self) -> bool { + !self.is_empty() + && self.sqlite_only_comments.is_empty() + && self.sqlite_only_time_entries.is_empty() + && self.sqlite_only_issues.is_empty() + } + + /// Human-readable summary suitable for the integrity check status + /// line. Empty drift produces an empty string. + #[must_use] + pub fn summary(&self) -> String { + if self.is_empty() { + return String::new(); + } + let mut parts: Vec = Vec::new(); + let push = |parts: &mut Vec, label: &str, n: usize| { + if n > 0 { + parts.push(format!("{n} sqlite-only {label}")); + } + }; + push(&mut parts, "issue(s)", self.sqlite_only_issues.len()); + push(&mut parts, "label(s)", self.sqlite_only_labels.len()); + push( + &mut parts, + "dependency(ies)", + self.sqlite_only_dependencies.len(), + ); + push(&mut parts, "relation(s)", self.sqlite_only_relations.len()); + push( + &mut parts, + "milestone assignment(s)", + self.sqlite_only_milestone_issues.len(), + ); + push(&mut parts, "comment(s)", self.sqlite_only_comments.len()); + push( + &mut parts, + "time entry(ies)", + self.sqlite_only_time_entries.len(), + ); + parts.join(", ") + } +} + +/// Diff every shared `SQLite` table against the `JSON`-derived view of +/// the same state. Returns a categorized record of every row that exists +/// in `SQLite` but not in `JSON`. +/// +/// The `JSON`-derived view is built by hydrating into an isolated temp +/// `SQLite` file (reusing the production `hydrate_to_sqlite` path), then +/// `ATTACH`-ing that file to `main_db`'s connection so the diff is a +/// set of cross-database SQL queries — no manual `JSON` walking, no +/// duplicate parsing logic. +/// +/// # Errors +/// +/// Returns an error if the temp database cannot be created, hydration +/// from JSON fails, ATTACH fails, or any diff query fails. +pub fn detect(cache_dir: &Path, main_db: &Database) -> Result { + // 1. Build the JSON-derived view in an isolated temp database. + let temp_dir = tempfile::tempdir().context("create temp dir for drift detection")?; + let temp_db_path = temp_dir.path().join("hydrated-view.sqlite"); + { + let temp_db = + Database::open(&temp_db_path).context("open temp drift-detection database")?; + hydrate_to_sqlite(cache_dir, &temp_db) + .context("hydrate JSON into temp database for drift detection")?; + // Explicit drop so the connection releases the file before ATTACH. + } + + // 2. ATTACH the temp file to the main connection. + let escaped = temp_db_path.to_string_lossy().replace('\'', "''"); + main_db + .conn + .execute(&format!("ATTACH DATABASE '{escaped}' AS json_view"), []) + .context("attach JSON-view database")?; + + // Run the diff inside a closure so we can ALWAYS detach, even on + // error from any individual query. + let result = run_diff_queries(main_db); + + // 3. DETACH — best-effort. If detach fails the connection is still + // usable for subsequent commands; tracing makes the failure visible. + if let Err(e) = main_db.conn.execute("DETACH DATABASE json_view", []) { + tracing::warn!("detach json_view database failed: {e}"); + } + + result +} + +/// Execute the per-table diff queries. Separated so the caller can wrap +/// it in a DETACH-guard. +fn run_diff_queries(main_db: &Database) -> Result { + let mut report = HydrationDriftReport::default(); + + // --- Issues (sqlite-only by uuid) --- + { + let mut stmt = main_db.conn.prepare( + "SELECT id FROM main.issues \ + WHERE uuid IS NOT NULL \ + AND uuid NOT IN (SELECT uuid FROM json_view.issues WHERE uuid IS NOT NULL) \ + ORDER BY id", + )?; + report.sqlite_only_issues = stmt + .query_map([], |row| row.get::<_, i64>(0))? + .collect::, _>>()?; + } + + // --- Labels (sqlite-only on JSON-known issues) --- + { + let mut stmt = main_db.conn.prepare( + "SELECT issue_id, label FROM main.labels \ + WHERE issue_id IN (SELECT id FROM json_view.issues) \ + AND (issue_id, label) NOT IN \ + (SELECT issue_id, label FROM json_view.labels) \ + ORDER BY issue_id, label", + )?; + report.sqlite_only_labels = stmt + .query_map([], |row| { + Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)) + })? + .collect::, _>>()?; + } + + // --- Dependencies (sqlite-only on JSON-known issues, both sides) --- + { + let mut stmt = main_db.conn.prepare( + "SELECT blocker_id, blocked_id FROM main.dependencies \ + WHERE blocker_id IN (SELECT id FROM json_view.issues) \ + AND blocked_id IN (SELECT id FROM json_view.issues) \ + AND (blocker_id, blocked_id) NOT IN \ + (SELECT blocker_id, blocked_id FROM json_view.dependencies) \ + ORDER BY blocker_id, blocked_id", + )?; + report.sqlite_only_dependencies = stmt + .query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)))? + .collect::, _>>()?; + } + + // --- Relations (canonical (min, max); both directions in SQLite) --- + // The relations table stores both (a, b) and (b, a). Canonicalize + // to (min, max) on both sides so the comparison sees one row per + // logical relation. Then return canonicalized SQLite-only pairs. + // + // Note: SQLite column names are `issue_id_1` / `issue_id_2`. + { + let mut stmt = main_db.conn.prepare( + "SELECT DISTINCT \ + MIN(issue_id_1, issue_id_2) AS lo, \ + MAX(issue_id_1, issue_id_2) AS hi \ + FROM main.relations \ + WHERE issue_id_1 IN (SELECT id FROM json_view.issues) \ + AND issue_id_2 IN (SELECT id FROM json_view.issues) \ + AND (MIN(issue_id_1, issue_id_2), MAX(issue_id_1, issue_id_2)) NOT IN \ + (SELECT MIN(issue_id_1, issue_id_2), MAX(issue_id_1, issue_id_2) \ + FROM json_view.relations) \ + ORDER BY lo, hi", + )?; + report.sqlite_only_relations = stmt + .query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)))? + .collect::, _>>()?; + } + + // --- Milestone assignments (sqlite-only on JSON-known issues) --- + { + let mut stmt = main_db.conn.prepare( + "SELECT milestone_id, issue_id FROM main.milestone_issues \ + WHERE issue_id IN (SELECT id FROM json_view.issues) \ + AND (milestone_id, issue_id) NOT IN \ + (SELECT milestone_id, issue_id FROM json_view.milestone_issues) \ + ORDER BY milestone_id, issue_id", + )?; + report.sqlite_only_milestone_issues = stmt + .query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)))? + .collect::, _>>()?; + } + + // --- Comments (sqlite-only by UUID, on JSON-known issues) --- + // Comments without UUIDs (legacy or transient) are excluded — there + // is no stable identity to diff on, and re-emit cannot bring them + // back regardless. + { + let mut stmt = main_db.conn.prepare( + "SELECT id FROM main.comments \ + WHERE issue_id IN (SELECT id FROM json_view.issues) \ + AND uuid IS NOT NULL \ + AND uuid NOT IN \ + (SELECT uuid FROM json_view.comments WHERE uuid IS NOT NULL) \ + ORDER BY id", + )?; + report.sqlite_only_comments = stmt + .query_map([], |row| row.get::<_, i64>(0))? + .collect::, _>>()?; + } + + // --- Time entries (sqlite-only on JSON-known issues) --- + // Time entries have no JSON representation, so EVERY time entry on + // a JSON-known issue would be destroyed by clear_shared_data. + { + let mut stmt = main_db.conn.prepare( + "SELECT id FROM main.time_entries \ + WHERE issue_id IN (SELECT id FROM json_view.issues) \ + ORDER BY id", + )?; + report.sqlite_only_time_entries = stmt + .query_map([], |row| row.get::<_, i64>(0))? + .collect::, _>>()?; + } + + Ok(report) +} + +/// Summary of how many SQLite-only rows were successfully written back +/// to the JSON event log via `SharedWriter`. Returned by [`re_emit`]. +#[derive(Debug, Default, Clone)] +pub struct ReEmitStats { + pub labels: usize, + pub dependencies: usize, + pub relations: usize, + pub milestone_issues: usize, +} + +impl ReEmitStats { + #[must_use] + pub const fn total(&self) -> usize { + self.labels + self.dependencies + self.relations + self.milestone_issues + } +} + +/// Write every re-emittable SQLite-only row in `drift` back to the JSON +/// event log via `writer`. Each `add_*` call short-circuits if the row +/// is already present (per #600), so this is safe to invoke even if the +/// JSON side raced ahead between detection and re-emit. +/// +/// Categories without a JSON representation (`sqlite_only_comments`, +/// `sqlite_only_time_entries`, `sqlite_only_issues`) are NOT touched — +/// the caller is responsible for either accepting their loss or +/// recovering them from a snapshot. +/// +/// # Errors +/// +/// Returns an error from the first failing `SharedWriter` mutation. +/// Partial progress IS persisted: each mutation is its own git commit, +/// so any rows that succeeded before the failure remain in the JSON +/// event log and are no longer drift. +pub fn re_emit( + drift: &HydrationDriftReport, + writer: &crate::shared_writer::SharedWriter, + db: &Database, +) -> Result { + let mut stats = ReEmitStats::default(); + + for (issue_id, label) in &drift.sqlite_only_labels { + if writer.add_label(db, *issue_id, label)? { + stats.labels += 1; + } + } + + for (blocker_id, blocked_id) in &drift.sqlite_only_dependencies { + if writer.add_blocker(db, *blocked_id, *blocker_id)? { + stats.dependencies += 1; + } + } + + for (a, b) in &drift.sqlite_only_relations { + if writer.add_relation(db, *a, *b)? { + stats.relations += 1; + } + } + + // Group milestone_issues by milestone_id so set_milestone_on_issues + // can write a single batch per milestone (matches its existing + // call shape). + if !drift.sqlite_only_milestone_issues.is_empty() { + use std::collections::BTreeMap; + let mut by_milestone: BTreeMap> = BTreeMap::new(); + for (m_id, i_id) in &drift.sqlite_only_milestone_issues { + by_milestone.entry(*m_id).or_default().push(*i_id); + } + for (m_id, issue_ids) in by_milestone { + writer.set_milestone_on_issues(db, m_id, &issue_ids)?; + stats.milestone_issues += issue_ids.len(); + } + } + + Ok(stats) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sync::HUB_CACHE_DIR; + + /// Minimal cache_dir setup: an empty `issues/` directory under + /// `crosslink_dir/.hub-cache/`. Enough to satisfy + /// `hydrate_to_sqlite`'s "no JSON files" early-return. + fn setup_empty_cache(crosslink_dir: &std::path::Path) { + let cache_dir = crosslink_dir.join(HUB_CACHE_DIR); + std::fs::create_dir_all(cache_dir.join("issues")).unwrap(); + std::fs::create_dir_all(cache_dir.join("meta").join("milestones")).unwrap(); + } + + #[test] + fn test_drift_report_summary_empty() { + let report = HydrationDriftReport::default(); + assert!(report.is_empty()); + assert!(!report.has_unrecoverable_loss()); + assert!(!report.is_fully_re_emittable()); + assert_eq!(report.summary(), ""); + } + + #[test] + fn test_drift_report_summary_with_labels() { + let report = HydrationDriftReport { + sqlite_only_labels: vec![(1, "bug".to_string())], + ..Default::default() + }; + assert!(!report.is_empty()); + assert!(!report.has_unrecoverable_loss()); + assert!(report.is_fully_re_emittable()); + assert_eq!(report.summary(), "1 sqlite-only label(s)"); + } + + #[test] + fn test_drift_report_unrecoverable_when_comments_present() { + let report = HydrationDriftReport { + sqlite_only_comments: vec![42], + ..Default::default() + }; + assert!(!report.is_empty()); + assert!(report.has_unrecoverable_loss()); + assert!(!report.is_fully_re_emittable()); + } + + #[test] + fn test_detect_no_drift_on_empty_state() { + let dir = tempfile::tempdir().unwrap(); + let crosslink_dir = dir.path(); + setup_empty_cache(crosslink_dir); + let db = Database::open(&dir.path().join("test.db")).unwrap(); + + let report = detect(&crosslink_dir.join(HUB_CACHE_DIR), &db).unwrap(); + assert!( + report.is_empty(), + "empty SQLite + empty JSON should report no drift, got: {report:?}" + ); + } + + fn make_issue(display_id: i64, title: &str) -> crate::issue_file::IssueFile { + crate::issue_file::IssueFile { + uuid: uuid::Uuid::new_v4(), + display_id: Some(display_id), + title: title.to_string(), + description: None, + status: crate::models::IssueStatus::Open, + priority: crate::models::Priority::Medium, + parent_uuid: None, + created_by: "test-agent".to_string(), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + closed_at: None, + scheduled_at: None, + due_at: None, + labels: Vec::new(), + comments: Vec::new(), + blockers: Vec::new(), + related: Vec::new(), + milestone_uuid: None, + time_entries: Vec::new(), + } + } + + /// Look up an issue's JSON file by display id by scanning the hub + /// cache. Used in tests so we can inspect post-re-emit JSON state + /// without depending on private `SharedWriter` helpers. + fn read_issue_by_display_id( + cache_dir: &std::path::Path, + display_id: i64, + ) -> crate::issue_file::IssueFile { + let issues = crate::issue_file::read_all_issue_files(&cache_dir.join("issues")).unwrap(); + issues + .into_iter() + .find(|i| i.display_id == Some(display_id)) + .unwrap_or_else(|| panic!("no JSON issue file with display_id = {display_id}")) + } + + /// Spin up a minimal git environment (work repo + bare remote + + /// initialized hub cache) so `SharedWriter::new()` succeeds, mirroring + /// `shared_writer::tests::integration::setup_shared_writer_env`. + /// Inlined here because `crate::commands` lives in the bin target + /// while that helper lives in the lib target. + fn setup_writer_env() -> (tempfile::TempDir, tempfile::TempDir, std::path::PathBuf) { + use crate::identity::{AgentConfig, AgentRole}; + use std::process::Command; + use tempfile::TempDir; + + let remote_dir: TempDir = tempfile::tempdir().unwrap(); + let work_dir: TempDir = tempfile::tempdir().unwrap(); + + Command::new("git") + .current_dir(remote_dir.path()) + .args(["init", "--bare", "-b", "main"]) + .output() + .unwrap(); + + Command::new("git") + .current_dir(work_dir.path()) + .args(["init", "-b", "main"]) + .output() + .unwrap(); + + for args in [ + vec!["config", "user.email", "test@test.local"], + vec!["config", "user.name", "Test"], + vec![ + "remote", + "add", + "origin", + remote_dir.path().to_str().unwrap(), + ], + ] { + Command::new("git") + .current_dir(work_dir.path()) + .args(&args) + .output() + .unwrap(); + } + + std::fs::write(work_dir.path().join("README.md"), "# test\n").unwrap(); + for args in [ + vec!["add", "."], + vec!["commit", "-m", "init", "--no-gpg-sign"], + vec!["push", "-u", "origin", "main"], + ] { + Command::new("git") + .current_dir(work_dir.path()) + .args(&args) + .output() + .unwrap(); + } + + let crosslink_dir = work_dir.path().join(".crosslink"); + std::fs::create_dir_all(&crosslink_dir).unwrap(); + std::fs::write( + crosslink_dir.join("hook-config.json"), + r#"{"remote":"origin","layout":"v2"}"#, + ) + .unwrap(); + + let agent_config = AgentConfig { + agent_id: "test-agent".to_string(), + machine_id: "test-machine".to_string(), + description: Some("integrity_drift test agent".to_string()), + role: AgentRole::Driver, + ssh_key_path: None, + ssh_fingerprint: None, + ssh_public_key: None, + }; + std::fs::write( + crosslink_dir.join("agent.json"), + serde_json::to_string_pretty(&agent_config).unwrap(), + ) + .unwrap(); + + crate::sync::SyncManager::new(&crosslink_dir) + .unwrap() + .init_cache() + .unwrap(); + + (work_dir, remote_dir, crosslink_dir) + } + + #[test] + fn test_re_emit_writes_sqlite_only_dependency_to_json() { + // End-to-end regression for the bug-report scenario: two issues + // are in JSON (created via SharedWriter), a dependency row is + // injected directly into SQLite (bypassing the writer), then + // detect + re_emit must produce a clean JSON-side state with + // the dep written back through the writer. After re_emit, drift + // detection must report no remaining divergence. + let (work_dir, _remote, crosslink_dir) = setup_writer_env(); + let writer = crate::shared_writer::SharedWriter::new(&crosslink_dir) + .unwrap() + .unwrap(); + let db = Database::open(&work_dir.path().join("issues.db")).unwrap(); + + let blocked = writer + .create_issue(&db, "needs another", None, "medium", None, None) + .unwrap(); + let blocker = writer + .create_issue(&db, "the blocker", None, "high", None, None) + .unwrap(); + + // Inject the SQLite-only dependency. + db.add_dependency(blocked, blocker).unwrap(); + + let cache_dir = crosslink_dir.join(crate::sync::HUB_CACHE_DIR); + let before = read_issue_by_display_id(&cache_dir, blocked); + assert!( + before.blockers.is_empty(), + "precondition: JSON must not yet contain the SQLite-only blocker" + ); + + let drift = detect(&cache_dir, &db).unwrap(); + assert_eq!( + drift.sqlite_only_dependencies, + vec![(blocker, blocked)], + "detect must surface the SQLite-only dependency, got: {drift:?}" + ); + assert!(drift.is_fully_re_emittable()); + assert!(!drift.has_unrecoverable_loss()); + + let stats = re_emit(&drift, &writer, &db).unwrap(); + assert_eq!(stats.dependencies, 1, "re-emit must write the dep to JSON"); + + let after = read_issue_by_display_id(&cache_dir, blocked); + assert_eq!( + after.blockers.len(), + 1, + "JSON must now contain the re-emitted blocker" + ); + + let drift_after = detect(&cache_dir, &db).unwrap(); + assert!( + drift_after.is_empty(), + "after re-emit, drift must be empty; got: {drift_after:?}" + ); + + drop(work_dir); + } + + #[test] + fn test_detect_sqlite_only_dependency_on_json_known_issues() { + // The bug-report reproducer: two issues exist in JSON, but a + // dependency row exists only in SQLite. + use crate::issue_file::write_issue_file; + + let dir = tempfile::tempdir().unwrap(); + let crosslink_dir = dir.path(); + setup_empty_cache(crosslink_dir); + let cache_dir = crosslink_dir.join(HUB_CACHE_DIR); + + let issue_a = make_issue(1, "first"); + let issue_b = make_issue(2, "second"); + write_issue_file( + &cache_dir + .join("issues") + .join(format!("{}.json", issue_a.uuid)), + &issue_a, + ) + .unwrap(); + write_issue_file( + &cache_dir + .join("issues") + .join(format!("{}.json", issue_b.uuid)), + &issue_b, + ) + .unwrap(); + + // Hydrate the JSON view into the real db, then add a SQLite-only + // dependency row that was never written through SharedWriter. + let db = Database::open(&dir.path().join("test.db")).unwrap(); + hydrate_to_sqlite(&cache_dir, &db).unwrap(); + db.add_dependency(2, 1).unwrap(); + + let report = detect(&cache_dir, &db).unwrap(); + + assert_eq!( + report.sqlite_only_dependencies, + vec![(1, 2)], + "the SQLite-only dependency must surface as drift" + ); + assert!( + report.is_fully_re_emittable(), + "dependency-only drift must be re-emittable" + ); + assert!(!report.has_unrecoverable_loss()); + } +} diff --git a/crosslink/src/commands/mod.rs b/crosslink/src/commands/mod.rs index 6c790ac1d..1ee412098 100644 --- a/crosslink/src/commands/mod.rs +++ b/crosslink/src/commands/mod.rs @@ -18,6 +18,7 @@ pub mod external_knowledge; pub mod import; pub mod init; pub mod integrity_cmd; +pub mod integrity_drift; pub mod intervene; pub mod kickoff; pub mod knowledge; diff --git a/crosslink/src/db/mod.rs b/crosslink/src/db/mod.rs index 36723513b..260588796 100644 --- a/crosslink/src/db/mod.rs +++ b/crosslink/src/db/mod.rs @@ -8,6 +8,7 @@ mod milestones; mod relations; pub mod sentinel; mod sessions; +pub mod snapshot; mod time_entries; mod token_usage; diff --git a/crosslink/src/db/snapshot.rs b/crosslink/src/db/snapshot.rs new file mode 100644 index 000000000..04825aa0a --- /dev/null +++ b/crosslink/src/db/snapshot.rs @@ -0,0 +1,132 @@ +//! Point-in-time `SQLite` snapshots for safety nets around destructive +//! operations (e.g. `integrity hydration --repair`, see #602). +//! +//! Uses `SQLite`'s native `VACUUM INTO` (`SQLite` 3.27+) so the snapshot is +//! self-contained — it bundles the main database, any WAL pages, and +//! settles all in-flight checkpoints into a single file the user can +//! drop in place to recover state. + +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use chrono::Utc; + +use super::Database; + +/// Directory (under `.crosslink/`) where snapshots are written. +pub const SNAPSHOT_DIR: &str = "integrity"; + +/// Filename prefix for hydration-repair backups. +pub const HYDRATION_BACKUP_PREFIX: &str = "hydration-backup-"; + +/// Write a point-in-time snapshot of `db` to +/// `/integrity/.sqlite`, returning the +/// absolute path of the snapshot file. +/// +/// The destination directory is created if it does not already exist. +/// Uses `VACUUM INTO` so the resulting file is self-contained regardless +/// of journal mode (WAL/DELETE/etc.). +/// +/// # Errors +/// +/// Returns an error if the destination directory cannot be created or if +/// the `VACUUM INTO` statement fails (e.g. destination already exists, +/// disk full, locked source database). +pub fn snapshot_to_integrity_dir( + db: &Database, + crosslink_dir: &Path, + prefix: &str, +) -> Result { + let snapshot_dir = crosslink_dir.join(SNAPSHOT_DIR); + std::fs::create_dir_all(&snapshot_dir).with_context(|| { + format!( + "Failed to create snapshot directory {}", + snapshot_dir.display() + ) + })?; + + // UTC ISO 8601 with colons replaced for filename safety on Windows. + let ts = Utc::now().format("%Y%m%dT%H%M%SZ").to_string(); + let filename = format!("{prefix}{ts}.sqlite"); + let dest_path = snapshot_dir.join(filename); + + // `VACUUM INTO` requires a string literal in its grammar — there is + // no parameter binding for VACUUM. The destination path is generated + // by this function (UTC timestamp under crosslink_dir), so it cannot + // contain hostile input, but SQL-escape single quotes anyway as a + // belt-and-suspenders safety measure. + let escaped = dest_path.to_string_lossy().replace('\'', "''"); + db.conn + .execute(&format!("VACUUM INTO '{escaped}'"), []) + .with_context(|| format!("Failed to write SQLite snapshot to {}", dest_path.display()))?; + + Ok(dest_path) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_snapshot_creates_file() { + let dir = tempdir().unwrap(); + let db = Database::open(&dir.path().join("source.db")).unwrap(); + db.create_issue("test", None, "medium").unwrap(); + + let crosslink_dir = dir.path().join(".crosslink"); + std::fs::create_dir_all(&crosslink_dir).unwrap(); + + let snap_path = + snapshot_to_integrity_dir(&db, &crosslink_dir, HYDRATION_BACKUP_PREFIX).unwrap(); + + assert!(snap_path.exists(), "snapshot file must be created"); + assert!(snap_path.starts_with(crosslink_dir.join(SNAPSHOT_DIR))); + assert!(snap_path + .file_name() + .unwrap() + .to_string_lossy() + .starts_with(HYDRATION_BACKUP_PREFIX)); + + // Snapshot should be a valid SQLite db with the same data. + let restored = Database::open(&snap_path).unwrap(); + let issues = restored.list_issues(None, None, None).unwrap(); + assert_eq!(issues.len(), 1, "snapshot must contain the source row"); + } + + #[test] + fn test_snapshot_creates_integrity_dir_if_missing() { + let dir = tempdir().unwrap(); + let db = Database::open(&dir.path().join("source.db")).unwrap(); + let crosslink_dir = dir.path().join(".crosslink"); + // Deliberately do NOT create crosslink_dir/integrity. + + let snap_path = + snapshot_to_integrity_dir(&db, &crosslink_dir, HYDRATION_BACKUP_PREFIX).unwrap(); + + assert!(snap_path.parent().unwrap().exists()); + } + + #[test] + fn test_snapshot_filename_has_utc_timestamp() { + let dir = tempdir().unwrap(); + let db = Database::open(&dir.path().join("source.db")).unwrap(); + let crosslink_dir = dir.path().join(".crosslink"); + + let snap_path = + snapshot_to_integrity_dir(&db, &crosslink_dir, HYDRATION_BACKUP_PREFIX).unwrap(); + let name = snap_path.file_name().unwrap().to_string_lossy(); + // Format: hydration-backup-YYYYMMDDTHHMMSSZ.sqlite (16 chars between + // prefix and .sqlite extension). + let stripped = name + .strip_prefix(HYDRATION_BACKUP_PREFIX) + .and_then(|s| s.strip_suffix(".sqlite")) + .expect("filename must follow .sqlite shape"); + assert_eq!( + stripped.len(), + 16, + "timestamp section must be exactly 16 chars (YYYYMMDDTHHMMSSZ), got: {stripped}" + ); + assert!(stripped.ends_with('Z'), "timestamp must be UTC (ends in Z)"); + } +} diff --git a/crosslink/src/main.rs b/crosslink/src/main.rs index 86f8c91e1..47fdcaeac 100644 --- a/crosslink/src/main.rs +++ b/crosslink/src/main.rs @@ -1534,9 +1534,17 @@ enum IntegrityCommands { }, /// Verify `SQLite` matches JSON issue files Hydration { - /// Re-hydrate `SQLite` from JSON + /// Re-hydrate `SQLite` from JSON. Tries to re-emit SQLite-only + /// state back to JSON first; falls back to refusing destructive + /// repair unless `--accept-data-loss` is also given. #[arg(long)] repair: bool, + /// Allow `--repair` to destroy `SQLite` rows that have no `JSON` + /// representation (comments, time entries). A snapshot of the + /// pre-repair database is always written under + /// `.crosslink/integrity/` regardless of this flag. + #[arg(long)] + accept_data_loss: bool, }, /// Check for stale or orphaned locks Locks {