diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9a431472339..84df30e75f8 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,6 +60,7 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; +use crate::payload_envelope_verification::EnvelopeError; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -320,6 +321,18 @@ pub enum BlockError { bid_parent_root: Hash256, block_parent_root: Hash256, }, + /// The block is known but its parent execution payload envelope has not been received yet. + /// + /// ## Peer scoring + /// + /// It's unclear if this block is valid, but it cannot be fully verified without the parent's + /// execution payload envelope. + ParentEnvelopeUnknown { parent_root: Hash256 }, + + PayloadEnvelopeError { + e: Box, + penalize_peer: bool, + }, } /// Which specific signature(s) are invalid in a SignedBeaconBlock @@ -486,6 +499,36 @@ impl From for BlockError { } } +impl From for BlockError { + fn from(e: EnvelopeError) -> Self { + let penalize_peer = match &e { + // REJECT per spec: peer sent invalid envelope data + EnvelopeError::BadSignature + | EnvelopeError::BuilderIndexMismatch { .. } + | EnvelopeError::BlockHashMismatch { .. } + | EnvelopeError::SlotMismatch { .. } + | EnvelopeError::IncorrectBlockProposer { .. } => true, + // IGNORE per spec: not the peer's fault + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } + | EnvelopeError::UnknownValidator { .. } => false, + // Internal errors: not the peer's fault + EnvelopeError::BeaconChainError(_) + | EnvelopeError::BeaconStateError(_) + | EnvelopeError::BlockProcessingError(_) + | EnvelopeError::EnvelopeProcessingError(_) + | EnvelopeError::ExecutionPayloadError(_) + | EnvelopeError::BlockError(_) + | EnvelopeError::InternalError(_) + | EnvelopeError::OptimisticSyncNotSupported { .. } => false, + }; + BlockError::PayloadEnvelopeError { + e: Box::new(e), + penalize_peer, + } + } +} + /// Stores information about verifying a payload against an execution engine. #[derive(Debug, PartialEq, Clone, Encode, Decode)] pub struct PayloadVerificationOutcome { @@ -897,12 +940,26 @@ impl GossipVerifiedBlock { }); } - // TODO(gloas) The following validation can only be completed once fork choice has been implemented: - // The block's parent execution payload (defined by bid.parent_block_hash) has been seen - // (via gossip or non-gossip sources) (a client MAY queue blocks for processing - // once the parent payload is retrieved). If execution_payload verification of block's execution - // payload parent by an execution node is complete, verify the block's execution payload - // parent (defined by bid.parent_block_hash) passes all validation. + // Check that we've received the parent envelope. If not, issue a single envelope + // lookup for the parent and queue this block in the reprocess queue. + // + // The anchor block (proto-array root) is implicitly considered to have its payload + // received: there is no envelope to fetch for the anchor (per spec, the anchor is + // never added to `store.payloads`), and the anchor is trusted by definition. + let parent_is_gloas = chain + .spec + .fork_name_at_slot::(parent_block.slot) + .gloas_enabled(); + let parent_is_anchor = parent_block.parent_root.is_none(); + + if parent_is_gloas + && !parent_is_anchor + && !fork_choice_read_lock.is_payload_received(&block.message().parent_root()) + { + return Err(BlockError::ParentEnvelopeUnknown { + parent_root: block.message().parent_root(), + }); + } drop(fork_choice_read_lock); @@ -1951,7 +2008,6 @@ fn load_parent>( // Retrieve any state that is advanced through to at most `block.slot()`: this is // particularly important if `block` descends from the finalized/split block, but at a slot // prior to the finalized slot (which is invalid and inaccessible in our DB schema). - // let (parent_state_root, state) = chain .store .get_advanced_hot_state(root, block.slot(), parent_block.state_root())? diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index b40e8337fb4..f6cd7143a82 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -14,7 +14,8 @@ use super::{ }; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, - NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, + NotifyExecutionLayer, block_verification::PayloadVerificationOutcome, + block_verification_types::AvailableBlockData, metrics, payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, }; @@ -99,9 +100,22 @@ impl BeaconChain { self.import_available_execution_payload_envelope(Box::new(envelope)) .await } - ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( - "Pending payload envelope not yet implemented".to_owned(), - )), + ExecutedEnvelope::AvailabilityPending { + signed_envelope, + import_data, + payload_verification_outcome: _, + } => { + // The envelope has been executed but data columns have not yet arrived. + // Do not import — return MissingComponents so callers know to fetch columns. + // TODO(gloas): once an envelope DA checker exists, cache the envelope here + // (analogous to `data_availability_checker.put_executed_block`) so that + // import is driven automatically when columns arrive. + let slot = signed_envelope.slot(); + let block_root = import_data.block_root; + Ok(AvailabilityProcessingStatus::MissingComponents( + slot, block_root, + )) + } } }; @@ -182,10 +196,42 @@ impl BeaconChain { signed_envelope, import_data, payload_verification_outcome, - self.spec.clone(), )) } + /// Import an envelope whose data column availability has not yet been satisfied. + /// + /// Marks the block's payload as received in fork choice and persists the envelope to the + /// store, but does not write data column ops. Columns are expected to arrive separately + /// (gossip, engineGetBlobs, or reconstruction). + #[instrument(skip_all)] + pub async fn import_pending_execution_payload_envelope( + self: &Arc, + signed_envelope: Arc>, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Result { + let EnvelopeImportData { + block_root, + _phantom, + } = import_data; + let block_root = { + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + chain.import_execution_payload_envelope_pending_columns( + signed_envelope, + block_root, + payload_verification_outcome.payload_verification_status, + ) + }, + "payload_verification_handle", + ) + .await?? + }; + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + #[instrument(skip_all)] pub async fn import_available_execution_payload_envelope( self: &Arc, @@ -220,6 +266,50 @@ impl BeaconChain { Ok(AvailabilityProcessingStatus::Imported(block_root)) } + /// Same as `import_execution_payload_envelope` but for envelopes whose data columns + /// have not yet been received. Marks the payload as received in fork choice and + /// persists the envelope; columns are persisted separately as they arrive. + #[instrument(skip_all)] + fn import_execution_payload_envelope_pending_columns( + &self, + signed_envelope: Arc>, + block_root: Hash256, + payload_verification_status: PayloadVerificationStatus, + ) -> Result { + let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); + if !fork_choice_reader.contains_block(&block_root) { + return Err(EnvelopeError::BlockRootUnknown { block_root }); + } + + let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); + fork_choice + .on_valid_payload_envelope_received(block_root) + .map_err(|e| EnvelopeError::InternalError(format!("{e:?}")))?; + + let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE); + let ops = vec![StoreOp::PutPayloadEnvelope( + block_root, + signed_envelope.clone(), + )]; + let db_span = info_span!("persist_envelope_pending_columns").entered(); + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { + error!(error = ?e, "Database write failed for pending-columns envelope"); + return Err(e.into()); + } + drop(db_span); + drop(fork_choice); + + let envelope_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX); + metrics::stop_timer(db_write_timer); + self.import_envelope_update_metrics_and_events( + signed_envelope, + block_root, + payload_verification_status, + envelope_time_imported, + ); + Ok(block_root) + } + /// Accepts a fully-verified and available envelope and imports it into the chain without performing any /// additional verification. /// diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index b153a3cd6a7..9d61025e96c 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -119,12 +119,16 @@ pub struct EnvelopeProcessingSnapshot { /// 1. `Available`: This envelope has been executed and also contains all data to consider it /// fully available. /// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it -/// fully available. -#[allow(dead_code)] +/// fully available. The envelope is still imported (fork-choice marks the block's payload +/// as received and the envelope is persisted); column persistence is handled separately +/// via gossip / engineGetBlobs as columns arrive. pub enum ExecutedEnvelope { Available(AvailableExecutedEnvelope), - // TODO(gloas): check data column availability via DA checker - AvailabilityPending(), + AvailabilityPending { + signed_envelope: Arc>, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + }, } impl ExecutedEnvelope { @@ -132,7 +136,6 @@ impl ExecutedEnvelope { envelope: MaybeAvailableEnvelope, import_data: EnvelopeImportData, payload_verification_outcome: PayloadVerificationOutcome, - spec: Arc, ) -> Self { match envelope { MaybeAvailableEnvelope::Available(available_envelope) => { @@ -142,15 +145,14 @@ impl ExecutedEnvelope { payload_verification_outcome, )) } - // TODO(gloas): check data column availability via DA checker MaybeAvailableEnvelope::AvailabilityPending { - block_hash, - envelope, - } => Self::Available(AvailableExecutedEnvelope::new( - AvailableEnvelope::new(block_hash, envelope, vec![], None, spec), + block_hash: _, + envelope: signed_envelope, + } => Self::AvailabilityPending { + signed_envelope, import_data, payload_verification_outcome, - )), + }, } } } diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ea87e9bc718..d2f70108547 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -417,6 +417,9 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcPayloadEnvelope { + process_fn: AsyncFn, + }, RpcCustodyColumn(AsyncFn), ColumnReconstruction(AsyncFn), IgnoredRpcBlock { @@ -483,6 +486,7 @@ pub enum WorkType { GossipLightClientOptimisticUpdate, RpcBlock, RpcBlobs, + RpcPayloadEnvelope, RpcCustodyColumn, ColumnReconstruction, IgnoredRpcBlock, @@ -545,6 +549,7 @@ impl Work { Work::GossipProposerPreferences(_) => WorkType::GossipProposerPreferences, Work::RpcBlock { .. } => WorkType::RpcBlock, Work::RpcBlobs { .. } => WorkType::RpcBlobs, + Work::RpcPayloadEnvelope { .. } => WorkType::RpcPayloadEnvelope, Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, @@ -1183,7 +1188,9 @@ impl BeaconProcessor { Work::GossipLightClientOptimisticUpdate { .. } => work_queues .lc_gossip_optimistic_update_queue .push(work, work_id), - Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { + Work::RpcBlock { .. } + | Work::IgnoredRpcBlock { .. } + | Work::RpcPayloadEnvelope { .. } => { work_queues.rpc_block_queue.push(work, work_id) } Work::RpcBlobs { .. } => work_queues.rpc_blob_queue.push(work, work_id), @@ -1318,7 +1325,9 @@ impl BeaconProcessor { WorkType::GossipLightClientOptimisticUpdate => { work_queues.lc_gossip_optimistic_update_queue.len() } - WorkType::RpcBlock => work_queues.rpc_block_queue.len(), + WorkType::RpcBlock | WorkType::RpcPayloadEnvelope => { + work_queues.rpc_block_queue.len() + } WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => { work_queues.rpc_blob_queue.len() } @@ -1513,6 +1522,7 @@ impl BeaconProcessor { beacon_block_root: _, } | Work::RpcBlobs { process_fn } + | Work::RpcPayloadEnvelope { process_fn } | Work::RpcCustodyColumn(process_fn) | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a4438579..a190a42a80e 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -31,6 +31,8 @@ pub enum SyncRequestId { BlobsByRange(BlobsByRangeRequestId), /// Data columns by range request DataColumnsByRange(DataColumnsByRangeRequestId), + /// Request searching for an execution payload envelope given a block root. + SinglePayloadEnvelope { id: SingleLookupReqId }, } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 0135d7f5dd4..b0a6e51704d 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1738,6 +1738,17 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root)); return None; } + Err(BlockError::ParentEnvelopeUnknown { parent_root }) => { + debug!( + ?block_root, + ?parent_root, + "Parent envelope not yet available for gossip block" + ); + self.send_sync_message(SyncMessage::UnknownParentEnvelope( + peer_id, block, block_root, + )); + return None; + } Err(e @ BlockError::BeaconChainError(_)) => { debug!( error = ?e, @@ -1827,7 +1838,9 @@ impl NetworkBeaconProcessor { return None; } // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob` - Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => { + Err(e @ BlockError::InternalError(_)) + | Err(e @ BlockError::BlobNotRequired(_)) + | Err(e @ BlockError::PayloadEnvelopeError { .. }) => { error!(error = %e, "Internal block gossip validation error"); return None; } @@ -2024,6 +2037,16 @@ impl NetworkBeaconProcessor { "Block with unknown parent attempted to be processed" ); } + Err(BlockError::ParentEnvelopeUnknown { parent_root }) => { + debug!( + %block_root, + ?parent_root, + "Parent envelope not yet available, need envelope lookup" + ); + // Unlike ParentUnknown, this can legitimately happen during processing + // because the parent envelope may not have arrived yet. The lookup + // system will handle retrying via Action::ParentEnvelopeUnknown. + } Err(e @ BlockError::ExecutionPayloadError(epe)) if !epe.penalize_peer() => { debug!( error = %e, @@ -3978,9 +4001,17 @@ impl NetworkBeaconProcessor { // register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); match &result { - Ok(AvailabilityProcessingStatus::Imported(_)) - | Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - // Nothing to do + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + // Notify sync so any pending child lookup awaiting this parent envelope unblocks. + self.send_sync_message(SyncMessage::GossipEnvelopeImported { + block_root: *block_root, + }); + } + Ok(AvailabilityProcessingStatus::MissingComponents(_slot, _block_root)) => { + // TODO(gloas): wire this into the envelope DA checker once it exists, analogous to + // how `process_availability` drives block import once blobs/columns arrive. Until + // then gossip envelopes with missing columns will be stuck until columns arrive via + // gossip or engineGetBlobs. } Err(e) => match e { EnvelopeError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {} diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e089159eb89..d535054d2b5 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -563,6 +563,22 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for an RPC payload envelope. + pub fn send_rpc_payload_envelope( + self: &Arc, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let process_fn = + self.clone() + .generate_rpc_envelope_process_fn(envelope, seen_timestamp, process_type); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcPayloadEnvelope { process_fn }, + }) + } + /// Create a new `Work` event for some blobs, where the result from computation (if any) is /// sent to the other side of `result_tx`. pub fn send_rpc_blobs( diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8f89b669485..e5f640b3c11 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -4,7 +4,7 @@ use crate::sync::BatchProcessResult; use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ ChainId, - manager::{BlockProcessType, SyncMessage}, + manager::{BlockProcessType, BlockProcessingResult, SyncMessage}, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; @@ -28,7 +28,9 @@ use store::KzgCommitment; use tracing::{debug, debug_span, error, info, instrument, warn}; use types::data::FixedBlobSidecarList; use types::kzg_ext::format_kzg_commitments; -use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; +use types::{ + BlockImportSource, DataColumnSidecarList, Epoch, Hash256, SignedExecutionPayloadEnvelope, +}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -73,6 +75,77 @@ impl NetworkBeaconProcessor { Box::pin(process_fn) } + /// Returns an async closure which processes a payload envelope received via RPC. + pub fn generate_rpc_envelope_process_fn( + self: Arc, + envelope: Arc>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> AsyncFn { + let process_fn = async move { + self.process_rpc_envelope(envelope, seen_timestamp, process_type) + .await; + }; + Box::pin(process_fn) + } + + /// Process an execution payload envelope received via RPC. + async fn process_rpc_envelope( + self: Arc, + envelope: Arc>, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + let beacon_block_root = envelope.beacon_block_root(); + + // Verify the envelope using the gossip verification path (same checks apply to RPC) + let verified_envelope = match self.chain.verify_envelope_for_gossip(envelope).await { + Ok(verified) => verified, + Err(e) => { + debug!( + error = ?e, + ?beacon_block_root, + "RPC payload envelope failed verification" + ); + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: BlockProcessingResult::Err(e.into()), + }); + return; + } + }; + + // Process the verified envelope + let result = self + .chain + .process_execution_payload_envelope( + beacon_block_root, + verified_envelope, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + #[allow(clippy::result_large_err)] + || Ok(()), + ) + .await; + + let processing_result = match result { + Ok(status) => BlockProcessingResult::Ok(status), + Err(e) => { + debug!( + error = ?e, + ?beacon_block_root, + "RPC payload envelope processing failed" + ); + BlockProcessingResult::Err(e.into()) + } + }; + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: processing_result, + }); + } + /// Returns the `process_fn` and `ignore_fn` required when requeuing an RPC block. pub fn generate_lookup_beacon_block_fns( self: Arc, diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index c4e7f8f8d1f..8dbf0028f73 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -9,6 +9,7 @@ use crate::{ sync::{SyncMessage, manager::BlockProcessType}, }; use beacon_chain::block_verification_types::LookupBlock; +use beacon_chain::chain_config::ChainConfig; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu; use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; @@ -134,7 +135,10 @@ impl TestRig { .fresh_ephemeral_store() .mock_execution_layer() .node_custody_type(NodeCustodyType::Fullnode) - .chain_config(<_>::default()) + .chain_config(ChainConfig { + disable_get_blobs: true, + ..ChainConfig::default() + }) .build(); harness.advance_slot(); @@ -169,7 +173,10 @@ impl TestRig { .fresh_ephemeral_store() .mock_execution_layer() .node_custody_type(node_custody_type) - .chain_config(<_>::default()) + .chain_config(ChainConfig { + disable_get_blobs: true, + ..ChainConfig::default() + }) .build(); harness.advance_slot(); @@ -1001,14 +1008,30 @@ async fn data_column_reconstruction_at_deadline() { rig.enqueue_gossip_data_columns(i); } - // Expect all gossip events + reconstruction - let mut expected_events: Vec = (0..min_columns_for_reconstruction) - .map(|_| WorkType::GossipDataColumnSidecar) - .collect(); - expected_events.push(WorkType::ColumnReconstruction); - - rig.assert_event_journal_contains_ordered(&expected_events) - .await; + // Drain the journal until we've seen all gossip events plus at least one + // reconstruction. Under real crypto the reprocess queue can dispatch the + // reconstruction work item more than once (the second is a no-op via + // `reconstruction_started`), so we don't pin the count — we just require >= 1. + let gsc: &str = WorkType::GossipDataColumnSidecar.into(); + let cr: &str = WorkType::ColumnReconstruction.into(); + let (mut gossip_seen, mut recon_seen) = (0usize, 0usize); + let drain = async { + while let Some(event) = rig.work_journal_rx.recv().await { + if event == gsc { + gossip_seen += 1; + } else if event == cr { + recon_seen += 1; + } + if gossip_seen == min_columns_for_reconstruction && recon_seen >= 1 { + break; + } + } + }; + if tokio::time::timeout(STANDARD_TIMEOUT, drain).await.is_err() { + panic!("timeout: gossip_seen={gossip_seen}, recon_seen={recon_seen}"); + } + assert_eq!(gossip_seen, min_columns_for_reconstruction); + assert!(recon_seen >= 1); } // Test the column reconstruction is delayed for columns that arrive for a previous slot. diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 443fa51cc67..a90658a3676 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -19,13 +19,14 @@ use lighthouse_network::{ }; use logging::TimeLatch; use logging::crit; -use slot_clock::SlotClock; +use slot_clock::{SlotClock, timestamp_now}; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, PartialDataColumn, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, }; /// Handles messages from the network and routes them to the appropriate service to be handled. @@ -341,10 +342,13 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(envelope) => { + self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); + } + // TODO(EIP-7732): implement outgoing payload envelopes by range responses once + // range sync requests them. + Response::PayloadEnvelopesByRange(_) => { + error!(%peer_id, "Unexpected PayloadEnvelopesByRange response"); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -718,6 +722,40 @@ impl Router { }); } + /// Handle a `PayloadEnvelopesByRoot` response from the peer. + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SinglePayloadEnvelope { .. } => id, + other => { + crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }, + AppRequestId::Router => { + crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync"); + return; + } + AppRequestId::Internal => unreachable!("Handled internally"), + }; + + trace!( + %peer_id, + "Received PayloadEnvelopesByRoot Response" + ); + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + peer_id, + sync_request_id, + envelope, + seen_timestamp: timestamp_now(), + }); + } + /// Handle a `BlobsByRoot` response from the peer. pub fn on_blobs_by_root_response( &mut self, diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index edd99345b43..bb8d81cc6e7 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -2,7 +2,7 @@ use crate::sync::block_lookups::single_block_lookup::{ LookupRequestError, SingleBlockLookup, SingleLookupRequestState, }; use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, PeerId, }; use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; @@ -12,16 +12,17 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; +use types::{DataColumnSidecarList, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use super::SingleLookupId; use super::single_block_lookup::{ComponentRequests, DownloadResult}; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ResponseType { Block, Blob, CustodyColumn, + Envelope, } /// This trait unifies common single block lookup functionality across blocks and blobs. This @@ -151,6 +152,7 @@ impl RequestState for BlobRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -205,6 +207,7 @@ impl RequestState for CustodyRequestState { ComponentRequests::WaitingForBlock => Err("waiting for block"), ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), ComponentRequests::ActiveCustodyRequest(request) => Ok(request), + ComponentRequests::ActiveEnvelopeRequest { .. } => Err("expecting envelope request"), ComponentRequests::NotNeeded { .. } => Err("not needed"), } } @@ -215,3 +218,52 @@ impl RequestState for CustodyRequestState { &mut self.state } } + +impl RequestState for EnvelopeRequestState { + type VerifiedResponseType = Arc>; + + fn make_request( + &self, + id: Id, + lookup_peers: Arc>>, + _: usize, + cx: &mut SyncNetworkContext, + ) -> Result { + cx.envelope_lookup_request(id, lookup_peers, self.block_root) + .map_err(LookupRequestError::SendFailedNetwork) + } + + fn send_for_processing( + id: Id, + download_result: DownloadResult, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let DownloadResult { + value, + block_root, + seen_timestamp, + .. + } = download_result; + cx.send_envelope_for_processing(id, value, seen_timestamp, block_root) + .map_err(LookupRequestError::SendFailedProcessor) + } + + fn response_type() -> ResponseType { + ResponseType::Envelope + } + + fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { + match &mut request.component_requests { + ComponentRequests::ActiveEnvelopeRequest(request) => Ok(request), + _ => Err("expecting envelope request"), + } + } + + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 3929f74aa04..bb003dc2228 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -22,7 +22,9 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{ + AwaitingParent, LookupRequestError, LookupResult, SingleBlockLookup, +}; use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; @@ -39,7 +41,9 @@ use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; +pub use single_block_lookup::{ + BlobRequestState, BlockRequestState, CustodyRequestState, EnvelopeRequestState, +}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; @@ -50,6 +54,7 @@ use types::{EthSpec, SignedBeaconBlock}; pub mod common; pub mod parent_chain; mod single_block_lookup; +mod single_envelope_lookup; /// The maximum depth we will search for a parent block. In principle we should have sync'd any /// canonical chain to its head once the peer connects. A chain should not appear where it's depth @@ -109,6 +114,7 @@ pub type SingleLookupId = u32; enum Action { Retry, ParentUnknown { parent_root: Hash256 }, + ParentEnvelopeUnknown { parent_root: Hash256 }, Drop(/* reason: */ String), Continue, } @@ -213,7 +219,7 @@ impl BlockLookups { self.new_current_lookup( block_root, Some(block_component), - Some(parent_root), + Some(AwaitingParent::Block(parent_root)), // On a `UnknownParentBlock` or `UnknownParentBlob` event the peer is not required // to have the rest of the block components (refer to decoupled blob gossip). Create // the lookup with zero peers to house the block components. @@ -225,7 +231,37 @@ impl BlockLookups { } } - /// Seach a block whose parent root is unknown. + /// A child block's parent envelope is missing. Create a child lookup (with the block component) + /// that waits for the parent envelope, and an envelope-only lookup for the parent. + /// + /// Returns true if both lookups are created or already exist. + #[must_use = "only reference the new lookup if returns true"] + pub fn search_child_and_parent_envelope( + &mut self, + block_root: Hash256, + block_component: BlockComponent, + parent_root: Hash256, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) -> bool { + let envelope_lookup_exists = + self.search_parent_envelope_of_child(parent_root, &[peer_id], cx); + if envelope_lookup_exists { + // Create child lookup that waits for the parent envelope. + // The child block itself has already been seen, so we pass it as a component. + self.new_current_lookup( + block_root, + Some(block_component), + Some(AwaitingParent::Envelope(parent_root)), + &[], + cx, + ) + } else { + false + } + } + + /// Search a block whose parent root is unknown. /// /// Returns true if the lookup is created or already exists #[must_use = "only reference the new lookup if returns true"] @@ -343,6 +379,57 @@ impl BlockLookups { self.new_current_lookup(block_root_to_search, None, None, peers, cx) } + /// A block triggers the search of a parent envelope. + #[must_use = "only reference the new lookup if returns true"] + pub fn search_parent_envelope_of_child( + &mut self, + parent_root: Hash256, + peers: &[PeerId], + cx: &mut SyncNetworkContext, + ) -> bool { + // Check if there's already a lookup for this root (could be a block lookup or envelope + // lookup). If so, add peers and let it handle the envelope. + if let Some((&lookup_id, _lookup)) = self + .single_block_lookups + .iter_mut() + .find(|(_, lookup)| lookup.is_for_block(parent_root)) + { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { + warn!(error = ?e, "Error adding peers to envelope lookup"); + } + return true; + } + + if self.single_block_lookups.len() >= MAX_LOOKUPS { + warn!(?parent_root, "Dropping envelope lookup reached max"); + return false; + } + + let lookup = SingleBlockLookup::new_envelope_only(parent_root, peers, cx.next_id()); + let _guard = lookup.span.clone().entered(); + + let id = lookup.id; + let lookup = match self.single_block_lookups.entry(id) { + Entry::Vacant(entry) => entry.insert(lookup), + Entry::Occupied(_) => { + warn!(id, "Lookup exists with same id"); + return false; + } + }; + + debug!( + ?peers, + ?parent_root, + id = lookup.id, + "Created envelope-only lookup" + ); + metrics::inc_counter(&metrics::SYNC_LOOKUP_CREATED); + self.metrics.created_lookups += 1; + + let result = lookup.continue_requests(cx); + self.on_lookup_result(id, result, "new_envelope_lookup", cx) + } + /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. /// Returns true if the lookup is created or already exists @@ -351,7 +438,7 @@ impl BlockLookups { &mut self, block_root: Hash256, block_component: Option>, - awaiting_parent: Option, + awaiting_parent: Option, peers: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { @@ -386,13 +473,14 @@ impl BlockLookups { } // Ensure that awaiting parent exists, otherwise this lookup won't be able to make progress - if let Some(awaiting_parent) = awaiting_parent + if let Some(AwaitingParent::Block(parent_root) | AwaitingParent::Envelope(parent_root)) = + awaiting_parent && !self .single_block_lookups .iter() - .any(|(_, lookup)| lookup.is_for_block(awaiting_parent)) + .any(|(_, lookup)| lookup.is_for_block(parent_root)) { - warn!(block_root = ?awaiting_parent, "Ignoring child lookup parent lookup not found"); + warn!(block_root = ?parent_root, "Ignoring child lookup parent lookup not found"); return false; } @@ -426,9 +514,7 @@ impl BlockLookups { debug!( ?peers, ?block_root, - awaiting_parent = awaiting_parent - .map(|root| root.to_string()) - .unwrap_or("none".to_owned()), + ?awaiting_parent, id = lookup.id, "Created block lookup" ); @@ -559,6 +645,40 @@ impl BlockLookups { BlockProcessType::SingleCustodyColumn(id) => { self.on_processing_result_inner::>(id, result, cx) } + BlockProcessType::SinglePayloadEnvelope { id, block_root } => { + // When envelope processing returns `MissingComponents`, the envelope has been + // executed but data columns are not yet available. Transition the lookup to fetch + // custody columns instead of retrying the envelope or erroring. + if matches!( + &result, + BlockProcessingResult::Ok( + AvailabilityProcessingStatus::MissingComponents { .. } + ) + ) && let Some(lookup) = self.single_block_lookups.get_mut(&id) + && lookup.transition_envelope_to_custody() + { + debug!( + ?block_root, + "Envelope processed, transitioning to custody column lookup" + ); + let lookup_result = lookup.continue_requests(cx); + self.on_lookup_result( + id, + lookup_result, + "envelope_to_custody_transition", + cx, + ); + return; + } + + let result = self + .on_processing_result_inner::>(id, result, cx); + // On successful envelope import, unblock child lookups waiting for this envelope + if matches!(&result, Ok(LookupResult::Completed)) { + self.continue_envelope_child_lookups(block_root, cx); + } + result + } }; self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); } @@ -645,6 +765,12 @@ impl BlockLookups { request_state.revert_to_awaiting_processing()?; Action::ParentUnknown { parent_root } } + BlockError::ParentEnvelopeUnknown { parent_root } => { + // The parent block is known but its execution payload envelope is missing. + // Revert to awaiting processing and fetch the envelope via RPC. + request_state.revert_to_awaiting_processing()?; + Action::ParentEnvelopeUnknown { parent_root } + } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { // These errors indicate that the execution layer is offline // and failed to validate the execution payload. Do not downscore peer. @@ -667,6 +793,26 @@ impl BlockLookups { // We opt to drop the lookup instead. Action::Drop(format!("{e:?}")) } + BlockError::PayloadEnvelopeError { e, penalize_peer } => { + debug!( + ?block_root, + error = ?e, + "Payload envelope processing error" + ); + if penalize_peer { + let peer_group = request_state.on_processing_failure()?; + for peer in peer_group.all() { + cx.report_peer( + *peer, + PeerAction::MidToleranceError, + "lookup_envelope_processing_failure", + ); + } + Action::Retry + } else { + Action::Drop(format!("{e:?}")) + } + } other => { debug!( ?block_root, @@ -701,6 +847,7 @@ impl BlockLookups { ResponseType::CustodyColumn => { "lookup_custody_column_processing_failure" } + ResponseType::Envelope => "lookup_envelope_processing_failure", }, ); } @@ -742,6 +889,25 @@ impl BlockLookups { ))) } } + Action::ParentEnvelopeUnknown { parent_root } => { + let peers = lookup.all_peers(); + lookup.set_awaiting_parent_envelope(parent_root); + let envelope_lookup_exists = + self.search_parent_envelope_of_child(parent_root, &peers, cx); + if envelope_lookup_exists { + debug!( + id = lookup_id, + ?block_root, + ?parent_root, + "Marking lookup as awaiting parent envelope" + ); + Ok(LookupResult::Pending) + } else { + Err(LookupRequestError::Failed(format!( + "Envelope lookup could not be created for {parent_root:?}" + ))) + } + } Action::Drop(reason) => { // Drop with noop Err(LookupRequestError::Failed(reason)) @@ -791,7 +957,7 @@ impl BlockLookups { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_parent() == Some(block_root) { + if lookup.awaiting_parent_block() == Some(block_root) { lookup.resolve_awaiting_parent(); debug!( parent_root = ?block_root, @@ -809,6 +975,33 @@ impl BlockLookups { } } + /// Makes progress on lookups that were waiting for a parent envelope to be imported. + pub fn continue_envelope_child_lookups( + &mut self, + block_root: Hash256, + cx: &mut SyncNetworkContext, + ) { + let mut lookup_results = vec![]; + + for (id, lookup) in self.single_block_lookups.iter_mut() { + if lookup.awaiting_parent_envelope() == Some(block_root) { + lookup.resolve_awaiting_parent(); + debug!( + envelope_root = ?block_root, + id, + block_root = ?lookup.block_root(), + "Continuing lookup after envelope imported" + ); + let result = lookup.continue_requests(cx); + lookup_results.push((*id, result)); + } + } + + for (id, result) in lookup_results { + self.on_lookup_result(id, result, "continue_envelope_child_lookups", cx); + } + } + /// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need /// the parent to make progress to resolve, therefore we must drop them if the parent is /// dropped. @@ -824,10 +1017,14 @@ impl BlockLookups { metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[reason]); self.metrics.dropped_lookups += 1; + let dropped_root = dropped_lookup.block_root(); let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) + .filter(|(_, lookup)| { + lookup.awaiting_parent_block() == Some(dropped_root) + || lookup.awaiting_parent_envelope() == Some(dropped_root) + }) .map(|(id, _)| *id) .collect::>(); @@ -995,17 +1192,15 @@ impl BlockLookups { &'a self, lookup: &'a SingleBlockLookup, ) -> Result<&'a SingleBlockLookup, String> { - if let Some(awaiting_parent) = lookup.awaiting_parent() { + if let Some(parent_root) = lookup.awaiting_parent_block() { if let Some(lookup) = self .single_block_lookups .values() - .find(|l| l.block_root() == awaiting_parent) + .find(|l| l.block_root() == parent_root) { self.find_oldest_ancestor_lookup(lookup) } else { - Err(format!( - "Lookup references unknown parent {awaiting_parent:?}" - )) + Err(format!("Lookup references unknown parent {parent_root:?}")) } } else { Ok(lookup) @@ -1038,7 +1233,7 @@ impl BlockLookups { } } - if let Some(parent_root) = lookup.awaiting_parent() { + if let Some(parent_root) = lookup.awaiting_parent_block() { if let Some((&child_id, _)) = self .single_block_lookups .iter() diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 5deea1dd94e..18363e9b8dc 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -13,7 +13,7 @@ impl From<&SingleBlockLookup> for Node { fn from(value: &SingleBlockLookup) -> Self { Self { block_root: value.block_root(), - parent_root: value.awaiting_parent(), + parent_root: value.awaiting_parent_block(), } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 23bfd531f0f..cdcb574219c 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -16,7 +16,9 @@ use store::Hash256; use strum::IntoStaticStr; use tracing::{Span, debug_span}; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; +use types::{ + DataColumnSidecarList, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, +}; // Dedicated enum for LookupResult to force its usage #[must_use = "LookupResult must be handled with on_lookup_result"] @@ -56,6 +58,14 @@ pub enum LookupRequestError { }, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AwaitingParent { + /// Waiting for the parent block to be imported. + Block(Hash256), + /// The parent block is imported but its execution payload envelope is missing. + Envelope(Hash256), +} + #[derive(Educe)] #[educe(Debug(bound(T: BeaconChainTypes)))] pub struct SingleBlockLookup { @@ -68,8 +78,8 @@ pub struct SingleBlockLookup { /// than the lifetime of a custody request. #[educe(Debug(method(fmt_peer_set_as_len)))] peers: Arc>>, - block_root: Hash256, - awaiting_parent: Option, + pub(super) block_root: Hash256, + pub(super) awaiting_parent: Option, created: Instant, pub(crate) span: Span, } @@ -79,6 +89,7 @@ pub(crate) enum ComponentRequests { WaitingForBlock, ActiveBlobRequest(BlobRequestState, usize), ActiveCustodyRequest(CustodyRequestState), + ActiveEnvelopeRequest(EnvelopeRequestState), // When printing in debug this state display the reason why it's not needed #[allow(dead_code)] NotNeeded(&'static str), @@ -89,7 +100,7 @@ impl SingleBlockLookup { requested_block_root: Hash256, peers: &[PeerId], id: Id, - awaiting_parent: Option, + awaiting_parent: Option, ) -> Self { let lookup_span = debug_span!( "lh_single_block_lookup", @@ -109,10 +120,18 @@ impl SingleBlockLookup { } } - /// Reset the status of all internal requests pub fn reset_requests(&mut self) { self.block_request_state = BlockRequestState::new(self.block_root); - self.component_requests = ComponentRequests::WaitingForBlock; + match &self.component_requests { + ComponentRequests::ActiveEnvelopeRequest(_) => { + self.component_requests = ComponentRequests::ActiveEnvelopeRequest( + EnvelopeRequestState::new(self.block_root), + ); + } + _ => { + self.component_requests = ComponentRequests::WaitingForBlock; + } + } } /// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing` @@ -128,18 +147,24 @@ impl SingleBlockLookup { self.block_root } - pub fn awaiting_parent(&self) -> Option { + pub fn awaiting_parent(&self) -> Option { self.awaiting_parent } - /// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send - /// components for processing. + /// Returns the parent root if awaiting a parent block. + pub fn awaiting_parent_block(&self) -> Option { + match self.awaiting_parent { + Some(AwaitingParent::Block(root)) => Some(root), + _ => None, + } + } + + /// Mark this lookup as awaiting a parent block to be imported before processing. pub fn set_awaiting_parent(&mut self, parent_root: Hash256) { - self.awaiting_parent = Some(parent_root) + self.awaiting_parent = Some(AwaitingParent::Block(parent_root)); } - /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for - /// processing. + /// Mark this lookup as no longer awaiting any parent. pub fn resolve_awaiting_parent(&mut self) { self.awaiting_parent = None; } @@ -180,6 +205,7 @@ impl SingleBlockLookup { ComponentRequests::WaitingForBlock => false, ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), + ComponentRequests::ActiveEnvelopeRequest(request) => request.state.is_processed(), ComponentRequests::NotNeeded { .. } => true, } } @@ -199,6 +225,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(request) => { request.state.is_awaiting_event() } + ComponentRequests::ActiveEnvelopeRequest(request) => { + request.state.is_awaiting_event() + } ComponentRequests::NotNeeded { .. } => false, } } @@ -268,6 +297,9 @@ impl SingleBlockLookup { ComponentRequests::ActiveCustodyRequest(_) => { self.continue_request::>(cx, 0)? } + ComponentRequests::ActiveEnvelopeRequest(_) => { + self.continue_request::>(cx, 0)? + } ComponentRequests::NotNeeded { .. } => {} // do nothing } @@ -289,7 +321,7 @@ impl SingleBlockLookup { expected_blobs: usize, ) -> Result<(), LookupRequestError> { let id = self.id; - let awaiting_parent = self.awaiting_parent.is_some(); + let awaiting_event = self.awaiting_parent.is_some(); let request = R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; @@ -333,7 +365,7 @@ impl SingleBlockLookup { // Otherwise, attempt to progress awaiting processing // If this request is awaiting a parent lookup to be processed, do not send for processing. // The request will be rejected with unknown parent error. - } else if !awaiting_parent { + } else if !awaiting_event { // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is // useful to conditionally access the result data. if let Some(result) = request.get_state_mut().maybe_start_processing() { @@ -429,6 +461,26 @@ impl BlockRequestState { } } +/// The state of the envelope request component of a `SingleBlockLookup`. +/// Used for envelope-only lookups where the parent block is already imported +/// but its execution payload envelope is missing. +#[derive(Educe)] +#[educe(Debug)] +pub struct EnvelopeRequestState { + #[educe(Debug(ignore))] + pub block_root: Hash256, + pub state: SingleLookupRequestState>>, +} + +impl EnvelopeRequestState { + pub fn new(block_root: Hash256) -> Self { + Self { + block_root, + state: SingleLookupRequestState::new(), + } + } +} + #[derive(Debug, Clone)] pub struct DownloadResult { pub value: T, diff --git a/beacon_node/network/src/sync/block_lookups/single_envelope_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_envelope_lookup.rs new file mode 100644 index 00000000000..88fa0424397 --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/single_envelope_lookup.rs @@ -0,0 +1,62 @@ +//! Envelope-specific extensions to `SingleBlockLookup`. +//! +//! Envelope-only lookups are created when a block's parent is known and imported but its +//! execution payload envelope has not yet been received. The block download step is skipped +//! (marked complete immediately), and only the envelope — and possibly subsequent custody +//! columns — are fetched. + +use super::single_block_lookup::{ + AwaitingParent, ComponentRequests, CustodyRequestState, EnvelopeRequestState, SingleBlockLookup, +}; +use beacon_chain::BeaconChainTypes; +use lighthouse_network::PeerId; +use lighthouse_network::service::api_types::Id; +use store::Hash256; + +impl SingleBlockLookup { + /// Create an envelope-only lookup. The block is already imported; only the envelope (and + /// potentially custody columns) need to be fetched. + pub fn new_envelope_only(block_root: Hash256, peers: &[PeerId], id: Id) -> Self { + let mut lookup = Self::new(block_root, peers, id, None); + // Block is already imported — advance past the download step immediately. + lookup + .block_request_state + .state + .on_completed_request("block already imported") + .expect("block state starts as AwaitingDownload"); + lookup.component_requests = + ComponentRequests::ActiveEnvelopeRequest(EnvelopeRequestState::new(block_root)); + lookup + } + + /// Transition from `ActiveEnvelopeRequest` to `ActiveCustodyRequest`. + /// + /// Called when envelope processing returns `MissingComponents`: the envelope has been executed + /// but data columns have not yet arrived and must be fetched separately. + /// Returns `true` if the transition was made, `false` if state was not an envelope request. + pub fn transition_envelope_to_custody(&mut self) -> bool { + if matches!( + self.component_requests, + ComponentRequests::ActiveEnvelopeRequest(_) + ) { + self.component_requests = + ComponentRequests::ActiveCustodyRequest(CustodyRequestState::new(self.block_root)); + true + } else { + false + } + } + + /// Returns the parent root if this lookup is awaiting a parent envelope. + pub fn awaiting_parent_envelope(&self) -> Option { + match self.awaiting_parent { + Some(AwaitingParent::Envelope(root)) => Some(root), + _ => None, + } + } + + /// Mark this lookup as awaiting a parent envelope before processing can resume. + pub fn set_awaiting_parent_envelope(&mut self, parent_root: Hash256) { + self.awaiting_parent = Some(AwaitingParent::Envelope(parent_root)); + } +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 734295ac1d3..869e7e32b08 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -45,6 +45,7 @@ use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, + EnvelopeRequestState, }; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; @@ -65,7 +66,7 @@ use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::{PeerAction, PeerId}; use logging::crit; use lru_cache::LRUTimeCache; -use slot_clock::SlotClock; +use slot_clock::{SlotClock, timestamp_now}; use std::ops::Sub; use std::sync::Arc; use std::time::Duration; @@ -73,7 +74,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -132,6 +134,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// An execution payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -141,6 +151,13 @@ pub enum SyncMessage { /// A data column with an unknown parent has been received. UnknownParentDataColumn(PeerId, Arc>), + /// A block's parent is known but its execution payload envelope has not been received yet. + UnknownParentEnvelope(PeerId, Arc>, Hash256), + + /// An execution payload envelope has been imported via the local gossip path. + /// Sync uses this to unblock any child lookups that were awaiting this parent envelope. + GossipEnvelopeImported { block_root: Hash256 }, + /// A partial data column with an unknown parent has been received. UnknownParentPartialDataColumn { peer_id: PeerId, @@ -191,6 +208,7 @@ pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, SingleCustodyColumn(Id), + SinglePayloadEnvelope { id: Id, block_root: Hash256 }, } impl BlockProcessType { @@ -198,7 +216,8 @@ impl BlockProcessType { match self { BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } - | BlockProcessType::SingleCustodyColumn(id) => *id, + | BlockProcessType::SingleCustodyColumn(id) + | BlockProcessType::SinglePayloadEnvelope { id, .. } => *id, } } } @@ -512,6 +531,9 @@ impl SyncManager { SyncRequestId::DataColumnsByRange(req_id) => { self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } } } @@ -846,6 +868,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -911,6 +944,35 @@ impl SyncManager { } } } + SyncMessage::UnknownParentEnvelope(peer_id, block, block_root) => { + let block_slot = block.slot(); + let parent_root = block.parent_root(); + debug!( + %block_root, + %parent_root, + "Parent envelope not yet available, creating envelope lookup" + ); + self.handle_unknown_parent_envelope( + peer_id, + block_root, + parent_root, + block_slot, + BlockComponent::Block(DownloadResult { + value: block.block_cloned(), + block_root, + seen_timestamp: timestamp_now(), + peer_group: PeerGroup::from_single(peer_id), + }), + ); + } + SyncMessage::GossipEnvelopeImported { block_root } => { + debug!( + %block_root, + "Gossip-imported envelope; unblocking awaiting child lookups" + ); + self.block_lookups + .continue_envelope_child_lookups(block_root, &mut self.network); + } SyncMessage::UnknownParentPartialDataColumn { peer_id, block_root, @@ -1036,6 +1098,55 @@ impl SyncManager { } } + /// Handle a block whose parent block is known but parent envelope is missing. + /// Creates an envelope-only lookup for the parent and a child lookup that waits for it. + fn handle_unknown_parent_envelope( + &mut self, + peer_id: PeerId, + block_root: Hash256, + parent_root: Hash256, + slot: Slot, + block_component: BlockComponent, + ) { + // Defensive: if the parent's payload envelope was already received between when + // gossip-verification raised `ParentEnvelopeUnknown` and now, no lookup is needed. + if self + .chain + .canonical_head + .fork_choice_read_lock() + .is_payload_received(&parent_root) + { + debug!( + %block_root, + %parent_root, + "Parent envelope already received, skipping envelope lookup" + ); + return; + } + match self.should_search_for_block(Some(slot), &peer_id) { + Ok(_) => { + if self.block_lookups.search_child_and_parent_envelope( + block_root, + block_component, + parent_root, + peer_id, + &mut self.network, + ) { + // Lookups created + } else { + debug!( + ?block_root, + ?parent_root, + "No lookup created for child and parent envelope" + ); + } + } + Err(reason) => { + debug!(%block_root, %parent_root, reason, "Ignoring unknown parent envelope request"); + } + } + } + fn handle_unknown_block_root(&mut self, peer_id: PeerId, block_root: Hash256) { match self.should_search_for_block(None, &peer_id) { Ok(_) => { @@ -1231,6 +1342,46 @@ impl SyncManager { } } + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => self.on_single_envelope_response( + id, + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ), + _ => { + crit!(%peer_id, "bad request id for payload envelope"); + } + } + } + + fn on_single_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_single_envelope_response(id, peer_id, rpc_event) + { + self.block_lookups + .on_download_response::>( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) + } + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b1ba87c75d3..4b6a95bf04d 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -37,6 +37,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +53,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -213,6 +214,9 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRange requests data_columns_by_range_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRoot requests + payload_envelopes_by_root_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, @@ -298,6 +302,7 @@ impl SyncNetworkContext { blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), + payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), custody_by_root_requests: <_>::default(), components_by_range_requests: FnvHashMap::default(), custody_backfill_data_column_batch_requests: FnvHashMap::default(), @@ -326,6 +331,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_root_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -361,12 +367,17 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + let envelope_by_root_ids = payload_envelopes_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id }); blocks_by_root_ids .chain(blobs_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) .chain(data_column_by_range_ids) + .chain(envelope_by_root_ids) .collect() } @@ -423,6 +434,7 @@ impl SyncNetworkContext { blocks_by_range_requests, blobs_by_range_requests, data_columns_by_range_requests, + payload_envelopes_by_root_requests, // custody_by_root_requests is a meta request of data_columns_by_root_requests custody_by_root_requests: _, // components_by_range_requests is a meta request of various _by_range requests @@ -445,6 +457,7 @@ impl SyncNetworkContext { .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) .chain(data_columns_by_range_requests.iter_request_peers()) + .chain(payload_envelopes_by_root_requests.iter_request_peers()) { *active_request_count_by_peer.entry(peer_id).or_default() += 1; } @@ -927,6 +940,74 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(id.req_id)) } + /// Request a payload envelope for `block_root` from a peer. + pub fn envelope_lookup_request( + &mut self, + lookup_id: SingleLookupId, + lookup_peers: Arc>>, + block_root: Hash256, + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + let request = PayloadEnvelopesByRootSingleRequest(block_root); + + let network_request = RequestType::PayloadEnvelopesByRoot( + request + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: network_request, + app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRoot", + ?block_root, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + let request_span = debug_span!( + parent: Span::current(), + "lh_outgoing_envelope_by_root_request", + %block_root, + ); + self.payload_envelopes_by_root_requests.insert( + id, + peer_id, + true, + PayloadEnvelopesByRootRequestItems::new(request), + request_span, + ); + + Ok(LookupRequestResult::RequestSent(id.req_id)) + } + /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// - If we have a downloaded but not yet processed block /// - If the da_checker has a pending block @@ -1435,6 +1516,27 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + pub(crate) fn on_single_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>> { + let resp = self + .payload_envelopes_by_root_requests + .on_response(id, rpc_event); + let resp = resp.map(|res| { + res.and_then(|(mut envelopes, seen_timestamp)| { + match envelopes.pop() { + Some(envelope) => Ok((envelope, seen_timestamp)), + // Should never happen, request items enforces at least 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), + } + }) + }); + self.on_rpc_response_result(resp, peer_id) + } + pub(crate) fn on_single_blob_response( &mut self, id: SingleLookupReqId, @@ -1610,6 +1712,33 @@ impl SyncNetworkContext { }) } + pub fn send_envelope_for_processing( + &self, + id: Id, + envelope: Arc>, + seen_timestamp: Duration, + block_root: Hash256, + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(?block_root, ?id, "Sending payload envelope for processing"); + beacon_processor + .send_rpc_payload_envelope( + envelope, + seen_timestamp, + BlockProcessType::SinglePayloadEnvelope { id, block_root }, + ) + .map_err(|e| { + error!( + error = ?e, + "Failed to send sync envelope to processor" + ); + SendErrorProcessor::SendError + }) + } + pub fn send_blobs_for_processing( &self, id: Id, @@ -1788,6 +1917,10 @@ impl SyncNetworkContext { "data_columns_by_range", self.data_columns_by_range_requests.len(), ), + ( + "payload_envelopes_by_root", + self.payload_envelopes_by_root_requests.len(), + ), ("custody_by_root", self.custody_by_root_requests.len()), ( "components_by_range", diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index ad60dffb455..8c091eca807 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_root::{ + PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, +}; use crate::metrics; @@ -27,6 +30,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs new file mode 100644 index 00000000000..7f7097971d6 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs @@ -0,0 +1,53 @@ +use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest; +use std::sync::Arc; +use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope}; + +use super::{ActiveRequestItems, LookupVerifyError}; + +#[derive(Debug, Copy, Clone)] +pub struct PayloadEnvelopesByRootSingleRequest(pub Hash256); + +impl PayloadEnvelopesByRootSingleRequest { + pub fn into_request( + self, + fork_context: &ForkContext, + ) -> Result { + PayloadEnvelopesByRootRequest::new(vec![self.0], fork_context) + } +} + +pub struct PayloadEnvelopesByRootRequestItems { + request: PayloadEnvelopesByRootSingleRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRootRequestItems { + pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRootRequestItems { + type Item = Arc>; + + /// Append a response to the single chunk request. If the chunk is valid, the request is + /// resolved immediately. + /// The active request SHOULD be dropped after `add_response` returns an error + fn add(&mut self, envelope: Self::Item) -> Result { + let beacon_block_root = envelope.beacon_block_root(); + if self.request.0 != beacon_block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(beacon_block_root)); + } + + self.items.push(envelope); + // Always returns true, payload envelopes by root expects a single response + Ok(true) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a26996ec5ee..35c45eb9286 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -37,7 +37,7 @@ use tokio::sync::mpsc; use tracing::info; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, ForkName, - Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, + Hash256, MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, test_utils::{SeedableRng, XorShiftRng}, }; @@ -85,6 +85,9 @@ pub struct SimulateConfig { ee_offline_for_n_range_responses: Option, /// Disconnect all peers after this many successful BlocksByRange responses. successful_range_responses_before_disconnect: Option, + /// Number of `PayloadEnvelopesByRoot` responses that return an envelope for a + /// different block_root than requested. + return_wrong_envelopes_n_times: usize, } impl SimulateConfig { @@ -116,6 +119,11 @@ impl SimulateConfig { self } + fn return_wrong_envelope_once(mut self) -> Self { + self.return_wrong_envelopes_n_times = 1; + self + } + fn return_wrong_sidecar_for_block_once(mut self) -> Self { self.return_wrong_sidecar_for_block_n_times = 1; self @@ -209,6 +217,9 @@ pub(crate) struct TestRigConfig { fulu_test_type: FuluTestType, /// Override the node custody type derived from `fulu_test_type` node_custody_type_override: Option, + /// Override the number of validators in the harness genesis state. Defaults to 1. + /// Some forks (e.g. Gloas) cannot initialise a state with a single validator. + validator_count_override: Option, } impl TestRig { @@ -222,9 +233,9 @@ impl TestRig { ); // Initialise a new beacon chain - let harness = BeaconChainHarness::>::builder(E) + let mut builder = BeaconChainHarness::>::builder(E) .spec(spec.clone()) - .deterministic_keypairs(1) + .deterministic_keypairs(test_rig_config.validator_count_override.unwrap_or(1)) .fresh_ephemeral_store() .mock_execution_layer() .testing_slot_clock(clock.clone()) @@ -232,8 +243,17 @@ impl TestRig { test_rig_config .node_custody_type_override .unwrap_or_else(|| test_rig_config.fulu_test_type.we_node_custody_type()), - ) - .build(); + ); + // Post-Electra forks need validators with effective balance close to + // `max_effective_balance_electra` for balance-weighted committee + // selection (sync committee, PTC) to converge during genesis. + if spec.electra_fork_epoch == Some(types::Epoch::new(0)) { + let max_eb = spec.max_effective_balance_electra; + builder = builder.with_genesis_state_builder(move |b| { + b.set_initial_balance_fn(Box::new(move |_| max_eb)) + }); + } + let harness = builder.build(); let chain = harness.chain.clone(); let fork_context = Arc::new(ForkContext::new::( @@ -305,6 +325,7 @@ impl TestRig { fork_name, network_blocks_by_root: <_>::default(), network_blocks_by_slot: <_>::default(), + network_envelopes_by_root: <_>::default(), penalties: <_>::default(), seen_lookups: <_>::default(), requests: <_>::default(), @@ -319,6 +340,7 @@ impl TestRig { Self::new(TestRigConfig { fulu_test_type: FuluTestType::WeFullnodeThemSupernode, node_custody_type_override: None, + validator_count_override: None, }) } @@ -327,6 +349,7 @@ impl TestRig { Self::new(TestRigConfig { fulu_test_type: FuluTestType::WeFullnodeThemSupernode, node_custody_type_override: Some(node_custody_type), + validator_count_override: None, }) } @@ -429,9 +452,9 @@ impl TestRig { process_fn.await } } - Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) => { - process_fn.await - } + Work::RpcBlobs { process_fn } + | Work::RpcCustodyColumn(process_fn) + | Work::RpcPayloadEnvelope { process_fn } => process_fn.await, Work::ChainSegment { process_fn, process_id: (chain_id, batch_epoch), @@ -671,6 +694,45 @@ impl TestRig { self.send_rpc_columns_response(req_id, peer_id, &columns); } + (RequestType::PayloadEnvelopesByRoot(req), AppRequestId::Sync(req_id)) => { + if self.complete_strategy.return_no_data_n_times > 0 { + self.complete_strategy.return_no_data_n_times -= 1; + return self.send_rpc_envelopes_response(req_id, peer_id, &[]); + } + + if self.complete_strategy.return_wrong_envelopes_n_times > 0 { + self.complete_strategy.return_wrong_envelopes_n_times -= 1; + // Return any envelope that doesn't match the request, so the + // request items layer raises `UnrequestedBlockRoot`. + let requested = req + .beacon_block_roots + .iter() + .copied() + .collect::>(); + let wrong = self + .network_envelopes_by_root + .iter() + .find(|(root, _)| !requested.contains(*root)) + .map(|(_, envelope)| envelope.clone()) + .expect("test fixture must produce at least one extra envelope"); + return self.send_rpc_envelopes_response(req_id, peer_id, &[wrong]); + } + + let envelopes = req + .beacon_block_roots + .iter() + .map(|block_root| { + self.network_envelopes_by_root + .get(block_root) + .unwrap_or_else(|| { + panic!("Test consumer requested unknown envelope: {block_root:?}") + }) + .clone() + }) + .collect::>(); + self.send_rpc_envelopes_response(req_id, peer_id, &envelopes); + } + (RequestType::BlocksByRange(req), AppRequestId::Sync(req_id)) => { if self.complete_strategy.skip_by_range_routes { return; @@ -894,6 +956,36 @@ impl TestRig { }); } + fn send_rpc_envelopes_response( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelopes: &[Arc>], + ) { + let block_roots = envelopes + .iter() + .map(|e| e.beacon_block_root()) + .collect::>(); + self.log(&format!( + "Completing request {sync_request_id:?} to {peer_id} with envelopes for {block_roots:?}" + )); + + for envelope in envelopes { + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: Some(envelope.clone()), + seen_timestamp: D, + }); + } + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: None, + seen_timestamp: D, + }); + } + fn send_rpc_columns_response( &mut self, sync_request_id: SyncRequestId, @@ -936,16 +1028,25 @@ impl TestRig { pub(super) async fn build_chain(&mut self, block_count: usize) -> Hash256 { let mut blocks = vec![]; - // Initialise a new beacon chain - let external_harness = BeaconChainHarness::>::builder(E) + // Initialise a new beacon chain. Match the local harness's validator count and + // balance hooks so post-Electra forks (where genesis-time committee selection is + // balance-weighted) can initialise. + let validator_count = self.harness.validator_keypairs.len(); + let mut builder = BeaconChainHarness::>::builder(E) .spec(self.harness.spec.clone()) - .deterministic_keypairs(1) + .deterministic_keypairs(validator_count) .fresh_ephemeral_store() .mock_execution_layer() .testing_slot_clock(self.harness.chain.slot_clock.clone()) // Make the external harness a supernode so all columns are available - .node_custody_type(NodeCustodyType::Supernode) - .build(); + .node_custody_type(NodeCustodyType::Supernode); + if self.harness.spec.electra_fork_epoch == Some(types::Epoch::new(0)) { + let max_eb = self.harness.spec.max_effective_balance_electra; + builder = builder.with_genesis_state_builder(move |b| { + b.set_initial_balance_fn(Box::new(move |_| max_eb)) + }); + } + let external_harness = builder.build(); // Ensure all blocks have data. Otherwise, the triggers for unknown blob parent and unknown // data column parent fail. external_harness @@ -974,6 +1075,16 @@ impl TestRig { self.network_blocks_by_root .insert(block_root, block.clone()); self.network_blocks_by_slot.insert(block_slot, block); + // Post-Gloas, also capture the execution payload envelope so peers can serve it. + if self.is_after_gloas() + && let Ok(Some(envelope)) = external_harness + .chain + .store + .get_payload_envelope(&block_root) + { + self.network_envelopes_by_root + .insert(block_root, Arc::new(envelope)); + } self.log(&format!( "Produced block {} index {i} in external harness", block_slot, @@ -1002,6 +1113,21 @@ impl TestRig { self.re_insert_block(Arc::new(block), blobs, columns); } + /// Replace the cached envelope's signature for `block_root` with one signed by an + /// unrelated key, so it fails verification against the proposer's pubkey. + fn corrupt_envelope_signature_for(&mut self, block_root: Hash256) { + let envelope = self + .network_envelopes_by_root + .get(&block_root) + .expect("no envelope cached for block_root") + .as_ref() + .clone(); + let mut envelope = envelope; + envelope.signature = self.valid_signature(); + self.network_envelopes_by_root + .insert(block_root, Arc::new(envelope)); + } + fn valid_signature(&mut self) -> bls::Signature { let keypair = bls::Keypair::random(); let msg = Hash256::random(); @@ -1178,6 +1304,32 @@ impl TestRig { self.harness.chain.recompute_head_at_current_slot().await; } + /// Persist a Gloas execution payload envelope into the local chain and mark the + /// block as "payload received" in fork choice. Mimics the side-effects of the + /// gossip-import path, including the `GossipEnvelopeImported` sync notification. + /// The caller is responsible for ensuring the corresponding beacon block is + /// already imported. + async fn import_envelope_for_block_root(&mut self, block_root: Hash256) { + let envelope = self + .network_envelopes_by_root + .get(&block_root) + .unwrap_or_else(|| panic!("no envelope cached for {block_root:?}")) + .as_ref() + .clone(); + self.harness + .chain + .store + .put_payload_envelope(&block_root, &envelope) + .expect("should store envelope"); + self.harness + .chain + .canonical_head + .fork_choice_write_lock() + .on_valid_payload_envelope_received(block_root) + .expect("should update fork choice with envelope"); + self.push_sync_message(SyncMessage::GossipEnvelopeImported { block_root }); + } + /// Import a block directly into the chain without going through lookup sync async fn import_block_by_root(&mut self, block_root: Hash256) { let range_sync_block = self @@ -1444,6 +1596,7 @@ impl TestRig { Self::new(TestRigConfig { fulu_test_type, node_custody_type_override: None, + validator_count_override: None, }) }) } @@ -1460,6 +1613,22 @@ impl TestRig { self.fork_name.fulu_enabled() } + pub fn is_after_gloas(&self) -> bool { + self.fork_name.gloas_enabled() + } + + fn new_after_gloas() -> Option { + // Gloas requires more than 1 validator to initialise the genesis state + // (committee/sampling computations fail with `InvalidIndicesCount`). + genesis_fork().gloas_enabled().then(|| { + Self::new(TestRigConfig { + fulu_test_type: FuluTestType::WeFullnodeThemSupernode, + node_custody_type_override: None, + validator_count_override: Some(1024), + }) + }) + } + fn trigger_unknown_parent_block(&mut self, peer_id: PeerId, block: Arc>) { let block_root = block.canonical_root(); self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root)) @@ -1483,6 +1652,18 @@ impl TestRig { )); } + /// Trigger an envelope-unknown lookup for the last block in the chain. Caller is + /// expected to have already imported the parent block (via `import_blocks_up_to_slot`) + /// without registering its envelope. + fn trigger_with_last_unknown_parent_envelope(&mut self) { + let peer_id = self.new_connected_supernode_peer(); + let last_block = self.get_last_block().block_cloned(); + let block_root = last_block.canonical_root(); + self.send_sync_message(SyncMessage::UnknownParentEnvelope( + peer_id, last_block, block_root, + )); + } + fn rand_block(&mut self) -> SignedBeaconBlock { self.rand_block_and_blobs(NumBlobs::None).0 } @@ -2639,3 +2820,172 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() { r.assert_penalties_of_type("lookup_custody_column_processing_failure"); } } + +// --------------------------------------------------------------------------- +// Gloas: parent envelope unknown lookup +// --------------------------------------------------------------------------- +// +// These tests exercise the lookup-sync state machine introduced in PR #9039: +// when a gossip block's parent execution payload envelope is missing, +// `SyncManager` is expected to create two single-block lookups — an envelope-only +// lookup for the parent block_root and a "child" lookup that holds the gossip +// block and waits on `AwaitingParent::Envelope(parent_root)`. The envelope-only +// lookup issues a `PayloadEnvelopesByRoot` RPC; on completion it unblocks the +// child via `continue_envelope_child_lookups`. +// +// The tests below cover lookup creation, RPC routing, and drop-cascade +// behaviour. The end-to-end happy path is gated on +// `process_execution_payload_envelope` supporting `AvailabilityPending` (today +// it returns `InternalError("Pending payload envelope not yet implemented")`), +// which is tracked separately. See `process_rpc_envelope` in `sync_methods.rs`. + +/// Builds a 2-block gloas chain in the external harness and locally imports block 1 +/// (parent) WITHOUT registering its envelope, leaving `is_payload_received(parent_root)` +/// false — the precondition for `BlockError::ParentEnvelopeUnknown`. +async fn setup_unknown_parent_envelope_scenario() -> Option { + let mut r = TestRig::new_after_gloas()?; + r.build_chain(2).await; + r.import_blocks_up_to_slot(1).await; + Some(r) +} + +fn payload_envelope_request_count(rig: &TestRig) -> usize { + rig.requests + .iter() + .filter(|(request, _)| matches!(request, RequestType::PayloadEnvelopesByRoot(_))) + .count() +} + +/// Triggering `UnknownParentEnvelope` creates exactly two lookups: an envelope-only +/// lookup for the parent and a child lookup for the gossip block awaiting that envelope. +#[tokio::test] +async fn unknown_parent_envelope_creates_two_lookups() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + r.trigger_with_last_unknown_parent_envelope(); + r.assert_single_lookups_count(2); +} + +/// Repeated `UnknownParentEnvelope` triggers for the same parent must not spawn extra +/// lookups (peers are merged into the existing envelope lookup). +#[tokio::test] +async fn happy_path_unknown_parent_envelope_multiple_triggers() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + r.trigger_with_last_unknown_parent_envelope(); + r.trigger_with_last_unknown_parent_envelope(); + r.assert_single_lookups_count(2); +} + +/// The envelope-only lookup must dispatch a `PayloadEnvelopesByRoot` RPC for the +/// parent block_root. +#[tokio::test] +async fn envelope_lookup_issues_by_root_rpc() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + r.trigger_with_last_unknown_parent_envelope(); + r.simulate(SimulateConfig::new()).await; + assert_eq!( + payload_envelope_request_count(&r), + 1, + "expected exactly one PayloadEnvelopesByRoot request" + ); +} + +/// One transient RPC error on the envelope request → lookup retries with the same peer +/// and completes successfully. Mirrors the `bad_peer_rpc_failure` shape used for blocks. +#[tokio::test] +async fn bad_peer_envelope_rpc_failure() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + r.trigger_with_last_unknown_parent_envelope(); + r.simulate(SimulateConfig::new().return_rpc_error(RPCError::IoError("test".into()))) + .await; + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); +} + +/// Peer responds once with an envelope for a different block_root than requested. +/// The request-items layer raises `UnrequestedBlockRoot`, the peer is penalised, and +/// the lookup retries successfully on the next request. +#[tokio::test] +async fn bad_peer_wrong_envelope_response() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + r.trigger_with_last_unknown_parent_envelope(); + r.simulate(SimulateConfig::new().return_wrong_envelope_once()) + .await; + r.assert_penalties_of_type("UnrequestedBlockRoot"); + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); +} + +/// Trigger `UnknownParentEnvelope` when the parent's payload envelope is already +/// in fork choice. Sync should treat the trigger as a no-op and create no lookups. +#[tokio::test] +async fn envelope_already_received_skips_lookup() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + let parent_root = r.get_last_block().block_cloned().parent_root(); + r.import_envelope_for_block_root(parent_root).await; + r.trigger_with_last_unknown_parent_envelope(); + r.assert_single_lookups_count(0); +} + +/// End-to-end: an envelope-only RPC lookup completes, the cached child block is +/// processed, and the head advances to the gossip block. +#[tokio::test] +async fn happy_path_unknown_parent_envelope() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + r.trigger_with_last_unknown_parent_envelope(); + r.simulate(SimulateConfig::happy_path()).await; + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); + r.assert_no_penalties(); +} + +/// While an envelope-only RPC lookup is pending, the same envelope is imported +/// via the gossip path. The child lookup should still unblock and import. +#[tokio::test] +async fn happy_path_unknown_parent_envelope_via_gossip() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + let parent_root = r.get_last_block().block_cloned().parent_root(); + r.trigger_with_last_unknown_parent_envelope(); + // Import the envelope via the local gossip path before any RPC response arrives. + r.import_envelope_for_block_root(parent_root).await; + r.simulate(SimulateConfig::happy_path()).await; + r.assert_successful_lookup_sync(); + r.assert_head_slot(2); +} + +/// Peer returns the requested envelope but with a corrupted signature. Gossip +/// verification rejects it; the lookup retries (single peer → exhaust → drop) +/// and reports `lookup_envelope_processing_failure` against the peer. +#[tokio::test] +async fn crypto_on_fail_with_bad_envelope_signature() { + let Some(mut r) = setup_unknown_parent_envelope_scenario().await else { + return; + }; + let parent_root = r.get_last_block().block_cloned().parent_root(); + r.corrupt_envelope_signature_for(parent_root); + r.trigger_with_last_unknown_parent_envelope(); + r.simulate(SimulateConfig::happy_path()).await; + if cfg!(feature = "fake_crypto") { + // Under fake_crypto, signature checks are no-ops, so a "corrupted" + // signature still passes. Skip — analogous to the existing + // `crypto_on_fail_with_invalid_block_signature` test. + return; + } + r.assert_failed_lookup_sync(); + r.assert_penalties_of_type("lookup_envelope_processing_failure"); +} diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 8ffe24dda51..917e11b9ff9 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -22,7 +22,7 @@ use tokio::sync::mpsc; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use types::{ForkName, Hash256, MinimalEthSpec as E, Slot}; +use types::{ForkName, Hash256, MinimalEthSpec as E, SignedExecutionPayloadEnvelope, Slot}; mod lookups; mod range; @@ -79,6 +79,8 @@ struct TestRig { /// Blocks that will be used in the test but may not be known to `harness` yet. network_blocks_by_root: HashMap>, network_blocks_by_slot: HashMap>, + /// Execution payload envelopes (Gloas) keyed by beacon block root, available to peers. + network_envelopes_by_root: HashMap>>, penalties: Vec, /// All seen lookups through the test run seen_lookups: HashMap, diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 593aa27915b..4ab3df05d7c 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1514,6 +1514,11 @@ where } } + /// Returns whether the payload envelope has been received for the given block. + pub fn is_payload_received(&self, block_root: &Hash256) -> bool { + self.proto_array.is_payload_received(block_root) + } + /// Returns whether the proposer should extend the execution payload chain of the given block. pub fn should_extend_payload(&self, block_root: &Hash256) -> Result> { let proposer_boost_root = self.fc_store.proposer_boost_root();