From 5b22c176a660673bf635fdac8732f02ef0e5e093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 1 May 2026 15:07:32 +0200 Subject: [PATCH 1/3] first reorg until finality iteration --- crates/blockchain/blockchain.rs | 49 + crates/blockchain/fork_choice.rs | 190 +++- crates/networking/rpc/engine/fork_choice.rs | 6 +- crates/networking/rpc/engine/payload.rs | 26 + crates/networking/rpc/types/payload.rs | 14 + crates/storage/api/mod.rs | 11 + crates/storage/api/tables.rs | 10 +- crates/storage/backend/in_memory.rs | 20 + crates/storage/backend/rocksdb.rs | 108 ++- crates/storage/journal.rs | 422 +++++++++ crates/storage/layering.rs | 624 ++++++++++++- crates/storage/lib.rs | 1 + crates/storage/store.rs | 957 +++++++++++++++++++- 13 files changed, 2413 insertions(+), 25 deletions(-) create mode 100644 crates/storage/journal.rs diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 078e12a87a9..37872103255 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -201,6 +201,13 @@ pub struct Blockchain { /// Set to true after initial sync completes, never reset to false. /// Does not reflect whether an ongoing sync is in progress. is_synced: AtomicBool, + /// Whether a deep-reorg apply is currently in flight (Section 11). + /// + /// Set on entry to `reorg_apply_deep`, cleared on exit (via the + /// `ReorgGuard` drop). While set, engine API `forkchoiceUpdated` calls + /// short-circuit to `SYNCING` (matching Reth's `BackfillSyncState::Active` + /// gating at `crates/engine/tree/src/tree/mod.rs:1173-1178`). + reorg_in_progress: AtomicBool, /// Configuration options for blockchain behavior. pub options: BlockchainOptions, /// Cache of recently built payloads. @@ -333,23 +340,65 @@ impl Blockchain { storage: store, mempool: Mempool::new(blockchain_opts.max_mempool_size), is_synced: AtomicBool::new(false), + reorg_in_progress: AtomicBool::new(false), payloads: Arc::new(TokioMutex::new(Vec::new())), options: blockchain_opts, merkle_pool: Self::build_merkle_pool(), } } + /// Returns a reference to the underlying [`Store`]. Used by code that + /// orchestrates storage-side operations (e.g. the deep-reorg apply path) + /// alongside `Blockchain`'s execution capabilities. + pub fn store(&self) -> &Store { + &self.storage + } + pub fn default_with_store(store: Store) -> Self { Self { storage: store, mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT), is_synced: AtomicBool::new(false), + reorg_in_progress: AtomicBool::new(false), payloads: Arc::new(TokioMutex::new(Vec::new())), options: BlockchainOptions::default(), merkle_pool: Self::build_merkle_pool(), } } + /// Returns `true` if a deep-reorg apply is currently in flight. + /// While true, `apply_fork_choice_with_deep_reorg` short-circuits to + /// `InvalidForkChoice::Syncing` and the engine API responds `SYNCING`. + pub fn is_reorg_in_progress(&self) -> bool { + self.reorg_in_progress + .load(std::sync::atomic::Ordering::Acquire) + } + + /// Internal: marks a deep-reorg apply as in flight. Returns a guard whose + /// `Drop` clears the flag — use this to ensure the flag is cleared on + /// every exit path (success, error, panic). + pub(crate) fn enter_reorg(&self) -> ReorgGuard<'_> { + self.reorg_in_progress + .store(true, std::sync::atomic::Ordering::Release); + ReorgGuard { blockchain: self } + } +} + +/// RAII guard that clears `Blockchain::reorg_in_progress` on drop. +pub(crate) struct ReorgGuard<'a> { + blockchain: &'a Blockchain, +} + +impl<'a> Drop for ReorgGuard<'a> { + fn drop(&mut self) { + self.blockchain + .reorg_in_progress + .store(false, std::sync::atomic::Ordering::Release); + } +} + +impl Blockchain { + /// Executes a block withing a new vm instance and state fn execute_block( &self, diff --git a/crates/blockchain/fork_choice.rs b/crates/blockchain/fork_choice.rs index fa68bee8bf6..f3c7087bc24 100644 --- a/crates/blockchain/fork_choice.rs +++ b/crates/blockchain/fork_choice.rs @@ -4,10 +4,12 @@ use ethrex_common::{ }; use ethrex_metrics::metrics; use ethrex_storage::{Store, error::StoreError}; -use tracing::{error, warn}; +use std::collections::HashMap; +use tracing::{error, info, warn}; use crate::{ - error::{self, InvalidForkChoice}, + Blockchain, + error::{self, ChainError, InvalidForkChoice}, is_canonical, }; @@ -205,3 +207,187 @@ async fn find_link_with_canonical_chain( Ok(None) } + +// =========================================================================== +// Deep-reorg apply path (Section 8 orchestration). +// =========================================================================== + +/// Wrapper around [`apply_fork_choice`] that handles the deep-reorg case: +/// when the head's state is not directly reachable from the on-disk trie, +/// build an in-memory overlay from the journal, replay the side chain +/// against it, and reconcile on the first new-chain commit. +/// +/// Falls through to the simpler `apply_fork_choice` for shallow reorgs and +/// no-op cases. +/// +/// Pre-condition: the journal contains entries down to the deepest required +/// pivot. If finalization has pruned past the pivot, the call returns +/// [`InvalidForkChoice::StateNotReachable`] (the engine API responds with +/// `SYNCING`, matching today's behavior pre-deep-reorg). +pub async fn apply_fork_choice_with_deep_reorg( + blockchain: &Blockchain, + head_hash: H256, + safe_hash: H256, + finalized_hash: H256, +) -> Result { + // Section 11 — short-circuit when a previous deep-reorg apply is still in + // flight. The CL retries on SYNCING; once the in-progress reorg completes, + // the next FCU is processed normally. Reth's pattern at + // `crates/engine/tree/src/tree/mod.rs:1173-1178`. + if blockchain.is_reorg_in_progress() { + return Err(InvalidForkChoice::Syncing); + } + + let store = blockchain.store(); + match apply_fork_choice(store, head_hash, safe_hash, finalized_hash).await { + Ok(header) => Ok(header), + Err(InvalidForkChoice::StateNotReachable) => { + info!(%head_hash, "head state not directly reachable; attempting deep-reorg apply"); + reorg_apply_deep(blockchain, head_hash, safe_hash, finalized_hash).await + } + Err(e) => Err(e), + } +} + +/// Drives the deep-reorg apply pass: +/// +/// 1. Walk back through `HEADERS` to find the pivot — the deepest block on +/// the OLD canonical chain that is also an ancestor of the new head. +/// 2. Look up the cache edge `D` from `STATE_HISTORY` (the highest journal +/// entry's block number). +/// 3. Build the OLD canonical chain's hash chain in `[pivot+1, D]` so the +/// overlay constructor can verify each journal entry's `block_hash`. +/// 4. Install the overlay (storage primitive — Section 8.3-8.5). +/// 5. Execute the side-chain blocks `[pivot+1 .. new_head]` in chain order +/// via `Blockchain::add_block`. The first such block's commit triggers +/// the Section 9 reconciliation that folds overlay + layer_T into a +/// single atomic disk write. +/// 6. Update `CANONICAL_BLOCK_HASHES` via `forkchoice_update`. +async fn reorg_apply_deep( + blockchain: &Blockchain, + head_hash: H256, + safe_hash: H256, + finalized_hash: H256, +) -> Result { + // Mark the reorg in progress for the duration of this call. The guard + // clears the flag on every exit path (success, early return, panic via + // unwinding). Concurrent FCUs from the engine API will see the flag set + // and short-circuit to SYNCING (see `apply_fork_choice_with_deep_reorg`). + let _reorg_guard = blockchain.enter_reorg(); + + let store = blockchain.store(); + + let head = store + .get_block_header_by_hash(head_hash)? + .ok_or(InvalidForkChoice::Syncing)?; + + // Branch is the side-fork chain in DESCENDING order (new_head's parent + // first, then deeper). The deepest entry's `(number-1)` is the pivot. + let new_canonical_blocks = find_link_with_canonical_chain(store, &head) + .await? + .ok_or(InvalidForkChoice::UnlinkedHead)?; + + // Pivot = parent of the deepest side-fork entry, or head's direct parent + // if the branch is empty (head's parent is canonical, no real reorg). + let pivot_number = match new_canonical_blocks.last() { + Some((n, _)) => n.saturating_sub(1), + None => head.number.saturating_sub(1), + }; + + // The overlay's range is `[pivot+1, edge]` where edge is the highest + // committed block (= highest journal entry). + let edge = store + .highest_state_history_block_number()? + .ok_or(InvalidForkChoice::StateNotReachable)?; + let to_block = pivot_number.saturating_add(1); + if edge < to_block { + // The pivot is above the cache edge — apply_fork_choice should have + // handled this as a shallow reorg. If we reach here, something is + // off; punt. + warn!( + edge, to_block, + "deep-reorg path entered but pivot is above cache edge" + ); + return Err(InvalidForkChoice::StateNotReachable); + } + + // Pre-build the OLD canonical chain's hash lookup for journal verification. + // This must reflect the chain BEFORE we update CANONICAL_BLOCK_HASHES below. + let mut canonical_hashes: HashMap = HashMap::new(); + for n in to_block..=edge { + if let Some(hash) = store.get_canonical_block_hash_sync(n)? { + canonical_hashes.insert(n, hash); + } + } + + // Install overlay. Errors abort cleanly; the existing cache stays intact. + store + .install_overlay_for_reorg(edge, to_block, |n| canonical_hashes.get(&n).copied()) + .map_err(|e| { + error!(error = %e, "deep-reorg: overlay install failed"); + InvalidForkChoice::StateNotReachable + })?; + + // Execute the side-chain blocks in CHAIN order (oldest first). The + // existing `add_block` path handles execution + storage; layer cache + // reads cascade through the freshly-installed overlay. + for (number, block_hash) in new_canonical_blocks.iter().rev() { + let block = match store.get_block_by_hash(*block_hash).await? { + Some(b) => b, + None => { + warn!(%number, %block_hash, "deep-reorg: side-chain block body missing"); + return Err(InvalidForkChoice::UnlinkedHead); + } + }; + if let Err(e) = blockchain.add_block(block) { + error!(%number, %block_hash, error = %e, "deep-reorg: side-chain block execution failed"); + return Err(map_chain_error_for_fcu(e)); + } + } + + // Resolve safe / finalized for the canonical-hash update. + let safe_res = if !safe_hash.is_zero() { + store.get_block_header_by_hash(safe_hash)? + } else { + None + }; + let finalized_res = if !finalized_hash.is_zero() { + store.get_block_header_by_hash(finalized_hash)? + } else { + None + }; + + store + .forkchoice_update( + new_canonical_blocks, + head.number, + head_hash, + safe_res.map(|h| h.number), + finalized_res.map(|h| h.number), + ) + .await?; + + metrics!( + use ethrex_metrics::blocks::METRICS_BLOCKS; + METRICS_BLOCKS.set_head_height(head.number); + ); + + info!( + head_number = head.number, + pivot_number, + side_chain_len = head.number.saturating_sub(pivot_number), + "deep-reorg apply succeeded" + ); + + Ok(head) +} + +/// Maps a `ChainError` from a side-chain block execution into the +/// closest-fitting [`InvalidForkChoice`] variant. Most chain errors during +/// side-chain replay indicate the new chain is invalid, so we collapse them +/// to a generic `StateNotReachable` (engine API responds `SYNCING`) — a more +/// specific `InvalidAncestor` could be emitted in a follow-up that walks +/// back to find the exact bad block. +fn map_chain_error_for_fcu(_: ChainError) -> InvalidForkChoice { + InvalidForkChoice::StateNotReachable +} diff --git a/crates/networking/rpc/engine/fork_choice.rs b/crates/networking/rpc/engine/fork_choice.rs index 65f3889ee9d..575ebe2ff32 100644 --- a/crates/networking/rpc/engine/fork_choice.rs +++ b/crates/networking/rpc/engine/fork_choice.rs @@ -1,6 +1,6 @@ use ethrex_blockchain::{ error::{ChainError, InvalidForkChoice}, - fork_choice::apply_fork_choice, + fork_choice::apply_fork_choice_with_deep_reorg, payload::{BuildPayloadArgs, create_payload}, }; use ethrex_common::types::{BlockHeader, ELASTICITY_MULTIPLIER}; @@ -270,8 +270,8 @@ async fn handle_forkchoice( return Ok((None, PayloadStatus::syncing().into())); } - match apply_fork_choice( - &context.storage, + match apply_fork_choice_with_deep_reorg( + &context.blockchain, fork_choice_state.head_block_hash, fork_choice_state.safe_block_hash, fork_choice_state.finalized_block_hash, diff --git a/crates/networking/rpc/engine/payload.rs b/crates/networking/rpc/engine/payload.rs index d8dcf15a853..b6fbbc94cfd 100644 --- a/crates/networking/rpc/engine/payload.rs +++ b/crates/networking/rpc/engine/payload.rs @@ -1050,6 +1050,32 @@ async fn try_execute_payload( return Ok(PayloadStatus::valid_with_hash(block_hash)); } + // Section 10 — defer eager execution when the parent block is known but + // its state is not currently materialized (parent state neither in the + // layer cache nor on disk). Stash the block in HEADERS+BODIES and return + // ACCEPTED; a later forkchoiceUpdated pointing at this block (or a + // descendant) will drive the deep-reorg apply path. Matches Geth's + // `eth/catalyst/api.go:863-867` (`HasBlockAndState` predicate). + // + // If the parent is itself unknown, fall through to add_block which + // returns `ChainError::ParentNotFound` and stashes the block — handled + // below as `SYNCING`, preserving existing behavior. + if let Some(parent_header) = storage.get_block_header_by_hash(block.header.parent_hash)? { + let parent_state = parent_header.state_root; + let in_cache = storage.is_state_in_layer_cache(parent_state)?; + let on_disk = !in_cache && storage.has_state_root(parent_state)?; + if !in_cache && !on_disk { + debug!( + %block_hash, + %block_number, + parent_hash = %block.header.parent_hash, + "Parent state not materialized; stashing payload as ACCEPTED" + ); + storage.add_block(block).await?; + return Ok(PayloadStatus::accepted()); + } + } + // Execute and store the block debug!(%block_hash, %block_number, "Executing payload"); diff --git a/crates/networking/rpc/types/payload.rs b/crates/networking/rpc/types/payload.rs index 287a77c3c30..9ca6222ec2e 100644 --- a/crates/networking/rpc/types/payload.rs +++ b/crates/networking/rpc/types/payload.rs @@ -260,6 +260,20 @@ impl PayloadStatus { validation_error: None, } } + + /// Creates a PayloadStatus with `ACCEPTED` status. Used when the EL has + /// the payload and its parent block is known, but the parent's state is + /// not currently materialized in memory and the EL declines to + /// speculatively rebuild it. Geth uses this for the same case + /// (`eth/catalyst/api.go:863-867`); a follow-up FCU pointing at this + /// block (or a descendant) will trigger the deep-reorg apply path. + pub fn accepted() -> Self { + PayloadStatus { + status: PayloadValidationStatus::Accepted, + latest_valid_hash: None, + validation_error: None, + } + } } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/crates/storage/api/mod.rs b/crates/storage/api/mod.rs index ca5ce8104df..0719b3906c1 100644 --- a/crates/storage/api/mod.rs +++ b/crates/storage/api/mod.rs @@ -88,6 +88,17 @@ pub trait StorageWriteBatch: Send { /// Removes a key-value pair from the specified table. fn delete(&mut self, table: &'static str, key: &[u8]) -> Result<(), StoreError>; + /// Removes every key in `[start_key, end_key)` from the specified table. + /// The range is half-open: `start_key` is inclusive, `end_key` is exclusive. + /// Backends that natively support range deletion (e.g. RocksDB) SHOULD use it + /// so the operation participates in the surrounding atomic transaction. + fn delete_range( + &mut self, + table: &'static str, + start_key: &[u8], + end_key: &[u8], + ) -> Result<(), StoreError>; + /// Commits all changes made in this transaction. fn commit(&mut self) -> Result<(), StoreError>; } diff --git a/crates/storage/api/tables.rs b/crates/storage/api/tables.rs index fa59620b186..4fd1a798ad9 100644 --- a/crates/storage/api/tables.rs +++ b/crates/storage/api/tables.rs @@ -102,7 +102,14 @@ pub const MISC_VALUES: &str = "misc_values"; /// - [`Vec`] = `serde_json::to_vec(&witness)` pub const EXECUTION_WITNESSES: &str = "execution_witnesses"; -pub const TABLES: [&str; 19] = [ +/// State history column family: per-block reverse-diffs for deep reorg support. +/// Written on every layer commit during regular block-by-block execution, +/// pruned at finality. Skipped during full-sync batch execution. +/// - [`u8; 8`] = `block_number.to_be_bytes()` (big-endian for chain-ordered iteration) +/// - [`Vec`] = serialized `JournalEntry` (custom compact codec, version-prefixed) +pub const STATE_HISTORY: &str = "state_history"; + +pub const TABLES: [&str; 20] = [ CHAIN_DATA, ACCOUNT_CODES, ACCOUNT_CODE_METADATA, @@ -122,4 +129,5 @@ pub const TABLES: [&str; 19] = [ STORAGE_FLATKEYVALUE, MISC_VALUES, EXECUTION_WITNESSES, + STATE_HISTORY, ]; diff --git a/crates/storage/backend/in_memory.rs b/crates/storage/backend/in_memory.rs index c6c71c301de..b104f901d13 100644 --- a/crates/storage/backend/in_memory.rs +++ b/crates/storage/backend/in_memory.rs @@ -181,6 +181,26 @@ impl StorageWriteBatch for InMemoryWriteTx { Ok(()) } + fn delete_range( + &mut self, + table: &'static str, + start_key: &[u8], + end_key: &[u8], + ) -> Result<(), StoreError> { + let mut db = self + .backend + .write() + .map_err(|_| StoreError::Custom("Failed to acquire write lock".to_string()))?; + + let db_mut = Arc::make_mut(&mut *db); + if let Some(table_ref) = db_mut.get_mut(table) { + table_ref.retain(|key, _| { + key.as_slice() < start_key || key.as_slice() >= end_key + }); + } + Ok(()) + } + fn commit(&mut self) -> Result<(), StoreError> { // FIXME: in-memory writes aren't atomic Ok(()) diff --git a/crates/storage/backend/rocksdb.rs b/crates/storage/backend/rocksdb.rs index 1672fffb07d..14fb452416d 100644 --- a/crates/storage/backend/rocksdb.rs +++ b/crates/storage/backend/rocksdb.rs @@ -1,7 +1,7 @@ use crate::api::tables::{ ACCOUNT_CODES, ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, BLOCK_NUMBERS, BODIES, - CANONICAL_BLOCK_HASHES, FULLSYNC_HEADERS, HEADERS, RECEIPTS, STORAGE_FLATKEYVALUE, - STORAGE_TRIE_NODES, TRANSACTION_LOCATIONS, + CANONICAL_BLOCK_HASHES, FULLSYNC_HEADERS, HEADERS, RECEIPTS, STATE_HISTORY, + STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES, TRANSACTION_LOCATIONS, }; use crate::api::{ PrefixResult, StorageBackend, StorageLockedView, StorageReadView, StorageWriteBatch, @@ -175,6 +175,20 @@ impl RocksDBBackend { block_opts.set_block_cache(&block_cache); cf_opts.set_block_based_table_factory(&block_opts); } + STATE_HISTORY => { + // Small CF bounded by finality depth. Sequential big-endian keys + // (one per block), heavy use of range deletion at finality. Reads + // are rare (only during deep reorgs) but bursty. + cf_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB + cf_opts.set_max_write_buffer_number(3); + cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB + + let mut block_opts = BlockBasedOptions::default(); + block_opts.set_block_size(16 * 1024); // 16KB + block_opts.set_bloom_filter(10.0, false); + block_opts.set_block_cache(&block_cache); + cf_opts.set_block_based_table_factory(&block_opts); + } _ => { // Default for other CFs cf_opts.set_write_buffer_size(64 * 1024 * 1024); // 64MB @@ -365,6 +379,21 @@ impl StorageWriteBatch for RocksDBWriteTx { Ok(()) } + fn delete_range( + &mut self, + table: &'static str, + start_key: &[u8], + end_key: &[u8], + ) -> Result<(), StoreError> { + let cf = self + .db + .cf_handle(table) + .ok_or_else(|| StoreError::Custom(format!("Table {} not found", table)))?; + + self.batch.delete_range_cf(&cf, start_key, end_key); + Ok(()) + } + fn commit(&mut self) -> Result<(), StoreError> { // Take ownership of the batch (replaces it with an empty one) since db.write() consumes it let batch = std::mem::take(&mut self.batch); @@ -403,3 +432,78 @@ impl Drop for RocksDBLocked { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::tables::STATE_HISTORY; + + #[test] + fn delete_range_only_applies_after_commit() { + let dir = tempfile::tempdir().unwrap(); + let backend = RocksDBBackend::open(dir.path()).unwrap(); + + // Seed five keys. + let mut tx = backend.begin_write().unwrap(); + for i in 0u64..5 { + tx.put(STATE_HISTORY, &i.to_be_bytes(), &[i as u8]).unwrap(); + } + tx.commit().unwrap(); + + // Stage a range delete of [1, 4) but do NOT commit. + let mut tx = backend.begin_write().unwrap(); + tx.delete_range(STATE_HISTORY, &1u64.to_be_bytes(), &4u64.to_be_bytes()) + .unwrap(); + + // A fresh read view must still see all five entries — the range delete + // is buffered in the write batch, not yet visible. + let read = backend.begin_read().unwrap(); + for i in 0u64..5 { + assert!( + read.get(STATE_HISTORY, &i.to_be_bytes()).unwrap().is_some(), + "key {i} should still be visible before commit" + ); + } + + tx.commit().unwrap(); + + // After commit: keys 1,2,3 are gone; 0 and 4 remain. + let read = backend.begin_read().unwrap(); + assert!(read.get(STATE_HISTORY, &0u64.to_be_bytes()).unwrap().is_some()); + assert!(read.get(STATE_HISTORY, &1u64.to_be_bytes()).unwrap().is_none()); + assert!(read.get(STATE_HISTORY, &2u64.to_be_bytes()).unwrap().is_none()); + assert!(read.get(STATE_HISTORY, &3u64.to_be_bytes()).unwrap().is_none()); + assert!(read.get(STATE_HISTORY, &4u64.to_be_bytes()).unwrap().is_some()); + } + + #[test] + fn delete_range_atomic_with_other_writes() { + let dir = tempfile::tempdir().unwrap(); + let backend = RocksDBBackend::open(dir.path()).unwrap(); + + let mut tx = backend.begin_write().unwrap(); + tx.put(STATE_HISTORY, &10u64.to_be_bytes(), b"v10").unwrap(); + tx.put(STATE_HISTORY, &20u64.to_be_bytes(), b"v20").unwrap(); + tx.commit().unwrap(); + + // Single transaction: insert 30, delete range [10, 20), update 20. + // All four effects must land together. + let mut tx = backend.begin_write().unwrap(); + tx.put(STATE_HISTORY, &30u64.to_be_bytes(), b"v30").unwrap(); + tx.delete_range(STATE_HISTORY, &10u64.to_be_bytes(), &20u64.to_be_bytes()) + .unwrap(); + tx.put(STATE_HISTORY, &20u64.to_be_bytes(), b"v20-new").unwrap(); + tx.commit().unwrap(); + + let read = backend.begin_read().unwrap(); + assert_eq!(read.get(STATE_HISTORY, &10u64.to_be_bytes()).unwrap(), None); + assert_eq!( + read.get(STATE_HISTORY, &20u64.to_be_bytes()).unwrap().as_deref(), + Some(&b"v20-new"[..]) + ); + assert_eq!( + read.get(STATE_HISTORY, &30u64.to_be_bytes()).unwrap().as_deref(), + Some(&b"v30"[..]) + ); + } +} diff --git a/crates/storage/journal.rs b/crates/storage/journal.rs new file mode 100644 index 00000000000..44679128ae4 --- /dev/null +++ b/crates/storage/journal.rs @@ -0,0 +1,422 @@ +//! # State-history journal +//! +//! Per-block reverse-diff entries persisted to RocksDB so reorgs deeper than the +//! in-memory `TrieLayerCache` become possible up to the finalized boundary. +//! +//! Each entry captures the previous on-disk values (or absence markers) for every +//! account-trie node, storage-trie node, account flat-key-value, and storage +//! flat-key-value path that a single layer commit overwrites. Codes are +//! content-addressed and not journaled. +//! +//! Entries are keyed by `block_number.to_be_bytes()` in the +//! [`STATE_HISTORY`](crate::api::tables::STATE_HISTORY) column family. +//! +//! ## Codec +//! +//! Entries use a hand-rolled compact format (see design.md §D15): a version +//! byte at offset 0, then `block_hash` (32 bytes), `parent_state_root` +//! (32 bytes), then four varint-prefixed reverse-diff sections in order: +//! account-trie, storage-trie, account flat-KV, storage flat-KV. RLP, bincode, +//! and postcard are deliberately avoided — RLP has shown to be slow and clunky +//! for nested optional payloads of this shape elsewhere in the codebase, and +//! the access pattern (write-once, read-on-reorg, large volume) makes +//! encode/decode cost matter. + +use ethrex_common::H256; + +/// Current version of the journal entry codec. +pub const JOURNAL_VERSION: u8 = 1; + +/// A single reverse-diff entry: `(on_disk_key, previous_value_or_none)`. +/// +/// `on_disk_key` is the exact key written to its column family — for storage +/// CFs this includes the nibble-encoded account-hash prefix. `Some(prev)` +/// means the key existed on disk with `prev` before the commit; `None` means +/// the key did not exist on disk (i.e., the commit added it, and a rollback +/// should remove it). +pub type ReverseDiffEntry = (Vec, Option>); + +/// A flat list of reverse-diff entries. +pub type FlatDiff = Vec; + +/// A single reverse-diff entry covering one block's commit. +/// +/// All four diff sections are flat lists of `(on_disk_key, prev_value)` tuples. +/// On rollback, each entry can be applied directly to its column family +/// without further interpretation: a `Some(prev)` becomes a `put`, a `None` +/// becomes a `delete`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JournalEntry { + /// Hash of the block whose commit this entry reverses. + pub block_hash: H256, + /// Post-state root of the parent block (the state we'd return to on rollback). + pub parent_state_root: H256, + /// Reverse diff for `ACCOUNT_TRIE_NODES`. + pub account_trie_diff: FlatDiff, + /// Reverse diff for `STORAGE_TRIE_NODES`. Keys carry the nibble-encoded + /// account-hash prefix as written on disk; no separate grouping is needed. + pub storage_trie_diff: FlatDiff, + /// Reverse diff for `ACCOUNT_FLATKEYVALUE`. + pub account_flat_diff: FlatDiff, + /// Reverse diff for `STORAGE_FLATKEYVALUE`. Keys carry the nibble-encoded + /// account-hash prefix as written on disk. + pub storage_flat_diff: FlatDiff, +} + +/// Errors that can occur when decoding a journal entry from disk. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum JournalDecodeError { + #[error("journal entry truncated: expected {expected} more bytes at offset {offset}")] + Truncated { offset: usize, expected: usize }, + #[error("journal entry version mismatch: expected {expected}, found {found}")] + VersionMismatch { expected: u8, found: u8 }, + #[error("journal entry varint overflow at offset {offset}")] + VarintOverflow { offset: usize }, + #[error("journal entry presence byte invalid: expected 0 or 1, found {found} at offset {offset}")] + InvalidPresenceByte { offset: usize, found: u8 }, +} + +impl JournalEntry { + /// Encode this entry into its on-disk byte representation. + pub fn encode(&self) -> Vec { + // Heuristic: ~70 bytes overhead + ~50 bytes per typical small entry. + let approx = 1 + 32 + 32 + + diff_byte_estimate(&self.account_trie_diff) + + diff_byte_estimate(&self.storage_trie_diff) + + diff_byte_estimate(&self.account_flat_diff) + + diff_byte_estimate(&self.storage_flat_diff); + let mut out = Vec::with_capacity(approx); + + out.push(JOURNAL_VERSION); + out.extend_from_slice(self.block_hash.as_bytes()); + out.extend_from_slice(self.parent_state_root.as_bytes()); + + encode_flat_diff(&mut out, &self.account_trie_diff); + encode_flat_diff(&mut out, &self.storage_trie_diff); + encode_flat_diff(&mut out, &self.account_flat_diff); + encode_flat_diff(&mut out, &self.storage_flat_diff); + + out + } + + /// Decode an entry from its on-disk byte representation. + /// + /// Returns [`JournalDecodeError::VersionMismatch`] if the version byte is + /// not [`JOURNAL_VERSION`]. The current binary deliberately refuses to + /// interpret entries written by a future codec version rather than silently + /// producing a malformed reverse-diff. + pub fn decode(bytes: &[u8]) -> Result { + let mut cur = Cursor::new(bytes); + + let version = cur.read_byte()?; + if version != JOURNAL_VERSION { + return Err(JournalDecodeError::VersionMismatch { + expected: JOURNAL_VERSION, + found: version, + }); + } + + let block_hash = cur.read_h256()?; + let parent_state_root = cur.read_h256()?; + + let account_trie_diff = decode_flat_diff(&mut cur)?; + let storage_trie_diff = decode_flat_diff(&mut cur)?; + let account_flat_diff = decode_flat_diff(&mut cur)?; + let storage_flat_diff = decode_flat_diff(&mut cur)?; + + Ok(Self { + block_hash, + parent_state_root, + account_trie_diff, + storage_trie_diff, + account_flat_diff, + storage_flat_diff, + }) + } +} + +// --- varint (LEB128 unsigned) --------------------------------------------- + +fn encode_varint(out: &mut Vec, mut value: u64) { + while value >= 0x80 { + out.push((value as u8 & 0x7f) | 0x80); + value >>= 7; + } + out.push(value as u8); +} + +// --- diff section encoders ------------------------------------------------- + +fn encode_flat_diff(out: &mut Vec, diff: &[ReverseDiffEntry]) { + encode_varint(out, diff.len() as u64); + for (path, value) in diff { + encode_varint(out, path.len() as u64); + out.extend_from_slice(path); + match value { + None => out.push(0), + Some(v) => { + out.push(1); + encode_varint(out, v.len() as u64); + out.extend_from_slice(v); + } + } + } +} + +fn diff_byte_estimate(diff: &[ReverseDiffEntry]) -> usize { + diff.iter() + .map(|(p, v)| 2 + p.len() + 1 + v.as_ref().map_or(0, |v| 2 + v.len())) + .sum::() + + 2 +} + +// --- cursor / decoders ----------------------------------------------------- + +struct Cursor<'a> { + bytes: &'a [u8], + offset: usize, +} + +impl<'a> Cursor<'a> { + fn new(bytes: &'a [u8]) -> Self { + Self { bytes, offset: 0 } + } + + fn read_byte(&mut self) -> Result { + if self.offset >= self.bytes.len() { + return Err(JournalDecodeError::Truncated { + offset: self.offset, + expected: 1, + }); + } + let b = self.bytes[self.offset]; + self.offset += 1; + Ok(b) + } + + fn read_slice(&mut self, n: usize) -> Result<&'a [u8], JournalDecodeError> { + if self.bytes.len() - self.offset < n { + return Err(JournalDecodeError::Truncated { + offset: self.offset, + expected: n, + }); + } + let s = &self.bytes[self.offset..self.offset + n]; + self.offset += n; + Ok(s) + } + + fn read_h256(&mut self) -> Result { + let s = self.read_slice(32)?; + Ok(H256::from_slice(s)) + } + + fn read_varint(&mut self) -> Result { + let mut result: u64 = 0; + let mut shift: u32 = 0; + loop { + let b = self.read_byte()?; + // Maximum 10 bytes for u64 LEB128 (10 * 7 = 70 > 64). + if shift >= 64 { + return Err(JournalDecodeError::VarintOverflow { + offset: self.offset - 1, + }); + } + result |= ((b & 0x7f) as u64) << shift; + if b & 0x80 == 0 { + return Ok(result); + } + shift += 7; + } + } +} + +fn decode_flat_diff(cur: &mut Cursor<'_>) -> Result { + let count = cur.read_varint()? as usize; + let mut out = Vec::with_capacity(count); + for _ in 0..count { + let path_len = cur.read_varint()? as usize; + let path = cur.read_slice(path_len)?.to_vec(); + let presence_offset = cur.offset; + let presence = cur.read_byte()?; + let value = match presence { + 0 => None, + 1 => { + let value_len = cur.read_varint()? as usize; + Some(cur.read_slice(value_len)?.to_vec()) + } + other => { + return Err(JournalDecodeError::InvalidPresenceByte { + offset: presence_offset, + found: other, + }); + } + }; + out.push((path, value)); + } + Ok(out) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn h(b: u8) -> H256 { + let mut x = [0u8; 32]; + x.fill(b); + H256::from(x) + } + + fn round_trip(entry: &JournalEntry) { + let bytes = entry.encode(); + let decoded = JournalEntry::decode(&bytes).unwrap(); + assert_eq!(&decoded, entry); + } + + #[test] + fn empty_entry_round_trips() { + let entry = JournalEntry { + block_hash: h(0xaa), + parent_state_root: h(0xbb), + account_trie_diff: vec![], + storage_trie_diff: vec![], + account_flat_diff: vec![], + storage_flat_diff: vec![], + }; + round_trip(&entry); + // Encoded shape: 1 (version) + 32 + 32 + 1 (count=0) × 4 = 69 bytes. + assert_eq!(entry.encode().len(), 69); + } + + #[test] + fn typical_entry_round_trips() { + let entry = JournalEntry { + block_hash: h(0x11), + parent_state_root: h(0x22), + account_trie_diff: vec![ + (vec![0x00, 0x01], Some(vec![0xde, 0xad, 0xbe, 0xef])), + (vec![0x02], None), + ], + storage_trie_diff: vec![ + (vec![0x0a; 67], Some(vec![0xff])), + (vec![0x0b; 68], None), + ], + account_flat_diff: vec![(vec![0xaa; 65], Some(vec![0x01, 0x02, 0x03]))], + storage_flat_diff: vec![(vec![0xbb; 131], None)], + }; + round_trip(&entry); + } + + #[test] + fn entry_with_empty_sections_round_trips() { + let entry = JournalEntry { + block_hash: h(0x33), + parent_state_root: h(0x44), + account_trie_diff: vec![(vec![0x00], Some(vec![0xff]))], + storage_trie_diff: vec![], + account_flat_diff: vec![], + storage_flat_diff: vec![(vec![0xbb; 67], None)], + }; + round_trip(&entry); + } + + #[test] + fn entry_with_only_absences_round_trips() { + // All values are None: the rollback would only delete keys, never restore them. + let entry = JournalEntry { + block_hash: h(0x55), + parent_state_root: h(0x66), + account_trie_diff: vec![ + (vec![0x00], None), + (vec![0x01], None), + (vec![0x02], None), + ], + storage_trie_diff: vec![], + account_flat_diff: vec![(vec![0xaa; 32], None)], + storage_flat_diff: vec![], + }; + round_trip(&entry); + } + + #[test] + fn large_entry_round_trips() { + // 10k entries to exercise allocations and varint widths. + let mut account_trie_diff = Vec::with_capacity(10_000); + for i in 0u32..10_000 { + let path = i.to_be_bytes().to_vec(); + let value = if i % 7 == 0 { + None + } else { + Some(vec![(i & 0xff) as u8; (i % 200) as usize]) + }; + account_trie_diff.push((path, value)); + } + let entry = JournalEntry { + block_hash: h(0xee), + parent_state_root: h(0xff), + account_trie_diff, + storage_trie_diff: vec![], + account_flat_diff: vec![], + storage_flat_diff: vec![], + }; + round_trip(&entry); + } + + #[test] + fn rejects_unknown_version() { + let mut bytes = vec![0xff]; // bogus version + bytes.extend_from_slice(&[0; 32]); // block_hash + bytes.extend_from_slice(&[0; 32]); // parent_state_root + bytes.extend_from_slice(&[0, 0, 0, 0]); // four empty diff sections + let err = JournalEntry::decode(&bytes).unwrap_err(); + assert_eq!( + err, + JournalDecodeError::VersionMismatch { + expected: JOURNAL_VERSION, + found: 0xff, + } + ); + } + + #[test] + fn rejects_truncated_input() { + let entry = JournalEntry { + block_hash: h(0x77), + parent_state_root: h(0x88), + account_trie_diff: vec![(vec![0x00], Some(vec![0xff]))], + storage_trie_diff: vec![], + account_flat_diff: vec![], + storage_flat_diff: vec![], + }; + let bytes = entry.encode(); + // Chop off the last byte — decode should report truncation. + let err = JournalEntry::decode(&bytes[..bytes.len() - 1]).unwrap_err(); + assert!(matches!(err, JournalDecodeError::Truncated { .. })); + } + + #[test] + fn rejects_invalid_presence_byte() { + // Manually craft a payload with an invalid presence marker (2). + let mut bytes = Vec::new(); + bytes.push(JOURNAL_VERSION); + bytes.extend_from_slice(&[0; 32]); + bytes.extend_from_slice(&[0; 32]); + bytes.push(1); // account_trie_diff count = 1 + bytes.push(1); // path_len = 1 + bytes.push(0xab); // path + bytes.push(2); // presence = 2 (invalid) + let err = JournalEntry::decode(&bytes).unwrap_err(); + assert!(matches!(err, JournalDecodeError::InvalidPresenceByte { found: 2, .. })); + } + + #[test] + fn varint_round_trip() { + // Hit a few interesting widths. + for &v in &[0u64, 1, 127, 128, 16_383, 16_384, u32::MAX as u64, u64::MAX] { + let mut buf = Vec::new(); + encode_varint(&mut buf, v); + let mut cur = Cursor::new(&buf); + assert_eq!(cur.read_varint().unwrap(), v); + } + } +} diff --git a/crates/storage/layering.rs b/crates/storage/layering.rs index 7d205c15ceb..4da33240b92 100644 --- a/crates/storage/layering.rs +++ b/crates/storage/layering.rs @@ -1,4 +1,4 @@ -use ethrex_common::H256; +use ethrex_common::{H256, types::BlockNumber}; use fastbloom::AtomicBloomFilter; use rayon::prelude::*; use rustc_hash::{FxBuildHasher, FxHashMap}; @@ -6,6 +6,15 @@ use std::{fmt, sync::Arc}; use ethrex_trie::{Nibbles, TrieDB, TrieError}; +use crate::{ + api::{ + StorageBackend, + tables::{ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, STATE_HISTORY, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES}, + }, + error::StoreError, + journal::{JournalDecodeError, JournalEntry}, +}; + const BLOOM_SIZE: usize = 1_000_000; const FALSE_POSITIVE_RATE: f64 = 0.02; @@ -14,6 +23,13 @@ struct TrieLayer { nodes: FxHashMap, Vec>, parent: H256, id: usize, + /// Number of the block whose post-state this layer represents. Used by the + /// journal write path so a commit can record the entry under the correct + /// block number (not the in-flight block whose insertion *triggered* the + /// commit). + block_number: BlockNumber, + /// Hash of the block whose post-state this layer represents. + block_hash: H256, } /// In-memory cache of trie diff-layers, one per block (or per batch of blocks in full sync). @@ -50,6 +66,10 @@ pub struct TrieLayerCache { /// Used to avoid looking up all layers when the given path doesn't exist in any /// layer, thus going directly to the database. bloom: AtomicBloomFilter, + /// Optional in-memory overlay bridging the on-disk state at the cache edge to + /// a deeper pivot during a deep reorg. Reads that miss the layer cache consult + /// the overlay before falling through to disk. `None` in steady state. + overlay: Option>, } impl fmt::Debug for TrieLayerCache { @@ -71,6 +91,7 @@ impl Default for TrieLayerCache { layers: Default::default(), // TODO (issue #6345): this is coupled with DB_COMMIT_THRESHOLD in store.rs — unify them. commit_threshold: 128, + overlay: None, } } } @@ -85,9 +106,50 @@ impl TrieLayerCache { last_id: 0, layers: Default::default(), commit_threshold, + overlay: None, } } + /// Installs an overlay on this cache. Subsequent reads that miss the layer + /// chain will consult the overlay before falling through to disk. Replaces + /// any previously-installed overlay. + #[allow(dead_code, reason = "consumed by Section 8 (deep-reorg apply path)")] + pub fn set_overlay(&mut self, overlay: Arc) { + self.overlay = Some(overlay); + } + + /// Removes any installed overlay. Called after the first new-chain commit + /// reconciles the overlay into disk (Section 9). + #[allow(dead_code, reason = "consumed by Section 9 (overlay reconciliation)")] + pub fn clear_overlay(&mut self) { + self.overlay = None; + } + + /// Returns a reference to the installed overlay, if any. Used by tests + /// and by the reconciliation path to fold the overlay into the first + /// new-chain commit. + #[allow(dead_code, reason = "consumed by Section 9 (overlay reconciliation) and tests")] + pub fn overlay(&self) -> Option<&Arc> { + self.overlay.as_ref() + } + + /// Looks up `key` in the installed overlay. Returns: + /// - `None` if no overlay is installed, or the overlay does not contain the key. + /// Caller should fall through to on-disk state. + /// - `Some(None)` if the overlay holds the key with absence — the key did not + /// exist at the pivot. Caller should treat as missing without consulting disk. + /// - `Some(Some(v))` if the overlay holds the key with value `v`. Caller should + /// return `v` without consulting disk. + /// + /// The CF is determined by the key's length, matching `BackendTrieDB::table_for_key`: + /// `len == 65` → account flat-KV; `len == 131` → storage flat-KV; + /// `len < 65` → account trie node; otherwise storage trie node. + pub fn lookup_overlay(&self, key: &[u8]) -> Option>> { + let overlay = self.overlay.as_ref()?; + let cf = OverlayCf::classify_by_key_length(key.len()); + overlay.lookup(cf, key) + } + fn create_filter(expected_items: usize) -> AtomicBloomFilter { AtomicBloomFilter::with_false_pos(FALSE_POSITIVE_RATE) .hasher(FxBuildHasher) @@ -128,6 +190,21 @@ impl TrieLayerCache { None } + /// Returns `true` if a layer with the given `state_root` is present in the cache. + /// Used by the engine API to decide whether a `newPayload`'s parent state is + /// reachable through forward execution (eager path) or whether the payload must be + /// stashed pending a deeper reorg (deferred path returning `ACCEPTED`). + pub fn contains(&self, state_root: H256) -> bool { + self.layers.contains_key(&state_root) + } + + /// Returns the commit threshold of this cache. Used by the deep-reorg + /// path so a freshly-constructed replacement cache inherits the same + /// threshold (carrying batch-mode vs regular-mode configuration). + pub fn commit_threshold(&self) -> usize { + self.commit_threshold + } + /// Returns the state root from which to start a disk commit, using the cache's /// default `commit_threshold`. /// @@ -176,6 +253,8 @@ impl TrieLayerCache { &mut self, parent: H256, state_root: H256, + block_number: BlockNumber, + block_hash: H256, key_values: Vec<(Nibbles, Vec)>, ) { if parent == state_root && key_values.is_empty() { @@ -207,6 +286,8 @@ impl TrieLayerCache { nodes, parent, id: self.last_id, + block_number, + block_hash, }; self.layers.insert(state_root, Arc::new(entry)); } @@ -232,7 +313,8 @@ impl TrieLayerCache { } /// Removes the layer at `state_root` and all its ancestors from the cache, returning - /// their merged trie node diffs in oldest-first order (suitable for sequential disk write). + /// the committed block's identity plus their merged trie node diffs in oldest-first order + /// (suitable for sequential disk write). /// /// `state_root` must be a key in `self.layers` (as returned by /// [`get_commitable`](Self::get_commitable) / @@ -241,7 +323,16 @@ impl TrieLayerCache { /// /// After removal, any orphaned layers (older than the committed ones) are pruned, and /// the bloom filter is rebuilt to remove stale entries. - pub fn commit(&mut self, state_root: H256) -> Option, Vec)>> { + /// + /// Returns `(block_number, block_hash, parent_state_root, merged_nodes)` of the layer at + /// `state_root`. `parent_state_root` is the state root we'd return to on rollback (the + /// committed block's pre-state). In normal operation only one layer is removed; ancestors + /// are evicted as orphans without contributing to the merged nodes (caught by the `id` + /// retain below). + pub fn commit( + &mut self, + state_root: H256, + ) -> Option { let mut layers_to_commit = vec![]; let mut current_state_root = state_root; while let Some(layer) = self.layers.remove(¤t_state_root) { @@ -249,7 +340,11 @@ impl TrieLayerCache { current_state_root = layer.parent; layers_to_commit.push(layer); } - let top_layer_id = layers_to_commit.first()?.id; + let top_layer = layers_to_commit.first()?; + let top_layer_id = top_layer.id; + let committed_block_number = top_layer.block_number; + let committed_block_hash = top_layer.block_hash; + let committed_parent_state_root = top_layer.parent; // older layers are useless self.layers.retain(|_, item| item.id > top_layer_id); self.rebuild_bloom(); // layers removed, rebuild global bloom filter. @@ -258,10 +353,25 @@ impl TrieLayerCache { .rev() .flat_map(|layer| layer.nodes) .collect(); - Some(nodes_to_commit) + Some(CommitResult { + block_number: committed_block_number, + block_hash: committed_block_hash, + parent_state_root: committed_parent_state_root, + nodes: nodes_to_commit, + }) } } +/// Output of [`TrieLayerCache::commit`]: the identity of the committed block plus the merged +/// trie node updates to write to disk. +#[derive(Debug, Default)] +pub struct CommitResult { + pub block_number: BlockNumber, + pub block_hash: H256, + pub parent_state_root: H256, + pub nodes: Vec<(Vec, Vec)>, +} + /// [`TrieDB`] adapter that checks in-memory diff-layers ([`TrieLayerCache`]) first, /// falling back to the on-disk trie only for keys not found in any layer. /// @@ -322,9 +432,19 @@ impl TrieDB for TrieWrapper { Some(prefix) => prefix.concat(&key), None => key, }; + // Read cascade: layer cache (forward layers above the pivot) → overlay + // (reverse-diff bridging disk → pivot during deep reorgs) → disk. + // A layer-cache hit pre-empts the overlay because side-chain writes + // shadow the pivot value for any key the new chain has touched. if let Some(value) = self.inner.get(self.state_root, key.as_ref()) { return Ok(Some(value)); } + if let Some(overlay_result) = self.inner.lookup_overlay(key.as_ref()) { + // Overlay says: key had value `v` at pivot, OR key was absent at + // pivot. Either way, do NOT consult disk (disk holds the OLD + // chain's value, not the pivot value). + return Ok(overlay_result); + } self.db.get(key) } @@ -333,3 +453,497 @@ impl TrieDB for TrieWrapper { unimplemented!("This function should not be called"); } } + +// =========================================================================== +// Overlay — in-memory aggregated reverse-diff used during deep reorgs. +// =========================================================================== + +/// Identifier of which on-disk column family an [`Overlay`] entry targets. +/// Returned by classifier helpers; used by callers to route a key to the right +/// internal map without re-doing the length classification. +#[allow(dead_code, reason = "consumed by the read cascade in Section 7 / reorg apply in Section 8")] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OverlayCf { + AccountTrie, + StorageTrie, + AccountFlat, + StorageFlat, +} + +impl OverlayCf { + /// Maps `OverlayCf` to its column-family name. + #[allow(dead_code, reason = "consumed by Section 9 (overlay reconciliation)")] + pub fn table(self) -> &'static str { + match self { + OverlayCf::AccountTrie => ACCOUNT_TRIE_NODES, + OverlayCf::StorageTrie => STORAGE_TRIE_NODES, + OverlayCf::AccountFlat => ACCOUNT_FLATKEYVALUE, + OverlayCf::StorageFlat => STORAGE_FLATKEYVALUE, + } + } + + /// Classifies an on-disk key into its CF based on length, matching the + /// rules in `BackendTrieDB::table_for_key`: + /// - `len == 65` → `AccountFlat` (account leaf) + /// - `len == 131` → `StorageFlat` (storage leaf, including 32-byte account prefix) + /// - `len < 65` → `AccountTrie` (non-leaf state-trie node) + /// - otherwise → `StorageTrie` (non-leaf storage-trie node) + pub fn classify_by_key_length(len: usize) -> Self { + let is_leaf = len == 65 || len == 131; + let is_account = len <= 65; + match (is_leaf, is_account) { + (true, true) => OverlayCf::AccountFlat, + (true, false) => OverlayCf::StorageFlat, + (false, true) => OverlayCf::AccountTrie, + (false, false) => OverlayCf::StorageTrie, + } + } +} + +/// Errors produced while constructing an [`Overlay`] from the on-disk journal. +#[allow(dead_code, reason = "consumed by Section 8 (deep-reorg apply path)")] +#[derive(Debug, thiserror::Error)] +pub enum OverlayError { + #[error("missing journal entry for block {0}")] + MissingEntry(BlockNumber), + #[error("journal block_hash mismatch at block {block_number}: expected {expected:?}, found {found:?}")] + HashMismatch { + block_number: BlockNumber, + expected: H256, + found: H256, + }, + #[error("journal decode error: {0}")] + Decode(#[from] JournalDecodeError), + #[error("storage error: {0}")] + Store(#[from] StoreError), +} + +/// In-memory aggregated reverse-diff bridging the on-disk state at the cache +/// edge `D` to the virtual state at the deep-reorg pivot `T-1`. +/// +/// Built once per deep reorg by replaying [`STATE_HISTORY`] entries for blocks +/// `D, D-1, ..., T` in descending order. Subsequent state reads during +/// side-chain execution cascade as: new layer cache → overlay → on-disk state. +/// On-disk state is NOT mutated while the overlay is alive — disk stays at `D` +/// until the first new-chain commit folds the overlay and the new layer +/// together into a single atomic write. +pub struct Overlay { + account_trie: FxHashMap, Option>>, + storage_trie: FxHashMap, Option>>, + account_flat: FxHashMap, Option>>, + storage_flat: FxHashMap, Option>>, + /// Bloom filter shared across all four CFs. Populated as entries are added. + /// A miss here lets readers skip overlay lookup and fall through to disk. + bloom: AtomicBloomFilter, + /// Highest block number covered by the overlay (= cache edge `D` at install time). + /// Used by Section 9's reconciliation to issue `delete_range` for obsolete + /// old-chain journal entries. + from_block: BlockNumber, + /// Lowest block number covered by the overlay (= pivot+1, where pivot is + /// `to_block - 1`). The first new-chain block at this height matches + /// `to_block`; reconciliation uses this for the same range computation. + to_block: BlockNumber, +} + +impl fmt::Debug for Overlay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Overlay") + .field("account_trie_len", &self.account_trie.len()) + .field("storage_trie_len", &self.storage_trie.len()) + .field("account_flat_len", &self.account_flat.len()) + .field("storage_flat_len", &self.storage_flat.len()) + .field("bloom", &"AtomicBloomFilter") + .finish() + } +} + +impl Default for Overlay { + fn default() -> Self { + Self { + account_trie: FxHashMap::default(), + storage_trie: FxHashMap::default(), + account_flat: FxHashMap::default(), + storage_flat: FxHashMap::default(), + bloom: AtomicBloomFilter::with_false_pos(FALSE_POSITIVE_RATE) + .hasher(FxBuildHasher) + .expected_items(Self::BLOOM_INITIAL_CAPACITY), + from_block: 0, + to_block: 0, + } + } +} + +impl Overlay { + /// Number of expected items used to size the bloom filter on construction. + const BLOOM_INITIAL_CAPACITY: usize = 64 * 1024; + + /// Build an overlay by replaying journal entries for blocks + /// `[from_block, to_block]` (inclusive both) in descending order. Each + /// loaded entry's `block_hash` is verified against the canonical hash + /// returned by `expected_hash` for that height; a mismatch aborts and + /// returns [`OverlayError::HashMismatch`]. + /// + /// `expected_hash` is a callback that maps a height to the hash of the + /// canonical block at that height on the chain being unwound. This lets + /// the caller drive verification from `CANONICAL_BLOCK_HASHES` without + /// pre-materializing the full chain. Returning `None` from the callback + /// for a height means "skip verification at this height" (used by tests). + /// + /// Within a single key, the OLDEST recorded `prev` value wins (because + /// later applications overwrite earlier ones in the descending walk): + /// that's exactly what we want — the value at `to_block - 1` is whatever + /// the oldest in-range journal entry recorded as the pre-image. + pub fn from_journal( + backend: &dyn StorageBackend, + from_block: BlockNumber, + to_block: BlockNumber, + expected_hash: impl Fn(BlockNumber) -> Option, + ) -> Result { + debug_assert!(from_block >= to_block, "from must be >= to (descending)"); + let mut overlay = Overlay { + from_block, + to_block, + ..Default::default() + }; + + let read = backend.begin_read()?; + let mut n = from_block; + loop { + let bytes = read + .get(STATE_HISTORY, &n.to_be_bytes())? + .ok_or(OverlayError::MissingEntry(n))?; + let entry = JournalEntry::decode(&bytes)?; + if let Some(expected) = expected_hash(n) + && entry.block_hash != expected + { + return Err(OverlayError::HashMismatch { + block_number: n, + expected, + found: entry.block_hash, + }); + } + overlay.absorb(entry); + if n == to_block { + break; + } + n -= 1; + } + Ok(overlay) + } + + /// Absorbs one journal entry into the overlay. Later inserts overwrite + /// earlier ones — combined with a descending walk in `from_journal`, this + /// makes the OLDEST in-range entry's `prev` win, which is the correct + /// value at the pivot. + fn absorb(&mut self, entry: JournalEntry) { + for (k, v) in entry.account_trie_diff { + self.bloom.insert(&k); + self.account_trie.insert(k, v); + } + for (k, v) in entry.storage_trie_diff { + self.bloom.insert(&k); + self.storage_trie.insert(k, v); + } + for (k, v) in entry.account_flat_diff { + self.bloom.insert(&k); + self.account_flat.insert(k, v); + } + for (k, v) in entry.storage_flat_diff { + self.bloom.insert(&k); + self.storage_flat.insert(k, v); + } + } + + /// Look up `key` in the overlay's `cf` slot. + /// + /// Returns: + /// - `None` if the key is not in the overlay (caller should consult disk). + /// - `Some(None)` if the key was overwritten and previously didn't exist + /// on disk (caller should treat as absent — a rollback would delete it). + /// - `Some(Some(v))` if the key was overwritten and previously had value + /// `v` on disk (caller should treat as `v` — a rollback would restore it). + #[allow(dead_code, reason = "consumed by the read cascade in Section 7")] + pub fn lookup(&self, cf: OverlayCf, key: &[u8]) -> Option>> { + if !self.bloom.contains(key) { + return None; + } + let map = match cf { + OverlayCf::AccountTrie => &self.account_trie, + OverlayCf::StorageTrie => &self.storage_trie, + OverlayCf::AccountFlat => &self.account_flat, + OverlayCf::StorageFlat => &self.storage_flat, + }; + map.get(key).cloned() + } + + /// Total number of overlay entries across all four CFs. Mostly useful for + /// tests and metrics. + #[allow(dead_code, reason = "exposed for tests and Section 14 metrics")] + pub fn len(&self) -> usize { + self.account_trie.len() + + self.storage_trie.len() + + self.account_flat.len() + + self.storage_flat.len() + } + + /// Whether the overlay holds any entries. + #[allow(dead_code, reason = "exposed for tests and Section 14 metrics")] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Highest block number covered by the overlay (= the cache edge `D` at install time). + #[allow(clippy::wrong_self_convention, reason = "field accessor: name matches struct field")] + pub fn from_block(&self) -> BlockNumber { + self.from_block + } + + /// Lowest block number covered by the overlay (= `pivot + 1`). + pub fn to_block(&self) -> BlockNumber { + self.to_block + } + + /// Iterates every overlay entry across the four CFs as + /// `(cf, key, value)` triples. Used by Section 9's reconciliation to fold + /// overlay-only entries into the first new-chain commit. + pub fn iter_all_entries(&self) -> impl Iterator, &Option>)> { + self.account_trie + .iter() + .map(|(k, v)| (OverlayCf::AccountTrie, k, v)) + .chain( + self.storage_trie + .iter() + .map(|(k, v)| (OverlayCf::StorageTrie, k, v)), + ) + .chain( + self.account_flat + .iter() + .map(|(k, v)| (OverlayCf::AccountFlat, k, v)), + ) + .chain( + self.storage_flat + .iter() + .map(|(k, v)| (OverlayCf::StorageFlat, k, v)), + ) + } +} + +#[cfg(test)] +mod overlay_tests { + use super::*; + use crate::backend::in_memory::InMemoryBackend; + use std::sync::Arc; + + fn h(b: u8) -> H256 { + H256::repeat_byte(b) + } + + /// Seed N journal entries directly into STATE_HISTORY. Each entry's + /// `account_trie_diff` carries one (path, value) pair so we can verify + /// "older entry wins" semantics across multiple blocks. + fn seed( + backend: &Arc, + per_block: &[(BlockNumber, H256, crate::journal::FlatDiff)], + ) { + let mut tx = backend.begin_write().unwrap(); + for (n, block_hash, diff) in per_block { + let entry = JournalEntry { + block_hash: *block_hash, + parent_state_root: H256::zero(), + account_trie_diff: diff.clone(), + storage_trie_diff: vec![], + account_flat_diff: vec![], + storage_flat_diff: vec![], + }; + tx.put(STATE_HISTORY, &n.to_be_bytes(), &entry.encode()) + .unwrap(); + } + tx.commit().unwrap(); + } + + #[test] + fn from_journal_loads_descending_range() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + seed( + &backend, + &[ + (3, h(0x03), vec![(vec![0xa], Some(vec![0x33]))]), + (4, h(0x04), vec![(vec![0xb], Some(vec![0x44]))]), + (5, h(0x05), vec![(vec![0xc], Some(vec![0x55]))]), + ], + ); + let overlay = Overlay::from_journal(backend.as_ref(), 5, 3, |n| { + Some(H256::repeat_byte(n as u8)) + }) + .unwrap(); + assert_eq!(overlay.len(), 3); + assert_eq!( + overlay.lookup(OverlayCf::AccountTrie, &[0xa]), + Some(Some(vec![0x33])) + ); + assert_eq!( + overlay.lookup(OverlayCf::AccountTrie, &[0xb]), + Some(Some(vec![0x44])) + ); + assert_eq!( + overlay.lookup(OverlayCf::AccountTrie, &[0xc]), + Some(Some(vec![0x55])) + ); + } + + #[test] + fn older_entry_wins_when_key_repeats() { + // Block 3 (oldest) wrote K=Y3 (was X). Block 5 (newest) wrote K=Y5 + // (was Y4). The overlay should expose K=X — the value at block 2 + // (= to_block - 1). + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + seed( + &backend, + &[ + (3, h(0x03), vec![(vec![0xaa], Some(b"X".to_vec()))]), + (4, h(0x04), vec![(vec![0xaa], Some(b"Y3".to_vec()))]), + (5, h(0x05), vec![(vec![0xaa], Some(b"Y4".to_vec()))]), + ], + ); + let overlay = + Overlay::from_journal(backend.as_ref(), 5, 3, |n| Some(H256::repeat_byte(n as u8))) + .unwrap(); + assert_eq!( + overlay.lookup(OverlayCf::AccountTrie, &[0xaa]), + Some(Some(b"X".to_vec())), + "oldest reverse-diff value should win after descending walk" + ); + } + + #[test] + fn absent_key_passes_through_bloom() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + seed( + &backend, + &[(3, h(0x03), vec![(vec![0xaa], Some(vec![0x11]))])], + ); + let overlay = + Overlay::from_journal(backend.as_ref(), 3, 3, |n| Some(H256::repeat_byte(n as u8))) + .unwrap(); + assert_eq!(overlay.lookup(OverlayCf::AccountTrie, &[0xff]), None); + } + + #[test] + fn hash_mismatch_aborts() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + seed(&backend, &[(7, h(0x07), vec![(vec![0xaa], None)])]); + // Caller supplies the WRONG expected hash for height 7. + let err = Overlay::from_journal(backend.as_ref(), 7, 7, |_| Some(h(0xff))).unwrap_err(); + match err { + OverlayError::HashMismatch { block_number, .. } => assert_eq!(block_number, 7), + other => panic!("expected HashMismatch, got {other:?}"), + } + } + + #[test] + fn missing_entry_aborts() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + // Seed only block 5; ask for [5, 3] — blocks 4 and 3 are missing. + seed(&backend, &[(5, h(0x05), vec![])]); + let err = Overlay::from_journal(backend.as_ref(), 5, 3, |_| None).unwrap_err(); + match err { + OverlayError::MissingEntry(n) => assert_eq!(n, 4), + other => panic!("expected MissingEntry, got {other:?}"), + } + } + + /// Verifies the read cascade precedence on `TrieLayerCache::lookup_overlay`: + /// + /// layer cache (top) ─── covered separately by `TrieLayerCache::get` + /// overlay (mid) ─── this method + /// on-disk (bot) ─── `BackendTrieDB` + /// + /// `lookup_overlay` answers only the overlay tier with three possible + /// outcomes — caller (TrieWrapper::get) uses the result to decide whether + /// to skip disk. + #[test] + fn overlay_lookup_returns_none_when_no_overlay_installed() { + let cache = TrieLayerCache::new(1); + // No overlay installed — must short-circuit to None for any key length. + for key_len in [4usize, 65, 67, 131] { + let key = vec![0xab; key_len]; + assert_eq!( + cache.lookup_overlay(&key), + None, + "no overlay installed → outer None at length {key_len}" + ); + } + } + + #[test] + fn overlay_lookup_classifies_cf_by_key_length() { + // Construct an overlay with one entry per CF, each at the canonical + // length, then assert lookup hits the right bucket. + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + // Build entries directly (bypass from_journal so we control all four CFs). + let entry = JournalEntry { + block_hash: h(0x01), + parent_state_root: H256::zero(), + account_trie_diff: vec![(vec![0x10; 4], Some(b"acct-trie".to_vec()))], + storage_trie_diff: vec![(vec![0x20; 67], Some(b"stor-trie".to_vec()))], + account_flat_diff: vec![(vec![0x30; 65], Some(b"acct-flat".to_vec()))], + storage_flat_diff: vec![(vec![0x40; 131], None)], + }; + let mut tx = backend.begin_write().unwrap(); + tx.put(STATE_HISTORY, &1u64.to_be_bytes(), &entry.encode()) + .unwrap(); + tx.commit().unwrap(); + let overlay = Overlay::from_journal(backend.as_ref(), 1, 1, |_| None).unwrap(); + + let mut cache = TrieLayerCache::new(1); + cache.set_overlay(Arc::new(overlay)); + + // Each key must route to its correct CF and produce the right value. + assert_eq!( + cache.lookup_overlay(&[0x10; 4]), + Some(Some(b"acct-trie".to_vec())) + ); + assert_eq!( + cache.lookup_overlay(&[0x20; 67]), + Some(Some(b"stor-trie".to_vec())) + ); + assert_eq!( + cache.lookup_overlay(&[0x30; 65]), + Some(Some(b"acct-flat".to_vec())) + ); + assert_eq!( + cache.lookup_overlay(&[0x40; 131]), + Some(None), + "overlay with None means key was absent at pivot" + ); + // A different key at the same length must miss the overlay. + assert_eq!(cache.lookup_overlay(&[0xee; 4]), None); + } + + #[test] + fn classify_by_key_length_matches_backend_table_routing() { + // Spot-check the boundaries: the rules must agree with + // BackendTrieDB::table_for_key (account leaf at 65, storage leaf at 131, + // anything else routed by length comparison to 65). + assert_eq!(OverlayCf::classify_by_key_length(0), OverlayCf::AccountTrie); + assert_eq!(OverlayCf::classify_by_key_length(64), OverlayCf::AccountTrie); + assert_eq!(OverlayCf::classify_by_key_length(65), OverlayCf::AccountFlat); + assert_eq!(OverlayCf::classify_by_key_length(66), OverlayCf::StorageTrie); + assert_eq!(OverlayCf::classify_by_key_length(130), OverlayCf::StorageTrie); + assert_eq!(OverlayCf::classify_by_key_length(131), OverlayCf::StorageFlat); + assert_eq!(OverlayCf::classify_by_key_length(132), OverlayCf::StorageTrie); + } + + #[test] + fn skip_verification_when_callback_returns_none() { + // expected_hash returning None means "don't verify this height"; + // overlay loads regardless of what's on disk. + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + seed(&backend, &[(7, h(0xab), vec![(vec![0x01], None)])]); + let overlay = Overlay::from_journal(backend.as_ref(), 7, 7, |_| None).unwrap(); + assert_eq!( + overlay.lookup(OverlayCf::AccountTrie, &[0x01]), + Some(None) + ); + } +} diff --git a/crates/storage/lib.rs b/crates/storage/lib.rs index 876911539e5..5c246cc0808 100644 --- a/crates/storage/lib.rs +++ b/crates/storage/lib.rs @@ -67,6 +67,7 @@ pub mod api; pub mod backend; pub mod error; +pub mod journal; mod layering; pub mod migrations; pub mod rlp; diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 56158c8ce6a..d6fbd4c3116 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -8,13 +8,15 @@ use crate::{ ACCOUNT_CODE_METADATA, ACCOUNT_CODES, ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, BLOCK_NUMBERS, BODIES, CANONICAL_BLOCK_HASHES, CHAIN_DATA, EXECUTION_WITNESSES, FULLSYNC_HEADERS, HEADERS, INVALID_CHAINS, MISC_VALUES, PENDING_BLOCKS, RECEIPTS, - SNAP_STATE, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES, TRANSACTION_LOCATIONS, + SNAP_STATE, STATE_HISTORY, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES, + TRANSACTION_LOCATIONS, }, }, apply_prefix, backend::in_memory::InMemoryBackend, error::StoreError, - layering::{TrieLayerCache, TrieWrapper}, + journal::{FlatDiff, JournalEntry}, + layering::{CommitResult, TrieLayerCache, TrieWrapper}, rlp::{BlockBodyRLP, BlockHeaderRLP, BlockRLP}, trie::{BackendTrieDB, BackendTrieDBLocked}, utils::{ChainDataIndex, SnapStateIndex}, @@ -1046,7 +1048,31 @@ impl Store { if let Some(finalized) = finalized { let finalized_key = chain_data_key(ChainDataIndex::FinalizedBlockNumber); + + // Read the previous finalized number before overwriting it, so + // we can decide whether to prune the state-history journal. + // Pre-merge or fresh chains have no entry; treat as 0. + let prev_finalized = db + .begin_read()? + .get(CHAIN_DATA, &finalized_key)? + .and_then(|bytes| { + bytes + .try_into() + .ok() + .map(|arr: [u8; 8]| BlockNumber::from_le_bytes(arr)) + }) + .unwrap_or(0); + txn.put(CHAIN_DATA, &finalized_key, &finalized.to_le_bytes())?; + + // If finality advanced, prune every STATE_HISTORY entry at or + // below the new finalized number in the same atomic txn. + // delete_range is half-open: [start, end), so end is F+1. + if finalized > prev_finalized { + let start = 0u64.to_be_bytes(); + let end = finalized.saturating_add(1).to_be_bytes(); + txn.delete_range(STATE_HISTORY, &start, &end)?; + } } txn.commit() @@ -1370,12 +1396,13 @@ impl Store { )? .map(|header| header.state_root) .unwrap_or_default(); - let last_state_root = update_batch + let last_block = update_batch .blocks .last() - .ok_or(StoreError::UpdateBatchNoBlocks)? - .header - .state_root; + .ok_or(StoreError::UpdateBatchNoBlocks)?; + let last_state_root = last_block.header.state_root; + let last_block_number = last_block.header.number; + let last_block_hash = last_block.hash(); let trie_upd_worker_tx = self.trie_update_worker_tx.clone(); let is_batch = update_batch.batch_mode; @@ -1396,6 +1423,8 @@ impl Store { result_sender: notify_tx, child_state_root: last_state_root, is_batch, + block_number: last_block_number, + block_hash: last_block_hash, }; trie_upd_worker_tx.send(trie_update).map_err(|e| { StoreError::Custom(format!("failed to read new trie layer notification: {e}")) @@ -2749,6 +2778,114 @@ impl Store { Ok(state_root == root_hash) } + // =========================================================================== + // Deep-reorg primitives (Section 8 storage side). + // =========================================================================== + + /// Returns `true` if the in-memory layer cache currently has a layer with + /// the given `state_root`. Used by the engine API and the deep-reorg + /// dispatcher to decide whether the head's state can be reached through + /// forward execution (cache pivot or cache hit) or whether a deep-reorg + /// path with overlay construction is required. + pub fn is_state_in_layer_cache(&self, state_root: H256) -> Result { + let trie = self + .trie_cache + .read() + .map_err(|_| StoreError::LockError)? + .clone(); + Ok(trie.contains(state_root)) + } + + /// Atomically prepares the store for a deep-reorg apply pass. + /// + /// Builds an [`Overlay`] from `STATE_HISTORY` entries for blocks + /// `[to_block, from_block]` (descending), verifies each entry's + /// `block_hash` against `expected_hash`, then swaps the in-memory layer + /// cache for a fresh one with the overlay installed. After this call: + /// + /// - The layer cache contains zero forward layers. + /// - The overlay is in place; subsequent `TrieWrapper::get` calls cascade + /// layer cache → overlay → disk. + /// - The on-disk trie/flat-KV state is **unchanged** (still at the old + /// chain's edge). + /// + /// Side-chain blocks `[pivot+1 .. new_head]` should now be executed in + /// order via the existing `Blockchain::add_block` path; each block's + /// state reads will see the overlay, and each block's commit produces a + /// new forward layer in the (initially empty) cache. + /// + /// Errors abort the swap: if overlay construction fails (missing entry, + /// hash mismatch, decode error), the existing layer cache is left intact + /// and the caller can fall back to other recovery (e.g. `SYNCING`). + pub fn install_overlay_for_reorg( + &self, + from_block: BlockNumber, + to_block: BlockNumber, + expected_hash: impl Fn(BlockNumber) -> Option, + ) -> Result<(), StoreError> { + // Build the overlay first so any error aborts before mutating the + // trie cache. + let overlay = crate::layering::Overlay::from_journal( + self.backend.as_ref(), + from_block, + to_block, + expected_hash, + ) + .map_err(|e| StoreError::Custom(format!("overlay construction failed: {e}")))?; + + // Atomically swap the layer cache. The lock excludes any in-flight + // trie worker activity; the caller is responsible for ensuring no new + // `store_block_updates` are dispatched until side-chain execution + // begins (the engine API's `ReorgInProgress` flag, Section 11). + let threshold = { + let current = self.trie_cache.read().map_err(|_| StoreError::LockError)?; + current.commit_threshold() + }; + let mut fresh = TrieLayerCache::new(threshold); + fresh.set_overlay(Arc::new(overlay)); + + let mut guard = self.trie_cache.write().map_err(|_| StoreError::LockError)?; + *guard = Arc::new(fresh); + Ok(()) + } + + /// Returns the highest block number with a `STATE_HISTORY` entry, i.e., + /// the cache edge `D` (the deepest block whose post-state is on disk). + /// Returns `None` if the journal is empty (pre-finality fresh chain or + /// post-finality fully-pruned). + /// + /// Used by the deep-reorg orchestrator to determine the overlay's + /// `from_block` parameter. + pub fn highest_state_history_block_number(&self) -> Result, StoreError> { + let read = self.backend.begin_read()?; + // STATE_HISTORY uses big-endian keys, so prefix-iterating from the + // empty prefix yields entries in ascending block-number order. We + // want the max — collect all and take the last. For the bounded + // depths we expect (≤ finality, mainnet ~64 entries), this is fine. + let mut max: Option = None; + for entry in read.prefix_iterator(STATE_HISTORY, &[])? { + let (key, _) = entry?; + if let Ok(arr) = <[u8; 8]>::try_from(key.as_ref()) { + let n = BlockNumber::from_be_bytes(arr); + max = Some(max.map_or(n, |m| m.max(n))); + } + } + Ok(max) + } + + /// Removes any installed overlay from the layer cache. + /// + /// Called by Section 9's reconciliation path after the first new-chain + /// commit folds the overlay into disk. Idempotent: a no-op if no overlay + /// is currently installed. + pub fn clear_reorg_overlay(&self) -> Result<(), StoreError> { + let mut guard = self.trie_cache.write().map_err(|_| StoreError::LockError)?; + let mut updated = (**guard).clone(); + updated.clear_overlay(); + *guard = Arc::new(updated); + Ok(()) + } + /// Takes a block hash and returns an iterator to its ancestors. Block headers are returned /// in reverse order, starting from the given block and going up to the genesis block. pub fn ancestors(&self, block_hash: BlockHash) -> AncestorIterator { @@ -2864,6 +3001,14 @@ struct TrieUpdate { account_updates: TrieNodesUpdate, storage_updates: Vec<(H256, TrieNodesUpdate)>, is_batch: bool, + /// Number of the block whose layer this update represents. + /// For a non-batch update this is exactly the single block being committed; + /// for a batch update it is the last block in the batch (matching + /// `child_state_root`). Used by the journal write path; harmless for batch + /// updates since journal writes are skipped when `is_batch == true`. + block_number: BlockNumber, + /// Hash of the block whose layer this update represents (see `block_number`). + block_hash: H256, } // NOTE: we don't receive `Store` here to avoid cyclic dependencies @@ -2881,6 +3026,8 @@ fn apply_trie_updates( account_updates, storage_updates, is_batch, + block_number, + block_hash, } = trie_update; // Phase 1: update the in-memory diff-layers only, then notify block production. @@ -2899,7 +3046,13 @@ fn apply_trie_updates( .map_err(|_| StoreError::LockError)? .clone(); let mut trie_mut = (*trie).clone(); - trie_mut.put_batch(parent_state_root, child_state_root, new_layer); + trie_mut.put_batch( + parent_state_root, + child_state_root, + block_number, + block_hash, + new_layer, + ); let trie = Arc::new(trie_mut); *trie_cache.write().map_err(|_| StoreError::LockError)? = trie.clone(); // Update finished, signal block processing. @@ -2924,8 +3077,8 @@ fn apply_trie_updates( // RCU to remove the bottom layer: update step needs to happen after disk layer is updated. let mut trie_mut = (*trie).clone(); - let last_written = backend - .begin_read()? + let read_view = backend.begin_read()?; + let last_written = read_view .get(MISC_VALUES, "last_written".as_bytes())? .unwrap_or_default(); @@ -2934,10 +3087,70 @@ fn apply_trie_updates( // Before encoding, accounts have only the account address as their path, while storage keys have // the account address (32 bytes) + storage path (up to 32 bytes). - // Commit removes the bottom layer and returns it, this is the mutation step. - let nodes = trie_mut.commit(root).unwrap_or_default(); + // Reverse-diff accumulators for the journal entry, one per CF. Populated + // only when `!is_batch`. Each entry stores the on-disk key as-is, so + // rollback can apply diffs directly without further interpretation. For + // full sync (`is_batch == true`), no journal entry is written — reorgs + // aren't supported during full sync, and journaling would slow it down. + let mut journal_account_trie: FlatDiff = Vec::new(); + let mut journal_storage_trie: FlatDiff = Vec::new(); + let mut journal_account_flat: FlatDiff = Vec::new(); + let mut journal_storage_flat: FlatDiff = Vec::new(); + + // Commit removes the bottom layer and returns the committed block's + // identity plus the merged k/v. In normal operation this is the block one + // commit-cadence behind the just-added block, NOT the just-added block. + let CommitResult { + block_number: committed_block_number, + block_hash: committed_block_hash, + parent_state_root: committed_parent_state_root, + nodes, + } = trie_mut.commit(root).unwrap_or_default(); + + // Section 9 — overlay reconciliation on first new-chain commit. + // + // If an overlay is installed AND we're committing in non-batch mode, this + // is the first new-chain commit after a deep reorg. The overlay holds the + // reverse-diff bridging on-disk state (at the OLD chain's edge `D`) to + // the pivot at `to_block - 1`. We must fold overlay-only entries into the + // commit so disk advances directly from D-old to T-new in a single + // atomic write (no intermediate "rolled back" disk state). After the + // commit succeeds, the overlay is cleared and obsolete old-chain journal + // entries in `[T, D]` are deleted via `delete_range`. + let overlay_for_reconciliation = if !is_batch { trie.overlay().cloned() } else { None }; + let mut result = Ok(()); - for (key, value) in nodes { + + // Build a key set for fast "is this key in layer_T?" lookups so overlay + // entries that the new chain has ALSO touched can be skipped (layer_T + // wins — its value is the post-T state, which is what we want on disk). + let layer_keys: rustc_hash::FxHashSet> = if overlay_for_reconciliation.is_some() { + nodes.iter().map(|(k, _)| k.clone()).collect() + } else { + rustc_hash::FxHashSet::default() + }; + + // Combined write iterator: layer_T first, then overlay-only entries. + // Overlay entries are converted to layer-style format (empty value = delete). + let extra_writes: Vec<(Vec, Vec)> = match &overlay_for_reconciliation { + Some(overlay) => overlay + .iter_all_entries() + .filter(|(_, key, _)| !layer_keys.contains(*key)) + .map(|(_, key, value)| { + // None (absent at pivot) → empty Vec → "delete" downstream. + // Some(v) → use v as-is. Empty values written through this path + // are extremely rare; treat them consistently with layer values. + let v = value.clone().unwrap_or_default(); + (key.clone(), v) + }) + .collect(), + None => Vec::new(), + }; + + let combined_writes_len = nodes.len() + extra_writes.len(); + let combined_writes = nodes.into_iter().chain(extra_writes); + + for (key, value) in combined_writes { let is_leaf = key.len() == 65 || key.len() == 131; let is_account = key.len() <= 65; @@ -2955,6 +3168,22 @@ fn apply_trie_updates( } else { &STORAGE_TRIE_NODES }; + + // Read the pre-image before overwriting. The read view was opened + // before the write transaction so it sees the on-disk state we are + // about to mutate. Skipped for batch (full-sync) commits. + let prev_value = if !is_batch { + match read_view.get(table, &key) { + Ok(v) => Some(v), + Err(e) => { + result = Err(e); + break; + } + } + } else { + None + }; + if value.is_empty() { result = write_tx.delete(table, &key); } else { @@ -2963,13 +3192,78 @@ fn apply_trie_updates( if result.is_err() { break; } + + // Record the reverse-diff entry. Done after the put/delete so that on + // a write error we don't accumulate state we won't actually persist. + // Keys are stored as-is (with their nibble-encoded prefix for storage + // CFs); rollback applies them directly without interpretation. + if let Some(prev) = prev_value { + let bucket = match (is_leaf, is_account) { + (false, true) => &mut journal_account_trie, + (false, false) => &mut journal_storage_trie, + (true, true) => &mut journal_account_flat, + (true, false) => &mut journal_storage_flat, + }; + bucket.push((key, prev)); + } + } + // Silence the unused-variable warning when the size hint isn't consumed by + // a tracing log; kept available for future observability. + let _ = combined_writes_len; + + // Stage the journal entry into the same write batch as the trie/flat-KV + // overwrites. `delete_range_cf` and `put_cf` are both buffered until + // `commit`, so all four CFs land atomically (or none do on commit failure). + // The entry is keyed and identified by the COMMITTED block (not the + // in-flight block whose insertion triggered this commit) — the in-flight + // block's own commit happens later when the next block's insertion pushes + // it past the threshold. + // Section 9 reconciliation: stage `delete_range` for obsolete old-chain + // journal entries `[T, D]` BEFORE the new T-new entry put, so the new + // entry isn't clobbered by the range delete. + if result.is_ok() + && let Some(overlay) = &overlay_for_reconciliation + { + let t = overlay.to_block(); + let d = overlay.from_block(); + debug_assert_eq!( + committed_block_number, t, + "first new-chain commit must be at the pivot's T height (overlay.to_block)" + ); + let start = t.to_be_bytes(); + let end = d.saturating_add(1).to_be_bytes(); + result = write_tx.delete_range(STATE_HISTORY, &start, &end); + } + + if result.is_ok() && !is_batch { + let entry = JournalEntry { + block_hash: committed_block_hash, + parent_state_root: committed_parent_state_root, + account_trie_diff: journal_account_trie, + storage_trie_diff: journal_storage_trie, + account_flat_diff: journal_account_flat, + storage_flat_diff: journal_storage_flat, + }; + result = write_tx.put( + STATE_HISTORY, + &committed_block_number.to_be_bytes(), + &entry.encode(), + ); } + if result.is_ok() { result = write_tx.commit(); } // We want to send this message even if there was an error during the batch write let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue); result?; + + // Section 9: on successful commit of the reconciliation, clear the overlay + // from the cache. Subsequent commits revert to the normal one-block path. + if overlay_for_reconciliation.is_some() { + trie_mut.clear_overlay(); + } + // Phase 3: update diff layers with the removal of bottom layer. *trie_cache.write().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut); Ok(()) @@ -3352,3 +3646,642 @@ pub fn read_chain_id_from_db(path: &Path) -> Option { None } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::tables::STATE_HISTORY; + use crate::backend::in_memory::InMemoryBackend; + use crate::journal::JournalEntry; + use ethrex_common::types::{BlockBody, BlockHeader}; + use ethrex_trie::Nibbles; + use std::time::{Duration, Instant}; + + fn make_block(number: BlockNumber, parent_hash: H256, state_root: H256) -> Block { + let header = BlockHeader { + number, + parent_hash, + state_root, + ..Default::default() + }; + Block::new(header, BlockBody::default()) + } + + /// Polls `STATE_HISTORY` for an entry at the given block number, up to + /// `timeout`. The trie worker commits to disk asynchronously after + /// `store_block_updates` returns, so a small wait window is required. + fn await_journal_entry( + backend: &Arc, + block_number: BlockNumber, + timeout: Duration, + ) -> Option> { + let key = block_number.to_be_bytes(); + let deadline = Instant::now() + timeout; + loop { + let read = backend.begin_read().ok()?; + if let Ok(Some(v)) = read.get(STATE_HISTORY, &key) { + return Some(v); + } + if Instant::now() >= deadline { + return None; + } + std::thread::sleep(Duration::from_millis(10)); + } + } + + fn assert_no_journal_entry(backend: &Arc, block_number: BlockNumber) { + // Give the worker a generous window to confirm absence isn't just a race. + std::thread::sleep(Duration::from_millis(50)); + let key = block_number.to_be_bytes(); + let read = backend.begin_read().expect("read view"); + let v = read.get(STATE_HISTORY, &key).expect("get"); + assert!( + v.is_none(), + "expected no STATE_HISTORY entry for block {block_number}, got {v:?}" + ); + } + + /// With `commit_threshold = 1`, the first batch seeds the layer cache and + /// the second batch commits the first layer to disk. So pushing N batches + /// produces N-1 journal entries. We verify entries for block 1 and block 2 + /// after pushing 3 batches. + #[test] + fn journal_entry_written_per_block_in_regular_mode() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // Block 1: distinct state_root, single non-leaf trie node update. + let state_root_1 = H256::repeat_byte(0x11); + let block1 = make_block(1, H256::zero(), state_root_1); + let block1_hash = block1.hash(); + let update_1 = UpdateBatch { + account_updates: vec![(Nibbles::from_raw(&[0x00, 0x01], false), vec![0xab, 0xcd])], + storage_updates: vec![], + blocks: vec![block1], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }; + store.store_block_updates(update_1).unwrap(); + + // Block 2: parented to block 1. This is what triggers the commit of + // block 1's layer. + let state_root_2 = H256::repeat_byte(0x22); + let block2 = make_block(2, block1_hash, state_root_2); + let block2_hash = block2.hash(); + let update_2 = UpdateBatch { + account_updates: vec![(Nibbles::from_raw(&[0x00, 0x02], false), vec![0xef, 0x11])], + storage_updates: vec![], + blocks: vec![block2], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }; + store.store_block_updates(update_2).unwrap(); + + let bytes = await_journal_entry(&backend, 1, Duration::from_secs(2)) + .expect("STATE_HISTORY entry for block 1 should appear after block 2 commits it"); + let entry = JournalEntry::decode(&bytes).unwrap(); + assert_eq!( + entry.block_hash, block1_hash, + "journal block_hash must match the committed block" + ); + assert_eq!( + entry.parent_state_root, H256::zero(), + "first block's parent state root is zero (no header for parent_hash=0)" + ); + assert!( + !entry.account_trie_diff.is_empty(), + "non-empty trie update must produce non-empty account_trie_diff" + ); + // The pre-image is None because the path didn't exist on disk before this commit. + let (path, prev) = &entry.account_trie_diff[0]; + assert_eq!(prev, &None, "first-time write means previous value is None"); + // Path length is small (non-leaf), classified as ACCOUNT_TRIE_NODES. + assert!(path.len() < 65); + + // Block 3 commits block 2's layer. + let state_root_3 = H256::repeat_byte(0x33); + let block3 = make_block(3, block2_hash, state_root_3); + let update_3 = UpdateBatch { + account_updates: vec![(Nibbles::from_raw(&[0x00, 0x03], false), vec![0x77])], + storage_updates: vec![], + blocks: vec![block3], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }; + store.store_block_updates(update_3).unwrap(); + + let bytes = await_journal_entry(&backend, 2, Duration::from_secs(2)) + .expect("STATE_HISTORY entry for block 2 should appear after block 3 commits it"); + let entry = JournalEntry::decode(&bytes).unwrap(); + assert_eq!(entry.block_hash, block2_hash); + // Block 2's parent state root is block 1's post-state root. + assert_eq!(entry.parent_state_root, state_root_1); + assert!(!entry.account_trie_diff.is_empty()); + } + + /// `batch_mode = true` SHALL skip the journal entirely (full sync does not + /// support reorgs and the read-pre-image cost would slow it down). To + /// actually exercise the gating we need to push enough batches to trigger + /// a commit under `BATCH_COMMIT_THRESHOLD = 4`, then verify no + /// STATE_HISTORY entry materializes despite the commit happening. + #[test] + fn journal_skipped_in_batch_mode() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // 5 batches in batch mode → commit fires for block 1 on the 5th call. + let mut prev_hash = H256::zero(); + for n in 1..=5u64 { + let state_root = H256::repeat_byte(0xa0 | (n as u8)); + let block = make_block(n, prev_hash, state_root); + prev_hash = block.hash(); + let update = UpdateBatch { + account_updates: vec![( + Nibbles::from_raw(&[(n as u8)], false), + vec![0xde, 0xad, n as u8], + )], + storage_updates: vec![], + blocks: vec![block], + receipts: vec![], + code_updates: vec![], + batch_mode: true, + }; + store.store_block_updates(update).unwrap(); + } + + // Despite a commit having fired (block 1's layer was flushed), no + // STATE_HISTORY entries should exist for any block. + for n in 1..=5u64 { + assert_no_journal_entry(&backend, n); + } + } + + /// Helper: seed the journal with synthetic entries at the given block + /// numbers so we can test pruning without driving real layer commits. + fn seed_journal_entries(backend: &Arc, block_numbers: &[BlockNumber]) { + let mut tx = backend.begin_write().unwrap(); + for n in block_numbers { + let entry = JournalEntry { + block_hash: H256::repeat_byte(*n as u8), + parent_state_root: H256::zero(), + account_trie_diff: vec![(vec![*n as u8], None)], + storage_trie_diff: vec![], + account_flat_diff: vec![], + storage_flat_diff: vec![], + }; + tx.put(STATE_HISTORY, &n.to_be_bytes(), &entry.encode()) + .unwrap(); + } + tx.commit().unwrap(); + } + + fn journal_entry_exists(backend: &Arc, block_number: BlockNumber) -> bool { + backend + .begin_read() + .unwrap() + .get(STATE_HISTORY, &block_number.to_be_bytes()) + .unwrap() + .is_some() + } + + /// Finality advance SHALL prune every STATE_HISTORY entry at or below the + /// new finalized number, in the same atomic txn as the finalized-number + /// update. + #[tokio::test] + async fn finality_advance_prunes_journal_below_boundary() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // Seed journal entries for blocks 1..=10. + seed_journal_entries(&backend, &(1..=10).collect::>()); + for n in 1..=10 { + assert!(journal_entry_exists(&backend, n), "seed entry {n} present"); + } + + // Advance finalized to 5. Entries 1..=5 should be pruned; 6..=10 retained. + store + .forkchoice_update_inner(vec![], 100, H256::zero(), None, Some(5)) + .await + .unwrap(); + + for n in 1..=5 { + assert!( + !journal_entry_exists(&backend, n), + "entry {n} should have been pruned (≤ finalized)" + ); + } + for n in 6..=10 { + assert!( + journal_entry_exists(&backend, n), + "entry {n} should remain (> finalized)" + ); + } + } + + /// Forkchoice updates that don't advance finalized SHALL NOT prune the + /// journal. Re-asserting the same finalized value is a no-op. + #[tokio::test] + async fn finality_no_op_does_not_prune() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // Establish prev_finalized = 5. + store + .forkchoice_update_inner(vec![], 100, H256::zero(), None, Some(5)) + .await + .unwrap(); + + // Now seed entries for 6..=10 (representing post-finality history that + // accumulated since the last advance). + seed_journal_entries(&backend, &(6..=10).collect::>()); + + // FCU re-asserting finalized = 5: must not prune anything. + store + .forkchoice_update_inner(vec![], 100, H256::zero(), None, Some(5)) + .await + .unwrap(); + + for n in 6..=10 { + assert!( + journal_entry_exists(&backend, n), + "entry {n} should still exist after no-op finality update" + ); + } + + // FCU with finalized = None: also a no-op for pruning. + store + .forkchoice_update_inner(vec![], 100, H256::zero(), None, None) + .await + .unwrap(); + + for n in 6..=10 { + assert!( + journal_entry_exists(&backend, n), + "entry {n} should still exist when finalized is None" + ); + } + } + + /// `is_state_in_layer_cache` returns true for state_roots that the trie + /// cache currently has a layer for, and false otherwise. Used by the + /// engine API and the deep-reorg dispatcher. + #[tokio::test] + async fn is_state_in_layer_cache_reflects_current_cache_contents() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + // Use a high commit threshold (3) so the layers stay in cache after + // the first call — we want to inspect cache contents, not flushed state. + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 3).unwrap(); + + let state_root_1 = H256::repeat_byte(0x11); + let block1 = make_block(1, H256::zero(), state_root_1); + + // Before any updates, no state roots are in the cache. + assert!(!store.is_state_in_layer_cache(state_root_1).unwrap()); + + store + .store_block_updates(UpdateBatch { + account_updates: vec![(Nibbles::from_raw(&[0x00], false), vec![0xab])], + storage_updates: vec![], + blocks: vec![block1], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }) + .unwrap(); + + // After the update, the layer for state_root_1 is in the cache. + // Poll briefly because the trie worker is asynchronous. + let deadline = std::time::Instant::now() + Duration::from_secs(1); + loop { + if store.is_state_in_layer_cache(state_root_1).unwrap() { + break; + } + assert!(std::time::Instant::now() < deadline, "layer never appeared"); + std::thread::sleep(Duration::from_millis(10)); + } + // A different state root remains absent. + assert!(!store.is_state_in_layer_cache(H256::repeat_byte(0xee)).unwrap()); + } + + /// `install_overlay_for_reorg` SHALL atomically swap the in-memory layer + /// cache for a fresh empty one with the constructed overlay installed. + /// On-disk state SHALL remain untouched, and reads through the new cache + /// SHALL cascade to the overlay (then disk). + #[tokio::test] + async fn install_overlay_replaces_cache_with_fresh_one() { + use crate::layering::OverlayCf; + + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // Seed two journal entries so the overlay constructor has data to load. + seed_journal_entries(&backend, &[3, 4]); + + // Pre-condition: layer cache is empty (no put_batch happened) and no + // overlay installed. + assert!(store + .trie_cache + .read() + .unwrap() + .clone() + .overlay() + .is_none()); + + store + .install_overlay_for_reorg(4, 3, |_| None) + .expect("overlay install should succeed"); + + // Post-condition: an overlay is installed and contains the seeded + // entries. Verify via the cache's own lookup_overlay path. + let cache = store.trie_cache.read().unwrap().clone(); + assert!(cache.overlay().is_some(), "overlay must be installed"); + // The seed_journal_entries helper writes one account_trie entry per + // block at path [block_number as u8] — verify both are visible. + assert_eq!( + cache.lookup_overlay(&[3u8]), + Some(None), + "block 3's seeded path should be in the overlay (with None pre-image)" + ); + assert_eq!(cache.lookup_overlay(&[4u8]), Some(None)); + // OverlayCf::AccountTrie classification is correct for these short paths. + assert_eq!( + OverlayCf::classify_by_key_length(1), + OverlayCf::AccountTrie + ); + } + + /// `install_overlay_for_reorg` SHALL leave the existing cache intact when + /// overlay construction fails (e.g. missing journal entry). + #[tokio::test] + async fn install_overlay_aborts_cleanly_on_missing_entry() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // Only seed block 5; ask for [5, 3]. Blocks 4 and 3 are missing. + seed_journal_entries(&backend, &[5]); + + let err = store + .install_overlay_for_reorg(5, 3, |_| None) + .expect_err("missing-entry must abort"); + let msg = format!("{err}"); + assert!( + msg.contains("MissingEntry") || msg.contains("missing"), + "error message should mention missing entry: {msg}" + ); + + // Cache and overlay unchanged. + let cache = store.trie_cache.read().unwrap().clone(); + assert!(cache.overlay().is_none(), "no overlay should be installed"); + } + + /// First-new-chain commit reconciliation: overlay-only entries SHALL be + /// folded into the disk write, the new T journal entry SHALL be written, + /// obsolete old-chain entries `[T, D]` SHALL be deleted, and the overlay + /// SHALL be cleared after the commit succeeds. + #[tokio::test] + async fn first_new_chain_commit_reconciles_overlay() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // Seed STATE_HISTORY with synthetic old-chain entries at blocks 5..=10. + // Each entry's account_trie_diff has one path-only entry so the + // overlay's `iter_all_entries` produces something to fold. + seed_journal_entries(&backend, &(5u64..=10).collect::>()); + for n in 5..=10 { + assert!(journal_entry_exists(&backend, n)); + } + + // We need the pivot block (number 4) header in HEADERS so apply_updates + // can resolve parent_state_root for the first side-chain block (number 5). + let pivot_state_root = H256::repeat_byte(0xee); + let pivot = make_block(4, H256::zero(), pivot_state_root); + let pivot_hash = pivot.hash(); + // Use the regular put-header path. We can do this by storing block 4 + // through store_block_updates with batch_mode=true (which skips the + // journal). This populates HEADERS / BODIES without producing a + // STATE_HISTORY entry that would conflict with our seeded ones. + // BUT — store_block_updates with batch_mode=true at threshold=1 would + // also try to commit. To keep this surgical, we just write to HEADERS + // directly via the backend. + let mut tx = backend.begin_write().unwrap(); + use ethrex_rlp::encode::RLPEncode; + tx.put( + crate::api::tables::HEADERS, + &pivot_hash.encode_to_vec(), + crate::rlp::BlockHeaderRLP::from(pivot.header).bytes(), + ) + .unwrap(); + tx.commit().unwrap(); + + // Install the overlay. from_block=10, to_block=5. + store + .install_overlay_for_reorg(10, 5, |_| None) + .expect("overlay install"); + + // The overlay was built; confirm before any side-chain commit. + { + let cache = store.trie_cache.read().unwrap().clone(); + let overlay = cache.overlay().expect("overlay installed"); + assert_eq!(overlay.from_block(), 10); + assert_eq!(overlay.to_block(), 5); + } + + // Push side-chain block T = 5. This adds a layer to the new (fresh) + // cache but does NOT commit yet (with threshold=1, get_commitable + // checks the parent's state root, which is pivot_state_root and is + // NOT in the fresh cache, so returns None). + let t_state_root = H256::repeat_byte(0x55); + let block_t = make_block(5, pivot_hash, t_state_root); + let block_t_hash = block_t.hash(); + store + .store_block_updates(UpdateBatch { + account_updates: vec![( + Nibbles::from_raw(&[0xab], false), + vec![0x77], + )], + storage_updates: vec![], + blocks: vec![block_t], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }) + .unwrap(); + + // No commit fired yet → overlay still installed, T-new entry not yet + // written, old-chain entries still present. + std::thread::sleep(Duration::from_millis(30)); + for n in 5..=10 { + assert!( + journal_entry_exists(&backend, n), + "old-chain entry at {n} should still exist before commit fires" + ); + } + + // Push side-chain block T+1 = 6. Its parent is T's state root, which + // IS in the fresh cache. get_commitable(t_state_root) walks 1 step + // (counter=1>=threshold=1) and returns t_state_root → commits T's + // layer → triggers reconciliation. + let t1_state_root = H256::repeat_byte(0x56); + let block_t1 = make_block(6, block_t_hash, t1_state_root); + store + .store_block_updates(UpdateBatch { + account_updates: vec![( + Nibbles::from_raw(&[0xab, 0xcd], false), + vec![0x88], + )], + storage_updates: vec![], + blocks: vec![block_t1], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }) + .unwrap(); + + // Wait for the reconciliation commit to land. The trie worker runs + // asynchronously after store_block_updates returns. Sync on the + // delete_range firing — when block 10's seeded entry is gone, the + // reconciliation has written the new T-new entry too (same atomic + // write batch). + let deadline = std::time::Instant::now() + Duration::from_secs(2); + loop { + if !journal_entry_exists(&backend, 10) { + break; + } + assert!( + std::time::Instant::now() < deadline, + "delete_range never fired (reconciliation didn't complete)" + ); + std::thread::sleep(Duration::from_millis(10)); + } + + let bytes = backend + .begin_read() + .unwrap() + .get(STATE_HISTORY, &5u64.to_be_bytes()) + .unwrap() + .expect("STATE_HISTORY[5] should still exist (overwritten by T-new put)"); + let entry = JournalEntry::decode(&bytes).unwrap(); + + // The new T-new entry's block_hash matches the new chain's block T. + assert_eq!(entry.block_hash, block_t_hash); + assert_eq!(entry.parent_state_root, pivot_state_root); + + // Old-chain entries for blocks 6..=10 are deleted by the + // `delete_range(STATE_HISTORY, &5.to_be_bytes(), &11.to_be_bytes())` + // staged in the same write batch. Block 5 itself was overwritten by + // the new T-new entry put. + for n in 6..=10 { + assert!( + !journal_entry_exists(&backend, n), + "old-chain entry at {n} should be deleted by reconciliation" + ); + } + + // The overlay is cleared after the successful commit. + let cache = store.trie_cache.read().unwrap().clone(); + assert!( + cache.overlay().is_none(), + "overlay must be cleared after reconciliation" + ); + } + + /// `clear_reorg_overlay` SHALL remove an installed overlay; it SHALL be + /// a no-op when no overlay is installed. + #[tokio::test] + async fn clear_reorg_overlay_removes_overlay_and_is_idempotent() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // No-op when nothing installed. + store.clear_reorg_overlay().unwrap(); + assert!(store.trie_cache.read().unwrap().overlay().is_none()); + + // Install and clear. + seed_journal_entries(&backend, &[7]); + store.install_overlay_for_reorg(7, 7, |_| None).unwrap(); + assert!(store.trie_cache.read().unwrap().overlay().is_some()); + store.clear_reorg_overlay().unwrap(); + assert!(store.trie_cache.read().unwrap().overlay().is_none()); + + // A second clear is a no-op. + store.clear_reorg_overlay().unwrap(); + assert!(store.trie_cache.read().unwrap().overlay().is_none()); + } + + /// Storage trie updates SHALL appear in `storage_trie_diff` (not + /// `account_trie_diff`), with their on-disk keys as written. The keys + /// carry the nibble-encoded account-hash prefix; we only verify that two + /// distinct accounts produce two distinct entries with first-time `None` + /// pre-images. + #[test] + fn journal_storage_updates_appear_in_storage_diff() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + let account_hash_a = H256::repeat_byte(0xa0); + let account_hash_b = H256::repeat_byte(0xb0); + + // Block 1 carries the storage updates we want to verify on disk. + let state_root_1 = H256::repeat_byte(0x33); + let block1 = make_block(1, H256::zero(), state_root_1); + let block1_hash = block1.hash(); + let update_1 = UpdateBatch { + account_updates: vec![], + storage_updates: vec![ + ( + account_hash_a, + vec![(Nibbles::from_raw(&[0x05], false), vec![0x01])], + ), + ( + account_hash_b, + vec![(Nibbles::from_raw(&[0x06], false), vec![0x02])], + ), + ], + blocks: vec![block1], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }; + store.store_block_updates(update_1).unwrap(); + + // Block 2 triggers commit of block 1's layer. + let state_root_2 = H256::repeat_byte(0x44); + let block2 = make_block(2, block1_hash, state_root_2); + let update_2 = UpdateBatch { + account_updates: vec![(Nibbles::from_raw(&[0xee], false), vec![0xff])], + storage_updates: vec![], + blocks: vec![block2], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }; + store.store_block_updates(update_2).unwrap(); + + let bytes = await_journal_entry(&backend, 1, Duration::from_secs(2)) + .expect("STATE_HISTORY entry for block 1"); + let entry = JournalEntry::decode(&bytes).unwrap(); + assert_eq!(entry.block_hash, block1_hash); + assert_eq!( + entry.storage_trie_diff.len(), + 2, + "two distinct account hashes must produce two storage_trie entries" + ); + for (_key, prev) in &entry.storage_trie_diff { + assert_eq!(prev, &None, "first-time storage write has None pre-image"); + } + assert!(entry.account_trie_diff.is_empty()); + } +} From 11a6e1640fc2a69ff63befc26e19948ec98e1354 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 1 May 2026 15:20:10 +0200 Subject: [PATCH 2/3] format --- crates/blockchain/blockchain.rs | 1 - crates/blockchain/fork_choice.rs | 4 +- crates/storage/backend/in_memory.rs | 4 +- crates/storage/backend/rocksdb.rs | 41 +++++++++++++--- crates/storage/journal.rs | 24 ++++----- crates/storage/layering.rs | 75 ++++++++++++++++++++--------- crates/storage/store.rs | 38 ++++++--------- 7 files changed, 115 insertions(+), 72 deletions(-) diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 37872103255..ab7459dc59d 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -398,7 +398,6 @@ impl<'a> Drop for ReorgGuard<'a> { } impl Blockchain { - /// Executes a block withing a new vm instance and state fn execute_block( &self, diff --git a/crates/blockchain/fork_choice.rs b/crates/blockchain/fork_choice.rs index f3c7087bc24..f3cb3b11137 100644 --- a/crates/blockchain/fork_choice.rs +++ b/crates/blockchain/fork_choice.rs @@ -305,8 +305,8 @@ async fn reorg_apply_deep( // handled this as a shallow reorg. If we reach here, something is // off; punt. warn!( - edge, to_block, - "deep-reorg path entered but pivot is above cache edge" + edge, + to_block, "deep-reorg path entered but pivot is above cache edge" ); return Err(InvalidForkChoice::StateNotReachable); } diff --git a/crates/storage/backend/in_memory.rs b/crates/storage/backend/in_memory.rs index b104f901d13..355db6b0d75 100644 --- a/crates/storage/backend/in_memory.rs +++ b/crates/storage/backend/in_memory.rs @@ -194,9 +194,7 @@ impl StorageWriteBatch for InMemoryWriteTx { let db_mut = Arc::make_mut(&mut *db); if let Some(table_ref) = db_mut.get_mut(table) { - table_ref.retain(|key, _| { - key.as_slice() < start_key || key.as_slice() >= end_key - }); + table_ref.retain(|key, _| key.as_slice() < start_key || key.as_slice() >= end_key); } Ok(()) } diff --git a/crates/storage/backend/rocksdb.rs b/crates/storage/backend/rocksdb.rs index 14fb452416d..d4cb346afa0 100644 --- a/crates/storage/backend/rocksdb.rs +++ b/crates/storage/backend/rocksdb.rs @@ -469,11 +469,31 @@ mod tests { // After commit: keys 1,2,3 are gone; 0 and 4 remain. let read = backend.begin_read().unwrap(); - assert!(read.get(STATE_HISTORY, &0u64.to_be_bytes()).unwrap().is_some()); - assert!(read.get(STATE_HISTORY, &1u64.to_be_bytes()).unwrap().is_none()); - assert!(read.get(STATE_HISTORY, &2u64.to_be_bytes()).unwrap().is_none()); - assert!(read.get(STATE_HISTORY, &3u64.to_be_bytes()).unwrap().is_none()); - assert!(read.get(STATE_HISTORY, &4u64.to_be_bytes()).unwrap().is_some()); + assert!( + read.get(STATE_HISTORY, &0u64.to_be_bytes()) + .unwrap() + .is_some() + ); + assert!( + read.get(STATE_HISTORY, &1u64.to_be_bytes()) + .unwrap() + .is_none() + ); + assert!( + read.get(STATE_HISTORY, &2u64.to_be_bytes()) + .unwrap() + .is_none() + ); + assert!( + read.get(STATE_HISTORY, &3u64.to_be_bytes()) + .unwrap() + .is_none() + ); + assert!( + read.get(STATE_HISTORY, &4u64.to_be_bytes()) + .unwrap() + .is_some() + ); } #[test] @@ -492,17 +512,22 @@ mod tests { tx.put(STATE_HISTORY, &30u64.to_be_bytes(), b"v30").unwrap(); tx.delete_range(STATE_HISTORY, &10u64.to_be_bytes(), &20u64.to_be_bytes()) .unwrap(); - tx.put(STATE_HISTORY, &20u64.to_be_bytes(), b"v20-new").unwrap(); + tx.put(STATE_HISTORY, &20u64.to_be_bytes(), b"v20-new") + .unwrap(); tx.commit().unwrap(); let read = backend.begin_read().unwrap(); assert_eq!(read.get(STATE_HISTORY, &10u64.to_be_bytes()).unwrap(), None); assert_eq!( - read.get(STATE_HISTORY, &20u64.to_be_bytes()).unwrap().as_deref(), + read.get(STATE_HISTORY, &20u64.to_be_bytes()) + .unwrap() + .as_deref(), Some(&b"v20-new"[..]) ); assert_eq!( - read.get(STATE_HISTORY, &30u64.to_be_bytes()).unwrap().as_deref(), + read.get(STATE_HISTORY, &30u64.to_be_bytes()) + .unwrap() + .as_deref(), Some(&b"v30"[..]) ); } diff --git a/crates/storage/journal.rs b/crates/storage/journal.rs index 44679128ae4..6538e586f30 100644 --- a/crates/storage/journal.rs +++ b/crates/storage/journal.rs @@ -72,7 +72,9 @@ pub enum JournalDecodeError { VersionMismatch { expected: u8, found: u8 }, #[error("journal entry varint overflow at offset {offset}")] VarintOverflow { offset: usize }, - #[error("journal entry presence byte invalid: expected 0 or 1, found {found} at offset {offset}")] + #[error( + "journal entry presence byte invalid: expected 0 or 1, found {found} at offset {offset}" + )] InvalidPresenceByte { offset: usize, found: u8 }, } @@ -80,7 +82,9 @@ impl JournalEntry { /// Encode this entry into its on-disk byte representation. pub fn encode(&self) -> Vec { // Heuristic: ~70 bytes overhead + ~50 bytes per typical small entry. - let approx = 1 + 32 + 32 + let approx = 1 + + 32 + + 32 + diff_byte_estimate(&self.account_trie_diff) + diff_byte_estimate(&self.storage_trie_diff) + diff_byte_estimate(&self.account_flat_diff) @@ -297,10 +301,7 @@ mod tests { (vec![0x00, 0x01], Some(vec![0xde, 0xad, 0xbe, 0xef])), (vec![0x02], None), ], - storage_trie_diff: vec![ - (vec![0x0a; 67], Some(vec![0xff])), - (vec![0x0b; 68], None), - ], + storage_trie_diff: vec![(vec![0x0a; 67], Some(vec![0xff])), (vec![0x0b; 68], None)], account_flat_diff: vec![(vec![0xaa; 65], Some(vec![0x01, 0x02, 0x03]))], storage_flat_diff: vec![(vec![0xbb; 131], None)], }; @@ -326,11 +327,7 @@ mod tests { let entry = JournalEntry { block_hash: h(0x55), parent_state_root: h(0x66), - account_trie_diff: vec![ - (vec![0x00], None), - (vec![0x01], None), - (vec![0x02], None), - ], + account_trie_diff: vec![(vec![0x00], None), (vec![0x01], None), (vec![0x02], None)], storage_trie_diff: vec![], account_flat_diff: vec![(vec![0xaa; 32], None)], storage_flat_diff: vec![], @@ -406,7 +403,10 @@ mod tests { bytes.push(0xab); // path bytes.push(2); // presence = 2 (invalid) let err = JournalEntry::decode(&bytes).unwrap_err(); - assert!(matches!(err, JournalDecodeError::InvalidPresenceByte { found: 2, .. })); + assert!(matches!( + err, + JournalDecodeError::InvalidPresenceByte { found: 2, .. } + )); } #[test] diff --git a/crates/storage/layering.rs b/crates/storage/layering.rs index 4da33240b92..cc0c2c77da4 100644 --- a/crates/storage/layering.rs +++ b/crates/storage/layering.rs @@ -9,7 +9,10 @@ use ethrex_trie::{Nibbles, TrieDB, TrieError}; use crate::{ api::{ StorageBackend, - tables::{ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, STATE_HISTORY, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES}, + tables::{ + ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, STATE_HISTORY, STORAGE_FLATKEYVALUE, + STORAGE_TRIE_NODES, + }, }, error::StoreError, journal::{JournalDecodeError, JournalEntry}, @@ -128,7 +131,10 @@ impl TrieLayerCache { /// Returns a reference to the installed overlay, if any. Used by tests /// and by the reconciliation path to fold the overlay into the first /// new-chain commit. - #[allow(dead_code, reason = "consumed by Section 9 (overlay reconciliation) and tests")] + #[allow( + dead_code, + reason = "consumed by Section 9 (overlay reconciliation) and tests" + )] pub fn overlay(&self) -> Option<&Arc> { self.overlay.as_ref() } @@ -329,10 +335,7 @@ impl TrieLayerCache { /// committed block's pre-state). In normal operation only one layer is removed; ancestors /// are evicted as orphans without contributing to the merged nodes (caught by the `id` /// retain below). - pub fn commit( - &mut self, - state_root: H256, - ) -> Option { + pub fn commit(&mut self, state_root: H256) -> Option { let mut layers_to_commit = vec![]; let mut current_state_root = state_root; while let Some(layer) = self.layers.remove(¤t_state_root) { @@ -461,7 +464,10 @@ impl TrieDB for TrieWrapper { /// Identifier of which on-disk column family an [`Overlay`] entry targets. /// Returned by classifier helpers; used by callers to route a key to the right /// internal map without re-doing the length classification. -#[allow(dead_code, reason = "consumed by the read cascade in Section 7 / reorg apply in Section 8")] +#[allow( + dead_code, + reason = "consumed by the read cascade in Section 7 / reorg apply in Section 8" +)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum OverlayCf { AccountTrie, @@ -506,7 +512,9 @@ impl OverlayCf { pub enum OverlayError { #[error("missing journal entry for block {0}")] MissingEntry(BlockNumber), - #[error("journal block_hash mismatch at block {block_number}: expected {expected:?}, found {found:?}")] + #[error( + "journal block_hash mismatch at block {block_number}: expected {expected:?}, found {found:?}" + )] HashMismatch { block_number: BlockNumber, expected: H256, @@ -693,7 +701,10 @@ impl Overlay { } /// Highest block number covered by the overlay (= the cache edge `D` at install time). - #[allow(clippy::wrong_self_convention, reason = "field accessor: name matches struct field")] + #[allow( + clippy::wrong_self_convention, + reason = "field accessor: name matches struct field" + )] pub fn from_block(&self) -> BlockNumber { self.from_block } @@ -706,7 +717,9 @@ impl Overlay { /// Iterates every overlay entry across the four CFs as /// `(cf, key, value)` triples. Used by Section 9's reconciliation to fold /// overlay-only entries into the first new-chain commit. - pub fn iter_all_entries(&self) -> impl Iterator, &Option>)> { + pub fn iter_all_entries( + &self, + ) -> impl Iterator, &Option>)> { self.account_trie .iter() .map(|(k, v)| (OverlayCf::AccountTrie, k, v)) @@ -772,10 +785,9 @@ mod overlay_tests { (5, h(0x05), vec![(vec![0xc], Some(vec![0x55]))]), ], ); - let overlay = Overlay::from_journal(backend.as_ref(), 5, 3, |n| { - Some(H256::repeat_byte(n as u8)) - }) - .unwrap(); + let overlay = + Overlay::from_journal(backend.as_ref(), 5, 3, |n| Some(H256::repeat_byte(n as u8))) + .unwrap(); assert_eq!(overlay.len(), 3); assert_eq!( overlay.lookup(OverlayCf::AccountTrie, &[0xa]), @@ -926,12 +938,30 @@ mod overlay_tests { // BackendTrieDB::table_for_key (account leaf at 65, storage leaf at 131, // anything else routed by length comparison to 65). assert_eq!(OverlayCf::classify_by_key_length(0), OverlayCf::AccountTrie); - assert_eq!(OverlayCf::classify_by_key_length(64), OverlayCf::AccountTrie); - assert_eq!(OverlayCf::classify_by_key_length(65), OverlayCf::AccountFlat); - assert_eq!(OverlayCf::classify_by_key_length(66), OverlayCf::StorageTrie); - assert_eq!(OverlayCf::classify_by_key_length(130), OverlayCf::StorageTrie); - assert_eq!(OverlayCf::classify_by_key_length(131), OverlayCf::StorageFlat); - assert_eq!(OverlayCf::classify_by_key_length(132), OverlayCf::StorageTrie); + assert_eq!( + OverlayCf::classify_by_key_length(64), + OverlayCf::AccountTrie + ); + assert_eq!( + OverlayCf::classify_by_key_length(65), + OverlayCf::AccountFlat + ); + assert_eq!( + OverlayCf::classify_by_key_length(66), + OverlayCf::StorageTrie + ); + assert_eq!( + OverlayCf::classify_by_key_length(130), + OverlayCf::StorageTrie + ); + assert_eq!( + OverlayCf::classify_by_key_length(131), + OverlayCf::StorageFlat + ); + assert_eq!( + OverlayCf::classify_by_key_length(132), + OverlayCf::StorageTrie + ); } #[test] @@ -941,9 +971,6 @@ mod overlay_tests { let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); seed(&backend, &[(7, h(0xab), vec![(vec![0x01], None)])]); let overlay = Overlay::from_journal(backend.as_ref(), 7, 7, |_| None).unwrap(); - assert_eq!( - overlay.lookup(OverlayCf::AccountTrie, &[0x01]), - Some(None) - ); + assert_eq!(overlay.lookup(OverlayCf::AccountTrie, &[0x01]), Some(None)); } } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index d6fbd4c3116..d46aaff1497 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -3117,7 +3117,11 @@ fn apply_trie_updates( // atomic write (no intermediate "rolled back" disk state). After the // commit succeeds, the overlay is cleared and obsolete old-chain journal // entries in `[T, D]` are deleted via `delete_range`. - let overlay_for_reconciliation = if !is_batch { trie.overlay().cloned() } else { None }; + let overlay_for_reconciliation = if !is_batch { + trie.overlay().cloned() + } else { + None + }; let mut result = Ok(()); @@ -3748,7 +3752,8 @@ mod tests { "journal block_hash must match the committed block" ); assert_eq!( - entry.parent_state_root, H256::zero(), + entry.parent_state_root, + H256::zero(), "first block's parent state root is zero (no header for parent_hash=0)" ); assert!( @@ -3968,7 +3973,11 @@ mod tests { std::thread::sleep(Duration::from_millis(10)); } // A different state root remains absent. - assert!(!store.is_state_in_layer_cache(H256::repeat_byte(0xee)).unwrap()); + assert!( + !store + .is_state_in_layer_cache(H256::repeat_byte(0xee)) + .unwrap() + ); } /// `install_overlay_for_reorg` SHALL atomically swap the in-memory layer @@ -3988,13 +3997,7 @@ mod tests { // Pre-condition: layer cache is empty (no put_batch happened) and no // overlay installed. - assert!(store - .trie_cache - .read() - .unwrap() - .clone() - .overlay() - .is_none()); + assert!(store.trie_cache.read().unwrap().clone().overlay().is_none()); store .install_overlay_for_reorg(4, 3, |_| None) @@ -4013,10 +4016,7 @@ mod tests { ); assert_eq!(cache.lookup_overlay(&[4u8]), Some(None)); // OverlayCf::AccountTrie classification is correct for these short paths. - assert_eq!( - OverlayCf::classify_by_key_length(1), - OverlayCf::AccountTrie - ); + assert_eq!(OverlayCf::classify_by_key_length(1), OverlayCf::AccountTrie); } /// `install_overlay_for_reorg` SHALL leave the existing cache intact when @@ -4106,10 +4106,7 @@ mod tests { let block_t_hash = block_t.hash(); store .store_block_updates(UpdateBatch { - account_updates: vec![( - Nibbles::from_raw(&[0xab], false), - vec![0x77], - )], + account_updates: vec![(Nibbles::from_raw(&[0xab], false), vec![0x77])], storage_updates: vec![], blocks: vec![block_t], receipts: vec![], @@ -4136,10 +4133,7 @@ mod tests { let block_t1 = make_block(6, block_t_hash, t1_state_root); store .store_block_updates(UpdateBatch { - account_updates: vec![( - Nibbles::from_raw(&[0xab, 0xcd], false), - vec![0x88], - )], + account_updates: vec![(Nibbles::from_raw(&[0xab, 0xcd], false), vec![0x88])], storage_updates: vec![], blocks: vec![block_t1], receipts: vec![], From 52b52a25f7a4aa700695b043c4e3d676b867d45d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 1 May 2026 16:05:01 +0200 Subject: [PATCH 3/3] abort guard + tests --- crates/blockchain/fork_choice.rs | 44 +++++++++++++ crates/storage/store.rs | 108 +++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) diff --git a/crates/blockchain/fork_choice.rs b/crates/blockchain/fork_choice.rs index f3cb3b11137..8070f0af51f 100644 --- a/crates/blockchain/fork_choice.rs +++ b/crates/blockchain/fork_choice.rs @@ -328,6 +328,13 @@ async fn reorg_apply_deep( InvalidForkChoice::StateNotReachable })?; + // From this point on, ANY error must reset the layer cache to a fresh + // empty state — otherwise the half-installed overlay + partial new-chain + // layers would leak into subsequent FCU evaluations. The guard fires + // `store.abort_reorg()` on drop unless `disarm()` is called below after + // a successful canonical-hash update. (Section 13.) + let mut abort_guard = AbortReorgGuard::new(store); + // Execute the side-chain blocks in CHAIN order (oldest first). The // existing `add_block` path handles execution + storage; layer cache // reads cascade through the freshly-installed overlay. @@ -367,6 +374,11 @@ async fn reorg_apply_deep( ) .await?; + // forkchoice_update succeeded — the new chain is canonical. Disarm the + // abort guard so the now-correct cache is preserved on this function's + // return. + abort_guard.disarm(); + metrics!( use ethrex_metrics::blocks::METRICS_BLOCKS; METRICS_BLOCKS.set_head_height(head.number); @@ -382,6 +394,38 @@ async fn reorg_apply_deep( Ok(head) } +/// RAII guard that calls `Store::abort_reorg()` on drop — resetting the layer +/// cache to a fresh empty state — UNLESS `disarm()` is called first. +/// +/// The deep-reorg apply path arms this guard immediately after +/// `install_overlay_for_reorg` succeeds, so any subsequent failure (side-chain +/// execution error, missing block body, fork-choice update error, panic via +/// unwinding) leaves the store in a recoverable state for the next FCU. +struct AbortReorgGuard<'a> { + store: &'a Store, + armed: bool, +} + +impl<'a> AbortReorgGuard<'a> { + fn new(store: &'a Store) -> Self { + Self { store, armed: true } + } + + fn disarm(&mut self) { + self.armed = false; + } +} + +impl<'a> Drop for AbortReorgGuard<'a> { + fn drop(&mut self) { + if self.armed + && let Err(e) = self.store.abort_reorg() + { + error!(error = %e, "AbortReorgGuard: abort_reorg failed during cleanup"); + } + } +} + /// Maps a `ChainError` from a side-chain block execution into the /// closest-fitting [`InvalidForkChoice`] variant. Most chain errors during /// side-chain replay indicate the new chain is invalid, so we collapse them diff --git a/crates/storage/store.rs b/crates/storage/store.rs index d46aaff1497..b38e4eb1b2a 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -2886,6 +2886,19 @@ impl Store { Ok(()) } + /// Aborts an in-progress deep reorg and resets the layer cache to a fresh + /// empty state with the same commit threshold. Both the overlay AND any + /// partially-built new-chain layers are discarded. + /// + /// On-disk state is untouched (still at the OLD chain's `D`), so subsequent + /// FCU evaluations start from a clean foundation. Section 13. + pub fn abort_reorg(&self) -> Result<(), StoreError> { + let mut guard = self.trie_cache.write().map_err(|_| StoreError::LockError)?; + let threshold = guard.commit_threshold(); + *guard = Arc::new(TrieLayerCache::new(threshold)); + Ok(()) + } + /// Takes a block hash and returns an iterator to its ancestors. Block headers are returned /// in reverse order, starting from the given block and going up to the genesis block. pub fn ancestors(&self, block_hash: BlockHash) -> AncestorIterator { @@ -4190,6 +4203,101 @@ mod tests { ); } + /// `abort_reorg` SHALL discard both an installed overlay and any + /// partially-built layers, leaving the cache fresh and empty so a + /// subsequent FCU can start from disk state. + #[tokio::test] + async fn abort_reorg_resets_cache_to_fresh() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend.clone(), dir.path().to_path_buf(), 1).unwrap(); + + // Seed journal entries and install an overlay. + seed_journal_entries(&backend, &[3, 4]); + store.install_overlay_for_reorg(4, 3, |_| None).unwrap(); + assert!( + store.trie_cache.read().unwrap().overlay().is_some(), + "overlay should be installed before abort" + ); + + // Push a synthetic side-chain layer to simulate partial new-chain progress. + let block = make_block(3, H256::zero(), H256::repeat_byte(0x33)); + store + .store_block_updates(UpdateBatch { + account_updates: vec![(Nibbles::from_raw(&[0x01], false), vec![0x99])], + storage_updates: vec![], + blocks: vec![block], + receipts: vec![], + code_updates: vec![], + batch_mode: false, + }) + .unwrap(); + // Wait briefly for the trie worker. + std::thread::sleep(Duration::from_millis(20)); + + // Abort. Both the overlay AND the partial layer must be gone. + store.abort_reorg().unwrap(); + let cache = store.trie_cache.read().unwrap().clone(); + assert!(cache.overlay().is_none(), "overlay should be cleared"); + // No layer should be reachable; lookup of any state_root returns false. + assert!(!cache.contains(H256::repeat_byte(0x33))); + + // Subsequent overlay install on the now-fresh cache works. + store.install_overlay_for_reorg(4, 3, |_| None).unwrap(); + assert!(store.trie_cache.read().unwrap().overlay().is_some()); + } + + /// LVH propagation: when a block is rejected with `parent_hash` as its + /// LVH, and a descendant is recorded with the SAME LVH (because its + /// parent was the bad block, whose recorded LVH was `parent_hash`), the + /// descendant correctly inherits the deeper valid ancestor. Verifies the + /// existing `set/get_latest_valid_ancestor` storage primitive is + /// consistent with the engine-API-level propagation logic at + /// `crates/networking/rpc/engine/payload.rs:865-895` and + /// `crates/networking/rpc/engine/fork_choice.rs:241-260`. + #[tokio::test] + async fn invalid_ancestor_lvh_propagates_through_descendant_chain() { + let backend: Arc = Arc::new(InMemoryBackend::open().unwrap()); + let dir = tempfile::tempdir().unwrap(); + let store = Store::from_backend(backend, dir.path().to_path_buf(), 1).unwrap(); + + // Simulate the engine-API flow: + // 1. Block A is rejected with LVH = parent_of_A (= the deepest valid). + let bad_a = H256::repeat_byte(0xa0); + let valid_root = H256::repeat_byte(0x01); + store + .set_latest_valid_ancestor(bad_a, valid_root) + .await + .unwrap(); + + // 2. A descendant B (parent = bad_a) arrives. The engine API checks + // parent's recorded LVH (= valid_root) and propagates it. + let bad_b = H256::repeat_byte(0xb0); + let parent_lvh = store.get_latest_valid_ancestor(bad_a).await.unwrap(); + assert_eq!(parent_lvh, Some(valid_root)); + store + .set_latest_valid_ancestor(bad_b, valid_root) + .await + .unwrap(); + + // 3. Grandchild C (parent = bad_b) arrives. Same propagation step. + let bad_c = H256::repeat_byte(0xc0); + let parent_lvh = store.get_latest_valid_ancestor(bad_b).await.unwrap(); + assert_eq!(parent_lvh, Some(valid_root)); + store + .set_latest_valid_ancestor(bad_c, valid_root) + .await + .unwrap(); + + // 4. All three descendants now resolve to the same deepest valid + // ancestor; LVH does NOT point at any intermediate bad block. + for bad in [bad_a, bad_b, bad_c] { + let lvh = store.get_latest_valid_ancestor(bad).await.unwrap(); + assert_eq!(lvh, Some(valid_root)); + assert_ne!(lvh, Some(bad), "LVH must never be the bad block itself"); + } + } + /// `clear_reorg_overlay` SHALL remove an installed overlay; it SHALL be /// a no-op when no overlay is installed. #[tokio::test]