Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 85 additions & 58 deletions beacon_node/store/src/reconstruct.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
//! Implementation of historic state reconstruction (given complete block history).
use crate::forwards_iter::FrozenForwardsIterator;
use crate::hot_cold_store::{HotColdDB, HotColdDBError};
use crate::metrics;
use crate::{Error, ItemStore};
use crate::{DBColumn, Error, ItemStore};
use itertools::{Itertools, process_results};
use state_processing::{
BlockSignatureStrategy, ConsensusContext, VerifyBlockRoot, per_block_processing,
per_slot_processing,
};
use std::sync::Arc;
use tracing::{debug, info};
use types::EthSpec;
use types::{EthSpec, Slot};

impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
where
Expand All @@ -35,13 +36,6 @@ where
});
}

debug!(
start_slot = %anchor.state_lower_limit,
"Starting state reconstruction batch"
);

let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME);

// Iterate blocks from the state lower limit to the upper limit.
let split = self.get_split_info();
let lower_limit_slot = anchor.state_lower_limit;
Expand All @@ -56,20 +50,86 @@ where
// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
// of the state at slot `lower_limit_slot + num_blocks`.
let block_root_iter = self
.forwards_block_roots_iterator_until(lower_limit_slot, upper_limit_slot - 1, || {
Err(Error::StateShouldNotBeRequired(upper_limit_slot - 1))
})?
.take(num_blocks.map_or(usize::MAX, |n| n + 1));
let to_slot = num_blocks
.map(|n| std::cmp::min(lower_limit_slot + n as u64 + 1, upper_limit_slot))
.unwrap_or(upper_limit_slot);

let on_commit = |slot: Slot| -> Result<(), Error> {
info!(
%slot,
remaining = %(upper_limit_slot - 1 - slot),
"State reconstruction in progress"
);

// Update anchor.
let old_anchor = anchor.clone();
let reconstruction_complete = slot + 1 == upper_limit_slot;

if reconstruction_complete {
// The two limits have met in the middle! We're done!
let new_anchor = old_anchor.as_archive_anchor();
self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?;
} else {
// The lower limit has been raised, store it.
anchor.state_lower_limit = slot;
self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?;
}

// The state to be advanced.
let mut state = self.load_cold_state_by_slot(lower_limit_slot)?;
Ok(())
};

self.reconstruct_historic_states_on_range(lower_limit_slot, to_slot, on_commit)?;

// Check that the split point wasn't mutated during the state reconstruction process.
// It shouldn't have been, due to the serialization of requests through the store migrator,
// so this is just a paranoid check.
let latest_split = self.get_split_info();
if split != latest_split {
return Err(Error::SplitPointModified(latest_split.slot, split.slot));
}

Ok(())
}

/// Reconstruct historic states for the slot range `(with_state_at_slot, to_slot)`.
///
/// Loads the state at `with_state_at_slot` and replays blocks up to and including slot
/// `to_slot - 1`, writing all intermediate states to the freezer DB.
///
/// The `BeaconBlockRoots` column must be populated for the range before this is called.
///
/// `on_commit(slot)` is invoked after each atomic commit (whenever the hierarchy says to
/// commit, plus once at the final slot) so callers can update anchor metadata or log
/// progress.
pub fn reconstruct_historic_states_on_range(
self: &Arc<Self>,
with_state_at_slot: Slot,
to_slot: Slot,
mut on_commit: impl FnMut(Slot) -> Result<(), Error>,
) -> Result<(), Error> {
debug!(
from_slot = %(with_state_at_slot + 1),
%to_slot,
"Starting state reconstruction batch"
);

let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME);

// Iterate from `with_state_at_slot` so `tuple_windows` gives us the predecessor block
// root at each step for skip detection.
let block_root_iter = FrozenForwardsIterator::new(
self,
DBColumn::BeaconBlockRoots,
with_state_at_slot,
to_slot,
)?;

// The state to be advanced.
let mut state = self.load_cold_state_by_slot(with_state_at_slot)?;
state.build_caches(&self.spec)?;

process_results(block_root_iter, |iter| -> Result<(), Error> {
let mut io_batch = vec![];

let mut prev_state_root = None;

for ((prev_block_root, _), (block_root, slot)) in iter.tuple_windows() {
Expand Down Expand Up @@ -114,32 +174,16 @@ where
// Stage state for storage in freezer DB.
self.store_cold_state(&state_root, &state, &mut io_batch)?;

let batch_complete =
num_blocks.is_some_and(|n_blocks| slot == lower_limit_slot + n_blocks as u64);
let reconstruction_complete = slot + 1 == upper_limit_slot;
let batch_complete = slot + 1 == to_slot;

// Commit the I/O batch if:
//
// - The diff/snapshot for this slot is required for future slots, or
// - The reconstruction batch is complete (we are about to return), or
// - Reconstruction is complete.
if self.hierarchy.should_commit_immediately(slot)?
|| batch_complete
|| reconstruction_complete
{
info!(
%slot,
remaining = %(upper_limit_slot - 1 - slot),
"State reconstruction in progress"
);

// - The reconstruction batch is complete (we are about to return).
if self.hierarchy.should_commit_immediately(slot)? || batch_complete {
self.cold_db.do_atomically(std::mem::take(&mut io_batch))?;

// Update anchor.
let old_anchor = anchor.clone();

if reconstruction_complete {
// The two limits have met in the middle! We're done!
if batch_complete {
// Perform one last integrity check on the state reached.
let computed_state_root = state.update_tree_hash_cache()?;
if computed_state_root != state_root {
Expand All @@ -149,23 +193,15 @@ where
computed: computed_state_root,
});
}

let new_anchor = old_anchor.as_archive_anchor();
self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?;

return Ok(());
} else {
// The lower limit has been raised, store it.
anchor.state_lower_limit = slot;

self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?;
}

on_commit(slot)?;

// If this is the end of the batch, return Ok. The caller will run another
// batch when there is idle capacity.
if batch_complete {
debug!(
start_slot = %lower_limit_slot,
start_slot = %(with_state_at_slot + 1),
end_slot = %slot,
"Finished state reconstruction batch"
);
Expand All @@ -174,19 +210,10 @@ where
}
}

// Should always reach the `upper_limit_slot` or the end of the batch and return early
// above.
// Should always reach `to_slot` or the end of the batch and return early above.
Err(Error::StateReconstructionLogicError)
})??;

// Check that the split point wasn't mutated during the state reconstruction process.
// It shouldn't have been, due to the serialization of requests through the store migrator,
// so this is just a paranoid check.
let latest_split = self.get_split_info();
if split != latest_split {
return Err(Error::SplitPointModified(latest_split.slot, split.slot));
}

Ok(())
}
}
Loading