Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ pub struct Blockchain {
/// Set to true after initial sync completes, never reset to false.
/// Does not reflect whether an ongoing sync is in progress.
is_synced: AtomicBool,
/// Whether a deep-reorg apply is currently in flight (Section 11).
///
/// Set on entry to `reorg_apply_deep`, cleared on exit (via the
/// `ReorgGuard` drop). While set, engine API `forkchoiceUpdated` calls
/// short-circuit to `SYNCING` (matching Reth's `BackfillSyncState::Active`
/// gating at `crates/engine/tree/src/tree/mod.rs:1173-1178`).
reorg_in_progress: AtomicBool,
/// Configuration options for blockchain behavior.
pub options: BlockchainOptions,
/// Cache of recently built payloads.
Expand Down Expand Up @@ -333,23 +340,64 @@ impl Blockchain {
storage: store,
mempool: Mempool::new(blockchain_opts.max_mempool_size),
is_synced: AtomicBool::new(false),
reorg_in_progress: AtomicBool::new(false),
payloads: Arc::new(TokioMutex::new(Vec::new())),
options: blockchain_opts,
merkle_pool: Self::build_merkle_pool(),
}
}

/// Returns a reference to the underlying [`Store`]. Used by code that
/// orchestrates storage-side operations (e.g. the deep-reorg apply path)
/// alongside `Blockchain`'s execution capabilities.
pub fn store(&self) -> &Store {
&self.storage
}

pub fn default_with_store(store: Store) -> Self {
Self {
storage: store,
mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT),
is_synced: AtomicBool::new(false),
reorg_in_progress: AtomicBool::new(false),
payloads: Arc::new(TokioMutex::new(Vec::new())),
options: BlockchainOptions::default(),
merkle_pool: Self::build_merkle_pool(),
}
}

/// Returns `true` if a deep-reorg apply is currently in flight.
/// While true, `apply_fork_choice_with_deep_reorg` short-circuits to
/// `InvalidForkChoice::Syncing` and the engine API responds `SYNCING`.
pub fn is_reorg_in_progress(&self) -> bool {
self.reorg_in_progress
.load(std::sync::atomic::Ordering::Acquire)
}

/// Internal: marks a deep-reorg apply as in flight. Returns a guard whose
/// `Drop` clears the flag — use this to ensure the flag is cleared on
/// every exit path (success, error, panic).
pub(crate) fn enter_reorg(&self) -> ReorgGuard<'_> {
self.reorg_in_progress
.store(true, std::sync::atomic::Ordering::Release);
ReorgGuard { blockchain: self }
}
}

/// RAII guard that clears `Blockchain::reorg_in_progress` on drop.
pub(crate) struct ReorgGuard<'a> {
blockchain: &'a Blockchain,
}

impl<'a> Drop for ReorgGuard<'a> {
fn drop(&mut self) {
self.blockchain
.reorg_in_progress
.store(false, std::sync::atomic::Ordering::Release);
}
}

