Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crosslink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ portable-pty = "0.9"
base64 = "0.22"
aes-gcm = "0.10"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
tempfile = "3"

[dev-dependencies]
tempfile = "3"
proptest = "1"
arbitrary = "1"
tower = { version = "0.5", features = ["util"] }
177 changes: 139 additions & 38 deletions crosslink/src/commands/integrity_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ pub fn run(action: Option<&IntegrityCommands>, crosslink_dir: &Path, db: &Databa
print_result(&result);
Ok(())
}
Some(IntegrityCommands::Hydration { repair }) => {
let result = check_hydration(crosslink_dir, db, *repair)?;
Some(IntegrityCommands::Hydration {
repair,
accept_data_loss,
}) => {
let result = check_hydration(crosslink_dir, db, *repair, *accept_data_loss)?;
print_result(&result);
Ok(())
}
Expand All @@ -79,7 +82,7 @@ fn run_all(crosslink_dir: &Path, db: &Database) -> Result<()> {
let results = vec![
check_schema(db, false)?,
check_counters(crosslink_dir, db, false)?,
check_hydration(crosslink_dir, db, false)?,
check_hydration(crosslink_dir, db, false, false)?,
check_locks(crosslink_dir, false)?,
check_layout(crosslink_dir, false),
];
Expand Down Expand Up @@ -175,7 +178,12 @@ fn check_counters(crosslink_dir: &Path, db: &Database, repair: bool) -> Result<C
})
}

fn check_hydration(crosslink_dir: &Path, db: &Database, repair: bool) -> Result<CheckResult> {
fn check_hydration(
crosslink_dir: &Path,
db: &Database,
repair: bool,
accept_data_loss: bool,
) -> Result<CheckResult> {
let cache_dir = crosslink_dir.join(HUB_CACHE_DIR);
if !cache_dir.exists() {
return Ok(CheckResult {
Expand All @@ -184,6 +192,116 @@ fn check_hydration(crosslink_dir: &Path, db: &Database, repair: bool) -> Result<
});
}

// ---- 1. Count check (cheap, surfaces JSON↔SQLite size differences) ----
let count_details = collect_count_mismatch(&cache_dir, db)?;

// ---- 2. Content-level drift check (catches same-count divergence) ----
let drift = super::integrity_drift::detect(&cache_dir, db)?;

// ---- 3. Decide the disposition ----
if count_details.is_empty() && drift.is_empty() {
return Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Pass,
});
}

// Build a human-readable summary of every kind of divergence we
// found, for both Fail (no --repair) and Repaired result lines.
let summary = combined_drift_summary(&count_details, &drift);

if !repair {
return Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Fail(summary),
});
}

// ---- 4. Repair path ----
//
// Always snapshot first, regardless of which sub-path we end up
// running. The snapshot is the recovery handle if anything goes
// wrong, and the user-visible record of what state the repair was
// applied against (#602 fix #4).
let snapshot_path = crate::db::snapshot::snapshot_to_integrity_dir(
db,
crosslink_dir,
crate::db::snapshot::HYDRATION_BACKUP_PREFIX,
)?;
let snapshot_rel = snapshot_path.strip_prefix(crosslink_dir).map_or_else(
|_| snapshot_path.to_string_lossy().into_owned(),
|p| p.to_string_lossy().into_owned(),
);

// If the drift contains rows that re-emit cannot represent in JSON
// (comments, time entries), refuse to destroy them without an
// explicit opt-in. The snapshot is already on disk for recovery
// (#602 fix #2).
if drift.has_unrecoverable_loss() && !accept_data_loss {
return Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Fail(format!(
"{summary}; refusing destructive repair (would lose state with no JSON \
representation). Snapshot at {snapshot_rel}. Re-run with \
--accept-data-loss to proceed, or restore from the snapshot."
)),
});
}

