From ee6392a5b3fa7b7f403fd0f2472a73f864b72688 Mon Sep 17 00:00:00 2001 From: Farukest Date: Thu, 15 Jan 2026 02:12:24 +0300 Subject: [PATCH] feat(node/rpc): implement admin state persistence Implement the rpc.admin-state CLI flag functionality to persist admin API state changes (sequencer active/stopped, recovery mode) across node restarts. - Add AdminState struct with serde serialization support - Add AdminStatePersistence helper for loading/saving state to disk - Add admin_state_persistence field to SequencerActor - Persist state on start_sequencer, stop_sequencer, set_recovery_mode calls - Load persisted state on node startup if configured When --rpc.admin-state is set to a file path, the node will: 1. Load saved state on startup (if file exists) 2. Save state changes whenever admin API modifies sequencer state Closes #3124 --- Cargo.lock | 1 + crates/node/rpc/Cargo.toml | 3 +- crates/node/rpc/src/admin_state.rs | 191 ++++++++++++++++++ crates/node/rpc/src/lib.rs | 6 + .../service/src/actors/sequencer/actor.rs | 3 + .../src/actors/sequencer/admin_api_impl.rs | 13 +- .../src/actors/sequencer/tests/test_util.rs | 2 + crates/node/service/src/service/node.rs | 34 +++- 8 files changed, 248 insertions(+), 5 deletions(-) create mode 100644 crates/node/rpc/src/admin_state.rs diff --git a/Cargo.lock b/Cargo.lock index 25e1ceac38..93b581df7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5487,6 +5487,7 @@ dependencies = [ "rollup-boost", "serde", "serde_json", + "tempfile", "thiserror 2.0.17", "tokio", "tracing", diff --git a/crates/node/rpc/Cargo.toml b/crates/node/rpc/Cargo.toml index dadfbeb43c..2cc0d16dc3 100644 --- a/crates/node/rpc/Cargo.toml +++ b/crates/node/rpc/Cargo.toml @@ -49,6 +49,7 @@ backon = { workspace = true } # `serde` serde = { workspace = true, features = ["std"] } +serde_json.workspace = true # `jsonrpsee` jsonrpsee = { workspace = true, features = ["macros", "server"] } @@ -65,7 +66,7 @@ metrics = { workspace = true, optional = true } rollup-boost.workspace = true [dev-dependencies] -serde_json.workspace = true +tempfile.workspace = true [features] default = [] diff --git a/crates/node/rpc/src/admin_state.rs b/crates/node/rpc/src/admin_state.rs new file mode 100644 index 0000000000..ab9614981b --- /dev/null +++ b/crates/node/rpc/src/admin_state.rs @@ -0,0 +1,191 @@ +//! Admin State Persistence +//! +//! This module provides functionality to persist admin API state changes +//! so they survive node restarts. + +use serde::{Deserialize, Serialize}; +use std::{ + io, + path::{Path, PathBuf}, +}; + +/// The persisted admin state. +/// +/// This struct represents the state that can be modified via the admin API +/// and should persist across node restarts. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct AdminState { + /// Whether the sequencer is active (started/stopped). + #[serde(default)] + pub sequencer_active: bool, + /// Whether the sequencer is in recovery mode. + #[serde(default)] + pub recovery_mode: bool, +} + +impl AdminState { + /// Creates a new [`AdminState`] with the given values. + pub const fn new(sequencer_active: bool, recovery_mode: bool) -> Self { + Self { sequencer_active, recovery_mode } + } +} + +/// Error type for admin state persistence operations. +#[derive(Debug, thiserror::Error)] +pub enum AdminStatePersistenceError { + /// IO error while reading or writing the state file. + #[error("IO error: {0}")] + Io(#[from] io::Error), + /// JSON serialization/deserialization error. + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), +} + +/// Loads the admin state from the given file path. +/// +/// If the file does not exist, returns `None`. +/// If the file exists but cannot be parsed, returns an error. +pub fn load_admin_state(path: &Path) -> Result, AdminStatePersistenceError> { + if !path.exists() { + return Ok(None); + } + + let contents = std::fs::read_to_string(path)?; + let state: AdminState = serde_json::from_str(&contents)?; + Ok(Some(state)) +} + +/// Saves the admin state to the given file path. +/// +/// Creates parent directories if they don't exist. +pub fn save_admin_state(path: &Path, state: &AdminState) -> Result<(), AdminStatePersistenceError> { + // Create parent directories if needed + if let Some(parent) = path.parent() { + if !parent.exists() { + std::fs::create_dir_all(parent)?; + } + } + + let contents = serde_json::to_string_pretty(state)?; + std::fs::write(path, contents)?; + Ok(()) +} + +/// A helper struct that wraps an optional persistence path and provides +/// convenient methods for loading and saving admin state. +#[derive(Debug, Clone, Default)] +pub struct AdminStatePersistence { + /// The path to the admin state file, if persistence is enabled. + path: Option, +} + +impl AdminStatePersistence { + /// Creates a new [`AdminStatePersistence`] with the given path. + pub const fn new(path: Option) -> Self { + Self { path } + } + + /// Returns whether persistence is enabled. + pub const fn is_enabled(&self) -> bool { + self.path.is_some() + } + + /// Loads the admin state from the configured path. + /// + /// Returns `None` if persistence is disabled or the file doesn't exist. + pub fn load(&self) -> Result, AdminStatePersistenceError> { + match &self.path { + Some(path) => load_admin_state(path), + None => Ok(None), + } + } + + /// Saves the admin state to the configured path. + /// + /// Does nothing if persistence is disabled. + pub fn save(&self, state: &AdminState) -> Result<(), AdminStatePersistenceError> { + if let Some(path) = &self.path { + save_admin_state(path, state)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_admin_state_default() { + let state = AdminState::default(); + assert!(!state.sequencer_active); + assert!(!state.recovery_mode); + } + + #[test] + fn test_admin_state_serialization() { + let state = AdminState::new(true, false); + let json = serde_json::to_string(&state).unwrap(); + let deserialized: AdminState = serde_json::from_str(&json).unwrap(); + assert_eq!(state, deserialized); + } + + #[test] + fn test_load_nonexistent_file() { + let result = load_admin_state(Path::new("/nonexistent/path/state.json")); + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + } + + #[test] + fn test_save_and_load_admin_state() { + let dir = tempdir().unwrap(); + let path = dir.path().join("admin_state.json"); + + let state = AdminState::new(true, true); + save_admin_state(&path, &state).unwrap(); + + let loaded = load_admin_state(&path).unwrap(); + assert_eq!(loaded, Some(state)); + } + + #[test] + fn test_save_creates_parent_directories() { + let dir = tempdir().unwrap(); + let path = dir.path().join("nested").join("dir").join("state.json"); + + let state = AdminState::new(false, true); + save_admin_state(&path, &state).unwrap(); + + assert!(path.exists()); + let loaded = load_admin_state(&path).unwrap(); + assert_eq!(loaded, Some(state)); + } + + #[test] + fn test_persistence_disabled() { + let persistence = AdminStatePersistence::new(None); + assert!(!persistence.is_enabled()); + assert!(persistence.load().unwrap().is_none()); + + // Save should succeed (no-op) when disabled + let state = AdminState::new(true, true); + assert!(persistence.save(&state).is_ok()); + } + + #[test] + fn test_persistence_enabled() { + let dir = tempdir().unwrap(); + let path = dir.path().join("admin_state.json"); + let persistence = AdminStatePersistence::new(Some(path)); + + assert!(persistence.is_enabled()); + + let state = AdminState::new(true, false); + persistence.save(&state).unwrap(); + + let loaded = persistence.load().unwrap(); + assert_eq!(loaded, Some(state)); + } +} diff --git a/crates/node/rpc/src/lib.rs b/crates/node/rpc/src/lib.rs index 0e9ccd5814..d2ba0d63f4 100644 --- a/crates/node/rpc/src/lib.rs +++ b/crates/node/rpc/src/lib.rs @@ -12,6 +12,12 @@ extern crate tracing; mod admin; pub use admin::{AdminRpc, NetworkAdminQuery, RollupBoostAdminQuery}; +mod admin_state; +pub use admin_state::{ + AdminState, AdminStatePersistence, AdminStatePersistenceError, load_admin_state, + save_admin_state, +}; + mod client; pub use client::{ EngineRpcClient, RollupBoostAdminClient, SequencerAdminAPIClient, SequencerAdminAPIError, diff --git a/crates/node/service/src/actors/sequencer/actor.rs b/crates/node/service/src/actors/sequencer/actor.rs index 59b4b8efd7..63ab45f894 100644 --- a/crates/node/service/src/actors/sequencer/actor.rs +++ b/crates/node/service/src/actors/sequencer/actor.rs @@ -23,6 +23,7 @@ use kona_derive::{AttributesBuilder, PipelineErrorKind}; use kona_engine::{InsertTaskError, SealTaskError, SynchronizeTaskError}; use kona_genesis::RollupConfig; use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent}; +use kona_rpc::AdminStatePersistence; use op_alloy_rpc_types_engine::OpPayloadAttributes; use std::{ sync::Arc, @@ -69,6 +70,8 @@ pub struct SequencerActor< { /// Receiver for admin API requests. pub admin_api_rx: mpsc::Receiver, + /// The admin state persistence handler. + pub admin_state_persistence: AdminStatePersistence, /// The attributes builder used for block building. pub attributes_builder: AttributesBuilder_, /// The cancellation token, shared between all tasks. diff --git a/crates/node/service/src/actors/sequencer/admin_api_impl.rs b/crates/node/service/src/actors/sequencer/admin_api_impl.rs index b9bc76c1fe..d29d4b5abc 100644 --- a/crates/node/service/src/actors/sequencer/admin_api_impl.rs +++ b/crates/node/service/src/actors/sequencer/admin_api_impl.rs @@ -2,7 +2,7 @@ use super::SequencerActor; use crate::{Conductor, OriginSelector, SequencerEngineClient, UnsafePayloadGossipClient}; use alloy_primitives::B256; use kona_derive::AttributesBuilder; -use kona_rpc::SequencerAdminAPIError; +use kona_rpc::{AdminState, SequencerAdminAPIError}; use tokio::sync::oneshot; /// The query types to the sequencer actor for the admin api. @@ -120,6 +120,7 @@ where info!(target: "sequencer", "Starting sequencer"); self.is_active = true; + self.persist_admin_state(); self.update_metrics(); Ok(()) @@ -130,6 +131,7 @@ where info!(target: "sequencer", "Stopping sequencer"); self.is_active = false; + self.persist_admin_state(); self.update_metrics(); self.engine_client.get_unsafe_head().await @@ -148,6 +150,7 @@ where self.in_recovery_mode = is_active; info!(target: "sequencer", is_active, "Updated recovery mode"); + self.persist_admin_state(); self.update_metrics(); Ok(()) @@ -180,4 +183,12 @@ where SequencerAdminAPIError::RequestError(format!("Failed to reset engine: {e}")) }) } + + /// Persists the current admin state to disk, if persistence is enabled. + fn persist_admin_state(&self) { + let state = AdminState::new(self.is_active, self.in_recovery_mode); + if let Err(e) = self.admin_state_persistence.save(&state) { + warn!(target: "sequencer", err=?e, "Failed to persist admin state"); + } + } } diff --git a/crates/node/service/src/actors/sequencer/tests/test_util.rs b/crates/node/service/src/actors/sequencer/tests/test_util.rs index 3a6a08b4e5..81c74073a4 100644 --- a/crates/node/service/src/actors/sequencer/tests/test_util.rs +++ b/crates/node/service/src/actors/sequencer/tests/test_util.rs @@ -6,6 +6,7 @@ use crate::{ }; use kona_derive::test_utils::TestAttributesBuilder; use kona_genesis::RollupConfig; +use kona_rpc::AdminStatePersistence; use std::sync::Arc; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -23,6 +24,7 @@ pub(crate) fn test_actor() -> SequencerActor< let (_admin_api_tx, admin_api_rx) = mpsc::channel(20); SequencerActor { admin_api_rx, + admin_state_persistence: AdminStatePersistence::new(None), attributes_builder: TestAttributesBuilder { attributes: vec![] }, cancellation_token: CancellationToken::new(), conductor: None, diff --git a/crates/node/service/src/service/node.rs b/crates/node/service/src/service/node.rs index 3690858eae..b340ae8f3a 100644 --- a/crates/node/service/src/service/node.rs +++ b/crates/node/service/src/service/node.rs @@ -20,7 +20,7 @@ use kona_providers_alloy::{ AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProvider, OnlinePipeline, }; -use kona_rpc::RpcBuilder; +use kona_rpc::{AdminStatePersistence, RpcBuilder}; use op_alloy_network::Optimism; use std::{ops::Not as _, sync::Arc, time::Duration}; use tokio::sync::{mpsc, watch}; @@ -367,15 +367,43 @@ impl RollupNode { let queued_gossip_client = QueuedUnsafePayloadGossipClient::new(gossip_payload_tx.clone()); + // Load persisted admin state if configured + let admin_persistence_path = + self.rpc_builder.as_ref().and_then(|b| b.admin_persistence.clone()); + let admin_state_persistence = AdminStatePersistence::new(admin_persistence_path); + + // Determine initial state from persisted state (if available) or CLI config + let (is_active, in_recovery_mode) = match admin_state_persistence.load() { + Ok(Some(state)) => { + info!(target: "service", sequencer_active = state.sequencer_active, recovery_mode = state.recovery_mode, "Loaded persisted admin state"); + (state.sequencer_active, state.recovery_mode) + } + Ok(None) => { + // No persisted state, use CLI config + ( + self.sequencer_config.sequencer_stopped.not(), + self.sequencer_config.sequencer_recovery_mode, + ) + } + Err(e) => { + warn!(target: "service", err = ?e, "Failed to load persisted admin state, using CLI config"); + ( + self.sequencer_config.sequencer_stopped.not(), + self.sequencer_config.sequencer_recovery_mode, + ) + } + }; + ( Some(SequencerActor { admin_api_rx: sequencer_admin_api_rx, + admin_state_persistence, attributes_builder: self.create_attributes_builder(), cancellation_token: cancellation.clone(), conductor, engine_client: sequencer_engine_client, - is_active: self.sequencer_config.sequencer_stopped.not(), - in_recovery_mode: self.sequencer_config.sequencer_recovery_mode, + is_active, + in_recovery_mode, origin_selector: delayed_origin_selector, rollup_config: self.config.clone(), unsafe_payload_gossip_client: queued_gossip_client,