Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/e2e-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ futures = { workspace = true }
hex = { workspace = true }
httpmock = { workspace = true }
launcher-interface = { workspace = true }
libc = "0.2"
mpc-node-config = { workspace = true }
near-account-id = { workspace = true }
near-indexer-primitives = { workspace = true }
Expand Down
27 changes: 27 additions & 0 deletions crates/e2e-tests/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,33 @@ impl MpcCluster {
Ok(())
}

/// Send SIGTERM to a running node and wait up to `grace` for it to exit on
/// its own. Returns the exit status — `status.code().is_some()` indicates
/// the process exited cleanly via its own main() (i.e. our SIGTERM handler
/// ran), while `status.signal().is_some()` indicates the OS terminated it
/// (i.e. there was no handler).
pub fn terminate_node_with_sigterm(
&mut self,
idx: usize,
grace: std::time::Duration,
) -> anyhow::Result<std::process::ExitStatus> {
anyhow::ensure!(
idx < self.nodes.len(),
"node index {idx} out of bounds (have {} nodes)",
self.nodes.len()
);
let state = self.nodes.remove(idx);
let (status, setup) = match state {
MpcNodeState::Running(node) => node.terminate_with_sigterm(grace)?,
MpcNodeState::Stopped(setup) => {
self.nodes.insert(idx, MpcNodeState::Stopped(setup));
anyhow::bail!("node {idx} already stopped; cannot SIGTERM");
}
};
self.nodes.insert(idx, MpcNodeState::Stopped(setup));
Ok(status)
}