// Try to re-emit SQLite-only state back to JSON when possible
// (#602 fix #3). Requires an initialized SharedWriter; if one isn't
// available (no agent.json / no hub branch), fall back to refusing
// without --accept-data-loss.
let writer = crate::shared_writer::SharedWriter::new(crosslink_dir)?;
let mut re_emit_summary: Option<String> = None;
if !drift.is_empty() {
if let Some(w) = writer.as_ref() {
let stats = super::integrity_drift::re_emit(&drift, w, db)?;
if stats.total() > 0 {
re_emit_summary = Some(format!(
"re-emitted {} label(s), {} dep(s), {} relation(s), \
{} milestone link(s) to JSON",
stats.labels, stats.dependencies, stats.relations, stats.milestone_issues
));
}
} else if !accept_data_loss && !drift.is_empty() {
return Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Fail(format!(
"{summary}; cannot re-emit SQLite-only state without an initialized \
SharedWriter (missing agent.json or hub branch). Snapshot at \
{snapshot_rel}. Re-run with --accept-data-loss to proceed destructively, \
or initialize the workspace before retrying."
)),
});
}
}

// Now run the existing clear+rehydrate path. After re-emit the
// JSON event log is the union of both sides, so hydrate_to_sqlite
// restores everything that re-emit could express.
db.clear_shared_data()?;
let stats = hydrate_to_sqlite(&cache_dir, db)?;

let mut parts: Vec<String> = vec![format!(
"re-hydrated {} issues, {} comments",
stats.issues, stats.comments
)];
if let Some(s) = re_emit_summary {
parts.push(s);
}
parts.push(format!("snapshot at {snapshot_rel}"));

Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Repaired(parts.join("; ")),
})
}

/// Run the legacy count comparison: returns a list of "`JSON` has N,
/// `SQLite` has M" detail strings for each category that mismatches.
/// Empty `Vec` means counts agree (content drift may still exist).
fn collect_count_mismatch(cache_dir: &Path, db: &Database) -> Result<Vec<String>> {
let issues_dir = cache_dir.join("issues");
let json_issues = read_all_issue_files(&issues_dir)?;
let json_issue_count = json_issues
Expand All @@ -192,7 +310,6 @@ fn check_hydration(crosslink_dir: &Path, db: &Database, repair: bool) -> Result<
.count() as i64;
let db_issue_count = db.get_issue_count()?;

// Count milestones: per-file first, fall back to legacy single-file
let milestones_dir = cache_dir.join("meta").join("milestones");
let json_milestone_entries = read_all_milestone_files(&milestones_dir)?;
let json_milestone_count = if json_milestone_entries.is_empty() {
Expand All @@ -204,46 +321,30 @@ fn check_hydration(crosslink_dir: &Path, db: &Database, repair: bool) -> Result<
};
let db_milestone_count = db.get_milestone_count()?;

let issues_ok = json_issue_count == db_issue_count;
let milestones_ok = json_milestone_count == db_milestone_count;

if issues_ok && milestones_ok {
return Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Pass,
});
}

let mut issues = Vec::new();
if !issues_ok {
issues.push(format!(
let mut details = Vec::new();
if json_issue_count != db_issue_count {
details.push(format!(
"{json_issue_count} issues in JSON, {db_issue_count} in SQLite"
));
}
if !milestones_ok {
issues.push(format!(
if json_milestone_count != db_milestone_count {
details.push(format!(
"{json_milestone_count} milestones in JSON, {db_milestone_count} in SQLite"
));
}
let details = issues.join("; ");
Ok(details)
}

if !repair {
return Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Fail(details),
});
fn combined_drift_summary(
count_details: &[String],
drift: &super::integrity_drift::HydrationDriftReport,
) -> String {
let mut parts: Vec<String> = count_details.to_vec();
let drift_summary = drift.summary();
if !drift_summary.is_empty() {
parts.push(drift_summary);
}

db.clear_shared_data()?;
let stats = hydrate_to_sqlite(&cache_dir, db)?;

Ok(CheckResult {
name: "hydration".to_string(),
status: CheckStatus::Repaired(format!(
"re-hydrated {} issues, {} comments",
stats.issues, stats.comments
)),
})
parts.join("; ")
}

fn check_locks(crosslink_dir: &Path, repair: bool) -> Result<CheckResult> {
Expand Down Expand Up @@ -886,7 +987,7 @@ mod tests {
fn test_check_hydration_skipped_no_cache() {
let (db, dir) = test_db();
let crosslink_dir = dir.path();
let result = check_hydration(crosslink_dir, &db, false).unwrap();
let result = check_hydration(crosslink_dir, &db, false, false).unwrap();
assert_eq!(result.name, "hydration");
assert!(matches!(result.status, CheckStatus::Skipped(_)));
}
Expand Down
Loading
Loading