Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions crates/daemon/tests/cli_journey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1344,5 +1344,139 @@ fn cli_journey_reply_later_flag_persists_across_daemon_restart() {
);
}

/// Backgrounded mutation jobs (`--async`) must survive a daemon restart.
/// Before job history was persisted, `mxr jobs` read an in-memory Vec, so
/// a restart erased every record a batch ever ran. This drives the full
/// CLI journey — sync, start an async archive job, then bounce the daemon
/// — and asserts the job is still listed afterward.
#[test]
fn cli_journey_async_job_history_survives_daemon_restart() {
let _guard = cli_journey_guard();
let temp = TempDir::new().expect("temp dir");
let instance = unique_instance_name();
let data_dir = temp.path().join("data");
let config_dir = temp.path().join("config");
let socket_path = instance_socket_path(&instance);
let pid_path = data_dir.join("daemon.pid");
std::fs::create_dir_all(&data_dir).expect("data dir");
std::fs::create_dir_all(&config_dir).expect("config dir");
write_fake_config(&config_dir);

let mut daemon = DaemonGuard {
socket_path: socket_path.clone(),
pid_path: pid_path.clone(),
pid: None,
};

let status = run_json(
&instance,
&data_dir,
&config_dir,
&["status", "--format", "json"],
);
daemon.pid = status["daemon_pid"].as_u64();
let original_pid = daemon.pid.expect("daemon pid");

run_status_only(
&instance,
&data_dir,
&config_dir,
&["sync", "--wait", "--wait-timeout-secs", "30"],
);

let message_id = search_results(
&run_json(
&instance,
&data_dir,
&config_dir,
&["search", "deployment", "--format", "json", "--limit", "5"],
),
"at least one fixture matches `deployment`",
)
.first()
.and_then(|hit| hit["message_id"].as_str())
.expect("at least one fixture matches `deployment`")
.to_string();

// Start a backgrounded archive job.
run_status_only(
&instance,
&data_dir,
&config_dir,
&["archive", &message_id, "--async", "--yes"],
);

// Poll until the job row appears, then capture its id.
let job_ids = |daemon_instance: &str| -> Vec<String> {
let jobs = run_json(
daemon_instance,
&data_dir,
&config_dir,
&["jobs", "--format", "json"],
);
let arr = jobs
.as_array()
.or_else(|| jobs.get("jobs").and_then(Value::as_array))
.cloned()
.unwrap_or_default();
arr.iter()
.filter_map(|job| job["job_id"].as_str().map(str::to_string))
.collect()
};

let mut job_id = None;
for _ in 0..40 {
if let Some(id) = job_ids(&instance).into_iter().next() {
job_id = Some(id);
break;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
let job_id = job_id.expect("async archive must record a job before restart");

// Bounce the daemon.
std::process::Command::new("kill")
.arg(original_pid.to_string())
.status()
.expect("kill daemon");
for _ in 0..120 {
let alive = std::process::Command::new("kill")
.arg("-0")
.arg(original_pid.to_string())
.stderr(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.status()
.is_ok_and(|status: std::process::ExitStatus| status.success());
if !alive {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
daemon.pid = None;
let _ = std::fs::remove_file(&socket_path);

let status_after = run_json(
&instance,
&data_dir,
&config_dir,
&["status", "--format", "json"],
);
let new_pid = status_after["daemon_pid"]
.as_u64()
.expect("auto-started daemon should report its pid");
assert_ne!(
new_pid, original_pid,
"daemon must be a fresh process; got the same pid back"
);
daemon.pid = Some(new_pid);

// The job must still be listed after the restart.
assert!(
job_ids(&instance).contains(&job_id),
"async job {job_id} must survive a daemon restart; got jobs: {:?}",
job_ids(&instance)
);
}

// Daemon-spawning + run_* + write_fake_account_config helpers live in
// `mxr_test_support::daemon` (shared with other integration tests).
Loading