pub fn start_nodes(&mut self, indices: &[usize]) -> anyhow::Result<()> {
for &idx in indices {
let state = self.nodes.remove(idx);
Expand Down
36 changes: 36 additions & 0 deletions crates/e2e-tests/src/mpc_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,42 @@ impl MpcNode {
self.setup
}

/// Send SIGTERM to the node and wait up to `grace` for it to exit on its
/// own. Returns the child's `ExitStatus` on graceful exit; errors if the
/// grace period elapses (in which case `self.process`'s Drop will SIGKILL
/// it as a fallback). Used to exercise the production SIGTERM handler.
pub fn terminate_with_sigterm(
mut self,
grace: std::time::Duration,
) -> anyhow::Result<(std::process::ExitStatus, MpcNodeSetup)> {
let pid = self.process.0.id() as libc::pid_t;
// SAFETY: `libc::kill` with `SIGTERM` and a pid Rust's Child reports
// for a live child is well-defined. We make no assumptions beyond
// that the pid is the child we spawned.
let rc = unsafe { libc::kill(pid, libc::SIGTERM) };
anyhow::ensure!(
rc == 0,
"libc::kill({pid}, SIGTERM) failed: {}",
std::io::Error::last_os_error()
);

let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(100);
loop {
match self.process.0.try_wait()? {
Some(status) => return Ok((status, self.setup)),
None => {
if start.elapsed() >= grace {
anyhow::bail!(
"mpc-node pid {pid} did not exit within {grace:?} after SIGTERM"
);
}
std::thread::sleep(poll_interval);
}
}
}
}

/// Kill then start. New process, same config and data directory.
pub fn restart(self) -> anyhow::Result<MpcNode> {
self.kill().start()
Expand Down
1 change: 1 addition & 0 deletions crates/e2e-tests/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub const CONTRACT_UPGRADE_COMPATIBILITY_MAINNET_PORT_SEED: u16 = 18;
pub const CONTRACT_UPGRADE_COMPATIBILITY_TESTNET_PORT_SEED: u16 = 19;
pub const TIMEOUT_METRIC_PORT_SEED: u16 = 20;
pub const MIGRATION_BACK_PORT_SEED: u16 = 21;
pub const SIGTERM_HANDLER_PORT_SEED: u16 = 22;

/// Start a cluster, wait for Running state and presignatures to buffer.
///
Expand Down
1 change: 1 addition & 0 deletions crates/e2e-tests/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod migration_service;
mod parallel_sign_calls;
mod request_during_resharing;
mod request_lifecycle;
mod sigterm_handler;
mod submit_participant_info;
mod timeout_metric;
mod web_endpoints;
33 changes: 33 additions & 0 deletions crates/e2e-tests/tests/sigterm_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::os::unix::process::ExitStatusExt;
use std::time::Duration;

use crate::common;

/// Verifies that mpc-node's SIGTERM handler initiates a graceful shutdown
/// instead of letting the OS default-terminate the process.
///
/// Would fail under revert: without the handler in `crates/node/src/run.rs`,
/// SIGTERM hits the process with no handler installed and the OS terminates
/// the process directly. `status.signal()` is then `Some(15)` (SIGTERM) and
/// `status.code()` is `None`, which the assertion below catches. With the
/// handler installed the process exits via `main`'s normal return path,
/// `status.code()` is `Some(_)`, and the assertion passes.
#[tokio::test(flavor = "multi_thread")]
#[expect(non_snake_case)]
async fn sigterm_handler__should_exit_cleanly_instead_of_default_terminating() {
// Given: a running cluster.
let (mut cluster, _running) =
common::must_setup_cluster(common::SIGTERM_HANDLER_PORT_SEED, |_| {}).await;

// When: SIGTERM is delivered to node 0 with a 30s grace period.
let status = cluster
.terminate_node_with_sigterm(0, Duration::from_secs(30))
.expect("node did not exit within the SIGTERM grace period");

// Then: the process exited via its own main(), not by OS signal.
assert!(
status.code().is_some(),
"mpc-node was terminated by signal {:?} instead of exiting cleanly via the SIGTERM handler",
status.signal()
);
}
81 changes: 80 additions & 1 deletion crates/node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::{
time::Duration,
};
use tee_authority::tee_authority::TeeAuthority;
use tokio::signal::unix::{Signal, SignalKind, signal};
use tokio::sync::{RwLock, broadcast, mpsc, oneshot, watch};
use tokio_util::sync::CancellationToken;
use tracing::info;
Expand All @@ -41,6 +42,64 @@ use crate::tee::{

pub const ATTESTATION_RESUBMISSION_INTERVAL: Duration = Duration::from_secs(60 * 60); // 1 hour

/// Await a signal if its handle is present; if the handle is `None` (its
/// install failed), park forever so the `tokio::select!` arm is effectively
/// dead and the other (successfully-installed) signals still work.
async fn await_optional_signal(handle: Option<&mut Signal>) {
match handle {
Some(h) => {
h.recv().await;
}
None => std::future::pending().await,
}
}

/// Listen for SIGTERM / SIGINT / SIGHUP and forward the first arrival into
/// `sender`. Operators stopping the node via dstack CVM stop / `docker stop`
/// / `kubectl delete` / `systemctl stop` (or a dev pressing Ctrl-C) hit this
/// path; routing into the same channel that `monitor_allowed_image_hashes`
/// uses lets the main `select!` exit gracefully.
///
/// Logs a per-signal warning if any individual install fails; if none can be
/// installed (rare), logs an error and exits without ever forwarding —
/// graceful shutdown via signals is disabled in that case but the rest of
/// the node keeps running.
///
/// Spawned as the first thing after the tokio runtime is built so signals
/// arriving during early startup (indexer bootstrap, contract state fetch,
/// attestation generation) are also handled gracefully — otherwise they'd
/// hit the process with no handler installed and the OS would terminate it
/// immediately, functionally identical to SIGKILL.
async fn await_and_forward_shutdown_signal(sender: mpsc::Sender<()>) {
let install = |kind: SignalKind, name: &'static str| {
signal(kind)
.inspect_err(|e| tracing::warn!(error = %e, signal = name, "failed to install shutdown-signal handler"))
.ok()
};
let mut sigterm = install(SignalKind::terminate(), "SIGTERM");
let mut sigint = install(SignalKind::interrupt(), "SIGINT");
let mut sighup = install(SignalKind::hangup(), "SIGHUP");
if sigterm.is_none() && sigint.is_none() && sighup.is_none() {
tracing::error!(
"no shutdown-signal handlers could be installed; graceful shutdown disabled"
);
return;
}
let signal_name = tokio::select! {
_ = await_optional_signal(sigterm.as_mut()) => "SIGTERM",
_ = await_optional_signal(sigint.as_mut()) => "SIGINT",
_ = await_optional_signal(sighup.as_mut()) => "SIGHUP",
};
tracing::warn!(
signal = signal_name,
"shutdown signal received, initiating graceful shutdown"
);
// `send` returns Err only if the receiver has been dropped (the main
// `select!` already exited via a different arm); we're shutting down
// anyway, so it's fine to ignore.
let _ = sender.send(()).await;
}

pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> {
init_logging(&config.log);

Expand Down Expand Up @@ -77,6 +136,11 @@ pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> {

let _tokio_enter_guard = root_runtime.enter();

let (shutdown_signal_sender, mut shutdown_signal_receiver) = mpsc::channel(1);
root_runtime.spawn(await_and_forward_shutdown_signal(
shutdown_signal_sender.clone(),
));

// Load configuration and initialize persistent secrets
let node_config = config.node.clone();
let persistent_secrets = PersistentSecrets::generate_or_get_existing(
Expand Down Expand Up @@ -180,7 +244,6 @@ pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> {
node_config.foreign_chains.clone(),
);

let (shutdown_signal_sender, mut shutdown_signal_receiver) = mpsc::channel(1);
let cancellation_token = CancellationToken::new();

let allowed_hashes_in_contract = indexer_api.allowed_docker_images_receiver.clone();
Expand Down Expand Up @@ -228,6 +291,22 @@ pub async fn run_mpc_node(config: StartConfig) -> anyhow::Result<()> {
let exit_result = image_hash_watcher_handle.await;
info!(?exit_result, "Image hash watcher exited.");

// Stop nearcore's actor system so its tasks have a chance to commit any
// in-flight RocksDB batches before the process exits. We deliberately do
// NOT then call `near_store::db::RocksDB::block_until_all_instances_are_dropped()`
// (which neard's standalone binary does next) — our embedded indexer
// runs in a separate `std::thread::spawn`'d closure whose `block_on`
// never returns, because the spawned monitor tasks hold
// `Arc<IndexerState>` → `Arc<RocksDB>` references that nothing
// currently cancels. Calling `block_until_all_instances_are_dropped`
// here would hang the SIGTERM path past any reasonable grace period;
// the orchestrator would then SIGKILL us and we'd land in the exact
// uncommitted-RocksDB state this handler was supposed to prevent.
// RocksDB's WAL still guarantees committed data survives a kill; this
// call mostly closes a smaller flush window.
info!("Stopping nearcore actor system.");
near_async::shutdown_all_actors();

exit_reason
}

Expand Down
Loading