From 1014ad7c1b9f3db1ddd3ec53733a6f371f706096 Mon Sep 17 00:00:00 2001 From: Farukest Date: Thu, 15 Jan 2026 01:53:45 +0300 Subject: [PATCH] feat(node/actor): abstract UnsafeHeadPublisher from EngineActor Introduce UnsafeHeadPublisher trait to decouple unsafe head publishing from EngineActor. This allows different publishing strategies to be injected into EngineProcessor. - Add UnsafeHeadPublisher trait with publish_if_modified method - Add WatchUnsafeHeadPublisher implementation using watch::Sender - Make EngineProcessor generic over UnsafeHeadPublisher - Add MockUnsafeHeadPublisher for testing Closes #3214 --- .../actors/engine/engine_request_processor.rs | 41 ++++++------ crates/node/service/src/actors/engine/mod.rs | 6 ++ .../actors/engine/unsafe_head_publisher.rs | 67 +++++++++++++++++++ crates/node/service/src/actors/mod.rs | 5 +- crates/node/service/src/lib.rs | 6 +- crates/node/service/src/service/node.rs | 10 ++- 6 files changed, 109 insertions(+), 26 deletions(-) create mode 100644 crates/node/service/src/actors/engine/unsafe_head_publisher.rs diff --git a/crates/node/service/src/actors/engine/engine_request_processor.rs b/crates/node/service/src/actors/engine/engine_request_processor.rs index 85b7c88a07..8dc9eddf6b 100644 --- a/crates/node/service/src/actors/engine/engine_request_processor.rs +++ b/crates/node/service/src/actors/engine/engine_request_processor.rs @@ -1,5 +1,6 @@ use crate::{ - BuildRequest, EngineClientError, EngineDerivationClient, EngineError, ResetRequest, SealRequest, + BuildRequest, EngineClientError, EngineDerivationClient, EngineError, ResetRequest, + SealRequest, UnsafeHeadPublisher, }; use kona_derive::{ResetSignal, Signal}; use kona_engine::{ @@ -10,10 +11,7 @@ use kona_genesis::RollupConfig; use kona_protocol::L2BlockInfo; use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; use std::sync::Arc; -use tokio::{ - sync::{mpsc, watch}, - task::JoinHandle, -}; +use tokio::{sync::mpsc, task::JoinHandle}; /// Requires that the implementor handles [`EngineProcessingRequest`]s via the provided channel. /// Note: this exists to facilitate unit testing rather than consolidate multiple implementations @@ -47,10 +45,11 @@ pub enum EngineProcessingRequest { /// this, it uses the [`Engine`] task queue to order Engine API interactions based off of /// the [`Ord`] implementation of [`EngineTask`]. #[derive(Debug)] -pub struct EngineProcessor +pub struct EngineProcessor where EngineClient_: EngineClient, DerivationClient: EngineDerivationClient, + UnsafeHeadPublisher_: UnsafeHeadPublisher, { /// The client used to send messages to the [`crate::DerivationActor`]. derivation_client: DerivationClient, @@ -58,12 +57,11 @@ where el_sync_complete: bool, /// The last safe head update sent. last_safe_head_sent: L2BlockInfo, - /// The [`RollupConfig`] . - /// A channel to use to relay the current unsafe head. - /// ## Note + /// The unsafe head publisher used to relay the current unsafe head. + /// /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator /// mode. - unsafe_head_tx: Option>, + unsafe_head_publisher: Option, /// The [`RollupConfig`] used to build tasks. rollup: Arc, @@ -73,10 +71,12 @@ where engine: Engine, } -impl EngineProcessor +impl + EngineProcessor where EngineClient_: EngineClient + 'static, DerivationClient: EngineDerivationClient + 'static, + UnsafeHeadPublisher_: UnsafeHeadPublisher + 'static, { /// Constructs a new [`EngineProcessor`] from the params. pub fn new( @@ -84,7 +84,7 @@ where config: Arc, derivation_client: DerivationClient, engine: Engine, - unsafe_head_tx: Option>, + unsafe_head_publisher: Option, ) -> Self { Self { client, @@ -93,7 +93,7 @@ where engine, last_safe_head_sent: L2BlockInfo::default(), rollup: config, - unsafe_head_tx, + unsafe_head_publisher, } } @@ -211,11 +211,12 @@ where } } -impl EngineRequestReceiver - for EngineProcessor +impl EngineRequestReceiver + for EngineProcessor where EngineClient_: EngineClient + 'static, DerivationClient: EngineDerivationClient + 'static, + UnsafeHeadPublisher_: UnsafeHeadPublisher + 'static, { fn start( mut self, @@ -229,12 +230,10 @@ where |err| error!(target: "engine", ?err, "Failed to drain engine tasks"), )?; - // If the unsafe head has updated, propagate it to the outbound channels. - if let Some(unsafe_head_tx) = self.unsafe_head_tx.as_ref() { - unsafe_head_tx.send_if_modified(|val| { - let new_head = self.engine.state().sync_state.unsafe_head(); - (*val != new_head).then(|| *val = new_head).is_some() - }); + // If the unsafe head has updated, propagate it via the publisher. + if let Some(publisher) = self.unsafe_head_publisher.as_ref() { + let new_head = self.engine.state().sync_state.unsafe_head(); + publisher.publish_if_modified(new_head); } // Wait for the next processing request. diff --git a/crates/node/service/src/actors/engine/mod.rs b/crates/node/service/src/actors/engine/mod.rs index 51acbe6e16..2b65259385 100644 --- a/crates/node/service/src/actors/engine/mod.rs +++ b/crates/node/service/src/actors/engine/mod.rs @@ -25,3 +25,9 @@ pub use engine_request_processor::{ mod rpc_request_processor; pub use rpc_request_processor::{EngineRpcProcessor, EngineRpcRequestReceiver}; + +mod unsafe_head_publisher; +pub use unsafe_head_publisher::{UnsafeHeadPublisher, WatchUnsafeHeadPublisher}; + +#[cfg(test)] +pub use unsafe_head_publisher::MockUnsafeHeadPublisher; diff --git a/crates/node/service/src/actors/engine/unsafe_head_publisher.rs b/crates/node/service/src/actors/engine/unsafe_head_publisher.rs new file mode 100644 index 0000000000..12c875bedb --- /dev/null +++ b/crates/node/service/src/actors/engine/unsafe_head_publisher.rs @@ -0,0 +1,67 @@ +//! Unsafe head publisher trait and implementations. + +use derive_more::Constructor; +use kona_protocol::L2BlockInfo; +use std::fmt::Debug; +use tokio::sync::watch; + +/// Trait for publishing unsafe head updates. +/// +/// This abstracts the mechanism for publishing unsafe head updates from the [`EngineProcessor`], +/// allowing for different publishing strategies to be injected. +/// +/// [`EngineProcessor`]: crate::EngineProcessor +#[cfg_attr(test, mockall::automock)] +pub trait UnsafeHeadPublisher: Debug + Send + Sync { + /// Publishes a new unsafe head if it has changed from the previous value. + /// + /// Returns `true` if the value was modified. + fn publish_if_modified(&self, new_head: L2BlockInfo) -> bool; +} + +/// A [`watch::Sender`]-based implementation of [`UnsafeHeadPublisher`]. +/// +/// This implementation uses a [`watch::Sender`] to broadcast unsafe head updates +/// to subscribers. It only sends updates when the new head differs from the current value. +#[derive(Debug, Constructor)] +pub struct WatchUnsafeHeadPublisher { + /// The watch sender used to broadcast unsafe head updates. + sender: watch::Sender, +} + +impl UnsafeHeadPublisher for WatchUnsafeHeadPublisher { + fn publish_if_modified(&self, new_head: L2BlockInfo) -> bool { + self.sender.send_if_modified(|val| (*val != new_head).then(|| *val = new_head).is_some()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use kona_protocol::BlockInfo; + + #[test] + fn test_publish_if_modified_returns_true_when_changed() { + let (tx, rx) = watch::channel(L2BlockInfo::default()); + let publisher = WatchUnsafeHeadPublisher::new(tx); + + let new_head = L2BlockInfo { + block_info: BlockInfo { number: 1, ..Default::default() }, + ..Default::default() + }; + assert!(publisher.publish_if_modified(new_head)); + assert_eq!(*rx.borrow(), new_head); + } + + #[test] + fn test_publish_if_modified_returns_false_when_unchanged() { + let initial = L2BlockInfo { + block_info: BlockInfo { number: 1, ..Default::default() }, + ..Default::default() + }; + let (tx, _rx) = watch::channel(initial); + let publisher = WatchUnsafeHeadPublisher::new(tx); + + assert!(!publisher.publish_if_modified(initial)); + } +} diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index eb63a0b280..e696f2b838 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -10,7 +10,8 @@ pub use engine::{ BuildRequest, EngineActor, EngineActorRequest, EngineClientError, EngineClientResult, EngineConfig, EngineDerivationClient, EngineError, EngineProcessingRequest, EngineProcessor, EngineRequestReceiver, EngineRpcProcessor, EngineRpcRequest, EngineRpcRequestReceiver, - QueuedEngineDerivationClient, ResetRequest, SealRequest, + QueuedEngineDerivationClient, ResetRequest, SealRequest, UnsafeHeadPublisher, + WatchUnsafeHeadPublisher, }; mod rpc; @@ -50,6 +51,8 @@ pub use sequencer::{ SequencerEngineClient, }; +#[cfg(test)] +pub use engine::MockUnsafeHeadPublisher; #[cfg(test)] pub use network::MockUnsafePayloadGossipClient; #[cfg(test)] diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index bc73912f78..b3ce0d691d 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -34,7 +34,8 @@ pub use actors::{ QueuedSequencerEngineClient, QueuedUnsafePayloadGossipClient, ResetRequest, RollupBoostAdminApiClient, RollupBoostHealthRpcClient, RpcActor, RpcActorError, RpcContext, SealRequest, SequencerActor, SequencerActorError, SequencerAdminQuery, SequencerConfig, - SequencerEngineClient, UnsafePayloadGossipClient, UnsafePayloadGossipClientError, + SequencerEngineClient, UnsafeHeadPublisher, UnsafePayloadGossipClient, + UnsafePayloadGossipClientError, WatchUnsafeHeadPublisher, }; mod metrics; @@ -42,5 +43,6 @@ pub use metrics::Metrics; #[cfg(test)] pub use actors::{ - MockConductor, MockOriginSelector, MockSequencerEngineClient, MockUnsafePayloadGossipClient, + MockConductor, MockOriginSelector, MockSequencerEngineClient, MockUnsafeHeadPublisher, + MockUnsafePayloadGossipClient, }; diff --git a/crates/node/service/src/service/node.rs b/crates/node/service/src/service/node.rs index 3690858eae..291ff1bf5f 100644 --- a/crates/node/service/src/service/node.rs +++ b/crates/node/service/src/service/node.rs @@ -7,7 +7,7 @@ use crate::{ QueuedEngineDerivationClient, QueuedEngineRpcClient, QueuedL1WatcherDerivationClient, QueuedNetworkEngineClient, QueuedSequencerAdminAPIClient, QueuedSequencerEngineClient, RollupBoostAdminApiClient, RollupBoostHealthRpcClient, RpcActor, RpcContext, SequencerActor, - SequencerConfig, + SequencerConfig, WatchUnsafeHeadPublisher, actors::{BlockStream, NetworkInboundData, QueuedUnsafePayloadGossipClient}, }; use alloy_eips::BlockNumberOrTag; @@ -193,6 +193,7 @@ impl RollupNode { EngineProcessor< OpEngineClient>, QueuedEngineDerivationClient, + WatchUnsafeHeadPublisher, >, EngineRpcProcessor>>, >, @@ -208,12 +209,17 @@ impl RollupNode { format!("Engine client build failed: {e:?}") })?); + let unsafe_head_publisher = if self.mode().is_sequencer() { + Some(WatchUnsafeHeadPublisher::new(unsafe_head_tx)) + } else { + None + }; let engine_processor = EngineProcessor::new( engine_client.clone(), self.config.clone(), derivation_client, engine, - if self.mode().is_sequencer() { Some(unsafe_head_tx) } else { None }, + unsafe_head_publisher, ); let engine_rpc_processor = EngineRpcProcessor::new(