Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions crates/node/service/src/actors/engine/engine_request_processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
BuildRequest, EngineClientError, EngineDerivationClient, EngineError, ResetRequest, SealRequest,
BuildRequest, EngineClientError, EngineDerivationClient, EngineError, ResetRequest,
SealRequest, UnsafeHeadPublisher,
};
use kona_derive::{ResetSignal, Signal};
use kona_engine::{
Expand All @@ -10,10 +11,7 @@ use kona_genesis::RollupConfig;
use kona_protocol::L2BlockInfo;
use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope;
use std::sync::Arc;
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
};
use tokio::{sync::mpsc, task::JoinHandle};

/// Requires that the implementor handles [`EngineProcessingRequest`]s via the provided channel.
/// Note: this exists to facilitate unit testing rather than consolidate multiple implementations
Expand Down Expand Up @@ -47,23 +45,23 @@ pub enum EngineProcessingRequest {
/// this, it uses the [`Engine`] task queue to order Engine API interactions based off of
/// the [`Ord`] implementation of [`EngineTask`].
#[derive(Debug)]
pub struct EngineProcessor<EngineClient_, DerivationClient>
pub struct EngineProcessor<EngineClient_, DerivationClient, UnsafeHeadPublisher_>
where
EngineClient_: EngineClient,
DerivationClient: EngineDerivationClient,
UnsafeHeadPublisher_: UnsafeHeadPublisher,
{
/// The client used to send messages to the [`crate::DerivationActor`].
derivation_client: DerivationClient,
/// Whether the EL sync is complete. This should only ever go from false to true.
el_sync_complete: bool,
/// The last safe head update sent.
last_safe_head_sent: L2BlockInfo,
/// The [`RollupConfig`] .
/// A channel to use to relay the current unsafe head.
/// ## Note
/// The unsafe head publisher used to relay the current unsafe head.
///
/// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator
/// mode.
unsafe_head_tx: Option<watch::Sender<L2BlockInfo>>,
unsafe_head_publisher: Option<UnsafeHeadPublisher_>,

/// The [`RollupConfig`] used to build tasks.
rollup: Arc<RollupConfig>,
Expand All @@ -73,18 +71,20 @@ where
engine: Engine<EngineClient_>,
}

impl<EngineClient_, DerivationClient> EngineProcessor<EngineClient_, DerivationClient>
impl<EngineClient_, DerivationClient, UnsafeHeadPublisher_>
EngineProcessor<EngineClient_, DerivationClient, UnsafeHeadPublisher_>
where
EngineClient_: EngineClient + 'static,
DerivationClient: EngineDerivationClient + 'static,
UnsafeHeadPublisher_: UnsafeHeadPublisher + 'static,
{
/// Constructs a new [`EngineProcessor`] from the params.
pub fn new(
client: Arc<EngineClient_>,
config: Arc<RollupConfig>,
derivation_client: DerivationClient,
engine: Engine<EngineClient_>,
unsafe_head_tx: Option<watch::Sender<L2BlockInfo>>,
unsafe_head_publisher: Option<UnsafeHeadPublisher_>,
) -> Self {
Self {
client,
Expand All @@ -93,7 +93,7 @@ where
engine,
last_safe_head_sent: L2BlockInfo::default(),
rollup: config,
unsafe_head_tx,
unsafe_head_publisher,
}
}

Expand Down Expand Up @@ -211,11 +211,12 @@ where
}
}

