diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 7aca692ef9b..04a519af020 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -1,7 +1,8 @@ //! Implementation of historic state reconstruction (given complete block history). +use crate::forwards_iter::FrozenForwardsIterator; use crate::hot_cold_store::{HotColdDB, HotColdDBError}; use crate::metrics; -use crate::{Error, ItemStore}; +use crate::{DBColumn, Error, ItemStore}; use itertools::{Itertools, process_results}; use state_processing::{ BlockSignatureStrategy, ConsensusContext, VerifyBlockRoot, per_block_processing, @@ -9,7 +10,7 @@ use state_processing::{ }; use std::sync::Arc; use tracing::{debug, info}; -use types::EthSpec; +use types::{EthSpec, Slot}; impl HotColdDB where @@ -35,13 +36,6 @@ where }); } - debug!( - start_slot = %anchor.state_lower_limit, - "Starting state reconstruction batch" - ); - - let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME); - // Iterate blocks from the state lower limit to the upper limit. let split = self.get_split_info(); let lower_limit_slot = anchor.state_lower_limit; @@ -56,20 +50,86 @@ where // If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch // boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive* // of the state at slot `lower_limit_slot + num_blocks`. - let block_root_iter = self - .forwards_block_roots_iterator_until(lower_limit_slot, upper_limit_slot - 1, || { - Err(Error::StateShouldNotBeRequired(upper_limit_slot - 1)) - })? - .take(num_blocks.map_or(usize::MAX, |n| n + 1)); + let to_slot = num_blocks + .map(|n| std::cmp::min(lower_limit_slot + n as u64 + 1, upper_limit_slot)) + .unwrap_or(upper_limit_slot); + + let on_commit = |slot: Slot| -> Result<(), Error> { + info!( + %slot, + remaining = %(upper_limit_slot - 1 - slot), + "State reconstruction in progress" + ); + + // Update anchor. + let old_anchor = anchor.clone(); + let reconstruction_complete = slot + 1 == upper_limit_slot; + + if reconstruction_complete { + // The two limits have met in the middle! We're done! + let new_anchor = old_anchor.as_archive_anchor(); + self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?; + } else { + // The lower limit has been raised, store it. + anchor.state_lower_limit = slot; + self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?; + } - // The state to be advanced. - let mut state = self.load_cold_state_by_slot(lower_limit_slot)?; + Ok(()) + }; + + self.reconstruct_historic_states_on_range(lower_limit_slot, to_slot, on_commit)?; + + // Check that the split point wasn't mutated during the state reconstruction process. + // It shouldn't have been, due to the serialization of requests through the store migrator, + // so this is just a paranoid check. + let latest_split = self.get_split_info(); + if split != latest_split { + return Err(Error::SplitPointModified(latest_split.slot, split.slot)); + } + + Ok(()) + } + /// Reconstruct historic states for the slot range `(with_state_at_slot, to_slot)`. + /// + /// Loads the state at `with_state_at_slot` and replays blocks up to and including slot + /// `to_slot - 1`, writing all intermediate states to the freezer DB. + /// + /// The `BeaconBlockRoots` column must be populated for the range before this is called. + /// + /// `on_commit(slot)` is invoked after each atomic commit (whenever the hierarchy says to + /// commit, plus once at the final slot) so callers can update anchor metadata or log + /// progress. + pub fn reconstruct_historic_states_on_range( + self: &Arc, + with_state_at_slot: Slot, + to_slot: Slot, + mut on_commit: impl FnMut(Slot) -> Result<(), Error>, + ) -> Result<(), Error> { + debug!( + from_slot = %(with_state_at_slot + 1), + %to_slot, + "Starting state reconstruction batch" + ); + + let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME); + + // Iterate from `with_state_at_slot` so `tuple_windows` gives us the predecessor block + // root at each step for skip detection. + let block_root_iter = FrozenForwardsIterator::new( + self, + DBColumn::BeaconBlockRoots, + with_state_at_slot, + to_slot, + )?; + + // The state to be advanced. + let mut state = self.load_cold_state_by_slot(with_state_at_slot)?; state.build_caches(&self.spec)?; process_results(block_root_iter, |iter| -> Result<(), Error> { let mut io_batch = vec![]; - let mut prev_state_root = None; for ((prev_block_root, _), (block_root, slot)) in iter.tuple_windows() { @@ -114,32 +174,16 @@ where // Stage state for storage in freezer DB. self.store_cold_state(&state_root, &state, &mut io_batch)?; - let batch_complete = - num_blocks.is_some_and(|n_blocks| slot == lower_limit_slot + n_blocks as u64); - let reconstruction_complete = slot + 1 == upper_limit_slot; + let batch_complete = slot + 1 == to_slot; // Commit the I/O batch if: // // - The diff/snapshot for this slot is required for future slots, or - // - The reconstruction batch is complete (we are about to return), or - // - Reconstruction is complete. - if self.hierarchy.should_commit_immediately(slot)? - || batch_complete - || reconstruction_complete - { - info!( - %slot, - remaining = %(upper_limit_slot - 1 - slot), - "State reconstruction in progress" - ); - + // - The reconstruction batch is complete (we are about to return). + if self.hierarchy.should_commit_immediately(slot)? || batch_complete { self.cold_db.do_atomically(std::mem::take(&mut io_batch))?; - // Update anchor. - let old_anchor = anchor.clone(); - - if reconstruction_complete { - // The two limits have met in the middle! We're done! + if batch_complete { // Perform one last integrity check on the state reached. let computed_state_root = state.update_tree_hash_cache()?; if computed_state_root != state_root { @@ -149,23 +193,15 @@ where computed: computed_state_root, }); } - - let new_anchor = old_anchor.as_archive_anchor(); - self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?; - - return Ok(()); - } else { - // The lower limit has been raised, store it. - anchor.state_lower_limit = slot; - - self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?; } + on_commit(slot)?; + // If this is the end of the batch, return Ok. The caller will run another // batch when there is idle capacity. if batch_complete { debug!( - start_slot = %lower_limit_slot, + start_slot = %(with_state_at_slot + 1), end_slot = %slot, "Finished state reconstruction batch" ); @@ -174,19 +210,10 @@ where } } - // Should always reach the `upper_limit_slot` or the end of the batch and return early - // above. + // Should always reach `to_slot` or the end of the batch and return early above. Err(Error::StateReconstructionLogicError) })??; - // Check that the split point wasn't mutated during the state reconstruction process. - // It shouldn't have been, due to the serialization of requests through the store migrator, - // so this is just a paranoid check. - let latest_split = self.get_split_info(); - if split != latest_split { - return Err(Error::SplitPointModified(latest_split.slot, split.slot)); - } - Ok(()) } }