diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f618cf63217..3f2e8687e84 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3041,7 +3041,29 @@ impl BeaconChain { // In the case of (2), skipping the block is valid since we should never import it. // However, we will potentially get a `ParentUnknown` on a later block. The sync // protocol will need to ensure this is handled gracefully. - Err(BlockError::WouldRevertFinalizedSlot { .. }) => continue, + Err(BlockError::WouldRevertFinalizedSlot { .. }) => { + // For Gloas blocks, persist the envelope even though we're skipping + // the block. This is needed after checkpoint sync: the checkpoint + // block's envelope must be in the store so that `load_parent` can + // verify it when importing the first post-checkpoint block. + if let RangeSyncBlock::Gloas { + envelope: Some(ref available_envelope), + .. + } = block + { + let (signed_envelope, _columns) = available_envelope.clone().deconstruct(); + if let Err(e) = self + .store + .put_payload_envelope(&block_root, &signed_envelope) + { + return Err(Box::new(ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::BeaconChainError(Box::new(e.into())), + })); + } + } + continue; + } // The block has a known parent that does not descend from the finalized block. // There is no need to process this block or any children. Err(BlockError::NotFinalizedDescendant { block_parent_root }) => { @@ -3150,11 +3172,41 @@ impl BeaconChain { }; // Import the blocks into the chain. - for signature_verified_block in signature_verified_blocks { + for (signature_verified_block, envelope) in signature_verified_blocks { + let block_root = signature_verified_block.block_root(); let block_slot = signature_verified_block.slot(); + + // For Gloas blocks, persist the envelope and notify fork choice + // before importing the block. The next block's `load_parent` will + // check for this envelope in the store. + if let Some(available_envelope) = envelope { + let (signed_envelope, _columns) = available_envelope.deconstruct(); + if let Err(e) = self + .store + .put_payload_envelope(&block_root, &signed_envelope) + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::BeaconChainError(Box::new(e.into())), + }; + } + if let Err(e) = self + .canonical_head + .fork_choice_write_lock() + .on_valid_payload_envelope_received(block_root) + { + return ChainSegmentResult::Failed { + imported_blocks, + error: BlockError::BeaconChainError(Box::new( + BeaconChainError::ForkChoiceError(e), + )), + }; + } + } + match self .process_block( - signature_verified_block.block_root(), + block_root, signature_verified_block, notify_execution_layer, BlockImportSource::RangeSync, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9a431472339..9b6e71ccbf8 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,6 +60,7 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; +use crate::payload_envelope_verification::{AvailableEnvelope, EnvelopeError}; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -320,6 +321,20 @@ pub enum BlockError { bid_parent_root: Hash256, block_parent_root: Hash256, }, + /// The child block is known but its parent execution payload envelope has not been received yet. + /// + /// ## Peer scoring + /// + /// It's unclear if this block is valid, but it cannot be fully verified without the parent's + /// execution payload envelope. + ParentEnvelopeUnknown { parent_root: Hash256 }, + /// An error occurred while processing the execution payload envelope during range sync. + EnvelopeError(Box), + + PayloadEnvelopeError { + e: Box, + penalize_peer: bool, + }, } /// Which specific signature(s) are invalid in a SignedBeaconBlock @@ -486,6 +501,36 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: EnvelopeError) -> Self { + let penalize_peer = match &e { + // REJECT per spec: peer sent invalid envelope data + EnvelopeError::BadSignature + | EnvelopeError::BuilderIndexMismatch { .. } + | EnvelopeError::BlockHashMismatch { .. } + | EnvelopeError::SlotMismatch { .. } + | EnvelopeError::IncorrectBlockProposer { .. } => true, + // IGNORE per spec: not the peer's fault + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } + | EnvelopeError::UnknownValidator { .. } => false, + // Internal errors: not the peer's fault + EnvelopeError::BeaconChainError(_) + | EnvelopeError::BeaconStateError(_) + | EnvelopeError::BlockProcessingError(_) + | EnvelopeError::EnvelopeProcessingError(_) + | EnvelopeError::ExecutionPayloadError(_) + | EnvelopeError::BlockError(_) + | EnvelopeError::InternalError(_) + | EnvelopeError::OptimisticSyncNotSupported { .. } => false, + }; + BlockError::PayloadEnvelopeError { + e: Box::new(e), + penalize_peer, + } + } +} + /// Stores information about verifying a payload against an execution engine. #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct PayloadVerificationOutcome { @@ -583,10 +628,17 @@ pub(crate) fn process_block_slash_info( mut chain_segment: Vec<(Hash256, RangeSyncBlock)>, chain: &BeaconChain, -) -> Result>, BlockError> { +) -> Result< + Vec<( + SignatureVerifiedBlock, + Option>>, + )>, + BlockError, +> { if chain_segment.is_empty() { return Ok(vec![]); } @@ -615,14 +667,29 @@ pub fn signature_verify_chain_segment( let consensus_context = ConsensusContext::new(block.slot()).set_current_block_root(block_root); - let available_block = block.into_available_block(); + let (available_block, envelope) = match block { + RangeSyncBlock::Base(ab) => (ab, None), + RangeSyncBlock::Gloas { block, envelope } => { + let ab = AvailableBlock::new( + block, + AvailableBlockData::NoData, + &chain.data_availability_checker, + chain.spec.clone(), + ) + .map_err(BlockError::AvailabilityCheck)?; + (ab, envelope) + } + }; available_blocks.push(available_block.clone()); - signature_verified_blocks.push(SignatureVerifiedBlock { - block: MaybeAvailableBlock::Available(available_block), - block_root, - parent: None, - consensus_context, - }); + signature_verified_blocks.push(( + SignatureVerifiedBlock { + block: MaybeAvailableBlock::Available(available_block), + block_root, + parent: None, + consensus_context, + }, + envelope, + )); } chain @@ -632,7 +699,7 @@ pub fn signature_verify_chain_segment( // verify signatures let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - for svb in &mut signature_verified_blocks { + for (svb, _) in &mut signature_verified_blocks { signature_verifier .include_all_signatures(svb.block.as_block(), &mut svb.consensus_context)?; } @@ -643,7 +710,7 @@ pub fn signature_verify_chain_segment( drop(pubkey_cache); - if let Some(signature_verified_block) = signature_verified_blocks.first_mut() { + if let Some((signature_verified_block, _)) = signature_verified_blocks.first_mut() { signature_verified_block.parent = Some(parent); } @@ -1191,7 +1258,7 @@ impl SignatureVerifiedBlock { let result = info_span!("signature_verify").in_scope(|| signature_verifier.verify()); match result { Ok(_) => { - // gloas blocks are always available. + // Gloas blocks are always available — data arrives via the envelope. let maybe_available = if chain .spec .fork_name_at_slot::(block.slot()) @@ -1946,6 +2013,22 @@ fn load_parent>( BlockError::from(BeaconChainError::MissingBeaconBlock(block.parent_root())) })?; + // For post-Gloas parent blocks, the execution payload arrives via the envelope. + // If the parent's execution payload envelope hasn't arrived yet, + // return an unknown parent error so the block gets sent to the + // reprocess queue. + if parent_block.slot() != 0 + && chain + .spec + .fork_name_at_slot::(parent_block.slot()) + .gloas_enabled() + { + let _envelope = chain + .store + .get_payload_envelope(&root)? + .ok_or(BlockError::ParentEnvelopeUnknown { parent_root: root })?; + } + // Load the parent block's state from the database, returning an error if it is not found. // It is an error because if we know the parent block we should also know the parent state. // Retrieve any state that is advanced through to at most `block.slot()`: this is diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index be73ef15d73..f97e695b297 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,10 +2,11 @@ use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityC pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; +use crate::payload_envelope_verification::AvailableEnvelope; use crate::{BeaconChainTypes, PayloadVerificationOutcome}; -use educe::Educe; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ @@ -45,38 +46,61 @@ impl LookupBlock { /// This includes any and all blobs/columns required, including zero if /// none are required. This can happen if the block is pre-deneb or if /// it's simply past the DA boundary. -#[derive(Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct RangeSyncBlock { - block: AvailableBlock, +#[derive(Clone)] +pub enum RangeSyncBlock { + Base(AvailableBlock), + Gloas { + block: Arc>, + envelope: Option>>, + }, +} + +impl Hash for RangeSyncBlock { + fn hash(&self, state: &mut H) { + self.block_root().hash(state); + } } impl Debug for RangeSyncBlock { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RpcBlock({:?})", self.block_root()) + write!(f, "RangeSyncBlock({:?})", self.block_root()) } } impl RangeSyncBlock { pub fn block_root(&self) -> Hash256 { - self.block.block_root() + match self { + RangeSyncBlock::Base(block) => block.block_root(), + RangeSyncBlock::Gloas { block, .. } => block.canonical_root(), + } } pub fn as_block(&self) -> &SignedBeaconBlock { - self.block.block() + match self { + RangeSyncBlock::Base(block) => block.block(), + RangeSyncBlock::Gloas { block, .. } => block, + } } pub fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + match self { + RangeSyncBlock::Base(block) => block.block_cloned(), + RangeSyncBlock::Gloas { block, .. } => block.clone(), + } } pub fn block_data(&self) -> &AvailableBlockData { - self.block.data() + match self { + RangeSyncBlock::Base(block) => block.data(), + RangeSyncBlock::Gloas { .. } => { + unreachable!("block_data called on Gloas variant — use envelope data instead") + } + } } } impl RangeSyncBlock { - /// Constructs an `RangeSyncBlock` from a block and availability data. + /// Constructs a `RangeSyncBlock` from a block and availability data. /// /// # Errors /// @@ -94,32 +118,53 @@ impl RangeSyncBlock { T: BeaconChainTypes, { let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; - Ok(Self { - block: available_block, - }) + Ok(Self::Base(available_block)) + } + + pub fn new_gloas( + block: Arc>, + envelope: Option>>, + ) -> Self { + Self::Gloas { block, envelope } } #[allow(clippy::type_complexity)] pub fn deconstruct(self) -> (Hash256, Arc>, AvailableBlockData) { - self.block.deconstruct() + match self { + RangeSyncBlock::Base(block) => block.deconstruct(), + RangeSyncBlock::Gloas { .. } => { + unreachable!("deconstruct called on Gloas variant") + } + } } pub fn n_blobs(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, - AvailableBlockData::Blobs(blobs) => blobs.len(), + match self { + RangeSyncBlock::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + }, + RangeSyncBlock::Gloas { .. } => 0, } } pub fn n_data_columns(&self) -> usize { - match self.block_data() { - AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, - AvailableBlockData::DataColumns(columns) => columns.len(), + match self { + RangeSyncBlock::Base(block) => match block.data() { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + }, + RangeSyncBlock::Gloas { .. } => 0, } } pub fn into_available_block(self) -> AvailableBlock { - self.block + match self { + RangeSyncBlock::Base(block) => block, + RangeSyncBlock::Gloas { .. } => { + unreachable!("into_available_block called on Gloas variant") + } + } } } @@ -387,31 +432,31 @@ impl AsBlock for AvailableBlock { impl AsBlock for RangeSyncBlock { fn slot(&self) -> Slot { - self.as_block().slot() + RangeSyncBlock::as_block(self).slot() } fn epoch(&self) -> Epoch { - self.as_block().epoch() + RangeSyncBlock::as_block(self).epoch() } fn parent_root(&self) -> Hash256 { - self.as_block().parent_root() + RangeSyncBlock::as_block(self).parent_root() } fn state_root(&self) -> Hash256 { - self.as_block().state_root() + RangeSyncBlock::as_block(self).state_root() } fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.as_block().signed_block_header() + RangeSyncBlock::as_block(self).signed_block_header() } fn message(&self) -> BeaconBlockRef<'_, E> { - self.as_block().message() + RangeSyncBlock::as_block(self).message() } fn as_block(&self) -> &SignedBeaconBlock { - self.block.as_block() + RangeSyncBlock::as_block(self) } fn block_cloned(&self) -> Arc> { - self.block.block_cloned() + RangeSyncBlock::block_cloned(self) } fn canonical_root(&self) -> Hash256 { - self.block.block_root() + self.block_root() } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9d8b76aaed3..e296f1b78de 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -21,7 +21,7 @@ use tracing::{debug, error, instrument}; use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn}; use types::{ BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError, + DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, PartialDataColumnSidecarError, PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, new_non_zero_usize, }; @@ -540,6 +540,11 @@ impl DataAvailabilityChecker { self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) } + /// Determines if execution payload envelopes are required for an epoch (Gloas and later). + pub fn envelopes_required_for_epoch(&self, epoch: Epoch) -> bool { + self.spec.fork_name_at_epoch(epoch) >= ForkName::Gloas + } + /// See `Self::blobs_required_for_epoch` fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index b153a3cd6a7..c0e5ff4a7c1 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -48,7 +48,7 @@ pub struct EnvelopeImportData { _phantom: PhantomData, } -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(dead_code)] pub struct AvailableEnvelope { execution_block_hash: ExecutionBlockHash, diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ea87e9bc718..d2f70108547 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -417,6 +417,9 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcPayloadEnvelope { + process_fn: AsyncFn, + }, RpcCustodyColumn(AsyncFn), ColumnReconstruction(AsyncFn), IgnoredRpcBlock { @@ -483,6 +486,7 @@ pub enum WorkType { GossipLightClientOptimisticUpdate, RpcBlock, RpcBlobs, + RpcPayloadEnvelope, RpcCustodyColumn, ColumnReconstruction, IgnoredRpcBlock, @@ -545,6 +549,7 @@ impl Work { Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences, Work::RpcBlock { .. } => WorkType::RpcBlock, Work::RpcBlobs { .. } => WorkType::RpcBlobs, + Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope, Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, @@ -1183,7 +1188,9 @@ impl BeaconProcessor { Work::GossipLightClientOptimisticUpdate { .. } => work_queues .lc_gossip_optimistic_update_queue .push(work, work_id), - Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { + Work::RpcBlock { .. } + | Work::IgnoredRpcBlock { .. } + | Work::RpcPayloadEnvelope { .. } => { work_queues.rpc_block_queue.push(work, work_id) } Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), @@ -1318,7 +1325,9 @@ impl BeaconProcessor { WorkType::GossipLightClientOptimisticUpdate => { work_queues.lc_gossip_optimistic_update_queue.len() } - WorkType::RpcBlock => work_queues.rpc_block_queue.len(), + WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => { + work_queues.rpc_block_queue.len() + } WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { work_queues.rpc_blob_queue.len() } @@ -1513,6 +1522,7 @@ impl BeaconProcessor { beacon_block_root: _, } | Work::RpcBlobs { process_fn } + | Work::RpcPayloadEnvelope { process_fn } | Work::RpcCustodyColumn(process_fn) | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a4438579..0b1d84b7066 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -31,6 +31,10 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Request searching for an execution payload envelope given a block root. + SinglePayloadEnvelope { id: SingleLookupReqId }, + /// Payload envelopes by range request + PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -76,6 +80,14 @@ pub enum DataColumnsByRangeRequester { CustodyBackfillSync(CustodyBackFillBatchRequestId), } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct PayloadEnvelopesByRangeRequestId { + /// Id to identify this attempt at a payload_envelopes_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + /// Block components by range request for range sync. Includes an ID for downstream consumers to /// handle retries and tie all their sub requests together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -252,6 +264,12 @@ macro_rules! impl_display { impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!( + PayloadEnvelopesByRangeRequestId, + "{}/{}", + id, + parent_request_id +); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 0135d7f5dd4..411efcf5f2e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1831,6 +1831,21 @@ impl NetworkBeaconProcessor { error!(error = %e, "Internal block gossip validation error"); return None; } + Err(BlockError::ParentEnvelopeUnknown { .. }) => { + // Gossip validation does not check envelope availability; this should not occur. + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + return None; + } + Err(e @ BlockError::EnvelopeError(_)) => { + debug!(error = %e, "Gossip block envelope error"); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + return None; + } + Err(e @ BlockError::PayloadEnvelopeError { .. }) => { + debug!(error = %e, "Gossip block payload envelope error"); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + return None; + } }; metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_VERIFIED_TOTAL); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8f89b669485..968120b4bfc 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -864,6 +864,16 @@ impl NetworkBeaconProcessor { peer_action: Some(PeerAction::LowToleranceError), }) } + BlockError::ParentEnvelopeUnknown { parent_root } => { + Err(ChainSegmentFailed { + message: format!( + "Block's parent envelope has not been received: {}", + parent_root + ), + // Don't penalize the peer, the envelope may arrive later. + peer_action: None, + }) + } BlockError::DuplicateFullyImported(_) | BlockError::DuplicateImportStatusUnknown(..) => { // This can happen for many reasons. Head sync's can download multiples and parent diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 443fa51cc67..53267787941 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -26,6 +26,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; /// Handles messages from the network and routes them to the appropriate service to be handled. @@ -341,10 +342,19 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(payload_envelope) => { + self.on_payload_envelopes_by_root_response( + peer_id, + app_request_id, + payload_envelope, + ); + } + Response::PayloadEnvelopesByRange(payload_envelope) => { + self.on_payload_envelopes_by_range_response( + peer_id, + app_request_id, + payload_envelope, + ); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -809,6 +819,62 @@ impl Router { } } + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + payload_envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SinglePayloadEnvelope { .. } => id, + other => { + crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }, + AppRequestId::Router => { + crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync"); + return; + } + AppRequestId::Internal => unreachable!("Handled internally"), + }; + + trace!( + %peer_id, + "Received PayloadEnvelopesByRoot Response" + ); + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + + pub fn on_payload_envelopes_by_range_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + payload_envelope: Option>>, + ) { + trace!( + %peer_id, + "Received PayloadEnvelopesByRange Response" + ); + + if let AppRequestId::Sync(sync_request_id) = app_request_id { + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + peer_id, + sync_request_id, + payload_envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } else { + crit!("All payload envelopes by range responses should belong to sync"); + } + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 10af1bf5038..e0704e25697 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -33,6 +33,7 @@ pub type BatchId = Epoch; #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { BlocksAndColumns, + BlocksAndEnvelopesAndColumns, BlocksAndBlobs, Blocks, Columns(HashSet), diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 98cf3e0a1ff..4eab7f9e676 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -4,11 +4,13 @@ use beacon_chain::{ data_availability_checker::DataAvailabilityChecker, data_column_verification::CustodyDataColumn, get_block_root, + payload_envelope_verification::AvailableEnvelope, }; use lighthouse_network::{ PeerId, service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + PayloadEnvelopesByRangeRequestId, }, }; use ssz_types::RuntimeVariableList; @@ -16,7 +18,7 @@ use std::{collections::HashMap, sync::Arc}; use tracing::{Span, debug}; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, }; use crate::sync::network_context::MAX_COLUMN_RETRIES; @@ -35,6 +37,13 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES; pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, + /// Payload envelopes (Gloas+). None for pre-Gloas forks. + payloads_request: Option< + ByRangeRequest< + PayloadEnvelopesByRangeRequestId, + Vec>>, + >, + >, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, /// Span to track the range request and all children range requests. @@ -88,6 +97,7 @@ impl RangeBlockComponentsRequest { Vec<(DataColumnsByRangeRequestId, Vec)>, Vec, )>, + payloads_req_id: Option, request_span: Span, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { @@ -109,6 +119,7 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), + payloads_request: payloads_req_id.map(ByRangeRequest::Active), block_data_request, request_span, } @@ -191,6 +202,18 @@ impl RangeBlockComponentsRequest { } } + /// Adds received payload envelopes to the request. + pub fn add_payload_envelopes( + &mut self, + req_id: PayloadEnvelopesByRangeRequestId, + envelopes: Vec>>, + ) -> Result<(), String> { + match &mut self.payloads_request { + Some(req) => req.finish(req_id, envelopes), + None => Err("received payload envelopes but none expected".to_owned()), + } + } + /// Attempts to construct RPC blocks from all received components. /// /// Returns `None` if not all expected requests have completed. @@ -208,6 +231,13 @@ impl RangeBlockComponentsRequest { return None; }; + // If payloads are expected, they must also be complete before we can produce responses. + if let Some(payloads_req) = &self.payloads_request + && payloads_req.to_finished().is_none() + { + return None; + } + // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( @@ -254,15 +284,29 @@ impl RangeBlockComponentsRequest { } } - let resp = Self::responses_with_custody_columns( - blocks.to_vec(), - data_columns, - column_to_peer_id, - expected_custody_columns, - *attempt, - da_checker, - spec, - ); + // Gloas path: if payloads are present, produce Gloas blocks + let resp = if let Some(payloads_req) = &self.payloads_request { + let payloads = payloads_req.to_finished().expect("checked above").to_vec(); + Self::responses_with_envelopes_and_columns( + blocks.to_vec(), + payloads, + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + spec, + ) + } else { + Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + column_to_peer_id, + expected_custody_columns, + *attempt, + da_checker, + spec, + ) + }; if let Err(CouplingError::DataColumnPeerFailure { error: _, @@ -364,101 +408,207 @@ impl RangeBlockComponentsRequest { where T: BeaconChainTypes, { - // Group data columns by block_root and index - let mut data_columns_by_block = - HashMap::>>>::new(); + let mut columns_by_root = Self::group_columns_by_root(data_columns); + let mut range_sync_blocks = Vec::with_capacity(blocks.len()); + let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; + + for block in blocks { + let block_root = get_block_root(&block); + range_sync_blocks.push(if block.num_expected_blobs() > 0 { + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + let columns = Self::extract_custody_columns_for_root( + block_root, + &mut columns_by_root, + expects_custody_columns, + &column_to_peer, + exceeded_retries, + )?; + let custody_columns = columns + .into_iter() + .map(CustodyDataColumn::from_asserted_custody) + .collect::>(); + let block_data = AvailableBlockData::new_with_data_columns( + custody_columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); + RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + } else { + RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + }); + } + + if !columns_by_root.is_empty() { + let remaining_roots = columns_by_root.keys().collect::>(); + debug!(?remaining_roots, "Not all columns consumed for block"); + } + + Ok(range_sync_blocks) + } + + /// Couples blocks with payload envelopes and custody columns for Gloas range sync. + fn responses_with_envelopes_and_columns( + blocks: Vec>>, + payloads: Vec>>, + data_columns: DataColumnSidecarList, + column_to_peer: HashMap, + expects_custody_columns: &[ColumnIndex], + attempt: usize, + spec: Arc, + ) -> Result>, CouplingError> { + let mut columns_by_root = Self::group_columns_by_root(data_columns); + let mut range_sync_blocks = Vec::with_capacity(blocks.len()); + let mut payload_iter = payloads.into_iter().peekable(); + let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; + + for block in blocks { + let mut envelope_for_block = None; + if payload_iter + .peek() + .map(|e| e.message.slot() == block.slot()) + .unwrap_or(false) + { + envelope_for_block = payload_iter.next(); + } + + let block_root = get_block_root(&block); + let available_envelope = if block.num_expected_blobs() > 0 { + let envelope = envelope_for_block.ok_or_else(|| { + CouplingError::InternalError(format!( + "Missing payload envelope for block {block_root:?} with blobs" + )) + })?; + let custody_columns = Self::extract_custody_columns_for_root( + block_root, + &mut columns_by_root, + expects_custody_columns, + &column_to_peer, + exceeded_retries, + )?; + Some(Box::new(AvailableEnvelope::new( + envelope.block_hash(), + envelope, + custody_columns, + None, + spec.clone(), + ))) + } else { + envelope_for_block.map(|envelope| { + Box::new(AvailableEnvelope::new( + envelope.block_hash(), + envelope, + vec![], + None, + spec.clone(), + )) + }) + }; + + range_sync_blocks.push(RangeSyncBlock::new_gloas(block, available_envelope)); + } + + if payload_iter.next().is_some() { + let remaining = payload_iter.count() + 1; + debug!( + remaining, + "Received payload envelopes that don't pair with blocks" + ); + } + + if !columns_by_root.is_empty() { + let remaining_roots = columns_by_root.keys().collect::>(); + debug!( + ?remaining_roots, + "Not all columns consumed for Gloas blocks" + ); + } + + Ok(range_sync_blocks) + } + + /// Groups data columns by their block root, logging and skipping duplicates. + fn group_columns_by_root( + data_columns: DataColumnSidecarList, + ) -> HashMap>>> { + let mut by_root = + HashMap::>>>::new(); for column in data_columns { let block_root = column.block_root(); let index = *column.index(); - if data_columns_by_block + if by_root .entry(block_root) .or_default() .insert(index, column) .is_some() { - // `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers - // we request the data from. - // If there are duplicated indices, its likely a peer sending us the same index multiple times. - // However we can still proceed even if there are extra columns, just log an error. + // `DataColumnsByRangeRequestItems` ensures no duplicated indices across peers. + // Duplicates are likely a peer sending the same index multiple times; log and skip. debug!(?block_root, ?index, "Repeated column for block_root"); - continue; } } + by_root + } - // Now iterate all blocks ensuring that the block roots of each block and data column match, - // plus we have columns for our custody requirements - let mut range_sync_blocks = Vec::with_capacity(blocks.len()); - - let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; - for block in blocks { - let block_root = get_block_root(&block); - range_sync_blocks.push(if block.num_expected_blobs() > 0 { - let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) - else { - let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); - return Err(CouplingError::DataColumnPeerFailure { - error: format!("No columns for block {block_root:?} with data"), - faulty_peers: responsible_peers, - exceeded_retries, - - }); - }; - - let mut custody_columns = vec![]; - let mut naughty_peers = vec![]; - for index in expects_custody_columns { - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. - if let Some(data_column) = data_columns_by_index.remove(index) { - custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); - } else { - let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); - }; - naughty_peers.push((*index, *responsible_peer)); - } - } - if !naughty_peers.is_empty() { - return Err(CouplingError::DataColumnPeerFailure { - error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), - faulty_peers: naughty_peers, - exceeded_retries - }); - } - - // Assert that there are no columns left - if !data_columns_by_index.is_empty() { - let remaining_indices = data_columns_by_index.keys().collect::>(); - // log the error but don't return an error, we can still progress with extra columns. - debug!( - ?block_root, - ?remaining_indices, - "Not all columns consumed for block" - ); - } - - let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); + /// Extracts and validates custody columns for a single block root. + /// + /// Removes the matching entry from `columns_by_root`, checks all expected indices are + /// present, and logs any extras. Returns the raw columns; callers wrap them as needed. + fn extract_custody_columns_for_root( + block_root: Hash256, + columns_by_root: &mut HashMap>>>, + expects_custody_columns: &[ColumnIndex], + column_to_peer: &HashMap, + exceeded_retries: bool, + ) -> Result>>, CouplingError> { + let Some(mut by_index) = columns_by_root.remove(&block_root) else { + let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); + return Err(CouplingError::DataColumnPeerFailure { + error: format!("No columns for block {block_root:?} with data"), + faulty_peers: responsible_peers, + exceeded_retries, + }); + }; - RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) - .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + let mut columns = vec![]; + let mut naughty_peers = vec![]; + for index in expects_custody_columns { + if let Some(col) = by_index.remove(index) { + columns.push(col); } else { - // Block has no data, expects zero columns - RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) - .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + let Some(responsible_peer) = column_to_peer.get(index) else { + return Err(CouplingError::InternalError(format!( + "Internal error, no request made for column {index}" + ))); + }; + naughty_peers.push((*index, *responsible_peer)); + } + } + if !naughty_peers.is_empty() { + return Err(CouplingError::DataColumnPeerFailure { + error: format!( + "Peers did not return column for block_root {block_root:?} {naughty_peers:?}" + ), + faulty_peers: naughty_peers, + exceeded_retries, }); } - // Assert that there are no columns left for other blocks - if !data_columns_by_block.is_empty() { - let remaining_roots = data_columns_by_block.keys().collect::>(); - // log the error but don't return an error, we can still progress with responses. - // this is most likely an internal error with overrequesting or a client bug. - debug!(?remaining_roots, "Not all columns consumed for block"); + if !by_index.is_empty() { + let remaining_indices = by_index.keys().collect::>(); + debug!( + ?block_root, + ?remaining_indices, + "Not all columns consumed for block" + ); } - Ok(range_sync_blocks) + Ok(columns) } } @@ -494,6 +644,8 @@ mod tests { NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_da_checker, test_spec, }; + use bls::Signature; + use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId; use lighthouse_network::{ PeerId, service::api_types::{ @@ -504,7 +656,11 @@ mod tests { use rand::SeedableRng; use std::{collections::HashMap, sync::Arc}; use tracing::Span; - use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng}; + use types::{ + Epoch, EthSpec, ExecutionBlockHash, ExecutionPayloadEnvelope, ExecutionPayloadGloas, + ExecutionRequests, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, test_utils::XorShiftRng, + }; fn components_id() -> ComponentsByRangeRequestId { ComponentsByRangeRequestId { @@ -560,7 +716,7 @@ mod tests { let blocks_req_id = blocks_id(components_id()); let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + RangeBlockComponentsRequest::::new(blocks_req_id, None, None, None, Span::none()); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -591,6 +747,7 @@ mod tests { blocks_req_id, Some(blobs_req_id), None, + None, Span::none(), ); @@ -650,6 +807,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expects_custody_columns.clone())), + None, Span::none(), ); // Send blocks and complete terminate response @@ -726,6 +884,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -818,6 +977,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -915,6 +1075,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1030,6 +1191,7 @@ mod tests { blocks_req_id, None, Some((columns_req_id.clone(), expected_sampling_columns.clone())), + None, Span::none(), ); @@ -1101,4 +1263,171 @@ mod tests { panic!("Expected PeerFailure error with exceeded_retries=true"); } } + + // --- Gloas tests --- + + fn make_gloas_envelope( + slot: Slot, + rng: &mut impl rand::Rng, + ) -> Arc> { + let envelope = ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas { + slot_number: slot, + block_hash: ExecutionBlockHash::from_root(Hash256::from(rng.random::<[u8; 32]>())), + ..ExecutionPayloadGloas::default() + }, + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: Hash256::from(rng.random::<[u8; 32]>()), + parent_beacon_block_root: Hash256::repeat_byte(0), + }; + Arc::new(SignedExecutionPayloadEnvelope { + message: envelope, + signature: Signature::empty(), + }) + } + + fn envelope_id( + parent_request_id: ComponentsByRangeRequestId, + ) -> PayloadEnvelopesByRangeRequestId { + use lighthouse_network::service::api_types::PayloadEnvelopesByRangeRequestId; + PayloadEnvelopesByRangeRequestId { + id: 99, + parent_request_id, + } + } + + #[test] + fn gloas_blocks_couple_with_envelopes() { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([42; 16]); + + let blocks = (0..4) + .map(|_| { + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + Arc::new(raw_block) as Arc> + }) + .collect::>(); + + // Build envelopes with slots matching each block + let envelopes: Vec>> = blocks + .iter() + .map(|b| make_gloas_envelope::(b.slot(), &mut rng)) + .collect(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, blocks).unwrap(); + // Not finished — envelopes still pending + assert!(!is_finished(&mut info)); + + info.add_payload_envelopes(env_req_id, envelopes).unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), 4); + } + + #[test] + fn gloas_blocks_without_envelopes_succeed() { + // Blocks with no blobs don't require envelopes — they should couple fine with an empty envelope response. + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([42; 16]); + + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + let block: Arc> = Arc::new(raw_block); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, vec![block]).unwrap(); + // No envelope for this block (peer didn't send one) + info.add_payload_envelopes(env_req_id, vec![]).unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok(), "expected Ok, got: {:?}", result); + assert_eq!(result.unwrap().len(), 1); + } + + #[test] + fn gloas_extra_envelopes_are_ignored() { + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let mut rng = XorShiftRng::from_seed([99; 16]); + + let (raw_block, _) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::None, + &mut rng, + &spec, + ); + let block: Arc> = Arc::new(raw_block); + let slot = block.slot(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let env_req_id = envelope_id(components_id); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + None, + Some(env_req_id), + Span::none(), + ); + + info.add_blocks(blocks_req_id, vec![block]).unwrap(); + // Two envelopes: one matching, one extra at a different slot + let env1 = make_gloas_envelope::(slot, &mut rng); + let env2 = make_gloas_envelope::(Slot::new(slot.as_u64() + 10), &mut rng); + info.add_payload_envelopes(env_req_id, vec![env1, env2]) + .unwrap(); + + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_ok()); + assert_eq!(result.unwrap().len(), 1); + } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 734295ac1d3..c3b3daf4d6f 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -59,7 +59,8 @@ use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; @@ -73,7 +74,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -132,6 +134,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + payload_envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -512,6 +522,11 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } + SyncRequestId::PayloadEnvelopesByRange(req_id) => self + .on_payload_envelopes_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)), } } @@ -846,6 +861,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + payload_envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -1231,6 +1257,34 @@ impl SyncManager { } } + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + payload_envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(payload_envelope, seen_timestamp), + ); + } + SyncRequestId::PayloadEnvelopesByRange(req_id) => { + self.on_payload_envelopes_by_range_response( + req_id, + peer_id, + RpcEvent::from_chunk(payload_envelope, seen_timestamp), + ); + } + _ => { + crit!(%peer_id, "bad request id for payload_envelope"); + } + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, @@ -1331,6 +1385,36 @@ impl SyncManager { } } + fn on_single_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) { + // Placeholder: by-root envelope lookup not yet implemented for range sync. + // This is called on error injection for disconnected peers. Log and ignore. + let _ = (id, peer_id, rpc_event); + debug!("on_single_envelope_response: not yet implemented"); + } + + fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_payload_envelopes_by_range_response(id, peer_id, envelope) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::PayloadEnvelope(id, resp), + ); + } + } + fn on_custody_by_root_result( &mut self, requester: CustodyRequester, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b1ba87c75d3..346f9c5d41d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -22,14 +22,17 @@ use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, +}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + DataColumnsByRootRequester, Id, PayloadEnvelopesByRangeRequestId, SingleLookupReqId, + SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use parking_lot::RwLock; @@ -37,6 +40,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRangeRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -213,6 +217,11 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRange requests + payload_envelopes_by_range_requests: ActiveRequests< + PayloadEnvelopesByRangeRequestId, + PayloadEnvelopesByRangeRequestItems, + >, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -250,6 +259,10 @@ pub enum RangeBlockComponent { DataColumnsByRangeRequestId, RpcResponseResult>>>, ), + PayloadEnvelope( + PayloadEnvelopesByRangeRequestId, + RpcResponseResult>>>, + ), } #[cfg(test)] @@ -298,6 +311,7 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + payload_envelopes_by_range_requests: ActiveRequests::new("payload_envelopes_by_range"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -326,6 +340,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -361,12 +376,17 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let payload_envelope_by_range_ids = payload_envelopes_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::PayloadEnvelopesByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(payload_envelope_by_range_ids) .collect() } @@ -423,6 +443,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_range_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -445,6 +466,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(payload_envelopes_by_range_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -577,24 +599,26 @@ impl SyncNetworkContext { }; // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; + let columns_by_range_peers_to_request = if matches!( + batch_type, + ByRangeRequestType::BlocksAndColumns | ByRangeRequestType::BlocksAndEnvelopesAndColumns + ) { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); + Some(self.select_columns_by_range_peers_to_request( + &column_indexes, + column_peers, + active_request_count_by_peer, + peers_to_deprioritize, + )?) + } else { + None + }; // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { @@ -659,6 +683,28 @@ impl SyncNetworkContext { .transpose()?; let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + + // Send envelope request for Gloas epochs + let payloads_req_id = + if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { + Some(self.send_payload_envelopes_by_range_request( + block_peer, + PayloadEnvelopesByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + new_range_request_span!( + self, + "outgoing_envelopes_by_range", + range_request_span.clone(), + block_peer + ), + )?) + } else { + None + }; + let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, @@ -668,6 +714,7 @@ impl SyncNetworkContext { self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), + payloads_req_id, range_request_span, ); self.components_by_range_requests.insert(id, info); @@ -770,6 +817,17 @@ impl SyncNetworkContext { }) }) } + RangeBlockComponent::PayloadEnvelope(req_id, resp) => { + resp.and_then(|(envelopes, _)| { + request + .add_payload_envelopes(req_id, envelopes) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) + } } } { entry.remove(); @@ -1288,6 +1346,57 @@ impl SyncNetworkContext { Ok((id, requested_columns)) } + fn send_payload_envelopes_by_range_request( + &mut self, + peer_id: PeerId, + request: PayloadEnvelopesByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + request_span: Span, + ) -> Result { + let id = PayloadEnvelopesByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::PayloadEnvelopesByRange(request.clone()), + app_request_id: AppRequestId::Sync(SyncRequestId::PayloadEnvelopesByRange(id)), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRange", + slots = request.count, + epoch = %Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_range_requests.insert( + id, + peer_id, + false, + PayloadEnvelopesByRangeRequestItems::new(request), + request_span, + ); + Ok(id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_payload_envelopes_by_range_response( + &mut self, + id: PayloadEnvelopesByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self + .payload_envelopes_by_range_requests + .on_response(id, rpc_event); + self.on_rpc_response_result(resp, peer_id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -1369,6 +1478,12 @@ impl SyncNetworkContext { ); if self + .chain + .data_availability_checker + .envelopes_required_for_epoch(epoch) + { + ByRangeRequestType::BlocksAndEnvelopesAndColumns + } else if self .chain .data_availability_checker .data_columns_required_for_epoch(epoch) @@ -1788,6 +1903,10 @@ impl SyncNetworkContext { "data_columns_by_range", self.data_columns_by_range_requests.len(), ), + ( + "payload_envelopes_by_range", + self.payload_envelopes_by_range_requests.len(), + ), ("custody_by_root", self.custody_by_root_requests.len()), ( "components_by_range", diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index ad60dffb455..b6361a2ed14 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,7 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_range::PayloadEnvelopesByRangeRequestItems; use crate::metrics; @@ -27,6 +28,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_range; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs new file mode 100644 index 00000000000..3d4ea8248b2 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_range.rs @@ -0,0 +1,42 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::PayloadEnvelopesByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; + +/// Accumulates results of a payload_envelopes_by_range request. Only returns items after +/// receiving the stream termination. +pub struct PayloadEnvelopesByRangeRequestItems { + request: PayloadEnvelopesByRangeRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRangeRequestItems { + pub fn new(request: PayloadEnvelopesByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, envelope: Self::Item) -> Result { + let slot = envelope.slot(); + if slot < self.request.start_slot || slot >= self.request.start_slot + self.request.count { + return Err(LookupVerifyError::UnrequestedSlot(slot)); + } + if self.items.iter().any(|existing| existing.slot() == slot) { + return Err(LookupVerifyError::DuplicatedData(slot, 0)); + } + + self.items.push(envelope); + + Ok(false) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 6509ac3cb3c..80fee1c5caf 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -93,6 +93,22 @@ where } } + /// For Gloas, start range sync one epoch earlier so the first batch fetches the + /// parent block's payload envelope. Without this, the first block in the batch + /// fails `load_parent` because the preceding block's envelope isn't in the store. + fn gloas_adjusted_start_epoch(&self, epoch: Epoch) -> Epoch { + if self + .beacon_chain + .spec + .gloas_fork_epoch + .is_some_and(|gloas_epoch| epoch > gloas_epoch) + { + epoch.saturating_sub(1_u64) + } else { + epoch + } + } + #[cfg(test)] pub(crate) fn __failed_chains(&mut self) -> Vec { self.failed_chains.keys().copied().collect() @@ -156,8 +172,13 @@ where // Note: We keep current head chains. These can continue syncing whilst we complete // this new finalized chain. + // Start one epoch earlier for Gloas so the first batch includes + // the parent block's envelope. Without this, the first block in the + // batch fails because `load_parent` can't find the parent's envelope. + let start_epoch = self.gloas_adjusted_start_epoch(local_info.finalized_epoch); + self.chains.add_peer_or_create_chain( - local_info.finalized_epoch, + start_epoch, remote_info.finalized_root, target_head_slot, peer_id, @@ -188,6 +209,7 @@ where let start_epoch = std::cmp::min(local_info.head_slot, remote_finalized_slot) .epoch(T::EthSpec::slots_per_epoch()); + let start_epoch = self.gloas_adjusted_start_epoch(start_epoch); self.chains.add_peer_or_create_chain( start_epoch, remote_info.head_root,