diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e226c707a4e..9588aa7a2b4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -941,6 +941,28 @@ impl BeaconChain { )? } + /// Returns the Pending (pre-payload) state root at the given slot in the canonical chain. + /// + /// In ePBS (Gloas+), if the canonical state at `slot` is Full (post-payload), this resolves + /// to the same-slot Pending state root. For skipped slots or pre-Gloas, returns the canonical + /// state root unchanged. + pub fn pending_state_root_at_slot(&self, request_slot: Slot) -> Result, Error> { + let Some(root) = self.state_root_at_slot(request_slot)? else { + return Ok(None); + }; + + // Pre-Gloas: all states are inherently Pending. + if !self + .spec + .fork_name_at_slot::(request_slot) + .gloas_enabled() + { + return Ok(Some(root)); + } + + Ok(Some(self.store.resolve_pending_state_root(&root)?)) + } + /// Returns the block root at the given slot, if any. Only returns roots in the canonical chain. /// /// ## Notes @@ -2820,7 +2842,15 @@ impl BeaconChain { // // Note that `check_block_relevancy` is incapable of returning // `DuplicateImportStatusUnknown` so we don't need to handle that case here. - Err(BlockError::DuplicateFullyImported(_)) => continue, + // + // Gloas: keep duplicate blocks so their envelopes can still be processed + // in `process_chain_segment`. This handles the case where a node restarts + // before an envelope was persisted to the DB. + Err(BlockError::DuplicateFullyImported(_)) => { + if block.as_block().fork_name_unchecked().gloas_enabled() { + filtered_chain_segment.push((block_root, block)); + } + } // If the block is the genesis block, simply ignore this block. Err(BlockError::GenesisBlock) => continue, // If the block is is for a finalized slot, simply ignore this block. @@ -2836,7 +2866,15 @@ impl BeaconChain { // In the case of (2), skipping the block is valid since we should never import it. // However, we will potentially get a `ParentUnknown` on a later block. The sync // protocol will need to ensure this is handled gracefully. - Err(BlockError::WouldRevertFinalizedSlot { .. }) => continue, + Err(BlockError::WouldRevertFinalizedSlot { .. }) => { + // Gloas: keep blocks at finalized slots so their envelopes can + // still be processed. This handles the checkpoint sync case where + // the checkpoint block is already finalized but its envelope hasn't + // been stored yet. + if block.as_block().fork_name_unchecked().gloas_enabled() { + filtered_chain_segment.push((block_root, block)); + } + } // The block has a known parent that does not descend from the finalized block. // There is no need to process this block or any children. Err(BlockError::NotFinalizedDescendant { block_parent_root }) => { @@ -2905,6 +2943,39 @@ impl BeaconChain { } }; + // Strip already-known blocks (e.g. the checkpoint sync anchor) from the + // front of the segment and process only their envelopes. These blocks + // can't go through signature_verify_chain_segment because their parents + // may not be available. + while let Some((root, _)) = filtered_chain_segment.first() { + if !self + .canonical_head + .fork_choice_read_lock() + .contains_block(root) + { + break; + } + let (block_root, block) = filtered_chain_segment.remove(0); + let maybe_envelope = match block { + RangeSyncBlock::Gloas { envelope, .. } => envelope, + _ => None, + }; + if let Some(envelope) = maybe_envelope + && let Err(error) = self + .process_range_sync_envelope( + block_root, + envelope, + notify_execution_layer, + ) + .await + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::EnvelopeError(Box::new(error)), + }; + } + } + while let Some((_root, block)) = filtered_chain_segment.first() { // Determine the epoch of the first block in the remaining segment. let start_epoch = block.epoch(); @@ -2944,12 +3015,14 @@ impl BeaconChain { } }; - // Import the blocks into the chain. - for signature_verified_block in signature_verified_blocks { + // Import the blocks (and envelopes for Gloas) into the chain. + for (signature_verified_block, maybe_envelope) in signature_verified_blocks { + let block_root = signature_verified_block.block_root(); let block_slot = signature_verified_block.slot(); + match self .process_block( - signature_verified_block.block_root(), + block_root, signature_verified_block, notify_execution_layer, BlockImportSource::RangeSync, @@ -2962,6 +3035,22 @@ impl BeaconChain { AvailabilityProcessingStatus::Imported(block_root) => { // The block was imported successfully. imported_blocks.push((block_root, block_slot)); + + // Gloas: process the envelope now that the block is in fork choice. + if let Some(envelope) = maybe_envelope + && let Err(error) = self + .process_range_sync_envelope( + block_root, + envelope, + notify_execution_layer, + ) + .await + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::EnvelopeError(Box::new(error)), + }; + } } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { warn!( @@ -2978,11 +3067,29 @@ impl BeaconChain { } } } - Err(BlockError::DuplicateFullyImported(block_root)) => { + Err(BlockError::DuplicateFullyImported(_)) + | Err(BlockError::WouldRevertFinalizedSlot { .. }) => { debug!( ?block_root, - "Ignoring already known blocks while processing chain segment" + "Ignoring already known block while processing chain segment" ); + // Gloas: still process the envelope for duplicate blocks. The envelope + // may not have been persisted before a restart, or the block may be the + // checkpoint sync anchor whose envelope was never stored. + if let Some(envelope) = maybe_envelope + && let Err(error) = self + .process_range_sync_envelope( + block_root, + envelope, + notify_execution_layer, + ) + .await + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::EnvelopeError(Box::new(error)), + }; + } continue; } Err(error) => { @@ -7185,7 +7292,7 @@ impl BeaconChain { block_data: AvailableBlockData, ) -> Option> { match block_data { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(blobs) => { debug!( %block_root, diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 95fde28f5b2..8fc771aa7d0 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -172,11 +172,13 @@ where let mut anchor_state = anchor.beacon_state; let mut anchor_block_header = anchor_state.latest_block_header().clone(); - // The anchor state MUST be on an epoch boundary (it should be advanced by the caller). - if !anchor_state - .slot() - .as_u64() - .is_multiple_of(E::slots_per_epoch()) + // Pre-gloas the anchor state MUST be on an epoch boundary (it should be advanced by the caller). + // Post-gloas this requirement is relaxed. + if !anchor_state.fork_name_unchecked().gloas_enabled() + && !anchor_state + .slot() + .as_u64() + .is_multiple_of(E::slots_per_epoch()) { return Err(Error::UnalignedCheckpoint { block_slot: anchor_block_header.slot, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1ce1137f1ea..5e6d37cd4a7 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,6 +60,7 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; +use crate::payload_envelope_verification::{AvailableEnvelope, EnvelopeError}; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -321,6 +322,20 @@ pub enum BlockError { bid_parent_root: Hash256, block_parent_root: Hash256, }, + /// The child block is known but its parent execution payload envelope has not been received yet. + /// + /// ## Peer scoring + /// + /// It's unclear if this block is valid, but it cannot be fully verified without the parent's + /// execution payload envelope. + ParentEnvelopeUnknown { parent_root: Hash256 }, + /// An error occurred while processing the execution payload envelope during range sync. + EnvelopeError(Box), + + PayloadEnvelopeError { + e: Box, + penalize_peer: bool, + }, } /// Which specific signature(s) are invalid in a SignedBeaconBlock @@ -487,6 +502,36 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: EnvelopeError) -> Self { + let penalize_peer = match &e { + // REJECT per spec: peer sent invalid envelope data + EnvelopeError::BadSignature + | EnvelopeError::BuilderIndexMismatch { .. } + | EnvelopeError::BlockHashMismatch { .. } + | EnvelopeError::SlotMismatch { .. } + | EnvelopeError::IncorrectBlockProposer { .. } => true, + // IGNORE per spec: not the peer's fault + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } + | EnvelopeError::UnknownValidator { .. } => false, + // Internal errors: not the peer's fault + EnvelopeError::BeaconChainError(_) + | EnvelopeError::BeaconStateError(_) + | EnvelopeError::BlockProcessingError(_) + | EnvelopeError::EnvelopeProcessingError(_) + | EnvelopeError::ExecutionPayloadError(_) + | EnvelopeError::BlockError(_) + | EnvelopeError::InternalError(_) + | EnvelopeError::OptimisticSyncNotSupported { .. } => false, + }; + BlockError::PayloadEnvelopeError { + e: Box::new(e), + penalize_peer, + } + } +} + /// Stores information about verifying a payload against an execution engine. #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct PayloadVerificationOutcome { @@ -584,10 +629,17 @@ pub(crate) fn process_block_slash_info( mut chain_segment: Vec<(Hash256, RangeSyncBlock)>, chain: &BeaconChain, -) -> Result>, BlockError> { +) -> Result< + Vec<( + SignatureVerifiedBlock, + Option>>, + )>, + BlockError, +> { if chain_segment.is_empty() { return Ok(vec![]); } @@ -616,14 +668,30 @@ pub fn signature_verify_chain_segment( let consensus_context = ConsensusContext::new(block.slot()).set_current_block_root(block_root); - let available_block = block.into_available_block(); + let (available_block, envelope) = match block { + RangeSyncBlock::Base(ab) => (ab, None), + RangeSyncBlock::Gloas { block, envelope } => { + let ab = AvailableBlock::new( + block, + AvailableBlockData::DataInEnvelope, + &chain.data_availability_checker, + chain.spec.clone(), + ) + .map_err(BlockError::AvailabilityCheck)?; + (ab, envelope) + } + }; + available_blocks.push(available_block.clone()); - signature_verified_blocks.push(SignatureVerifiedBlock { - block: MaybeAvailableBlock::Available(available_block), - block_root, - parent: None, - consensus_context, - }); + signature_verified_blocks.push(( + SignatureVerifiedBlock { + block: MaybeAvailableBlock::Available(available_block), + block_root, + parent: None, + consensus_context, + }, + envelope, + )); } chain @@ -633,7 +701,7 @@ pub fn signature_verify_chain_segment( // verify signatures let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - for svb in &mut signature_verified_blocks { + for (svb, _) in &mut signature_verified_blocks { signature_verifier .include_all_signatures(svb.block.as_block(), &mut svb.consensus_context)?; } @@ -644,7 +712,7 @@ pub fn signature_verify_chain_segment( drop(pubkey_cache); - if let Some(signature_verified_block) = signature_verified_blocks.first_mut() { + if let Some((signature_verified_block, _)) = signature_verified_blocks.first_mut() { signature_verified_block.parent = Some(parent); } @@ -1192,7 +1260,7 @@ impl SignatureVerifiedBlock { let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify()); match result { Ok(_) => { - // gloas blocks are always available. + // Gloas blocks are always available — data arrives via the envelope. let maybe_available = if chain .spec .fork_name_at_slot::(block.slot()) @@ -1201,7 +1269,7 @@ impl SignatureVerifiedBlock { MaybeAvailableBlock::Available( AvailableBlock::new( block, - AvailableBlockData::NoData, + AvailableBlockData::DataInEnvelope, &chain.data_availability_checker, chain.spec.clone(), ) @@ -1971,14 +2039,13 @@ fn load_parent>( } else if let Ok(parent_bid_block_hash) = parent_block.payload_bid_block_hash() && block.as_block().is_parent_block_full(parent_bid_block_hash) { - // Post-Gloas Full block case. - // TODO(gloas): loading the envelope here is not very efficient - let Some(envelope) = chain.store.get_payload_envelope(&root)? else { - return Err(BeaconChainError::DBInconsistent(format!( - "Missing envelope for parent block {root:?}", - )) - .into()); - }; + // If the parent's execution payload envelope hasn't arrived yet, + // return an unknown parent error so the block gets sent to the + // reprocess queue. + let envelope = chain + .store + .get_payload_envelope(&root)? + .ok_or(BlockError::ParentEnvelopeUnknown { parent_root: root })?; let state_root = envelope.message.state_root; (StatePayloadStatus::Full, state_root) } else { diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index be73ef15d73..e94752cfc02 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,10 +2,11 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; +use crate::payload_envelope_verification::AvailableEnvelope; use crate::{BeaconChainTypes, PayloadVerificationOutcome}; -use educe::Educe; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ @@ -40,43 +41,63 @@ impl LookupBlock { } } -/// A fully available block that has been constructed by range sync. -/// The block contains all the data required to import into fork choice. -/// This includes any and all blobs/columns required, including zero if -/// none are required. This can happen if the block is pre-deneb or if -/// it's simply past the DA boundary. -#[derive(Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct RangeSyncBlock { - block: AvailableBlock, +/// A block that has been constructed by range sync with all required data. +/// +/// - `Base`: Pre-Gloas blocks bundled as an `AvailableBlock` (block + blobs/columns). +/// - `Gloas`: Post-Gloas blocks where the execution payload is a separate envelope. +#[derive(Clone)] +pub enum RangeSyncBlock { + Base(AvailableBlock), + Gloas { + block: Arc>, + envelope: Option>>, + }, +} + +impl Hash for RangeSyncBlock { + fn hash(&self, state: &mut H) { + self.block_root().hash(state); + } } impl Debug for RangeSyncBlock { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RpcBlock({:?})", self.block_root()) + write!(f, "RangeSyncBlock({:?})", self.block_root()) } } impl RangeSyncBlock { pub fn block_root(&self) -> Hash256 { - self.block.block_root() + match self { + Self::Base(block) => block.block_root(), + Self::Gloas { block, .. } => block.canonical_root(), + } } pub fn as_block(&self) -> &SignedBeaconBlock { - self.block.block() + match self { + Self::Base(block) => block.block(), + Self::Gloas { block, .. } => block, + } } pub fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + match self { + Self::Base(block) => block.block_cloned(), + Self::Gloas { block, .. } => block.clone(), + } } pub fn block_data(&self) -> &AvailableBlockData { - self.block.data() + match self { + Self::Base(block) => block.data(), + Self::Gloas { .. } => { + unreachable!("block_data called on Gloas variant — use envelope data instead") + } + } } -} -impl RangeSyncBlock { - /// Constructs an `RangeSyncBlock` from a block and availability data. + /// Constructs a `Base` variant from a block and availability data. /// /// # Errors /// @@ -94,32 +115,58 @@ impl RangeSyncBlock { T: BeaconChainTypes, { let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; - Ok(Self { - block: available_block, - }) + Ok(Self::Base(available_block)) + } + + /// Constructs a `Gloas` variant from a block and optional available envelope. + pub fn new_gloas( + block: Arc>, + envelope: Option>>, + ) -> Self { + Self::Gloas { block, envelope } } #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { - self.block.deconstruct() + match self { + Self::Base(block) => block.deconstruct(), + Self::Gloas { .. } => { + unreachable!("deconstruct called on Gloas variant") + } + } } pub fn n_blobs(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, - AvailableBlockData::Blobs(blobs) => blobs.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData + | AvailableBlockData::DataInEnvelope + | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + }, + Self::Gloas { .. } => 0, } } pub fn n_data_columns(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, - AvailableBlockData::DataColumns(columns) => columns.len(), + match self { + Self::Base(block) => match block.data() { + AvailableBlockData::NoData + | AvailableBlockData::DataInEnvelope + | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + }, + Self::Gloas { .. } => 0, } } pub fn into_available_block(self) -> AvailableBlock { - self.block + match self { + Self::Base(block) => block, + Self::Gloas { .. } => { + unreachable!("into_available_block called on Gloas variant") + } + } } } @@ -405,13 +452,13 @@ impl AsBlock for RangeSyncBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - self.block.as_block() + RangeSyncBlock::as_block(self) } fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + RangeSyncBlock::block_cloned(self) } fn canonical_root(&self) -> Hash256 { - self.block.block_root() + self.block_root() } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 11b87351b19..5920243c506 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -42,6 +42,7 @@ use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use tracing::{debug, error, info, warn}; use tree_hash::TreeHash; +use types::StatePayloadStatus; use types::data::CustodyIndex; use types::{ BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecarList, @@ -433,9 +434,15 @@ where .clone() .ok_or("weak_subjectivity_state requires a store")?; - // Ensure the state is advanced to an epoch boundary. + // Pre-gloas ensure the state is advanced to an epoch boundary. + // Post-gloas checkpoint states are always pending (post-block) and cannot + // be advanced across epoch boundaries without first checking for a payload + // envelope. let slots_per_epoch = E::slots_per_epoch(); - if weak_subj_state.slot() % slots_per_epoch != 0 { + + if !weak_subj_state.fork_name_unchecked().gloas_enabled() + && weak_subj_state.slot() % slots_per_epoch != 0 + { debug!( state_slot = %weak_subj_state.slot(), block_slot = %weak_subj_block.slot(), @@ -568,7 +575,7 @@ where // Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten // about on a crash restart. store - .update_finalized_state( + .set_initial_finalized_state( weak_subj_state_root, weak_subj_block_root, weak_subj_state.clone(), @@ -617,7 +624,15 @@ where .map_err(|e| format!("Failed to initialize data column info: {:?}", e))?, ); - // TODO(gloas): add check that checkpoint state is Pending + if weak_subj_state.fork_name_unchecked().gloas_enabled() + && weak_subj_state.payload_status() != StatePayloadStatus::Pending + { + return Err(format!( + "Checkpoint sync state must be Pending (post-block) for Gloas, got {:?}", + weak_subj_state.payload_status() + )); + } + let snapshot = BeaconSnapshot { beacon_block_root: weak_subj_block_root, execution_envelope: None, diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 4372efa8096..87c240e906a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -20,7 +20,7 @@ use tracing::{debug, error, instrument}; use types::data::{BlobIdentifier, FixedBlobSidecarList}; use types::{ BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, + DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot, }; mod error; @@ -366,7 +366,7 @@ impl DataAvailabilityChecker { available_block: &AvailableBlock, ) -> Result<(), AvailabilityCheckError> { match available_block.data() { - AvailableBlockData::NoData => Ok(()), + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => Ok(()), AvailableBlockData::Blobs(blobs) => verify_kzg_for_blob_list(blobs.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs), AvailableBlockData::DataColumns(columns) => { @@ -388,7 +388,7 @@ impl DataAvailabilityChecker { for available_block in available_blocks { match available_block.data().to_owned() { - AvailableBlockData::NoData => {} + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => {} AvailableBlockData::Blobs(blobs) => all_blobs.extend(blobs), AvailableBlockData::DataColumns(columns) => all_data_columns.extend(columns), } @@ -420,6 +420,11 @@ impl DataAvailabilityChecker { self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) } + /// Determines if execution payload envelopes are required for an epoch (Gloas and later). + pub fn envelopes_required_for_epoch(&self, epoch: Epoch) -> bool { + self.spec.fork_name_at_epoch(epoch) >= ForkName::Gloas + } + /// See `Self::blobs_required_for_epoch` fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) @@ -650,6 +655,9 @@ pub enum AvailableBlockData { Blobs(BlobSidecarList), /// Block is post-PeerDAS and has more than zero blobs DataColumns(DataColumnSidecarList), + /// Gloas: block data (payload + columns) arrives via the execution payload envelope, + /// not the block itself. + DataInEnvelope, } impl AvailableBlockData { @@ -671,7 +679,7 @@ impl AvailableBlockData { pub fn blobs(&self) -> Option> { match self { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(blobs) => Some(blobs.clone()), AvailableBlockData::DataColumns(_) => None, } @@ -687,7 +695,7 @@ impl AvailableBlockData { pub fn data_columns(&self) -> Option> { match self { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(_) => None, AvailableBlockData::DataColumns(data_columns) => Some(data_columns.clone()), } @@ -752,6 +760,8 @@ impl AvailableBlock { return Err(AvailabilityCheckError::MissingBlobs); } } + // Gloas: data availability is handled by the envelope path, not the block. + AvailableBlockData::DataInEnvelope => {} AvailableBlockData::Blobs(blobs) => { if !blobs_required { return Err(AvailabilityCheckError::InvalidAvailableBlockData); @@ -830,7 +840,7 @@ impl AvailableBlock { pub fn has_blobs(&self) -> bool { match self.blob_data { - AvailableBlockData::NoData => false, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => false, AvailableBlockData::Blobs(..) => true, AvailableBlockData::DataColumns(_) => false, } @@ -858,6 +868,7 @@ impl AvailableBlock { AvailableBlockData::DataColumns(data_columns) => { AvailableBlockData::DataColumns(data_columns.clone()) } + AvailableBlockData::DataInEnvelope => AvailableBlockData::DataInEnvelope, }, blobs_available_timestamp: self.blobs_available_timestamp, spec: self.spec.clone(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 8f1d4464e11..3f6c6baa5b2 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -275,7 +275,7 @@ impl PendingComponents { // Block is available, construct `AvailableExecutedBlock` let blobs_available_timestamp = match blob_data { - AvailableBlockData::NoData => None, + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => None, AvailableBlockData::Blobs(_) => self .verified_blobs .iter() diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 752e4d1a967..644d60f2382 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -138,7 +138,7 @@ impl EarlyAttesterCache { }; let (blobs, data_columns) = match block.data() { - AvailableBlockData::NoData => (None, None), + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => (None, None), AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None), AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())), }; diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index bfda52558e4..ebb1bc4b767 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -157,7 +157,7 @@ impl BeaconChain { } match &block_data { - AvailableBlockData::NoData => (), + AvailableBlockData::NoData | AvailableBlockData::DataInEnvelope => (), AvailableBlockData::Blobs(_) => new_oldest_blob_slot = Some(block.slot()), AvailableBlockData::DataColumns(_) => { new_oldest_data_column_slot = Some(block.slot()) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs index 86f9293c8f2..a538f35689c 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -12,7 +12,7 @@ use crate::{ PayloadVerificationOutcome, block_verification::PayloadVerificationHandle, payload_envelope_verification::{ - EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope, + AvailableEnvelope, EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root, payload_notifier::PayloadNotifier, }, @@ -32,7 +32,6 @@ impl GossipVerifiedEnvelope { ) -> Result, EnvelopeError> { let signed_envelope = self.signed_envelope; let envelope = &signed_envelope.message; - let payload = &envelope.payload; // Define a future that will verify the execution payload with an execution engine. // @@ -91,10 +90,13 @@ impl GossipVerifiedEnvelope { )?; Ok(ExecutionPendingEnvelope { - signed_envelope: MaybeAvailableEnvelope::AvailabilityPending { - block_hash: payload.block_hash, - envelope: signed_envelope, - }, + signed_envelope: MaybeAvailableEnvelope::Available(AvailableEnvelope::new( + signed_envelope.block_hash(), + signed_envelope.clone(), + vec![], + None, + chain.spec.clone(), + )), import_data: EnvelopeImportData { block_root, post_state: Box::new(state), diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 7e797993101..0922331e654 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -4,18 +4,25 @@ use std::time::Duration; use eth2::types::{EventKind, SseExecutionPayload}; use fork_choice::PayloadVerificationStatus; use slot_clock::SlotClock; -use store::StoreOp; +use state_processing::{ + VerifySignatures, + envelope_processing::{VerifyStateRoot, process_execution_payload_envelope}, +}; +use store::{DatabaseBlock, StoreOp}; use tracing::{debug, error, info, info_span, instrument, warn}; -use types::{BeaconState, BlockImportSource, Hash256, SignedExecutionPayloadEnvelope}; +use types::{BeaconState, BlockImportSource, EthSpec, Hash256, SignedExecutionPayloadEnvelope}; use super::{ AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, - ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, + ExecutedEnvelope, MaybeAvailableEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, + gossip_verified_envelope::verify_envelope_consistency, load_snapshot_from_state_root, + payload_notifier::PayloadNotifier, }; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, - NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, - payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, + NotifyExecutionLayer, PayloadVerificationOutcome, block_verification_types::AvailableBlockData, + metrics, payload_envelope_verification::ExecutionPendingEnvelope, + validator_monitor::get_slot_delay_ms, }; const ENVELOPE_METRICS_CACHE_SLOT_LIMIT: u32 = 64; @@ -148,6 +155,125 @@ impl BeaconChain { } } + /// Process an `AvailableEnvelope` from range sync. Unlike the gossip path, the block has + /// already been imported into fork choice so we can skip gossip-specific checks. + /// + /// Steps: consistency checks, signature verification, EL verification (newPayload), + /// state processing, await EL result, and import. + #[instrument(skip_all, fields(block_root = ?block_root))] + pub async fn process_range_sync_envelope( + self: &Arc, + block_root: Hash256, + available_envelope: Box>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result { + let signed_envelope = available_envelope.envelope().clone(); + let block_slot = signed_envelope.slot(); + + // Load the block from store (just imported, guaranteed to exist). + let block = match self.store.try_get_full_block(&block_root)? { + Some(DatabaseBlock::Full(block)) => Arc::new(block), + Some(DatabaseBlock::Blinded(_)) | None => { + return Err(EnvelopeError::BlockRootUnknown { block_root }); + } + }; + + // Envelope consistency checks. + let execution_bid = &block + .message() + .body() + .signed_execution_payload_bid()? + .message; + let latest_finalized_slot = self + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + verify_envelope_consistency( + &signed_envelope.message, + &block, + execution_bid, + latest_finalized_slot, + )?; + + // Load state for signature verification and state processing. + let snapshot = + load_snapshot_from_state_root::(block_root, block.state_root(), &self.store)?; + + // Verify the envelope signature. + let is_valid = + signed_envelope.verify_signature_with_state(&snapshot.pre_state, &self.spec)?; + if !is_valid { + return Err(EnvelopeError::BadSignature); + } + + // Start EL verification (newPayload) as early as possible. + let payload_notifier = PayloadNotifier::new( + self.clone(), + signed_envelope.clone(), + block.clone(), + notify_execution_layer, + )?; + let payload_verification_future = async move { + let chain = payload_notifier.chain.clone(); + if let Some(started_execution) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_started_execution(block_root, block_slot, started_execution); + } + let payload_verification_status = payload_notifier.notify_new_payload().await?; + Ok(PayloadVerificationOutcome { + payload_verification_status, + }) + }; + let payload_verification_handle = self + .task_executor + .spawn_handle( + payload_verification_future, + "range_sync_envelope_payload_verification", + ) + .ok_or(BeaconChainError::RuntimeShutdown)?; + + // Run state processing (signatures already verified above). + let mut state = snapshot.pre_state; + process_execution_payload_envelope( + &mut state, + Some(snapshot.state_root), + &signed_envelope, + VerifySignatures::False, + VerifyStateRoot::True, + &self.spec, + )?; + + // Build the ExecutionPendingEnvelope with Available status (columns already bundled). + let execution_pending = ExecutionPendingEnvelope { + signed_envelope: MaybeAvailableEnvelope::Available(*available_envelope), + import_data: EnvelopeImportData { + block_root, + post_state: Box::new(state), + }, + payload_verification_handle, + }; + + // Await EL verification and import. + let executed_envelope = self + .clone() + .into_executed_payload_envelope(execution_pending) + .await?; + + match executed_envelope { + ExecutedEnvelope::Available(envelope) => { + self.import_available_execution_payload_envelope(Box::new(envelope)) + .await + } + ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( + "Pending payload envelope not yet implemented".to_owned(), + )), + } + } + /// Accepts a fully-verified payload envelope and awaits on its payload verification handle to /// get a fully `ExecutedEnvelope`. /// @@ -168,16 +294,6 @@ impl BeaconChain { .map_err(BeaconChainError::TokioJoin)? .ok_or(BeaconChainError::RuntimeShutdown)??; - // TODO(gloas): optimistic sync is not supported for Gloas, maybe we could re-add it - if payload_verification_outcome - .payload_verification_status - .is_optimistic() - { - return Err(EnvelopeError::OptimisticSyncNotSupported { - block_root: import_data.block_root, - }); - } - Ok(ExecutedEnvelope::new( signed_envelope, import_data, diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 225d5a98924..1581ab325cb 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -47,7 +47,7 @@ pub struct EnvelopeImportData { pub post_state: Box>, } -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(dead_code)] pub struct AvailableEnvelope { execution_block_hash: ExecutionBlockHash, @@ -59,6 +59,26 @@ pub struct AvailableEnvelope { } impl AvailableEnvelope { + pub fn new( + execution_block_hash: ExecutionBlockHash, + envelope: Arc>, + columns: DataColumnSidecarList, + columns_available_timestamp: Option, + spec: Arc, + ) -> Self { + Self { + execution_block_hash, + envelope, + columns, + columns_available_timestamp, + spec, + } + } + + pub fn envelope(&self) -> &Arc> { + &self.envelope + } + pub fn message(&self) -> &ExecutionPayloadEnvelope { &self.envelope.message } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index c6e13bd160b..b98a3f4cffb 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -5465,6 +5465,150 @@ fn check_finalization(harness: &TestHarness, expected_slot: u64) { ); } +// Verify that post-gloas checkpoint sync accepts a non-epoch aligned state and builds +// the chain. +// +// Since post-gloas checkpoint sync states are always the post block state, if the epoch boundary +// slot is skipped, we'll receive a checkpoint state that is not epoch aligned. +// +// Example: slot `n` is the epoch boundary slot and is skipped. We'll receive the post block state for +// slot `n - 1`. This is the state before the payload for slot `n - 1` was processed. +#[tokio::test] +async fn weak_subjectivity_sync_gloas_pending_non_aligned() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let spec = test_spec::(); + + // Build a chain with a skipped slot at the epoch boundary. + let epoch_boundary_slot = E::slots_per_epoch(); + let num_initial_slots = E::slots_per_epoch() * 4; + let checkpoint_slot = Slot::new(epoch_boundary_slot); + + let slots = (1..num_initial_slots) + .map(Slot::new) + .filter(|&slot| { + // Skip the epoch boundary slot + slot.as_u64() != epoch_boundary_slot + }) + .collect::>(); + + let temp1 = tempdir().unwrap(); + let full_store = get_store_generic(&temp1, StoreConfig::default(), spec.clone()); + let harness = get_harness_import_all_data_columns(full_store.clone(), LOW_VALIDATOR_COUNT); + let all_validators = (0..LOW_VALIDATOR_COUNT).collect::>(); + + let (genesis_state, genesis_state_root) = harness.get_current_state_and_root(); + harness + .add_attested_blocks_at_slots( + genesis_state.clone(), + genesis_state_root, + &slots, + &all_validators, + ) + .await; + + // Extract the checkpoint block and its Pending state. + let wss_block_root = harness + .chain + .block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev) + .unwrap() + .unwrap(); + let wss_block = harness + .chain + .store + .get_full_block(&wss_block_root) + .unwrap() + .unwrap(); + + // The block's state_root points to the Pending state in Gloas. + let wss_state_root = wss_block.state_root(); + let wss_state = full_store + .get_state( + &wss_state_root, + Some(wss_block.slot()), + CACHE_STATE_IN_TESTS, + ) + .unwrap() + .unwrap(); + + assert_eq!( + wss_state.payload_status(), + StatePayloadStatus::Pending, + "Checkpoint state should be Pending" + ); + assert_ne!( + wss_state.slot() % E::slots_per_epoch(), + 0, + "Checkpoint state is epoch-aligned, expected non-aligned" + ); + + let wss_blobs_opt = harness + .chain + .get_or_reconstruct_blobs(&wss_block_root) + .unwrap(); + + // Build a new chain from the non-aligned Pending checkpoint state. + let temp2 = tempdir().unwrap(); + let store = get_store_generic(&temp2, StoreConfig::default(), spec.clone()); + + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(harness.chain.genesis_time), + spec.get_slot_duration(), + ); + slot_clock.set_slot(harness.get_current_slot().as_u64()); + + let chain_config = ChainConfig { + archive: true, + ..ChainConfig::default() + }; + + let trusted_setup = get_kzg(&spec); + let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1); + let mock = mock_execution_layer_from_parts( + harness.spec.clone(), + harness.runtime.task_executor.clone(), + ); + + let beacon_chain = BeaconChainBuilder::>::new(MinimalEthSpec, trusted_setup) + .chain_config(chain_config) + .store(store.clone()) + .custom_spec(spec.clone().into()) + .task_executor(harness.chain.task_executor.clone()) + .weak_subjectivity_state(wss_state, wss_block, wss_blobs_opt, genesis_state) + .unwrap() + .store_migrator_config(MigratorConfig::default().blocking()) + .slot_clock(slot_clock) + .shutdown_sender(shutdown_tx) + .event_handler(Some(ServerSentEventHandler::new_with_capacity(1))) + .execution_layer(Some(mock.el)) + .ordered_custody_column_indices(generate_data_column_indices_rand_order::()) + .rng(Box::new(StdRng::seed_from_u64(42))) + .build(); + + assert!( + beacon_chain.is_ok(), + "Beacon chain should build from non-aligned Gloas Pending checkpoint state. Error: {:?}", + beacon_chain.err() + ); + + let chain = beacon_chain.unwrap(); + + // The head state should be at the block's slot + assert_eq!( + chain.head_snapshot().beacon_state.slot(), + Slot::new(epoch_boundary_slot - 1), + "Head state should be at the checkpoint block's slot" + ); + assert_eq!( + chain.head_snapshot().beacon_state.payload_status(), + StatePayloadStatus::Pending, + "Head state should be Pending after checkpoint sync" + ); +} + // ===================== Gloas Store Tests ===================== /// Test basic Gloas block + envelope storage and retrieval. diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index a6c76beb317..61b46ca1e69 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -416,6 +416,9 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcPayloadEnvelope { + process_fn: AsyncFn, + }, RpcCustodyColumn(AsyncFn), ColumnReconstruction(AsyncFn), IgnoredRpcBlock { @@ -481,6 +484,7 @@ pub enum WorkType { GossipLightClientOptimisticUpdate, RpcBlock, RpcBlobs, + RpcPayloadEnvelope, RpcCustodyColumn, ColumnReconstruction, IgnoredRpcBlock, @@ -542,6 +546,7 @@ impl Work { Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences, Work::RpcBlock { .. } => WorkType::RpcBlock, Work::RpcBlobs { .. } => WorkType::RpcBlobs, + Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope, Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, @@ -1173,7 +1178,9 @@ impl BeaconProcessor { Work::GossipLightClientOptimisticUpdate { .. } => work_queues .lc_gossip_optimistic_update_queue .push(work, work_id), - Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { + Work::RpcBlock { .. } + | Work::IgnoredRpcBlock { .. } + | Work::RpcPayloadEnvelope { .. } => { work_queues.rpc_block_queue.push(work, work_id) } Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), @@ -1305,7 +1312,9 @@ impl BeaconProcessor { WorkType::GossipLightClientOptimisticUpdate => { work_queues.lc_gossip_optimistic_update_queue.len() } - WorkType::RpcBlock => work_queues.rpc_block_queue.len(), + WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => { + work_queues.rpc_block_queue.len() + } WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { work_queues.rpc_blob_queue.len() } @@ -1500,6 +1509,7 @@ impl BeaconProcessor { beacon_block_root: _, } | Work::RpcBlobs { process_fn } + | Work::RpcPayloadEnvelope { process_fn } | Work::RpcCustodyColumn(process_fn) | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index 4a974c9919a..3479d62f6ad 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -5,7 +5,11 @@ use crate::version::{ ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header, execution_optimistic_finalized_beacon_response, }; -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope; +use beacon_chain::{ + BeaconChain, BeaconChainTypes, NotifyExecutionLayer, + payload_envelope_verification::EnvelopeError, +}; use bytes::Bytes; use eth2::types as api_types; use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; @@ -15,7 +19,7 @@ use ssz::{Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{info, warn}; -use types::SignedExecutionPayloadEnvelope; +use types::{BlockImportSource, SignedExecutionPayloadEnvelope}; use warp::{ Filter, Rejection, Reply, hyper::{Body, Response}, @@ -93,33 +97,66 @@ pub async fn publish_execution_payload_envelope( ) -> Result, Rejection> { let slot = envelope.message.slot; let beacon_block_root = envelope.message.beacon_block_root; + let builder_index = envelope.message.builder_index; - // TODO(gloas): Replace this check once we have gossip validation. if !chain.spec.is_gloas_scheduled() { return Err(warp_utils::reject::custom_bad_request( "Execution payload envelopes are not supported before the Gloas fork".into(), )); } - // TODO(gloas): We should probably add validation here i.e. BroadcastValidation::Gossip - info!( - %slot, - %beacon_block_root, - builder_index = envelope.message.builder_index, - "Publishing signed execution payload envelope to network" - ); + let signed_envelope = Arc::new(envelope); + + // The publish_fn is called inside process_execution_payload_envelope after consensus + // verification but before the EL call. + let envelope_for_publish = signed_envelope.clone(); + let sender = network_tx.clone(); + let publish_fn = move || { + info!( + %slot, + %beacon_block_root, + builder_index, + "Publishing signed execution payload envelope to network" + ); + crate::utils::publish_pubsub_message( + &sender, + PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())), + ) + .map_err(|_| { + warn!(%slot, "Failed to publish execution payload envelope to network"); + EnvelopeError::InternalError( + "Unable to publish execution payload envelope to network".to_owned(), + ) + }) + }; + + let ctx = chain.gossip_verification_context(); + let gossip_verified_envelope = match GossipVerifiedEnvelope::new(signed_envelope, &ctx) { + Ok(envelope) => envelope, + Err(e) => { + warn!(%slot, %beacon_block_root, error = ?e, "Execution payload envelope rejected"); + return Err(warp_utils::reject::custom_bad_request(format!( + "execution payload envelope rejected: {e:?}", + ))); + } + }; - // Publish to the network - crate::utils::publish_pubsub_message( - network_tx, - PubsubMessage::ExecutionPayload(Box::new(envelope)), - ) - .map_err(|_| { - warn!(%slot, "Failed to publish execution payload envelope to network"); - warp_utils::reject::custom_server_error( - "Unable to publish execution payload envelope to network".into(), + // Import the envelope locally (runs state transition and notifies the EL). + chain + .process_execution_payload_envelope( + beacon_block_root, + gossip_verified_envelope, + NotifyExecutionLayer::Yes, + BlockImportSource::HttpApi, + publish_fn, ) - })?; + .await + .map_err(|e| { + warn!(%slot, %beacon_block_root, reason = ?e, "Execution payload envelope rejected"); + warp_utils::reject::custom_bad_request(format!( + "execution payload envelope rejected: {e:?}" + )) + })?; Ok(warp::reply().into_response()) } diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 13fb9b2c585..9f9a01d48bc 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -43,14 +43,32 @@ impl StateId { chain.canonical_head.cached_head().finalized_checkpoint(); let (slot, execution_optimistic) = checkpoint_slot_and_execution_optimistic(chain, finalized_checkpoint)?; - (slot, execution_optimistic, true) + let root = chain + .pending_state_root_at_slot(slot) + .map_err(warp_utils::reject::unhandled_error)? + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "beacon state at slot {}", + slot + )) + })?; + return Ok((root, execution_optimistic, true)); } CoreStateId::Justified => { let justified_checkpoint = chain.canonical_head.cached_head().justified_checkpoint(); let (slot, execution_optimistic) = checkpoint_slot_and_execution_optimistic(chain, justified_checkpoint)?; - (slot, execution_optimistic, false) + let root = chain + .pending_state_root_at_slot(slot) + .map_err(warp_utils::reject::unhandled_error)? + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "beacon state at slot {}", + slot + )) + })?; + return Ok((root, execution_optimistic, false)); } CoreStateId::Slot(slot) => ( *slot, diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index 9e1c6541ec8..f2b55d01c78 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -110,13 +110,13 @@ impl RateLimiterConfig { // blocks and a decent syncing rate for honest nodes. Malicious nodes would need to // spread out their requests over the time window to max out bandwidth on the server. pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = - Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + Quota::n_every(NonZeroU64::new(32).unwrap(), 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = - Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + Quota::n_every(NonZeroU64::new(32).unwrap(), 10); pub const DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota = - Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + Quota::n_every(NonZeroU64::new(32).unwrap(), 10); pub const DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota = - Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + Quota::n_every(NonZeroU64::new(32).unwrap(), 10); // `DEFAULT_BLOCKS_BY_RANGE_QUOTA` * (target + 1) to account for high usage pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(NonZeroU64::new(896).unwrap(), 10); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a4438579..0b1d84b7066 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -31,6 +31,10 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Request searching for an execution payload envelope given a block root. + SinglePayloadEnvelope { id: SingleLookupReqId }, + /// Payload envelopes by range request + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -76,6 +80,14 @@ pub enum DataColumnsByRangeRequester { CustodyBackfillSync(CustodyBackFillBatchRequestId), } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct PayloadEnvelopesByRangeRequestId { + /// Id to identify this attempt at a payload_envelopes_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -252,6 +264,12 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!( + PayloadEnvelopesByRangeRequestId, + "{}/{}", + id, + parent_request_id +); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 1f55d9a8789..2d13ef8e2ab 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1290,6 +1290,17 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root)); return None; } + Err(BlockError::ParentEnvelopeUnknown { parent_root }) => { + debug!( + ?block_root, + ?parent_root, + "Parent envelope not yet available for gossip block" + ); + self.send_sync_message(SyncMessage::UnknownParentEnvelope( + peer_id, block, block_root, + )); + return None; + } Err(e @ BlockError::BeaconChainError(_)) => { debug!( error = ?e, @@ -1379,7 +1390,11 @@ impl NetworkBeaconProcessor { return None; } // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob` - Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => { + // EnvelopeError is unreachable. Only constructed during range sync envelope processing. + Err(e @ BlockError::InternalError(_)) + | Err(e @ BlockError::BlobNotRequired(_)) + | Err(e @ BlockError::EnvelopeError(_)) + | Err(e @ BlockError::PayloadEnvelopeError { .. }) => { error!(error = %e, "Internal block gossip validation error"); return None; } @@ -1578,6 +1593,16 @@ impl NetworkBeaconProcessor { "Block with unknown parent attempted to be processed" ); } + Err(BlockError::ParentEnvelopeUnknown { parent_root }) => { + debug!( + %block_root, + ?parent_root, + "Parent envelope not yet available, need envelope lookup" + ); + // Unlike ParentUnknown, this can legitimately happen during processing + // because the parent envelope may not have arrived yet. The lookup + // system will handle retrying via Action::ParentEnvelopeUnknown. + } Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => { debug!( error = %e, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index b3d6874b8a3..1a1d348bb0a 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -541,6 +541,22 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for an RPC payload envelope. + pub fn send_rpc_payload_envelope( + self: &Arc, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let process_fn = + self.clone() + .generate_rpc_envelope_process_fn(envelope, seen_timestamp, process_type); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcPayloadEnvelope { process_fn }, + }) + } + /// Create a new `Work` event for some blobs, where the result from computation (if any) is /// sent to the other side of `result_tx`. pub fn send_rpc_blobs( diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index f7fbce8e568..57d3d7d2206 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -4,7 +4,7 @@ use crate::sync::BatchProcessResult; use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ ChainId, - manager::{BlockProcessType, SyncMessage}, + manager::{BlockProcessType, BlockProcessingResult, SyncMessage}, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; @@ -28,7 +28,9 @@ use store::KzgCommitment; use tracing::{debug, debug_span, error, info, instrument, warn}; use types::data::FixedBlobSidecarList; use types::kzg_ext::format_kzg_commitments; -use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; +use types::{ + BlockImportSource, DataColumnSidecarList, Epoch, Hash256, SignedExecutionPayloadEnvelope, +}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -73,6 +75,77 @@ impl NetworkBeaconProcessor { Box::pin(process_fn) } + /// Returns an async closure which processes a payload envelope received via RPC. + pub fn generate_rpc_envelope_process_fn( + self: Arc, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> AsyncFn { + let process_fn = async move { + self.process_rpc_envelope(envelope, seen_timestamp, process_type) + .await; + }; + Box::pin(process_fn) + } + + /// Process an execution payload envelope received via RPC. + async fn process_rpc_envelope( + self: Arc, + envelope: Arc>, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + let beacon_block_root = envelope.beacon_block_root(); + + // Verify the envelope using the gossip verification path (same checks apply to RPC) + let verified_envelope = match self.chain.verify_envelope_for_gossip(envelope).await { + Ok(verified) => verified, + Err(e) => { + debug!( + error = ?e, + ?beacon_block_root, + "RPC payload envelope failed verification" + ); + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: BlockProcessingResult::Err(e.into()), + }); + return; + } + }; + + // Process the verified envelope + let result = self + .chain + .process_execution_payload_envelope( + beacon_block_root, + verified_envelope, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + #[allow(clippy::result_large_err)] + || Ok(()), + ) + .await; + + let processing_result = match result { + Ok(status) => BlockProcessingResult::Ok(status), + Err(e) => { + debug!( + error = ?e, + ?beacon_block_root, + "RPC payload envelope processing failed" + ); + BlockProcessingResult::Err(e.into()) + } + }; + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: processing_result, + }); + } + /// Returns the `process_fn` and `ignore_fn` required when requeuing an RPC block. pub fn generate_lookup_beacon_block_fns( self: Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index e6982e6a847..faa55b48bfe 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -24,7 +24,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock}; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, +}; /// Handles messages from the network and routes them to the appropriate service to be handled. pub struct Router { @@ -327,10 +330,11 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(envelope) => { + self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); + } + Response::PayloadEnvelopesByRange(envelope) => { + self.on_payload_envelopes_by_range_response(peer_id, app_request_id, envelope); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -703,6 +707,40 @@ impl Router { }); } + /// Handle a `PayloadEnvelopesByRoot` response from the peer. + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SinglePayloadEnvelope { .. } => id, + other => { + crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }, + AppRequestId::Router => { + crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync"); + return; + } + AppRequestId::Internal => unreachable!("Handled internally"), + }; + + trace!( + %peer_id, + "Received PayloadEnvelopesByRoot Response" + ); + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + peer_id, + sync_request_id, + envelope, + seen_timestamp: timestamp_now(), + }); + } + /// Handle a `BlobsByRoot` response from the peer. pub fn on_blobs_by_root_response( &mut self, @@ -794,6 +832,29 @@ impl Router { } } + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + trace!( + %peer_id, + "Received PayloadEnvelopesByRange Response" + ); + + if let AppRequestId::Sync(sync_request_id) = app_request_id { + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + peer_id, + sync_request_id, + envelope, + seen_timestamp: timestamp_now(), + }); + } else { + crit!("All payload envelopes by range responses should belong to sync"); + } + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 0f80138d240..29beb96e5a5 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -35,7 +35,7 @@ use std::marker::PhantomData; use std::sync::Arc; use strum::IntoEnumIterator; use tracing::{debug, error, info, warn}; -use types::{ColumnIndex, Epoch, EthSpec}; +use types::{ColumnIndex, Epoch, EthSpec, ForkName}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -218,6 +218,14 @@ impl BackFillSync { match self.state() { BackFillState::Syncing => {} // already syncing ignore. BackFillState::Paused => { + if self + .beacon_chain + .spec + .fork_name_at_epoch(self.to_be_downloaded) + >= ForkName::Gloas + { + return Ok(SyncStart::NotSyncing); + } if self .network_globals .peers diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 10af1bf5038..e0704e25697 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -33,6 +33,7 @@ pub type BatchId = Epoch; #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { BlocksAndColumns, + BlocksAndEnvelopesAndColumns, BlocksAndBlobs, Blocks, Columns(HashSet), diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index edd99345b43..bb8d81cc6e7 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId, }; use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; @@ -12,16 +12,17 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; +use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use super::SingleLookupId; use super::single_block_lookup::{ComponentRequests, DownloadResult}; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ResponseType { Block, Blob, CustodyColumn, + Envelope, } /// This trait unifies common single block lookup functionality across blocks and blobs. This @@ -151,6 +152,7 @@ impl RequestState for BlobRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -205,6 +207,7 @@ impl RequestState for CustodyRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), ComponentRequests::ActiveCustodyRequest(request) => Ok(request), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -215,3 +218,52 @@ impl RequestState for CustodyRequestState { &mut self.state } } + +impl RequestState for EnvelopeRequestState { + type VerifiedResponseType = Arc>; + + fn make_request( + &self, + id: Id, + lookup_peers: Arc>>, + _: usize, + cx: &mut SyncNetworkContext, + ) -> Result { + cx.envelope_lookup_request(id, lookup_peers, self.block_root) + .map_err(LookupRequestError::SendFailedNetwork) + } + + fn send_for_processing( + id: Id, + download_result: DownloadResult, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let DownloadResult { + value, + block_root, + seen_timestamp, + .. + } = download_result; + cx.send_envelope_for_processing(id, value, seen_timestamp, block_root) + .map_err(LookupRequestError::SendFailedProcessor) + } + + fn response_type() -> ResponseType { + ResponseType::Envelope + } + + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { + match &mut request.component_requests { + ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request), + _ => Err("expecting envelope request"), + } + } + + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 394f2fc37d5..27d96de51d7 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -22,7 +22,9 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{ + AwaitingParent, LookupRequestError, LookupResult, SingleBlockLookup, +}; use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; @@ -39,7 +41,9 @@ use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; +pub use single_block_lookup::{ + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, +}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; @@ -109,6 +113,7 @@ pub type SingleLookupId = u32; enum Action { Retry, ParentUnknown { parent_root: Hash256 }, + ParentEnvelopeUnknown { parent_root: Hash256 }, Drop(/* reason: */ String), Continue, } @@ -213,7 +218,7 @@ impl BlockLookups { self.new_current_lookup( block_root, Some(block_component), - Some(parent_root), + Some(AwaitingParent::Block(parent_root)), // On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required // to have the rest of the block components (refer to decoupled blob gossip). Create // the lookup with zero peers to house the block components. @@ -225,7 +230,37 @@ impl BlockLookups { } } - /// Seach a block whose parent root is unknown. + /// A child block's parent envelope is missing. Create a child lookup (with the block component) + /// that waits for the parent envelope, and an envelope-only lookup for the parent. + /// + /// Returns true if both lookups are created or already exist. + #[must_use = "only reference the new lookup if returns true"] + pub fn search_child_and_parent_envelope( + &mut self, + block_root: Hash256, + block_component: BlockComponent, + parent_root: Hash256, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) -> bool { + let envelope_lookup_exists = + self.search_parent_envelope_of_child(parent_root, &[peer_id], cx); + if envelope_lookup_exists { + // Create child lookup that waits for the parent envelope. + // The child block itself has already been seen, so we pass it as a component. + self.new_current_lookup( + block_root, + Some(block_component), + Some(AwaitingParent::Envelope(parent_root)), + &[], + cx, + ) + } else { + false + } + } + + /// Search a block whose parent root is unknown. /// /// Returns true if the lookup is created or already exists #[must_use = "only reference the new lookup if returns true"] @@ -343,6 +378,57 @@ impl BlockLookups { self.new_current_lookup(block_root_to_search, None, None, peers, cx) } + /// A block triggers the search of a parent envelope. + #[must_use = "only reference the new lookup if returns true"] + pub fn search_parent_envelope_of_child( + &mut self, + parent_root: Hash256, + peers: &[PeerId], + cx: &mut SyncNetworkContext, + ) -> bool { + // Check if there's already a lookup for this root (could be a block lookup or envelope + // lookup). If so, add peers and let it handle the envelope. + if let Some((&lookup_id, _lookup)) = self + .single_block_lookups + .iter_mut() + .find(|(_, lookup)| lookup.is_for_block(parent_root)) + { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { + warn!(error = ?e, "Error adding peers to envelope lookup"); + } + return true; + } + + if self.single_block_lookups.len() >= MAX_LOOKUPS { + warn!(?parent_root, "Dropping envelope lookup reached max"); + return false; + } + + let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id()); + let _guard = lookup.span.clone().entered(); + + let id = lookup.id; + let lookup = match self.single_block_lookups.entry(id) { + Entry::Vacant(entry) => entry.insert(lookup), + Entry::Occupied(_) => { + warn!(id, "Lookup exists with same id"); + return false; + } + }; + + debug!( + ?peers, + ?parent_root, + id = lookup.id, + "Created envelope-only lookup" + ); + metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED); + self.metrics.created_lookups += 1; + + let result = lookup.continue_requests(cx); + self.on_lookup_result(id, result, "new_envelope_lookup", cx) + } + /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. /// Returns true if the lookup is created or already exists @@ -351,7 +437,7 @@ impl BlockLookups { &mut self, block_root: Hash256, block_component: Option>, - awaiting_parent: Option, + awaiting_parent: Option, peers: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { @@ -386,13 +472,14 @@ impl BlockLookups { } // Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress - if let Some(awaiting_parent) = awaiting_parent + if let Some(AwaitingParent::Block(parent_root) | AwaitingParent::Envelope(parent_root)) = + awaiting_parent && !self .single_block_lookups .iter() - .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) + .any(|(_, lookup)| lookup.is_for_block(parent_root)) { - warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found"); + warn!(block_root = ?parent_root, "Ignoring child lookup parent lookup not found"); return false; } @@ -426,9 +513,7 @@ impl BlockLookups { debug!( ?peers, ?block_root, - awaiting_parent = awaiting_parent - .map(|root| root.to_string()) - .unwrap_or("none".to_owned()), + ?awaiting_parent, id = lookup.id, "Created block lookup" ); @@ -559,6 +644,15 @@ impl BlockLookups { BlockProcessType::SingleCustodyColumn(id) => { self.on_processing_result_inner::>(id, result, cx) } + BlockProcessType::SinglePayloadEnvelope { id, block_root } => { + let result = self + .on_processing_result_inner::>(id, result, cx); + // On successful envelope import, unblock child lookups waiting for this envelope + if matches!(&result, Ok(LookupResult::Completed)) { + self.continue_envelope_child_lookups(block_root, cx); + } + result + } }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); } @@ -645,6 +739,12 @@ impl BlockLookups { request_state.revert_to_awaiting_processing()?; Action::ParentUnknown { parent_root } } + BlockError::ParentEnvelopeUnknown { parent_root } => { + // The parent block is known but its execution payload envelope is missing. + // Revert to awaiting processing and fetch the envelope via RPC. + request_state.revert_to_awaiting_processing()?; + Action::ParentEnvelopeUnknown { parent_root } + } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline // and failed to validate the execution payload. Do not downscore peer. @@ -667,6 +767,26 @@ impl BlockLookups { // We opt to drop the lookup instead. Action::Drop(format!("{e:?}")) } + BlockError::PayloadEnvelopeError { e, penalize_peer } => { + debug!( + ?block_root, + error = ?e, + "Payload envelope processing error" + ); + if penalize_peer { + let peer_group = request_state.on_processing_failure()?; + for peer in peer_group.all() { + cx.report_peer( + *peer, + PeerAction::MidToleranceError, + "lookup_envelope_processing_failure", + ); + } + Action::Retry + } else { + Action::Drop(format!("{e:?}")) + } + } other => { debug!( ?block_root, @@ -701,6 +821,7 @@ impl BlockLookups { ResponseType::CustodyColumn => { "lookup_custody_column_processing_failure" } + ResponseType::Envelope => "lookup_envelope_processing_failure", }, ); } @@ -742,6 +863,25 @@ impl BlockLookups { ))) } } + Action::ParentEnvelopeUnknown { parent_root } => { + let peers = lookup.all_peers(); + lookup.set_awaiting_parent_envelope(parent_root); + let envelope_lookup_exists = + self.search_parent_envelope_of_child(parent_root, &peers, cx); + if envelope_lookup_exists { + debug!( + id = lookup_id, + ?block_root, + ?parent_root, + "Marking lookup as awaiting parent envelope" + ); + Ok(LookupResult::Pending) + } else { + Err(LookupRequestError::Failed(format!( + "Envelope lookup could not be created for {parent_root:?}" + ))) + } + } Action::Drop(reason) => { // Drop with noop Err(LookupRequestError::Failed(reason)) @@ -791,7 +931,7 @@ impl BlockLookups { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_parent() == Some(block_root) { + if lookup.awaiting_parent_block() == Some(block_root) { lookup.resolve_awaiting_parent(); debug!( parent_root = ?block_root, @@ -809,6 +949,33 @@ impl BlockLookups { } } + /// Makes progress on lookups that were waiting for a parent envelope to be imported. + pub fn continue_envelope_child_lookups( + &mut self, + block_root: Hash256, + cx: &mut SyncNetworkContext, + ) { + let mut lookup_results = vec![]; + + for (id, lookup) in self.single_block_lookups.iter_mut() { + if lookup.awaiting_parent_envelope() == Some(block_root) { + lookup.resolve_awaiting_parent(); + debug!( + envelope_root = ?block_root, + id, + block_root = ?lookup.block_root(), + "Continuing lookup after envelope imported" + ); + let result = lookup.continue_requests(cx); + lookup_results.push((*id, result)); + } + } + + for (id, result) in lookup_results { + self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx); + } + } + /// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need /// the parent to make progress to resolve, therefore we must drop them if the parent is /// dropped. @@ -824,10 +991,14 @@ impl BlockLookups { metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]); self.metrics.dropped_lookups += 1; + let dropped_root = dropped_lookup.block_root(); let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) + .filter(|(_, lookup)| { + lookup.awaiting_parent_block() == Some(dropped_root) + || lookup.awaiting_parent_envelope() == Some(dropped_root) + }) .map(|(id, _)| *id) .collect::>(); @@ -995,17 +1166,15 @@ impl BlockLookups { &'a self, lookup: &'a SingleBlockLookup, ) -> Result<&'a SingleBlockLookup, String> { - if let Some(awaiting_parent) = lookup.awaiting_parent() { + if let Some(parent_root) = lookup.awaiting_parent_block() { if let Some(lookup) = self .single_block_lookups .values() - .find(|l| l.block_root() == awaiting_parent) + .find(|l| l.block_root() == parent_root) { self.find_oldest_ancestor_lookup(lookup) } else { - Err(format!( - "Lookup references unknown parent {awaiting_parent:?}" - )) + Err(format!("Lookup references unknown parent {parent_root:?}")) } } else { Ok(lookup) @@ -1038,7 +1207,7 @@ impl BlockLookups { } } - if let Some(parent_root) = lookup.awaiting_parent() { + if let Some(parent_root) = lookup.awaiting_parent_block() { if let Some((&child_id, _)) = self .single_block_lookups .iter() diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 5deea1dd94e..18363e9b8dc 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -13,7 +13,7 @@ impl From<&SingleBlockLookup> for Node { fn from(value: &SingleBlockLookup) -> Self { Self { block_root: value.block_root(), - parent_root: value.awaiting_parent(), + parent_root: value.awaiting_parent_block(), } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 919526c2386..6687a1ec75e 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -16,7 +16,9 @@ use store::Hash256; use strum::IntoStaticStr; use tracing::{Span, debug_span}; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; +use types::{ + DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, +}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -56,6 +58,14 @@ pub enum LookupRequestError { }, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AwaitingParent { + /// Waiting for the parent block to be imported. + Block(Hash256), + /// The parent block is imported but its execution payload envelope is missing. + Envelope(Hash256), +} + #[derive(Educe)] #[educe(Debug(bound(T: BeaconChainTypes)))] pub struct SingleBlockLookup { @@ -69,7 +79,7 @@ pub struct SingleBlockLookup { #[educe(Debug(method(fmt_peer_set_as_len)))] peers: Arc>>, block_root: Hash256, - awaiting_parent: Option, + awaiting_parent: Option, created: Instant, pub(crate) span: Span, } @@ -79,6 +89,7 @@ pub(crate) enum ComponentRequests { WaitingForBlock, ActiveBlobRequest(BlobRequestState, usize), ActiveCustodyRequest(CustodyRequestState), + ActiveEnvelopeRequest(EnvelopeRequestState), // When printing in debug this state display the reason why it's not needed #[allow(dead_code)] NotNeeded(&'static str), @@ -89,7 +100,7 @@ impl SingleBlockLookup { requested_block_root: Hash256, peers: &[PeerId], id: Id, - awaiting_parent: Option, + awaiting_parent: Option, ) -> Self { let lookup_span = debug_span!( "lh_single_block_lookup", @@ -109,10 +120,33 @@ impl SingleBlockLookup { } } + /// Create an envelope-only lookup. The block is already imported, we just need the envelope. + pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self { + let mut lookup = Self::new(block_root, peers, id, None); + // Block is already imported, mark as completed + lookup + .block_request_state + .state + .on_completed_request("block already imported") + .expect("block state starts as AwaitingDownload"); + lookup.component_requests = + ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root)); + lookup + } + /// Reset the status of all internal requests pub fn reset_requests(&mut self) { self.block_request_state = BlockRequestState::new(self.block_root); - self.component_requests = ComponentRequests::WaitingForBlock; + match &self.component_requests { + ComponentRequests::ActiveEnvelopeRequest(_) => { + self.component_requests = ComponentRequests::ActiveEnvelopeRequest( + EnvelopeRequestState::new(self.block_root), + ); + } + _ => { + self.component_requests = ComponentRequests::WaitingForBlock; + } + } } /// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing` @@ -128,18 +162,37 @@ impl SingleBlockLookup { self.block_root } - pub fn awaiting_parent(&self) -> Option { + pub fn awaiting_parent(&self) -> Option { self.awaiting_parent } - /// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send - /// components for processing. + /// Returns the parent root if awaiting a parent block. + pub fn awaiting_parent_block(&self) -> Option { + match self.awaiting_parent { + Some(AwaitingParent::Block(root)) => Some(root), + _ => None, + } + } + + /// Returns the parent root if awaiting a parent envelope. + pub fn awaiting_parent_envelope(&self) -> Option { + match self.awaiting_parent { + Some(AwaitingParent::Envelope(root)) => Some(root), + _ => None, + } + } + + /// Mark this lookup as awaiting a parent block to be imported before processing. pub fn set_awaiting_parent(&mut self, parent_root: Hash256) { - self.awaiting_parent = Some(parent_root) + self.awaiting_parent = Some(AwaitingParent::Block(parent_root)); } - /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for - /// processing. + /// Mark this lookup as awaiting a parent envelope to be imported before processing. + pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) { + self.awaiting_parent = Some(AwaitingParent::Envelope(parent_root)); + } + + /// Mark this lookup as no longer awaiting any parent. pub fn resolve_awaiting_parent(&mut self) { self.awaiting_parent = None; } @@ -178,6 +231,7 @@ impl SingleBlockLookup { ComponentRequests::WaitingForBlock => false, ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), + ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(), ComponentRequests::NotNeeded { .. } => true, } } @@ -197,6 +251,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(request) => { request.state.is_awaiting_event() } + ComponentRequests::ActiveEnvelopeRequest(request) => { + request.state.is_awaiting_event() + } ComponentRequests::NotNeeded { .. } => false, } } @@ -266,6 +323,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(_) => { self.continue_request::>(cx, 0)? } + ComponentRequests::ActiveEnvelopeRequest(_) => { + self.continue_request::>(cx, 0)? + } ComponentRequests::NotNeeded { .. } => {} // do nothing } @@ -287,7 +347,7 @@ impl SingleBlockLookup { expected_blobs: usize, ) -> Result<(), LookupRequestError> { let id = self.id; - let awaiting_parent = self.awaiting_parent.is_some(); + let awaiting_event = self.awaiting_parent.is_some(); let request = R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; @@ -331,7 +391,7 @@ impl SingleBlockLookup { // Otherwise, attempt to progress awaiting processing // If this request is awaiting a parent lookup to be processed, do not send for processing. // The request will be rejected with unknown parent error. - } else if !awaiting_parent { + } else if !awaiting_event { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { @@ -427,6 +487,26 @@ impl BlockRequestState { } } +/// The state of the envelope request component of a `SingleBlockLookup`. +/// Used for envelope-only lookups where the parent block is already imported +/// but its execution payload envelope is missing. +#[derive(Educe)] +#[educe(Debug)] +pub struct EnvelopeRequestState { + #[educe(Debug(ignore))] + pub block_root: Hash256, + pub state: SingleLookupRequestState>>, +} + +impl EnvelopeRequestState { + pub fn new(block_root: Hash256) -> Self { + Self { + block_root, + state: SingleLookupRequestState::new(), + } + } +} + #[derive(Debug, Clone)] pub struct DownloadResult { pub value: T, diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 98cf3e0a1ff..e475b60de91 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -4,11 +4,13 @@ use beacon_chain::{ data_availability_checker::DataAvailabilityChecker, data_column_verification::CustodyDataColumn, get_block_root, + payload_envelope_verification::AvailableEnvelope, }; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + PayloadEnvelopesByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; @@ -16,7 +18,7 @@ use std::{collections::HashMap, sync::Arc}; use tracing::{Span, debug}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -35,6 +37,13 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES; pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, + /// Payload envelopes (Gloas+). None for pre-Gloas forks. + payloads_request: Option< + ByRangeRequest< + PayloadEnvelopesByRangeRequestId, + Vec>>, + >, + >, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, /// Span to track the range request and all children range requests. @@ -88,6 +97,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + payloads_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -109,6 +119,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), + payloads_request: payloads_req_id.map(ByRangeRequest::Active), block_data_request, request_span, } @@ -191,6 +202,18 @@ impl RangeBlockComponentsRequest { } } + /// Adds received payload envelopes to the request. + pub fn add_payload_envelopes( + &mut self, + req_id: PayloadEnvelopesByRangeRequestId, + envelopes: Vec>>, + ) -> Result<(), String> { + match &mut self.payloads_request { + Some(req) => req.finish(req_id, envelopes), + None => Err("received payload envelopes but none expected".to_owned()), + } + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -208,6 +231,13 @@ impl RangeBlockComponentsRequest { return None; }; + // If payloads are expected, they must also be complete before we can produce responses. + if let Some(payloads_req) = &self.payloads_request + && payloads_req.to_finished().is_none() + { + return None; + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( @@ -254,15 +284,29 @@ impl RangeBlockComponentsRequest { } } - let resp = Self::responses_with_custody_columns( - blocks.to_vec(), - data_columns, - column_to_peer_id, - expected_custody_columns, - *attempt, - da_checker, - spec, - ); + // Gloas path: if payloads are present, produce Gloas blocks + let resp = if let Some(payloads_req) = &self.payloads_request { + let payloads = payloads_req.to_finished().expect("checked above").to_vec(); + Self::responses_gloas( + blocks.to_vec(), + payloads, + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + spec, + ) + } else { + Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + da_checker, + spec, + ) + }; if let Err(CouplingError::DataColumnPeerFailure { error: _, @@ -460,6 +504,136 @@ impl RangeBlockComponentsRequest { Ok(range_sync_blocks) } + + /// Couples blocks with payload envelopes and custody columns for Gloas. + /// In Gloas, columns are associated with the envelope (not the block directly). + fn responses_gloas( + blocks: Vec>>, + payloads: Vec>>, + data_columns: DataColumnSidecarList, + column_to_peer: HashMap, + expects_custody_columns: &[ColumnIndex], + attempt: usize, + spec: Arc, + ) -> Result>, CouplingError> { + // Group data columns by block_root + let mut data_columns_by_block = + HashMap::>>>::new(); + + for column in data_columns { + let block_root = column.block_root(); + let index = *column.index(); + if data_columns_by_block + .entry(block_root) + .or_default() + .insert(index, column) + .is_some() + { + debug!(?block_root, ?index, "Repeated column for block_root"); + } + } + + let mut range_sync_blocks = Vec::with_capacity(blocks.len()); + let mut payload_iter = payloads.into_iter().peekable(); + let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; + + for block in blocks { + // Match payload envelope to block by slot + let mut envelope_for_block = None; + if payload_iter + .peek() + .map(|e| e.message.slot == block.slot()) + .unwrap_or(false) + { + envelope_for_block = payload_iter.next(); + } + + let block_root = get_block_root(&block); + + let available_envelope = if block.num_expected_blobs() > 0 { + // Block has blobs — envelope and columns are required + let envelope = envelope_for_block.ok_or_else(|| { + CouplingError::InternalError(format!( + "Missing payload envelope for block {block_root:?} with blobs" + )) + })?; + + let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) + else { + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); + return Err(CouplingError::DataColumnPeerFailure { + error: format!("No columns for block {block_root:?} with data"), + faulty_peers: responsible_peers, + exceeded_retries, + }); + }; + + let mut custody_columns = vec![]; + let mut naughty_peers = vec![]; + for index in expects_custody_columns { + if let Some(data_column) = data_columns_by_index.remove(index) { + custody_columns.push(data_column); + } else { + let Some(responsible_peer) = column_to_peer.get(index) else { + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {index}" + ))); + }; + naughty_peers.push((*index, *responsible_peer)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError::DataColumnPeerFailure { + error: format!( + "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" + ), + faulty_peers: naughty_peers, + exceeded_retries, + }); + } + + Some(Box::new(AvailableEnvelope::new( + envelope.block_hash(), + envelope, + custody_columns, + None, + spec.clone(), + ))) + } else { + envelope_for_block.map(|envelope| { + Box::new(AvailableEnvelope::new( + envelope.block_hash(), + envelope, + vec![], + None, + spec.clone(), + )) + }) + }; + + range_sync_blocks.push(RangeSyncBlock::new_gloas(block, available_envelope)); + } + + // Log any remaining unmatched payloads + if payload_iter.next().is_some() { + let remaining = payload_iter.count() + 1; + debug!( + remaining, + "Received payload envelopes that don't pair with blocks" + ); + } + + // Log remaining unmatched columns + if !data_columns_by_block.is_empty() { + let remaining_roots = data_columns_by_block.keys().collect::>(); + debug!( + ?remaining_roots, + "Not all columns consumed for Gloas blocks" + ); + } + + Ok(range_sync_blocks) + } } impl ByRangeRequest { @@ -560,7 +734,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -591,6 +765,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -650,6 +825,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -726,6 +902,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -818,6 +995,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -915,6 +1093,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1030,6 +1209,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7e618d89808..c1c1029446b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -45,6 +45,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, + EnvelopeRequestState, }; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; @@ -60,7 +61,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -74,7 +76,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -133,6 +136,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// An execution payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -142,6 +153,9 @@ pub enum SyncMessage { /// A data column with an unknown parent has been received. UnknownParentDataColumn(PeerId, Arc>), + /// A block's parent is known but its execution payload envelope has not been received yet. + UnknownParentEnvelope(PeerId, Arc>, Hash256), + /// A peer has sent an attestation that references a block that is unknown. This triggers the /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), @@ -184,6 +198,7 @@ pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, SingleCustodyColumn(Id), + SinglePayloadEnvelope { id: Id, block_root: Hash256 }, } impl BlockProcessType { @@ -191,7 +206,8 @@ impl BlockProcessType { match self { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } - | BlockProcessType::SingleCustodyColumn(id) => *id, + | BlockProcessType::SingleCustodyColumn(id) + | BlockProcessType::SinglePayloadEnvelope { id, .. } => *id, } } } @@ -505,6 +521,11 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } + SyncRequestId::PayloadEnvelopesByRange(req_id) => self + .on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)), } } @@ -839,6 +860,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -900,6 +932,27 @@ impl SyncManager { } } } + SyncMessage::UnknownParentEnvelope(peer_id, block, block_root) => { + let block_slot = block.slot(); + let parent_root = block.parent_root(); + debug!( + %block_root, + %parent_root, + "Parent envelope not yet available, creating envelope lookup" + ); + self.handle_unknown_parent_envelope( + peer_id, + block_root, + parent_root, + block_slot, + BlockComponent::Block(DownloadResult { + value: block.block_cloned(), + block_root, + seen_timestamp: timestamp_now(), + peer_group: PeerGroup::from_single(peer_id), + }), + ); + } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_root) => { if !self.notified_unknown_roots.contains(&(peer_id, block_root)) { self.notified_unknown_roots.insert((peer_id, block_root)); @@ -1005,6 +1058,40 @@ impl SyncManager { } } + /// Handle a block whose parent block is known but parent envelope is missing. + /// Creates an envelope-only lookup for the parent and a child lookup that waits for it. + fn handle_unknown_parent_envelope( + &mut self, + peer_id: PeerId, + block_root: Hash256, + parent_root: Hash256, + slot: Slot, + block_component: BlockComponent, + ) { + match self.should_search_for_block(Some(slot), &peer_id) { + Ok(_) => { + if self.block_lookups.search_child_and_parent_envelope( + block_root, + block_component, + parent_root, + peer_id, + &mut self.network, + ) { + // Lookups created + } else { + debug!( + ?block_root, + ?parent_root, + "No lookup created for child and parent envelope" + ); + } + } + Err(reason) => { + debug!(%block_root, %parent_root, reason, "Ignoring unknown parent envelope request"); + } + } + } + fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) { match self.should_search_for_block(None, &peer_id) { Ok(_) => { @@ -1200,6 +1287,71 @@ impl SyncManager { } } + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => self.on_single_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ), + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ); + } + _ => { + crit!(%peer_id, "bad request id for payload_envelope"); + } + } + } + + fn on_single_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_single_envelope_response(id, peer_id, rpc_event) + { + self.block_lookups + .on_download_response::>( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) + } + } + + fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_payload_envelopes_by_range_response(id, peer_id, envelope) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::PayloadEnvelope(id, resp), + ); + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ff630bb470a..b840b12f614 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -22,14 +22,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -37,6 +40,8 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems, + PayloadEnvelopesByRootSingleRequest, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +57,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -213,6 +218,14 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRoot requests + payload_envelopes_by_root_requests: + ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRange requests + payload_envelopes_by_range_requests: ActiveRequests< + PayloadEnvelopesByRangeRequestId, + PayloadEnvelopesByRangeRequestItems, + >, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -250,6 +263,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + PayloadEnvelope( + PayloadEnvelopesByRangeRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -298,6 +315,8 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), + payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -326,6 +345,8 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_root_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -361,12 +382,22 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let envelope_by_root_ids = payload_envelopes_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id }); + let payload_envelope_by_range_ids = payload_envelopes_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(envelope_by_root_ids) + .chain(payload_envelope_by_range_ids) .collect() } @@ -423,6 +454,8 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_root_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -445,6 +478,8 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(payload_envelopes_by_root_requests.iter_request_peers()) + .chain(payload_envelopes_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -577,24 +612,26 @@ impl SyncNetworkContext { }; // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; + let columns_by_range_peers_to_request = if matches!( + batch_type, + ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns + ) { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + column_peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { @@ -659,6 +696,28 @@ impl SyncNetworkContext { .transpose()?; let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + + // Send envelope request for Gloas epochs + let payloads_req_id = + if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { + Some(self.send_payload_envelopes_by_range_request( + block_peer, + PayloadEnvelopesByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + new_range_request_span!( + self, + "outgoing_envelopes_by_range", + range_request_span.clone(), + block_peer + ), + )?) + } else { + None + }; + let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, @@ -668,6 +727,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + payloads_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -770,6 +830,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::PayloadEnvelope(req_id, resp) => { + resp.and_then(|(envelopes, _)| { + request + .add_payload_envelopes(req_id, envelopes) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -927,6 +998,74 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(id.req_id)) } + /// Request a payload envelope for `block_root` from a peer. + pub fn envelope_lookup_request( + &mut self, + lookup_id: SingleLookupId, + lookup_peers: Arc>>, + block_root: Hash256, + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + let request = PayloadEnvelopesByRootSingleRequest(block_root); + + let network_request = RequestType::PayloadEnvelopesByRoot( + request + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: network_request, + app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRoot", + ?block_root, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + let request_span = debug_span!( + parent: Span::current(), + "lh_outgoing_envelope_by_root_request", + %block_root, + ); + self.payload_envelopes_by_root_requests.insert( + id, + peer_id, + true, + PayloadEnvelopesByRootRequestItems::new(request), + request_span, + ); + + Ok(LookupRequestResult::RequestSent(id.req_id)) + } + /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// - If we have a downloaded but not yet processed block /// - If the da_checker has a pending block @@ -1288,6 +1427,57 @@ impl SyncNetworkContext { Ok((id, requested_columns)) } + fn send_payload_envelopes_by_range_request( + &mut self, + peer_id: PeerId, + request: PayloadEnvelopesByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + request_span: Span, + ) -> Result { + let id = PayloadEnvelopesByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::PayloadEnvelopesByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRange", + slots = request.count, + epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_range_requests.insert( + id, + peer_id, + false, + PayloadEnvelopesByRangeRequestItems::new(request), + request_span, + ); + Ok(id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self + .payload_envelopes_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(resp, peer_id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -1369,6 +1559,12 @@ impl SyncNetworkContext { ); if self + .chain + .data_availability_checker + .envelopes_required_for_epoch(epoch) + { + ByRangeRequestType::BlocksAndEnvelopesAndColumns + } else if self .chain .data_availability_checker .data_columns_required_for_epoch(epoch) @@ -1435,6 +1631,27 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + pub(crate) fn on_single_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>> { + let resp = self + .payload_envelopes_by_root_requests + .on_response(id, rpc_event); + let resp = resp.map(|res| { + res.and_then(|(mut envelopes, seen_timestamp)| { + match envelopes.pop() { + Some(envelope) => Ok((envelope, seen_timestamp)), + // Should never happen, request items enforces at least 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), + } + }) + }); + self.on_rpc_response_result(resp, peer_id) + } + pub(crate) fn on_single_blob_response( &mut self, id: SingleLookupReqId, @@ -1610,6 +1827,33 @@ impl SyncNetworkContext { }) } + pub fn send_envelope_for_processing( + &self, + id: Id, + envelope: Arc>, + seen_timestamp: Duration, + block_root: Hash256, + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(?block_root, ?id, "Sending payload envelope for processing"); + beacon_processor + .send_rpc_payload_envelope( + envelope, + seen_timestamp, + BlockProcessType::SinglePayloadEnvelope { id, block_root }, + ) + .map_err(|e| { + error!( + error = ?e, + "Failed to send sync envelope to processor" + ); + SendErrorProcessor::SendError + }) + } + pub fn send_blobs_for_processing( &self, id: Id, @@ -1788,6 +2032,14 @@ impl SyncNetworkContext { "data_columns_by_range", self.data_columns_by_range_requests.len(), ), + ( + "payload_envelopes_by_root", + self.payload_envelopes_by_root_requests.len(), + ), + ( + "payload_envelopes_by_range", + self.payload_envelopes_by_range_requests.len(), + ), ("custody_by_root", self.custody_by_root_requests.len()), ( "components_by_range", diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 8f9540693e1..8c9e1b2b34e 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,10 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems; +pub use payload_envelopes_by_root::{ + PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, +}; use crate::metrics; @@ -27,6 +31,8 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_range; +mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs new file mode 100644 index 00000000000..3d4ea8248b2 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs @@ -0,0 +1,42 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +/// Accumulates results of a payload_envelopes_by_range request. Only returns items after +/// receiving the stream termination. +pub struct PayloadEnvelopesByRangeRequestItems { + request: PayloadEnvelopesByRangeRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRangeRequestItems { + pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, envelope: Self::Item) -> Result { + let slot = envelope.slot(); + if slot < self.request.start_slot || slot >= self.request.start_slot + self.request.count { + return Err(LookupVerifyError::UnrequestedSlot(slot)); + } + if self.items.iter().any(|existing| existing.slot() == slot) { + return Err(LookupVerifyError::DuplicatedData(slot, 0)); + } + + self.items.push(envelope); + + Ok(false) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs new file mode 100644 index 00000000000..7f7097971d6 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs @@ -0,0 +1,53 @@ +use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest; +use std::sync::Arc; +use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope}; + +use super::{ActiveRequestItems, LookupVerifyError}; + +#[derive(Debug, Copy, Clone)] +pub struct PayloadEnvelopesByRootSingleRequest(pub Hash256); + +impl PayloadEnvelopesByRootSingleRequest { + pub fn into_request( + self, + fork_context: &ForkContext, + ) -> Result { + PayloadEnvelopesByRootRequest::new(vec![self.0], fork_context) + } +} + +pub struct PayloadEnvelopesByRootRequestItems { + request: PayloadEnvelopesByRootSingleRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRootRequestItems { + pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRootRequestItems { + type Item = Arc>; + + /// Append a response to the single chunk request. If the chunk is valid, the request is + /// resolved immediately. + /// The active request SHOULD be dropped after `add_response` returns an error + fn add(&mut self, envelope: Self::Item) -> Result { + let beacon_block_root = envelope.beacon_block_root(); + if self.request.0 != beacon_block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(beacon_block_root)); + } + + self.items.push(envelope); + // Always returns true, payload envelopes by root expects a single response + Ok(true) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index a07cc838863..e403df483a4 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -101,6 +101,7 @@ pub enum Error { from_state_slot: Slot, target_slot: Slot, }, + FinalizedStateAlreadySet, } pub trait HandleUnavailable { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 78dd69e55a2..b6564bed1b6 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -474,6 +474,25 @@ impl, Cold: ItemStore> HotColdDB } } + /// See [`StateCache::set_initial_finalized_state`](crate::state_cache::StateCache::set_initial_finalized_state). + pub fn set_initial_finalized_state( + &self, + state_root: Hash256, + block_root: Hash256, + state: BeaconState, + ) -> Result<(), Error> { + let start_slot = self.get_anchor_info().anchor_slot; + let pre_finalized_slots_to_retain = self + .hierarchy + .closest_layer_points(state.slot(), start_slot); + self.state_cache.lock().set_initial_finalized_state( + state_root, + block_root, + state, + &pre_finalized_slots_to_retain, + ) + } + pub fn update_finalized_state( &self, state_root: Hash256, @@ -1880,14 +1899,18 @@ impl, Cold: ItemStore> HotColdDB return Ok(StatePayloadStatus::Pending); } + // If the latest block was at this slot, the state is definitively `Pending` (post-block, + // pre-payload). Check this before loading the previous summary to avoid errors when the + // previous state doesn't exist (e.g. checkpoint sync where only one state is stored). + if summary.slot == summary.latest_block_slot { + return Ok(StatePayloadStatus::Pending); + } + // Load the hot state summary for the previous state. // // If it has the same slot as this summary then we know this summary is for a `Full` state // (payload state), because they are always diffed against their same-slot `Pending` state. // - // If the previous summary has a different slot AND the latest block is from `summary.slot`, - // then this state *must* be `Pending` (it is the summary for latest block itself). - // // Otherwise, we are at a skipped slot and must traverse the graph of state summaries // backwards until we reach a summary for the latest block. This recursion could be quite // far in the case of a long skip. We could optimise this in future using the @@ -1899,8 +1922,6 @@ impl, Cold: ItemStore> HotColdDB if previous_state_summary.slot == summary.slot { Ok(StatePayloadStatus::Full) - } else if summary.slot == summary.latest_block_slot { - Ok(StatePayloadStatus::Pending) } else { self.get_hot_state_summary_payload_status(&previous_state_summary) } @@ -1951,6 +1972,66 @@ impl, Cold: ItemStore> HotColdDB } } + /// Resolve a canonical state root to the Pending (pre-payload) state root at the same slot. + /// + /// In ePBS, checkpoint states (finalized, justified) should be returned as their Pending + /// variant. This function takes a canonical state root and: + /// + /// - If the state is already Pending (or pre-Gloas), returns it unchanged. + /// - If the state is Full due to a payload applied at this slot, returns the same-slot + /// Pending state root via `previous_state_root`. + /// - If the state is at a skipped slot (inheriting Full status from a prior slot), returns + /// it unchanged — there is no distinct Pending state at a skipped slot. + pub fn resolve_pending_state_root(&self, state_root: &Hash256) -> Result { + // Fast path: split state is always Pending. + let split = self.get_split_info(); + if *state_root == split.state_root { + return Ok(split.state_root); + } + + // Try hot DB first. + if let Some(summary) = self.load_hot_state_summary(state_root)? { + // Pre-Gloas states are always Pending. + if !self + .spec + .fork_name_at_slot::(summary.slot) + .gloas_enabled() + { + return Ok(*state_root); + } + + // Genesis state is always Pending. + if summary.previous_state_root.is_zero() { + return Ok(*state_root); + } + + // Load the previous state summary. If it has the same slot, the current state is + // Full (post-payload) and the previous state is Pending (post-block). Return the + // Pending state root. + let previous_summary = self + .load_hot_state_summary(&summary.previous_state_root)? + .ok_or(Error::MissingHotStateSummary(summary.previous_state_root))?; + + if previous_summary.slot == summary.slot { + // This is a Full state at a non-skipped slot. Return the Pending state root. + return Ok(summary.previous_state_root); + } + + // Either already Pending (block at this slot) or a skipped slot — return as-is. + return Ok(*state_root); + } + + // Try cold DB. + if let Some(_slot) = self.load_cold_state_slot(state_root)? { + // Cold DB states: the non-canonical payload variant is pruned during migration. + // Return whatever is stored. In practice, finalized/justified states are almost + // always in the hot DB or at the split point. + return Ok(*state_root); + } + + Err(Error::MissingHotStateSummary(*state_root)) + } + fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result { if let Some(buffer) = self .state_cache diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index d016922adeb..288b0a7d699 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -124,6 +124,34 @@ impl StateCache { roots } + /// Used by checkpoint sync to initialize the finalized state in the state cache. + /// + /// Post-gloas the checkpoint state may not be epoch-aligned, e.g when the epoch boundary + /// slot is skipped. Regular finalization updates should use `update_finalized_state`. + pub fn set_initial_finalized_state( + &mut self, + state_root: Hash256, + block_root: Hash256, + state: BeaconState, + pre_finalized_slots_to_retain: &[Slot], + ) -> Result<(), Error> { + if self.finalized_state.is_some() { + return Err(Error::FinalizedStateAlreadySet); + } + + if !state.fork_name_unchecked().gloas_enabled() && state.slot() % E::slots_per_epoch() != 0 + { + return Err(Error::FinalizedStateUnaligned); + } + + self.update_finalized_state_inner( + state_root, + block_root, + state, + pre_finalized_slots_to_retain, + ) + } + pub fn update_finalized_state( &mut self, state_root: Hash256, @@ -135,6 +163,21 @@ impl StateCache { return Err(Error::FinalizedStateUnaligned); } + self.update_finalized_state_inner( + state_root, + block_root, + state, + pre_finalized_slots_to_retain, + ) + } + + fn update_finalized_state_inner( + &mut self, + state_root: Hash256, + block_root: Hash256, + state: BeaconState, + pre_finalized_slots_to_retain: &[Slot], + ) -> Result<(), Error> { if self .finalized_state .as_ref() diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 92fd4c1faf3..5f2c3fc8615 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -396,8 +396,11 @@ where current_slot: Option, spec: &ChainSpec, ) -> Result> { - // Sanity check: the anchor must lie on an epoch boundary. - if anchor_state.slot() % E::slots_per_epoch() != 0 { + // Pre-gloas sanity check: the anchor must lie on an epoch boundary. + // Post-gloas we relax this requirement + if !anchor_state.fork_name_unchecked().gloas_enabled() + && anchor_state.slot() % E::slots_per_epoch() != 0 + { return Err(Error::InvalidAnchor { block_slot: anchor_block.slot(), state_slot: anchor_state.slot(), diff --git a/consensus/state_processing/src/block_replayer.rs b/consensus/state_processing/src/block_replayer.rs index f5f06d1cb9d..f6eec77e26a 100644 --- a/consensus/state_processing/src/block_replayer.rs +++ b/consensus/state_processing/src/block_replayer.rs @@ -9,6 +9,7 @@ use crate::{ per_slot_processing, }; use itertools::Itertools; +use std::collections::HashMap; use std::iter::Peekable; use std::marker::PhantomData; use types::{ @@ -288,17 +289,11 @@ where payload_envelopes: Vec>, target_slot: Option, ) -> Result { - let mut envelopes_iter = payload_envelopes.into_iter(); - - let mut next_envelope_at_slot = |slot| { - if let Some(envelope) = envelopes_iter.next() - && envelope.message.slot == slot - { - Ok(envelope) - } else { - Err(BlockReplayError::MissingPayloadEnvelope { slot }) - } - }; + let mut envelopes_by_slot: HashMap> = + payload_envelopes + .into_iter() + .map(|e| (e.message.slot, e)) + .collect(); for (i, block) in blocks.iter().enumerate() { // Allow one additional block at the start which is only used for its state root. @@ -313,24 +308,41 @@ where // indicates that the parent is full (and it hasn't already been applied). state_root = if block.fork_name_unchecked().gloas_enabled() && self.state.slot() == self.state.latest_block_header().slot - && self.state.payload_status() == StatePayloadStatus::Pending { - let latest_bid_block_hash = self - .state - .latest_execution_payload_bid() - .map_err(BlockReplayError::from)? - .block_hash; - - // Similar to `is_parent_block_full`, but reading the block hash from the - // not-yet-applied `block`. The slot 0 case covers genesis (no block replay reqd). - if self.state.slot() != 0 && block.is_parent_block_full(latest_bid_block_hash) { - let envelope = next_envelope_at_slot(self.state.slot())?; - // State root for the next slot processing is now the envelope's state root. - self.apply_payload_envelope(&envelope, state_root)? + if self.state.payload_status() == StatePayloadStatus::Pending { + let latest_bid_block_hash = self + .state + .latest_execution_payload_bid() + .map_err(BlockReplayError::from)? + .block_hash; + + // Similar to `is_parent_block_full`, but reading the block hash from the + // not-yet-applied `block`. The slot 0 case covers genesis (no block replay + // reqd). + if self.state.slot() != 0 + && block.is_parent_block_full(latest_bid_block_hash) + { + let envelope = envelopes_by_slot.remove(&self.state.slot()).ok_or( + BlockReplayError::MissingPayloadEnvelope { + slot: self.state.slot(), + }, + )?; + // State root for the next slot processing is now the envelope's + // state root. + self.apply_payload_envelope(&envelope, state_root)? + } else { + // Empty payload at this slot, the state root is unchanged from + // when the beacon block was applied. + state_root + } } else { - // Empty payload at this slot, the state root is unchanged from when the - // beacon block was applied. - state_root + // Full: the envelope was already applied. Use its state_root so + // per_slot_processing stores the correct post-envelope root + // (not the pre-envelope block state root). + envelopes_by_slot + .get(&self.state.slot()) + .map(|e| e.message.state_root) + .unwrap_or(state_root) } } else { // Pre-Gloas or at skipped slots post-Gloas, the state root of the parent state @@ -384,7 +396,11 @@ where let mut opt_state_root = if let StatePayloadStatus::Full = self.desired_state_payload_status && let Some(last_block) = blocks.last() { - let envelope = next_envelope_at_slot(self.state.slot())?; + let envelope = envelopes_by_slot.remove(&self.state.slot()).ok_or( + BlockReplayError::MissingPayloadEnvelope { + slot: self.state.slot(), + }, + )?; Some(self.apply_payload_envelope(&envelope, last_block.state_root())?) } else { None