impl<EngineClient_, DerivationClient> EngineRequestReceiver
for EngineProcessor<EngineClient_, DerivationClient>
impl<EngineClient_, DerivationClient, UnsafeHeadPublisher_> EngineRequestReceiver
for EngineProcessor<EngineClient_, DerivationClient, UnsafeHeadPublisher_>
where
EngineClient_: EngineClient + 'static,
DerivationClient: EngineDerivationClient + 'static,
UnsafeHeadPublisher_: UnsafeHeadPublisher + 'static,
{
fn start(
mut self,
Expand All @@ -229,12 +230,10 @@ where
|err| error!(target: "engine", ?err, "Failed to drain engine tasks"),
)?;

// If the unsafe head has updated, propagate it to the outbound channels.
if let Some(unsafe_head_tx) = self.unsafe_head_tx.as_ref() {
unsafe_head_tx.send_if_modified(|val| {
let new_head = self.engine.state().sync_state.unsafe_head();
(*val != new_head).then(|| *val = new_head).is_some()
});
// If the unsafe head has updated, propagate it via the publisher.
if let Some(publisher) = self.unsafe_head_publisher.as_ref() {
let new_head = self.engine.state().sync_state.unsafe_head();
publisher.publish_if_modified(new_head);
}

// Wait for the next processing request.
Expand Down
6 changes: 6 additions & 0 deletions crates/node/service/src/actors/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ pub use engine_request_processor::{

mod rpc_request_processor;
pub use rpc_request_processor::{EngineRpcProcessor, EngineRpcRequestReceiver};

mod unsafe_head_publisher;
pub use unsafe_head_publisher::{UnsafeHeadPublisher, WatchUnsafeHeadPublisher};

#[cfg(test)]
pub use unsafe_head_publisher::MockUnsafeHeadPublisher;
67 changes: 67 additions & 0 deletions crates/node/service/src/actors/engine/unsafe_head_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Unsafe head publisher trait and implementations.

use derive_more::Constructor;
use kona_protocol::L2BlockInfo;
use std::fmt::Debug;
use tokio::sync::watch;

/// Trait for publishing unsafe head updates.
///
/// This abstracts the mechanism for publishing unsafe head updates from the [`EngineProcessor`],
/// allowing for different publishing strategies to be injected.
///
/// [`EngineProcessor`]: crate::EngineProcessor
#[cfg_attr(test, mockall::automock)]
pub trait UnsafeHeadPublisher: Debug + Send + Sync {
/// Publishes a new unsafe head if it has changed from the previous value.
///
/// Returns `true` if the value was modified.
fn publish_if_modified(&self, new_head: L2BlockInfo) -> bool;
}

/// A [`watch::Sender`]-based implementation of [`UnsafeHeadPublisher`].
///
/// This implementation uses a [`watch::Sender`] to broadcast unsafe head updates
/// to subscribers. It only sends updates when the new head differs from the current value.
#[derive(Debug, Constructor)]
pub struct WatchUnsafeHeadPublisher {
/// The watch sender used to broadcast unsafe head updates.
sender: watch::Sender<L2BlockInfo>,
}

impl UnsafeHeadPublisher for WatchUnsafeHeadPublisher {
fn publish_if_modified(&self, new_head: L2BlockInfo) -> bool {
self.sender.send_if_modified(|val| (*val != new_head).then(|| *val = new_head).is_some())
}
}

#[cfg(test)]
mod tests {
use super::*;
use kona_protocol::BlockInfo;

#[test]
fn test_publish_if_modified_returns_true_when_changed() {
let (tx, rx) = watch::channel(L2BlockInfo::default());
let publisher = WatchUnsafeHeadPublisher::new(tx);

let new_head = L2BlockInfo {
block_info: BlockInfo { number: 1, ..Default::default() },
..Default::default()
};
assert!(publisher.publish_if_modified(new_head));
assert_eq!(*rx.borrow(), new_head);
}

#[test]
fn test_publish_if_modified_returns_false_when_unchanged() {
let initial = L2BlockInfo {
block_info: BlockInfo { number: 1, ..Default::default() },
..Default::default()
};
let (tx, _rx) = watch::channel(initial);
let publisher = WatchUnsafeHeadPublisher::new(tx);

assert!(!publisher.publish_if_modified(initial));
}
}
5 changes: 4 additions & 1 deletion crates/node/service/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ pub use engine::{
BuildRequest, EngineActor, EngineActorRequest, EngineClientError, EngineClientResult,
EngineConfig, EngineDerivationClient, EngineError, EngineProcessingRequest, EngineProcessor,
EngineRequestReceiver, EngineRpcProcessor, EngineRpcRequest, EngineRpcRequestReceiver,
QueuedEngineDerivationClient, ResetRequest, SealRequest,
QueuedEngineDerivationClient, ResetRequest, SealRequest, UnsafeHeadPublisher,
WatchUnsafeHeadPublisher,
};

mod rpc;
Expand Down Expand Up @@ -50,6 +51,8 @@ pub use sequencer::{
SequencerEngineClient,
};

#[cfg(test)]
pub use engine::MockUnsafeHeadPublisher;
#[cfg(test)]
pub use network::MockUnsafePayloadGossipClient;
#[cfg(test)]
Expand Down
6 changes: 4 additions & 2 deletions crates/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ pub use actors::{
QueuedSequencerEngineClient, QueuedUnsafePayloadGossipClient, ResetRequest,
RollupBoostAdminApiClient, RollupBoostHealthRpcClient, RpcActor, RpcActorError, RpcContext,
SealRequest, SequencerActor, SequencerActorError, SequencerAdminQuery, SequencerConfig,
SequencerEngineClient, UnsafePayloadGossipClient, UnsafePayloadGossipClientError,
SequencerEngineClient, UnsafeHeadPublisher, UnsafePayloadGossipClient,
UnsafePayloadGossipClientError, WatchUnsafeHeadPublisher,
};

mod metrics;
pub use metrics::Metrics;

#[cfg(test)]
pub use actors::{
MockConductor, MockOriginSelector, MockSequencerEngineClient, MockUnsafePayloadGossipClient,
MockConductor, MockOriginSelector, MockSequencerEngineClient, MockUnsafeHeadPublisher,
MockUnsafePayloadGossipClient,
};
10 changes: 8 additions & 2 deletions crates/node/service/src/service/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
QueuedEngineDerivationClient, QueuedEngineRpcClient, QueuedL1WatcherDerivationClient,
QueuedNetworkEngineClient, QueuedSequencerAdminAPIClient, QueuedSequencerEngineClient,
RollupBoostAdminApiClient, RollupBoostHealthRpcClient, RpcActor, RpcContext, SequencerActor,
SequencerConfig,
SequencerConfig, WatchUnsafeHeadPublisher,
actors::{BlockStream, NetworkInboundData, QueuedUnsafePayloadGossipClient},
};
use alloy_eips::BlockNumberOrTag;
Expand Down Expand Up @@ -193,6 +193,7 @@ impl RollupNode {
EngineProcessor<
OpEngineClient<RootProvider, RootProvider<Optimism>>,
QueuedEngineDerivationClient,
WatchUnsafeHeadPublisher,
>,
EngineRpcProcessor<OpEngineClient<RootProvider, RootProvider<Optimism>>>,
>,
Expand All @@ -208,12 +209,17 @@ impl RollupNode {
format!("Engine client build failed: {e:?}")
})?);

let unsafe_head_publisher = if self.mode().is_sequencer() {
Some(WatchUnsafeHeadPublisher::new(unsafe_head_tx))
} else {
None
};
let engine_processor = EngineProcessor::new(
engine_client.clone(),
self.config.clone(),
derivation_client,
engine,
if self.mode().is_sequencer() { Some(unsafe_head_tx) } else { None },
unsafe_head_publisher,
);

let engine_rpc_processor = EngineRpcProcessor::new(
Expand Down