impl Blockchain {
/// Executes a block withing a new vm instance and state
fn execute_block(
&self,
Expand Down
234 changes: 232 additions & 2 deletions crates/blockchain/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use ethrex_common::{
};
use ethrex_metrics::metrics;
use ethrex_storage::{Store, error::StoreError};
use tracing::{error, warn};
use std::collections::HashMap;
use tracing::{error, info, warn};

use crate::{
error::{self, InvalidForkChoice},
Blockchain,
error::{self, ChainError, InvalidForkChoice},
is_canonical,
};

Expand Down Expand Up @@ -205,3 +207,231 @@ async fn find_link_with_canonical_chain(

Ok(None)
}

// ===========================================================================
// Deep-reorg apply path (Section 8 orchestration).
// ===========================================================================

/// Wrapper around [`apply_fork_choice`] that handles the deep-reorg case:
/// when the head's state is not directly reachable from the on-disk trie,
/// build an in-memory overlay from the journal, replay the side chain
/// against it, and reconcile on the first new-chain commit.
///
/// Falls through to the simpler `apply_fork_choice` for shallow reorgs and
/// no-op cases.
///
/// Pre-condition: the journal contains entries down to the deepest required
/// pivot. If finalization has pruned past the pivot, the call returns
/// [`InvalidForkChoice::StateNotReachable`] (the engine API responds with
/// `SYNCING`, matching today's behavior pre-deep-reorg).
pub async fn apply_fork_choice_with_deep_reorg(
blockchain: &Blockchain,
head_hash: H256,
safe_hash: H256,
finalized_hash: H256,
) -> Result<BlockHeader, InvalidForkChoice> {
// Section 11 — short-circuit when a previous deep-reorg apply is still in
// flight. The CL retries on SYNCING; once the in-progress reorg completes,
// the next FCU is processed normally. Reth's pattern at
// `crates/engine/tree/src/tree/mod.rs:1173-1178`.
if blockchain.is_reorg_in_progress() {
return Err(InvalidForkChoice::Syncing);
}

let store = blockchain.store();
match apply_fork_choice(store, head_hash, safe_hash, finalized_hash).await {
Ok(header) => Ok(header),
Err(InvalidForkChoice::StateNotReachable) => {
info!(%head_hash, "head state not directly reachable; attempting deep-reorg apply");
reorg_apply_deep(blockchain, head_hash, safe_hash, finalized_hash).await
}
Err(e) => Err(e),
}
}

/// Drives the deep-reorg apply pass:
///
/// 1. Walk back through `HEADERS` to find the pivot — the deepest block on
/// the OLD canonical chain that is also an ancestor of the new head.
/// 2. Look up the cache edge `D` from `STATE_HISTORY` (the highest journal
/// entry's block number).
/// 3. Build the OLD canonical chain's hash chain in `[pivot+1, D]` so the
/// overlay constructor can verify each journal entry's `block_hash`.
/// 4. Install the overlay (storage primitive — Section 8.3-8.5).
/// 5. Execute the side-chain blocks `[pivot+1 .. new_head]` in chain order
/// via `Blockchain::add_block`. The first such block's commit triggers
/// the Section 9 reconciliation that folds overlay + layer_T into a
/// single atomic disk write.
/// 6. Update `CANONICAL_BLOCK_HASHES` via `forkchoice_update`.
async fn reorg_apply_deep(
blockchain: &Blockchain,
head_hash: H256,
safe_hash: H256,
finalized_hash: H256,
) -> Result<BlockHeader, InvalidForkChoice> {
// Mark the reorg in progress for the duration of this call. The guard
// clears the flag on every exit path (success, early return, panic via
// unwinding). Concurrent FCUs from the engine API will see the flag set
// and short-circuit to SYNCING (see `apply_fork_choice_with_deep_reorg`).
let _reorg_guard = blockchain.enter_reorg();

let store = blockchain.store();

let head = store
.get_block_header_by_hash(head_hash)?
.ok_or(InvalidForkChoice::Syncing)?;

// Branch is the side-fork chain in DESCENDING order (new_head's parent
// first, then deeper). The deepest entry's `(number-1)` is the pivot.
let new_canonical_blocks = find_link_with_canonical_chain(store, &head)
.await?
.ok_or(InvalidForkChoice::UnlinkedHead)?;

// Pivot = parent of the deepest side-fork entry, or head's direct parent
// if the branch is empty (head's parent is canonical, no real reorg).
let pivot_number = match new_canonical_blocks.last() {
Some((n, _)) => n.saturating_sub(1),
None => head.number.saturating_sub(1),
};

// The overlay's range is `[pivot+1, edge]` where edge is the highest
// committed block (= highest journal entry).
let edge = store
.highest_state_history_block_number()?
.ok_or(InvalidForkChoice::StateNotReachable)?;
let to_block = pivot_number.saturating_add(1);
if edge < to_block {
// The pivot is above the cache edge — apply_fork_choice should have
// handled this as a shallow reorg. If we reach here, something is
// off; punt.
warn!(
edge,
to_block, "deep-reorg path entered but pivot is above cache edge"
);
return Err(InvalidForkChoice::StateNotReachable);
}

// Pre-build the OLD canonical chain's hash lookup for journal verification.
// This must reflect the chain BEFORE we update CANONICAL_BLOCK_HASHES below.
let mut canonical_hashes: HashMap<BlockNumber, H256> = HashMap::new();
for n in to_block..=edge {
if let Some(hash) = store.get_canonical_block_hash_sync(n)? {
canonical_hashes.insert(n, hash);
}
}

// Install overlay. Errors abort cleanly; the existing cache stays intact.
store
.install_overlay_for_reorg(edge, to_block, |n| canonical_hashes.get(&n).copied())
.map_err(|e| {
error!(error = %e, "deep-reorg: overlay install failed");
InvalidForkChoice::StateNotReachable
})?;

// From this point on, ANY error must reset the layer cache to a fresh
// empty state — otherwise the half-installed overlay + partial new-chain
// layers would leak into subsequent FCU evaluations. The guard fires
// `store.abort_reorg()` on drop unless `disarm()` is called below after
// a successful canonical-hash update. (Section 13.)
let mut abort_guard = AbortReorgGuard::new(store);

// Execute the side-chain blocks in CHAIN order (oldest first). The
// existing `add_block` path handles execution + storage; layer cache
// reads cascade through the freshly-installed overlay.
for (number, block_hash) in new_canonical_blocks.iter().rev() {
let block = match store.get_block_by_hash(*block_hash).await? {
Some(b) => b,
None => {
warn!(%number, %block_hash, "deep-reorg: side-chain block body missing");
return Err(InvalidForkChoice::UnlinkedHead);
}
};
if let Err(e) = blockchain.add_block(block) {
error!(%number, %block_hash, error = %e, "deep-reorg: side-chain block execution failed");
return Err(map_chain_error_for_fcu(e));
}
}

// Resolve safe / finalized for the canonical-hash update.
let safe_res = if !safe_hash.is_zero() {
store.get_block_header_by_hash(safe_hash)?
} else {
None
};
let finalized_res = if !finalized_hash.is_zero() {
store.get_block_header_by_hash(finalized_hash)?
} else {
None
};

store
.forkchoice_update(
new_canonical_blocks,
head.number,
head_hash,
safe_res.map(|h| h.number),
finalized_res.map(|h| h.number),
)
.await?;

// forkchoice_update succeeded — the new chain is canonical. Disarm the
// abort guard so the now-correct cache is preserved on this function's
// return.
abort_guard.disarm();

metrics!(
use ethrex_metrics::blocks::METRICS_BLOCKS;
METRICS_BLOCKS.set_head_height(head.number);
);

info!(
head_number = head.number,
pivot_number,
side_chain_len = head.number.saturating_sub(pivot_number),
"deep-reorg apply succeeded"
);

Ok(head)
}

/// RAII guard that calls `Store::abort_reorg()` on drop — resetting the layer
/// cache to a fresh empty state — UNLESS `disarm()` is called first.
///
/// The deep-reorg apply path arms this guard immediately after
/// `install_overlay_for_reorg` succeeds, so any subsequent failure (side-chain
/// execution error, missing block body, fork-choice update error, panic via
/// unwinding) leaves the store in a recoverable state for the next FCU.
struct AbortReorgGuard<'a> {
store: &'a Store,
armed: bool,
}

impl<'a> AbortReorgGuard<'a> {
fn new(store: &'a Store) -> Self {
Self { store, armed: true }
}

fn disarm(&mut self) {
self.armed = false;
}
}

impl<'a> Drop for AbortReorgGuard<'a> {
fn drop(&mut self) {
if self.armed
&& let Err(e) = self.store.abort_reorg()
{
error!(error = %e, "AbortReorgGuard: abort_reorg failed during cleanup");
}
}
}

/// Maps a `ChainError` from a side-chain block execution into the
/// closest-fitting [`InvalidForkChoice`] variant. Most chain errors during
/// side-chain replay indicate the new chain is invalid, so we collapse them
/// to a generic `StateNotReachable` (engine API responds `SYNCING`) — a more
/// specific `InvalidAncestor` could be emitted in a follow-up that walks
/// back to find the exact bad block.
fn map_chain_error_for_fcu(_: ChainError) -> InvalidForkChoice {
InvalidForkChoice::StateNotReachable
}
6 changes: 3 additions & 3 deletions crates/networking/rpc/engine/fork_choice.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ethrex_blockchain::{
error::{ChainError, InvalidForkChoice},
fork_choice::apply_fork_choice,
fork_choice::apply_fork_choice_with_deep_reorg,
payload::{BuildPayloadArgs, create_payload},
};
use ethrex_common::types::{BlockHeader, ELASTICITY_MULTIPLIER};
Expand Down Expand Up @@ -270,8 +270,8 @@ async fn handle_forkchoice(
return Ok((None, PayloadStatus::syncing().into()));
}

match apply_fork_choice(
&context.storage,
match apply_fork_choice_with_deep_reorg(
&context.blockchain,
fork_choice_state.head_block_hash,
fork_choice_state.safe_block_hash,
fork_choice_state.finalized_block_hash,
Expand Down
26 changes: 26 additions & 0 deletions crates/networking/rpc/engine/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,32 @@ async fn try_execute_payload(
return Ok(PayloadStatus::valid_with_hash(block_hash));
}

// Section 10 — defer eager execution when the parent block is known but
// its state is not currently materialized (parent state neither in the
// layer cache nor on disk). Stash the block in HEADERS+BODIES and return
// ACCEPTED; a later forkchoiceUpdated pointing at this block (or a
// descendant) will drive the deep-reorg apply path. Matches Geth's
// `eth/catalyst/api.go:863-867` (`HasBlockAndState` predicate).
//
// If the parent is itself unknown, fall through to add_block which
// returns `ChainError::ParentNotFound` and stashes the block — handled
// below as `SYNCING`, preserving existing behavior.
if let Some(parent_header) = storage.get_block_header_by_hash(block.header.parent_hash)? {
let parent_state = parent_header.state_root;
let in_cache = storage.is_state_in_layer_cache(parent_state)?;
let on_disk = !in_cache && storage.has_state_root(parent_state)?;
if !in_cache && !on_disk {
debug!(
%block_hash,
%block_number,
parent_hash = %block.header.parent_hash,
"Parent state not materialized; stashing payload as ACCEPTED"
);
storage.add_block(block).await?;
return Ok(PayloadStatus::accepted());
}
}

// Execute and store the block
debug!(%block_hash, %block_number, "Executing payload");

Expand Down
14 changes: 14 additions & 0 deletions crates/networking/rpc/types/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,20 @@ impl PayloadStatus {
validation_error: None,
}
}

/// Creates a PayloadStatus with `ACCEPTED` status. Used when the EL has
/// the payload and its parent block is known, but the parent's state is
/// not currently materialized in memory and the EL declines to
/// speculatively rebuild it. Geth uses this for the same case
/// (`eth/catalyst/api.go:863-867`); a follow-up FCU pointing at this
/// block (or a descendant) will trigger the deep-reorg apply path.
pub fn accepted() -> Self {
PayloadStatus {
status: PayloadValidationStatus::Accepted,
latest_valid_hash: None,
validation_error: None,
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
Loading
Loading