diff --git a/Cargo.lock b/Cargo.lock index f59965734..d8b8b3d64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3288,6 +3288,7 @@ dependencies = [ "hex", "httpmock", "launcher-interface", + "libc", "mpc-node-config", "mpc-primitives", "near-account-id 2.6.0", diff --git a/crates/e2e-tests/Cargo.toml b/crates/e2e-tests/Cargo.toml index 13b79e5f6..a91106bdc 100644 --- a/crates/e2e-tests/Cargo.toml +++ b/crates/e2e-tests/Cargo.toml @@ -15,6 +15,7 @@ futures = { workspace = true } hex = { workspace = true } httpmock = { workspace = true } launcher-interface = { workspace = true } +libc = "0.2" mpc-node-config = { workspace = true } near-account-id = { workspace = true } near-indexer-primitives = { workspace = true } diff --git a/crates/e2e-tests/src/cluster.rs b/crates/e2e-tests/src/cluster.rs index 1155dc835..3ba34bb8b 100644 --- a/crates/e2e-tests/src/cluster.rs +++ b/crates/e2e-tests/src/cluster.rs @@ -344,6 +344,33 @@ impl MpcCluster { Ok(()) } + /// Send SIGTERM to a running node and wait up to `grace` for it to exit on + /// its own. Returns the exit status — `status.code().is_some()` indicates + /// the process exited cleanly via its own main() (i.e. our SIGTERM handler + /// ran), while `status.signal().is_some()` indicates the OS terminated it + /// (i.e. there was no handler). + pub fn terminate_node_with_sigterm( + &mut self, + idx: usize, + grace: std::time::Duration, + ) -> anyhow::Result { + anyhow::ensure!( + idx < self.nodes.len(), + "node index {idx} out of bounds (have {} nodes)", + self.nodes.len() + ); + let state = self.nodes.remove(idx); + let (status, setup) = match state { + MpcNodeState::Running(node) => node.terminate_with_sigterm(grace)?, + MpcNodeState::Stopped(setup) => { + self.nodes.insert(idx, MpcNodeState::Stopped(setup)); + anyhow::bail!("node {idx} already stopped; cannot SIGTERM"); + } + }; + self.nodes.insert(idx, MpcNodeState::Stopped(setup)); + Ok(status) + } + pub fn start_nodes(&mut self, indices: &[usize]) -> anyhow::Result<()> { for &idx in indices { let state = self.nodes.remove(idx); diff --git a/crates/e2e-tests/src/mpc_node.rs b/crates/e2e-tests/src/mpc_node.rs index 603ef002e..da549e3d0 100644 --- a/crates/e2e-tests/src/mpc_node.rs +++ b/crates/e2e-tests/src/mpc_node.rs @@ -39,6 +39,42 @@ impl MpcNode { self.setup } + /// Send SIGTERM to the node and wait up to `grace` for it to exit on its + /// own. Returns the child's `ExitStatus` on graceful exit; errors if the + /// grace period elapses (in which case `self.process`'s Drop will SIGKILL + /// it as a fallback). Used to exercise the production SIGTERM handler. + pub fn terminate_with_sigterm( + mut self, + grace: std::time::Duration, + ) -> anyhow::Result<(std::process::ExitStatus, MpcNodeSetup)> { + let pid = self.process.0.id() as libc::pid_t; + // SAFETY: `libc::kill` with `SIGTERM` and a pid Rust's Child reports + // for a live child is well-defined. We make no assumptions beyond + // that the pid is the child we spawned. + let rc = unsafe { libc::kill(pid, libc::SIGTERM) }; + anyhow::ensure!( + rc == 0, + "libc::kill({pid}, SIGTERM) failed: {}", + std::io::Error::last_os_error() + ); + + let start = std::time::Instant::now(); + let poll_interval = std::time::Duration::from_millis(100); + loop { + match self.process.0.try_wait()? { + Some(status) => return Ok((status, self.setup)), + None => { + if start.elapsed() >= grace { + anyhow::bail!( + "mpc-node pid {pid} did not exit within {grace:?} after SIGTERM" + ); + } + std::thread::sleep(poll_interval); + } + } + } + } + /// Kill then start. New process, same config and data directory. pub fn restart(self) -> anyhow::Result { self.kill().start() diff --git a/crates/e2e-tests/tests/common.rs b/crates/e2e-tests/tests/common.rs index c11b72a47..eb81da0c9 100644 --- a/crates/e2e-tests/tests/common.rs +++ b/crates/e2e-tests/tests/common.rs @@ -35,6 +35,7 @@ pub const CONTRACT_UPGRADE_COMPATIBILITY_MAINNET_PORT_SEED: u16 = 18; pub const CONTRACT_UPGRADE_COMPATIBILITY_TESTNET_PORT_SEED: u16 = 19; pub const TIMEOUT_METRIC_PORT_SEED: u16 = 20; pub const MIGRATION_BACK_PORT_SEED: u16 = 21; +pub const SIGTERM_HANDLER_PORT_SEED: u16 = 22; /// Start a cluster, wait for Running state and presignatures to buffer. /// diff --git a/crates/e2e-tests/tests/e2e.rs b/crates/e2e-tests/tests/e2e.rs index f0546057d..287fc7100 100644 --- a/crates/e2e-tests/tests/e2e.rs +++ b/crates/e2e-tests/tests/e2e.rs @@ -12,6 +12,7 @@ mod migration_service; mod parallel_sign_calls; mod request_during_resharing; mod request_lifecycle; +mod sigterm_handler; mod submit_participant_info; mod timeout_metric; mod web_endpoints; diff --git a/crates/e2e-tests/tests/sigterm_handler.rs b/crates/e2e-tests/tests/sigterm_handler.rs new file mode 100644 index 000000000..b48b501f7 --- /dev/null +++ b/crates/e2e-tests/tests/sigterm_handler.rs @@ -0,0 +1,33 @@ +use std::os::unix::process::ExitStatusExt; +use std::time::Duration; + +use crate::common; + +/// Verifies that mpc-node's SIGTERM handler initiates a graceful shutdown +/// instead of letting the OS default-terminate the process. +/// +/// Would fail under revert: without the handler in `crates/node/src/run.rs`, +/// SIGTERM hits the process with no handler installed and the OS terminates +/// the process directly. `status.signal()` is then `Some(15)` (SIGTERM) and +/// `status.code()` is `None`, which the assertion below catches. With the +/// handler installed the process exits via `main`'s normal return path, +/// `status.code()` is `Some(_)`, and the assertion passes. +#[tokio::test(flavor = "multi_thread")] +#[expect(non_snake_case)] +async fn sigterm_handler__should_exit_cleanly_instead_of_default_terminating() { + // Given: a running cluster. + let (mut cluster, _running) = + common::must_setup_cluster(common::SIGTERM_HANDLER_PORT_SEED, |_| {}).await; + + // When: SIGTERM is delivered to node 0 with a 30s grace period. + let status = cluster + .terminate_node_with_sigterm(0, Duration::from_secs(30)) + .expect("node did not exit within the SIGTERM grace period"); + + // Then: the process exited via its own main(), not by OS signal. + assert!( + status.code().is_some(), + "mpc-node was terminated by signal {:?} instead of exiting cleanly via the SIGTERM handler", + status.signal() + ); +} diff --git a/crates/node/src/run.rs b/crates/node/src/run.rs index 57e1bfd82..fea22ab4d 100644 --- a/crates/node/src/run.rs +++ b/crates/node/src/run.rs @@ -30,6 +30,7 @@ use std::{ time::Duration, }; use tee_authority::tee_authority::TeeAuthority; +use tokio::signal::unix::{Signal, SignalKind, signal}; use tokio::sync::{RwLock, broadcast, mpsc, oneshot, watch}; use tokio_util::sync::CancellationToken; use tracing::info; @@ -41,6 +42,64 @@ use crate::tee::{ pub const ATTESTATION_RESUBMISSION_INTERVAL: Duration = Duration::from_secs(60 * 60); // 1 hour +/// Await a signal if its handle is present; if the handle is `None` (its +/// install failed), park forever so the `tokio::select!` arm is effectively +/// dead and the other (successfully-installed) signals still work. +async fn await_optional_signal(handle: Option<&mut Signal>) { + match handle { + Some(h) => { + h.recv().await; + } + None => std::future::pending().await, + } +} + +/// Listen for SIGTERM / SIGINT / SIGHUP and forward the first arrival into +/// `sender`. Operators stopping the node via dstack CVM stop / `docker stop` +/// / `kubectl delete` / `systemctl stop` (or a dev pressing Ctrl-C) hit this +/// path; routing into the same channel that `monitor_allowed_image_hashes` +/// uses lets the main `select!` exit gracefully. +/// +/// Logs a per-signal warning if any individual install fails; if none can be +/// installed (rare), logs an error and exits without ever forwarding — +/// graceful shutdown via signals is disabled in that case but the rest of +/// the node keeps running. +/// +/// Spawned as the first thing after the tokio runtime is built so signals +/// arriving during early startup (indexer bootstrap, contract state fetch, +/// attestation generation) are also handled gracefully — otherwise they'd +/// hit the process with no handler installed and the OS would terminate it +/// immediately, functionally identical to SIGKILL. +async fn await_and_forward_shutdown_signal(sender: mpsc::Sender<()>) { + let install = |kind: SignalKind, name: &'static str| { + signal(kind) + .inspect_err(|e| tracing::warn!(error = %e, signal = name, "failed to install shutdown-signal handler")) + .ok() + }; + let mut sigterm = install(SignalKind::terminate(), "SIGTERM"); + let mut sigint = install(SignalKind::interrupt(), "SIGINT"); + let mut sighup = install(SignalKind::hangup(), "SIGHUP"); + if sigterm.is_none() && sigint.is_none() && sighup.is_none() { + tracing::error!( + "no shutdown-signal handlers could be installed; graceful shutdown disabled" + ); + return; + } + let signal_name = tokio::select! { + _ = await_optional_signal(sigterm.as_mut()) => "SIGTERM", + _ = await_optional_signal(sigint.as_mut()) => "SIGINT", + _ = await_optional_signal(sighup.as_mut()) => "SIGHUP", + }; + tracing::warn!( + signal = signal_name, + "shutdown signal received, initiating graceful shutdown" + ); + // `send` returns Err only if the receiver has been dropped (the main + // `select!` already exited via a different arm); we're shutting down + // anyway, so it's fine to ignore. + let _ = sender.send(()).await; +} + pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> { init_logging(&config.log); @@ -77,6 +136,11 @@ pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> { let _tokio_enter_guard = root_runtime.enter(); + let (shutdown_signal_sender, mut shutdown_signal_receiver) = mpsc::channel(1); + root_runtime.spawn(await_and_forward_shutdown_signal( + shutdown_signal_sender.clone(), + )); + // Load configuration and initialize persistent secrets let node_config = config.node.clone(); let persistent_secrets = PersistentSecrets::generate_or_get_existing( @@ -180,7 +244,6 @@ pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> { node_config.foreign_chains.clone(), ); - let (shutdown_signal_sender, mut shutdown_signal_receiver) = mpsc::channel(1); let cancellation_token = CancellationToken::new(); let allowed_hashes_in_contract = indexer_api.allowed_docker_images_receiver.clone(); @@ -228,6 +291,22 @@ pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> { let exit_result = image_hash_watcher_handle.await; info!(?exit_result, "Image hash watcher exited."); + // Stop nearcore's actor system so its tasks have a chance to commit any + // in-flight RocksDB batches before the process exits. We deliberately do + // NOT then call `near_store::db::RocksDB::block_until_all_instances_are_dropped()` + // (which neard's standalone binary does next) — our embedded indexer + // runs in a separate `std::thread::spawn`'d closure whose `block_on` + // never returns, because the spawned monitor tasks hold + // `Arc` → `Arc` references that nothing + // currently cancels. Calling `block_until_all_instances_are_dropped` + // here would hang the SIGTERM path past any reasonable grace period; + // the orchestrator would then SIGKILL us and we'd land in the exact + // uncommitted-RocksDB state this handler was supposed to prevent. + // RocksDB's WAL still guarantees committed data survives a kill; this + // call mostly closes a smaller flush window. + info!("Stopping nearcore actor system."); + near_async::shutdown_all_actors(); + exit_reason }