From ebe9fe228a5633399572826eb9b587825e191cc8 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 22 Apr 2026 00:37:14 +0200 Subject: [PATCH] Gloas lookup sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrites the single block lookup state machine for Gloas, where block, data (blobs/columns), and execution payload envelope are independent components that can arrive and import out of order. - Three additive-only sub-state-machines for block / data / payload streams. Peer sets start empty for data/payload and grow as children arrive — the parent lookup's completion requirement can widen over time without mutating any state machine. - `AwaitingParent` becomes a struct carrying the child's `parent_block_hash` so the parent can be classified empty/full from the child's bid reference. - Wires `PayloadEnvelopesByRoot` RPC end-to-end through `SyncNetworkContext`: request sending, response routing (`SingleLookupReqId::SinglePayloadEnvelope`), and integration into `PayloadRequest`. Envelope *processing* is still a TODO; only the download path is wired. - Test rig: serves envelopes from a `network_envelopes_by_root` cache populated from the external harness; bumps test validator count to 8 so `proposer_lookahead` can populate at the Fulu → Gloas upgrade. - Enables gloas in `TEST_NETWORK_FORKS`. - Fixes: genesis parent check, infinite retry loop on repeated download failure, no-op in `on_completed_request`, and peer sets not being cleared on disconnect. --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 + .../src/service/api_types.rs | 2 + .../gossip_methods.rs | 7 + beacon_node/network/src/router.rs | 50 +- .../network/src/sync/block_lookups/common.rs | 217 --- .../network/src/sync/block_lookups/mod.rs | 608 ++++--- .../src/sync/block_lookups/parent_chain.rs | 2 +- .../sync/block_lookups/single_block_lookup.rs | 1569 ++++++++++++----- beacon_node/network/src/sync/manager.rs | 131 +- .../network/src/sync/network_context.rs | 107 +- .../src/sync/network_context/requests.rs | 4 + .../requests/payload_envelopes_by_root.rs | 54 + beacon_node/network/src/sync/tests/lookups.rs | 91 +- beacon_node/network/src/sync/tests/mod.rs | 6 +- 14 files changed, 1931 insertions(+), 923 deletions(-) delete mode 100644 beacon_node/network/src/sync/block_lookups/common.rs create mode 100644 beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e14c7c047f1..ae06f8eb423 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5895,6 +5895,12 @@ impl BeaconChain { .contains_block(root) } + // TODO(gloas): implement this once issue #8956 is resolved + pub fn envelope_is_known_to_fork_choice(&self, root: &Hash256) -> bool { + // for now just check the database + self.store.payload_envelope_exists(root).unwrap_or(false) + } + /// Determines the beacon proposer for the next slot. If that proposer is registered in the /// `execution_layer`, provide the `execution_layer` with the necessary information to produce /// `PayloadAttributes` for future calls to fork choice. diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a4438579..4ddd58c19cf 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -23,6 +23,8 @@ pub enum SyncRequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// Request searching for a payload envelope given a hash. + SinglePayloadEnvelope { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), /// Blocks by range request diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 2fe5aec3473..407bf77ef20 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -3666,6 +3666,13 @@ impl NetworkBeaconProcessor { "Processing payload attestation message" ); + // Trigger lookup sync by beacon block root. Treat payload attestations as unknown block + // root signals (same as attestation-style lookup trigger). + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + payload_attestation_message.data.beacon_block_root, + )); + // For now, ignore all payload attestation messages since verification is not implemented self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 3f0e329e914..e9a056a1e73 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -24,7 +24,10 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock}; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, +}; /// Handles messages from the network and routes them to the appropriate service to be handled. pub struct Router { @@ -327,10 +330,13 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(envelope) => { + self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); + } + // TODO(EIP-7732): implement outgoing payload envelopes by range responses + // once sync manager requests them. + Response::PayloadEnvelopesByRange(_) => { + debug!("Requesting envelopes by range not supported yet"); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -795,6 +801,40 @@ impl Router { } } + /// Handle a `PayloadEnvelopesByRoot` response from the peer. + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SinglePayloadEnvelope { .. } => id, + other => { + crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }, + AppRequestId::Router => { + crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync"); + return; + } + AppRequestId::Internal => unreachable!("Handled internally"), + }; + + trace!( + %peer_id, + "Received PayloadEnvelopesByRoot Response" + ); + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp: self.chain.slot_clock.now_duration().unwrap_or_default(), + }); + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs deleted file mode 100644 index edd99345b43..00000000000 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ /dev/null @@ -1,217 +0,0 @@ -use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, SingleBlockLookup, SingleLookupRequestState, -}; -use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, -}; -use crate::sync::manager::BlockProcessType; -use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; -use beacon_chain::BeaconChainTypes; -use lighthouse_network::service::api_types::Id; -use parking_lot::RwLock; -use std::collections::HashSet; -use std::sync::Arc; -use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; - -use super::SingleLookupId; -use super::single_block_lookup::{ComponentRequests, DownloadResult}; - -#[derive(Debug, Copy, Clone)] -pub enum ResponseType { - Block, - Blob, - CustodyColumn, -} - -/// This trait unifies common single block lookup functionality across blocks and blobs. This -/// includes making requests, verifying responses, and handling processing results. A -/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is -/// implemented for each. -/// -/// The use of the `ResponseType` associated type gives us a degree of type -/// safety when handling a block/blob response ensuring we only mutate the correct corresponding -/// state. -pub trait RequestState { - /// The type created after validation. - type VerifiedResponseType: Clone; - - /// Request the network context to prepare a request of a component of `block_root`. If the - /// request is not necessary because the component is already known / processed, return false. - /// Return true if it sent a request and we can expect an event back from the network. - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - expected_blobs: usize, - cx: &mut SyncNetworkContext, - ) -> Result; - - /* Response handling methods */ - - /// Send the response to the beacon processor. - fn send_for_processing( - id: Id, - result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError>; - - /* Utility methods */ - - /// Returns the `ResponseType` associated with this trait implementation. Useful in logging. - fn response_type() -> ResponseType; - - /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str>; - - /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. - fn get_state(&self) -> &SingleLookupRequestState; - - /// A getter for a mutable reference to the SingleLookupRequestState associated with this trait. - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; -} - -impl RequestState for BlockRequestState { - type VerifiedResponseType = Arc>; - - fn make_request( - &self, - id: SingleLookupId, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.block_lookup_request(id, lookup_peers, self.requested_block_root) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: SingleLookupId, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_block_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::Block - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - Ok(&mut request.block_request_state) - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} - -impl RequestState for BlobRequestState { - type VerifiedResponseType = FixedBlobSidecarList; - - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - expected_blobs: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: Id, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_blobs_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::Blob - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - match &mut request.component_requests { - ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), - ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), - ComponentRequests::NotNeeded { .. } => Err("not needed"), - } - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} - -impl RequestState for CustodyRequestState { - type VerifiedResponseType = DataColumnSidecarList; - - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.custody_lookup_request(id, self.block_root, lookup_peers) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: Id, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_custody_columns_for_processing( - id, - block_root, - value, - seen_timestamp, - BlockProcessType::SingleCustodyColumn(id), - ) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::CustodyColumn - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - match &mut request.component_requests { - ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), - ComponentRequests::ActiveCustodyRequest(request) => Ok(request), - ComponentRequests::NotNeeded { .. } => Err("not needed"), - } - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 394f2fc37d5..20482c757de 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -22,32 +22,33 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{ + AwaitingParent, LookupRequestError, LookupResult, PeerType, SingleBlockLookup, +}; use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::SyncMessage; -use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::{ AvailabilityCheckError, AvailabilityCheckErrorCategory, }; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; -pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; use store::Hash256; use tracing::{debug, error, warn}; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; +use types::data::FixedBlobSidecarList; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, +}; -pub mod common; pub mod parent_chain; mod single_block_lookup; @@ -77,6 +78,15 @@ const LOOKUP_MAX_DURATION_NO_PEERS_SECS: u64 = 10; /// take at most 2 GB. 200 lookups allow 3 parallel chains of depth 64 (current maximum). const MAX_LOOKUPS: usize = 200; +type BlockDownloadResponse = + Result<(Arc>, PeerGroup, Duration), RpcResponseError>; +type BlobDownloadResponse = + Result<(FixedBlobSidecarList, PeerGroup, Duration), RpcResponseError>; +type CustodyDownloadResponse = + Result<(types::DataColumnSidecarList, PeerGroup, Duration), RpcResponseError>; +type PayloadDownloadResponse = + Result<(Arc>, PeerGroup, Duration), RpcResponseError>; + pub enum BlockComponent { Block(DownloadResult>>), Blob(DownloadResult>>), @@ -106,13 +116,6 @@ impl BlockComponent { pub type SingleLookupId = u32; -enum Action { - Retry, - ParentUnknown { parent_root: Hash256 }, - Drop(/* reason: */ String), - Continue, -} - pub struct BlockLookups { /// A cache of block roots that must be ignored for some time to prevent useless searches. For /// example if a chain is too long, its lookup chain is dropped, and range sync is expected to @@ -205,8 +208,11 @@ impl BlockLookups { ) -> bool { let parent_root = block_component.parent_root(); + // We don't know the child's fork yet (no block downloaded), use PreGloas conservatively. + // The correct AwaitingParent will be set when the child's block downloads. + let awaiting = AwaitingParent::pre_gloas(parent_root); let parent_lookup_exists = - self.search_parent_of_child(parent_root, block_root, &[peer_id], cx); + self.search_parent_of_child(awaiting, block_root, &[peer_id], cx); // Only create the child lookup if the parent exists if parent_lookup_exists { // `search_parent_of_child` ensures that the parent lookup exists so we can safely wait for it @@ -218,6 +224,10 @@ impl BlockLookups { // to have the rest of the block components (refer to decoupled blob gossip). Create // the lookup with zero peers to house the block components. &[], + &PeerType { + data: false, + payload: false, + }, cx, ) } else { @@ -225,7 +235,7 @@ impl BlockLookups { } } - /// Seach a block whose parent root is unknown. + /// Search a block whose parent root is unknown. /// /// Returns true if the lookup is created or already exists #[must_use = "only reference the new lookup if returns true"] @@ -235,7 +245,41 @@ impl BlockLookups { peer_source: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { - self.new_current_lookup(block_root, None, None, peer_source, cx) + self.new_current_lookup( + block_root, + None, + None, + peer_source, + &PeerType { + data: false, + payload: false, + }, + cx, + ) + } + + /// Search for a block triggered by a Gloas data column. The peer that sent the data column + /// is a valid data source, so mark it as data-capable. + /// + /// Returns true if the lookup is created or already exists + #[must_use = "only reference the new lookup if returns true"] + pub fn search_unknown_block_with_data_peer( + &mut self, + block_root: Hash256, + peer_source: &[PeerId], + cx: &mut SyncNetworkContext, + ) -> bool { + self.new_current_lookup( + block_root, + None, + None, + peer_source, + &PeerType { + data: true, + payload: false, + }, + cx, + ) } /// A block or blob triggers the search of a parent. @@ -247,11 +291,19 @@ impl BlockLookups { #[must_use = "only reference the new lookup if returns true"] pub fn search_parent_of_child( &mut self, - block_root_to_search: Hash256, + awaiting_parent: AwaitingParent, child_block_root_trigger: Hash256, peers: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { + let block_root_to_search = awaiting_parent.parent_root(); + + // The zero hash is the parent root of the genesis block, not a real block. + if block_root_to_search == Hash256::ZERO { + debug!("Not searching for zero hash (parent of genesis)"); + return false; + } + let parent_chains = self.active_parent_lookups(); for (chain_idx, parent_chain) in parent_chains.iter().enumerate() { @@ -339,8 +391,29 @@ impl BlockLookups { } } + // Child's peers can serve block, and data + payload if the parent is full. + // In Gloas, data and payload are coupled: empty blocks have neither. + // Pre-Gloas: data is always needed with block, payload is never needed. + let peer_type = if awaiting_parent.is_post_gloas() { + let is_full = self + .single_block_lookups + .values() + .find(|l| l.is_for_block(block_root_to_search)) + .map(|parent| parent.is_full_payload(&awaiting_parent)) + .unwrap_or(false); + PeerType { + data: is_full, + payload: is_full, + } + } else { + PeerType { + data: true, + payload: false, + } + }; + // `block_root_to_search` is a failed chain check happens inside new_current_lookup - self.new_current_lookup(block_root_to_search, None, None, peers, cx) + self.new_current_lookup(block_root_to_search, None, None, peers, &peer_type, cx) } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -353,6 +426,7 @@ impl BlockLookups { block_component: Option>, awaiting_parent: Option, peers: &[PeerId], + peer_type: &PeerType, cx: &mut SyncNetworkContext, ) -> bool { // If this block or it's parent is part of a known ignored chain, ignore it. @@ -378,7 +452,8 @@ impl BlockLookups { } } - if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, peer_type, cx) + { warn!(error = ?e, "Error adding peers to ancestor lookup"); } @@ -405,7 +480,13 @@ impl BlockLookups { // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), // signal here to hold processing downloaded data. - let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let mut lookup = SingleBlockLookup::new( + block_root, + peers, + peer_type, + cx.next_id(), + awaiting_parent.map(AwaitingParent::pre_gloas), + ); let _guard = lookup.span.clone().entered(); // Add block components to the new request @@ -446,88 +527,99 @@ impl BlockLookups { /* Lookup responses */ - /// Process a block or blob response received from a single lookup request. - pub fn on_download_response>( + /// Process a block response received from a single lookup request. + pub fn on_block_download_response( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, + response: BlockDownloadResponse, cx: &mut SyncNetworkContext, ) { - let result = self.on_download_response_inner::(id, response, cx); - self.on_lookup_result(id.lookup_id, result, "download_response", cx); + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { + debug!(?id, "Block returned for single block lookup not present"); + return; + }; + let block_root = lookup.block_root(); + // The downstream state machine only needs success / failure: details about RPC + // failures (peer info, error category) are logged here before being collapsed, so + // debugging still has the full context. + let response = match response { + Ok(ok) => Ok(ok), + Err(err) => { + debug!(?block_root, ?id, ?err, "Block download failed"); + Err(()) + } + }; + let result = lookup.on_block_download_response(id.req_id, response, cx); + self.on_lookup_result(id.lookup_id, result, "block_download_response", cx); } - /// Process a block or blob response received from a single lookup request. - pub fn on_download_response_inner>( + pub fn on_blob_download_response( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, + response: BlobDownloadResponse, cx: &mut SyncNetworkContext, - ) -> Result { - // Note: no need to downscore peers here, already downscored on network context - - let response_type = R::response_type(); + ) { let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { - // We don't have the ability to cancel in-flight RPC requests. So this can happen - // if we started this RPC request, and later saw the block/blobs via gossip. - debug!(?id, "Block returned for single block lookup not present"); - return Err(LookupRequestError::UnknownLookup); + debug!(?id, "Blob returned for single block lookup not present"); + return; }; - let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))? - .get_state_mut(); - - match response { - Ok((response, peer_group, seen_timestamp)) => { - debug!( - ?block_root, - ?id, - ?peer_group, - ?response_type, - "Received lookup download success" - ); - - // Here we could check if response extends a parent chain beyond its max length. - // However we defer that check to the handling of a processing error ParentUnknown. - // - // Here we could check if there's already a lookup for parent_root of `response`. In - // that case we know that sending the response for processing will likely result in - // a `ParentUnknown` error. However, for simplicity we choose to not implement this - // optimization. - - // Register the download peer here. Once we have received some data over the wire we - // attribute it to this peer for scoring latter regardless of how the request was - // done. - request_state.on_download_success( - id.req_id, - DownloadResult { - value: response, - block_root, - seen_timestamp, - peer_group, - }, - )?; - // continue_request will send for processing as the request state is AwaitingProcessing + let response = match response { + Ok(ok) => Ok(ok), + Err(err) => { + debug!(?block_root, ?id, ?err, "Blob download failed"); + Err(()) } - Err(e) => { - // No need to log peer source here. When sending a DataColumnsByRoot request we log - // the peer and the request ID which is linked to this `id` value here. - debug!( - ?block_root, - ?id, - ?response_type, - error = ?e, - "Received lookup download failure" - ); + }; + let result = lookup.on_blob_download_response(id.req_id, response, cx); + self.on_lookup_result(id.lookup_id, result, "blob_download_response", cx); + } - request_state.on_download_failure(id.req_id)?; - // continue_request will retry a download as the request state is AwaitingDownload + pub fn on_custody_download_response( + &mut self, + id: SingleLookupReqId, + response: CustodyDownloadResponse, + cx: &mut SyncNetworkContext, + ) { + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { + debug!(?id, "Custody returned for single block lookup not present"); + return; + }; + let block_root = lookup.block_root(); + let response = match response { + Ok(ok) => Ok(ok), + Err(err) => { + debug!(?block_root, ?id, ?err, "Custody download failed"); + Err(()) } - } + }; + let result = lookup.on_custody_download_response(id.req_id, response, cx); + self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx); + } - lookup.continue_requests(cx) + pub fn on_payload_download_response( + &mut self, + id: SingleLookupReqId, + response: PayloadDownloadResponse, + cx: &mut SyncNetworkContext, + ) { + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { + debug!( + ?id, + "Payload envelope returned for single block lookup not present" + ); + return; + }; + let block_root = lookup.block_root(); + let response = match response { + Ok(ok) => Ok(ok), + Err(err) => { + debug!(?block_root, ?id, ?err, "Payload envelope download failed"); + Err(()) + } + }; + let result = lookup.on_payload_download_response(id.req_id, response, cx); + self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx); } /* Error responses */ @@ -549,21 +641,22 @@ impl BlockLookups { result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) { + let lookup_id = process_type.id(); let lookup_result = match process_type { - BlockProcessType::SingleBlock { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleBlob { id } => { - self.on_processing_result_inner::>(id, result, cx) + BlockProcessType::SingleBlock { .. } => { + self.on_block_processing_result(lookup_id, result, cx) } - BlockProcessType::SingleCustodyColumn(id) => { - self.on_processing_result_inner::>(id, result, cx) + BlockProcessType::SingleBlob { .. } | BlockProcessType::SingleCustodyColumn(_) => { + self.on_data_processing_result(lookup_id, result, cx) } }; - self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); + self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx); } - pub fn on_processing_result_inner>( + /// Handle block processing result. The block is sent for processing alone (without data). + /// On success: marks block processing done and advances data/payload streams. + /// On error: penalizes block peer, resets all streams, retries from scratch. + fn on_block_processing_result( &mut self, lookup_id: SingleLookupId, result: BlockProcessingResult, @@ -575,180 +668,146 @@ impl BlockLookups { }; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))? - .get_state_mut(); debug!( - component = ?R::response_type(), ?block_root, id = lookup_id, ?result, - "Received lookup processing result" + "Received block processing result" ); - let action = match result { + match result { + // Block processed successfully (imported or missing components — both are ok since + // we send the block alone first, data follows independently) BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { + .. + }) | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) | BlockProcessingResult::Err(BlockError::GenesisBlock) => { - // Successfully imported - request_state.on_processing_success()?; - Action::Continue - } - - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { - .. - }) => { - // `on_processing_success` is called here to ensure the request state is updated prior to checking - // if both components have been processed. - request_state.on_processing_success()?; - - if lookup.all_components_processed() { - // We don't request for other block components until being sure that the block has - // data. If we request blobs / columns to a peer we are sure those must exist. - // Therefore if all components are processed and we still receive `MissingComponents` - // it indicates an internal bug. - return Err(LookupRequestError::MissingComponentsAfterAllProcessed); - } else { - // Continue request, potentially request blobs - Action::Retry - } - } - BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => { - // This is unreachable because RPC blocks do not undergo gossip verification, and - // this error can *only* come from gossip verification. - error!(?block_root, "Single block lookup hit unreachable condition"); - Action::Drop("DuplicateImportStatusUnknown".to_owned()) + lookup.on_block_processing_result(true, cx) } BlockProcessingResult::Ignored => { - // Beacon processor signalled to ignore the block processing result. - // This implies that the cpu is overloaded. Drop the request. - warn!( - component = ?R::response_type(), - "Lookup component processing ignored, cpu might be overloaded" - ); - Action::Drop("Block processing ignored".to_owned()) + warn!("Block processing ignored, cpu might be overloaded"); + Err(LookupRequestError::Failed( + "Block processing ignored".to_owned(), + )) } BlockProcessingResult::Err(e) => { - match e { - BlockError::BeaconChainError(e) => { - // Internal error - error!(%block_root, error = ?e, "Beacon chain error processing lookup component"); - Action::Drop(format!("{e:?}")) - } - BlockError::ParentUnknown { parent_root, .. } => { - // Reverts the status of this request to `AwaitingProcessing` holding the - // downloaded data. A future call to `continue_requests` will re-submit it - // once there are no pending parent requests. - // Note: `BlockError::ParentUnknown` is only returned when processing - // blocks, not blobs. - request_state.revert_to_awaiting_processing()?; - Action::ParentUnknown { parent_root } - } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - ?block_root, - error = ?e, - "Single block lookup failed. Execution layer is offline / unsynced / misconfigured" - ); - Action::Drop(format!("{e:?}")) + debug!(?block_root, error = ?e, "Block processing error, retrying"); + + match &e { + BlockError::ParentUnknown { .. } => { + return Err(LookupRequestError::InternalError( + "ParentUnknown on processing".to_string(), + )); } + // No penalization for internal / non-attributable errors + BlockError::BeaconChainError(_) + | BlockError::DuplicateImportStatusUnknown(..) => {} + BlockError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {} BlockError::AvailabilityCheck(e) - if e.category() == AvailabilityCheckErrorCategory::Internal => - { - // There errors indicate internal problems and should not downscore the peer - warn!(?block_root, error = ?e, "Internal availability check failure"); - - // Here we choose *not* to call `on_processing_failure` because this could result in a bad - // lookup state transition. This error invalidates both blob and block requests, and we don't know the - // state of both requests. Blobs may have already successfullly processed for example. - // We opt to drop the lookup instead. - Action::Drop(format!("{e:?}")) - } - other => { - debug!( - ?block_root, - component = ?R::response_type(), - error = ?other, - "Invalid lookup component" - ); - let peer_group = request_state.on_processing_failure()?; - let peers_to_penalize: Vec<_> = match other { - // Note: currenlty only InvalidColumn errors have index granularity, - // but future errors may follow the same pattern. Generalize this - // pattern with https://github.com/sigp/lighthouse/pull/6321 - BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn((index_opt, _)), - ) => { - match index_opt { - Some(index) => peer_group.of_index(index as usize).collect(), - // If no index supplied this is an un-attributable fault. In practice - // this should never happen. - None => vec![], - } - } - _ => peer_group.all().collect(), - }; - for peer in peers_to_penalize { + if e.category() == AvailabilityCheckErrorCategory::Internal => {} + // All other attributable errors: penalize the block peer + _ => { + if let Some(block_peer) = lookup.block_peer() { cx.report_peer( - *peer, + block_peer, PeerAction::MidToleranceError, - match R::response_type() { - ResponseType::Block => "lookup_block_processing_failure", - ResponseType::Blob => "lookup_blobs_processing_failure", - ResponseType::CustodyColumn => { - "lookup_custody_column_processing_failure" - } - }, + "lookup_block_processing_failure", ); } - - Action::Retry } } + + // Block processing failed — reset everything and retry from scratch + lookup.on_block_processing_result(false, cx) } + } + } + + /// Handle data processing result (blobs or custody columns). + /// On success: marks data processing done, may complete the lookup. + /// On error: penalizes data peers, retries data download only. + fn on_data_processing_result( + &mut self, + lookup_id: SingleLookupId, + result: BlockProcessingResult, + cx: &mut SyncNetworkContext, + ) -> Result { + let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { + debug!(id = lookup_id, "Unknown single block lookup"); + return Err(LookupRequestError::UnknownLookup); }; - match action { - Action::Retry => { - // Trigger download for all components in case `MissingComponents` failed the blob - // request. Also if blobs are `AwaitingProcessing` and need to be progressed - lookup.continue_requests(cx) + let block_root = lookup.block_root(); + + debug!( + ?block_root, + id = lookup_id, + ?result, + "Received data processing result" + ); + + match result { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) + | BlockProcessingResult::Err(BlockError::GenesisBlock) => { + lookup.on_data_processing_result(true, cx) } - Action::ParentUnknown { parent_root } => { - let peers = lookup.all_peers(); - // Mark lookup as awaiting **before** creating the parent lookup. At this point the - // lookup maybe inconsistent. - lookup.set_awaiting_parent(parent_root); - let parent_lookup_exists = - self.search_parent_of_child(parent_root, block_root, &peers, cx); - if parent_lookup_exists { - // The parent lookup exist or has been created. It's safe for `lookup` to - // reference the parent as awaiting. - debug!( - id = lookup_id, - ?block_root, - ?parent_root, - "Marking lookup as awaiting parent" - ); - Ok(LookupResult::Pending) - } else { - // The parent lookup is faulty and was not created, we must drop the `lookup` as - // it's in an inconsistent state. We must drop all of its children too. - Err(LookupRequestError::Failed(format!( - "Parent lookup is faulty {parent_root:?}" - ))) - } + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { + .. + }) => { + // Data sent for processing but still missing components — this can happen if + // the block hasn't been fully validated yet. Treat as success for the data + // stream; completion check will handle the rest. + lookup.on_data_processing_result(true, cx) } - Action::Drop(reason) => { - // Drop with noop - Err(LookupRequestError::Failed(reason)) + BlockProcessingResult::Ignored => { + warn!("Data processing ignored, cpu might be overloaded"); + Err(LookupRequestError::Failed( + "Data processing ignored".to_owned(), + )) } - Action::Continue => { - // Drop this completed lookup only - Ok(LookupResult::Completed) + BlockProcessingResult::Err(e) => { + debug!(?block_root, error = ?e, "Data processing error, retrying"); + + // Use the data kind to pick a penalty string the peer-scoring tests + // distinguish on (blobs vs custody columns). + let penalty_msg = match lookup.data_is_columns() { + Some(true) => "lookup_custody_column_processing_failure", + _ => "lookup_blobs_processing_failure", + }; + + match &e { + // No penalization for internal / non-attributable errors + BlockError::BeaconChainError(_) + | BlockError::DuplicateImportStatusUnknown(..) => {} + BlockError::AvailabilityCheck(e) + if e.category() == AvailabilityCheckErrorCategory::Internal => {} + // InvalidColumn: penalize only the peer(s) that served the bad column + BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn(( + index_opt, + _, + ))) => { + if let Some(custody_pg) = lookup.data_peer_group() + && let Some(index) = index_opt + { + for peer in custody_pg.of_index(*index as usize) { + cx.report_peer(*peer, PeerAction::MidToleranceError, penalty_msg); + } + } + } + // All other attributable errors: penalize the block peer (who also serves blobs) + _ => { + if let Some(block_peer) = lookup.block_peer() { + cx.report_peer(block_peer, PeerAction::MidToleranceError, penalty_msg); + } + } + } + + // Data processing failed — retry data download only + lookup.on_data_processing_result(false, cx) } } } @@ -771,14 +830,6 @@ impl BlockLookups { let lookup_result = if imported { Ok(LookupResult::Completed) } else { - // A lookup may be in the following state: - // - Block awaiting processing from a different source - // - Blobs downloaded processed, and inserted into the da_checker - // - // At this point the block fails processing (e.g. execution engine offline) and it is - // removed from the da_checker. Note that ALL components are removed from the da_checker - // so when we re-download and process the block we get the error - // MissingComponentsAfterAllProcessed and get stuck. lookup.reset_requests(); lookup.continue_requests(cx) }; @@ -791,7 +842,7 @@ impl BlockLookups { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_parent() == Some(block_root) { + if lookup.awaiting_parent().map(|a| a.parent_root()) == Some(block_root) { lookup.resolve_awaiting_parent(); debug!( parent_root = ?block_root, @@ -827,7 +878,10 @@ impl BlockLookups { let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) + .filter(|(_, lookup)| { + lookup.awaiting_parent().map(|a| a.parent_root()) + == Some(dropped_lookup.block_root()) + }) .map(|(id, _)| *id) .collect::>(); @@ -847,7 +901,21 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) -> bool { match result { - Ok(LookupResult::Pending) => true, // no action + Ok(LookupResult::Pending) => true, + Ok(LookupResult::ParentUnknown { + awaiting_parent, + block_root, + peers, + .. + }) => { + if self.search_parent_of_child(awaiting_parent, block_root, &peers, cx) { + true + } else { + self.drop_lookup_and_children(id, "Failed"); + self.update_metrics(); + false + } + } Ok(LookupResult::Completed) => { if let Some(lookup) = self.single_block_lookups.remove(&id) { debug!( @@ -995,17 +1063,16 @@ impl BlockLookups { &'a self, lookup: &'a SingleBlockLookup, ) -> Result<&'a SingleBlockLookup, String> { - if let Some(awaiting_parent) = lookup.awaiting_parent() { + if let Some(awaiting) = lookup.awaiting_parent() { + let parent_root = awaiting.parent_root(); if let Some(lookup) = self .single_block_lookups .values() - .find(|l| l.block_root() == awaiting_parent) + .find(|l| l.block_root() == parent_root) { self.find_oldest_ancestor_lookup(lookup) } else { - Err(format!( - "Lookup references unknown parent {awaiting_parent:?}" - )) + Err(format!("Lookup references unknown parent {parent_root:?}")) } } else { Ok(lookup) @@ -1013,12 +1080,14 @@ impl BlockLookups { } /// Adds peers to a lookup and its ancestors recursively. - /// Note: Takes a `lookup_id` as argument to allow recursion on mutable lookups, without having - /// to duplicate the code to add peers to a lookup + /// - Block peers are added at each level (needed for block download). + /// - When recursing from child to parent, also adds to parent's data/payload peer sets, + /// since children arriving activates the parent's data/payload downloads. fn add_peers_to_lookup_and_ancestors( &mut self, lookup_id: SingleLookupId, peers: &[PeerId], + peer_type: &PeerType, cx: &mut SyncNetworkContext, ) -> Result<(), String> { let lookup = self @@ -1028,7 +1097,7 @@ impl BlockLookups { let mut added_some_peer = false; for peer in peers { - if lookup.add_peer(*peer) { + if lookup.add_peer(*peer, peer_type) { added_some_peer = true; debug!( block_root = ?lookup.block_root(), @@ -1038,13 +1107,26 @@ impl BlockLookups { } } - if let Some(parent_root) = lookup.awaiting_parent() { - if let Some((&child_id, _)) = self + if let Some(awaiting) = lookup.awaiting_parent() { + let parent_root = awaiting.parent_root(); + if let Some((&parent_id, parent_lookup)) = self .single_block_lookups .iter() .find(|(_, l)| l.block_root() == parent_root) { - self.add_peers_to_lookup_and_ancestors(child_id, peers, cx) + let peer_type = if awaiting.is_post_gloas() { + let is_full = parent_lookup.is_full_payload(&awaiting); + PeerType { + data: is_full, + payload: is_full, + } + } else { + PeerType { + data: true, + payload: false, + } + }; + self.add_peers_to_lookup_and_ancestors(parent_id, peers, &peer_type, cx) } else { Err(format!("Lookup references unknown parent {parent_root:?}")) } diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 5deea1dd94e..120ce5b1cc2 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -13,7 +13,7 @@ impl From<&SingleBlockLookup> for Node { fn from(value: &SingleBlockLookup) -> Self { Self { block_root: value.block_root(), - parent_root: value.awaiting_parent(), + parent_root: value.awaiting_parent().map(|a| a.parent_root()), } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 919526c2386..a02270ed2e2 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,30 +1,78 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; -use crate::sync::block_lookups::common::RequestState; +use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{ LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext, }; -use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; +use beacon_chain::BeaconChainTypes; +use beacon_chain::BlockProcessStatus; +use beacon_chain::block_verification_types::AsBlock; use educe::Educe; use lighthouse_network::service::api_types::Id; use parking_lot::RwLock; use std::collections::HashSet; -use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; -use tracing::{Span, debug_span}; +use tracing::{Span, debug, debug_span}; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; +use types::{ + DataColumnSidecarList, EthSpec, ExecutionBlockHash, ForkName, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, +}; -// Dedicated enum for LookupResult to force its usage -#[must_use = "LookupResult must be handled with on_lookup_result"] -pub enum LookupResult { - /// Lookup completed successfully - Completed, - /// Lookup is expecting some future event from the network - Pending, +// === AwaitingParent — tracks what a child lookup waits for === + +/// What a child lookup is waiting for its parent to resolve. +/// +/// `parent_hash` is `Some` only post-Gloas: the child's bid references the +/// parent's payload execution hash, which lets us determine whether the parent +/// is full (payload envelope was published) or empty. Pre-Gloas lookups never +/// need to distinguish — they always wait for the full block+data set. +#[derive(Debug, Clone, Copy)] +pub struct AwaitingParent { + parent_root: Hash256, + parent_hash: Option, +} + +impl AwaitingParent { + pub fn pre_gloas(parent_root: Hash256) -> Self { + Self { + parent_root, + parent_hash: None, + } + } + + pub fn post_gloas(parent_root: Hash256, parent_hash: ExecutionBlockHash) -> Self { + Self { + parent_root, + parent_hash: Some(parent_hash), + } + } + + pub fn parent_root(&self) -> Hash256 { + self.parent_root + } + + pub fn parent_hash(&self) -> Option { + self.parent_hash + } + + pub fn is_post_gloas(&self) -> bool { + self.parent_hash.is_some() + } +} + +// === Public types re-exported by mod.rs === + +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct DownloadResult { + pub value: T, + pub block_root: Hash256, + pub seen_timestamp: Duration, + pub peer_group: PeerGroup, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -42,9 +90,6 @@ pub enum LookupRequestError { BadState(String), /// Lookup failed for some other reason and should be dropped Failed(/* reason: */ String), - /// Received MissingComponents when all components have been processed. This should never - /// happen, and indicates some internal bug - MissingComponentsAfterAllProcessed, /// Attempted to retrieve a not known lookup id UnknownLookup, /// Received a download result for a different request id than the in-flight request. @@ -54,42 +99,386 @@ pub enum LookupRequestError { expected_req_id: ReqId, req_id: ReqId, }, + InternalError(String), +} + +// Dedicated enum for LookupResult to force its usage +#[must_use = "LookupResult must be handled with on_lookup_result"] +pub enum LookupResult { + /// Lookup completed successfully + Completed, + /// Lookup is expecting some future event from the network + Pending, + /// Block's parent is not known to fork-choice, a parent lookup is needed + ParentUnknown { + awaiting_parent: AwaitingParent, + block_root: Hash256, + peers: Vec, + }, +} + +// === Block request: Downloading → Downloaded → Processing → Complete === + +#[derive(Educe)] +#[educe(Debug)] +enum BlockRequest { + /// Block downloading or awaiting download + Downloading { + block_root: Hash256, + state: SingleLookupRequestState>>, + }, + /// Block downloaded, waiting for parent check + send for processing + Downloaded { + #[educe(Debug(ignore))] + block: Arc>, + peer: PeerId, + }, + /// Block sent for processing, awaiting result + Processing { + #[educe(Debug(ignore))] + block: Arc>, + peer: PeerId, + }, + /// Block processing complete. `peer` is retained so data/payload processing failures + /// after the block has been imported can still be attributed back to the peer that + /// served the block (they are typically the same peer for blobs). `None` when the + /// block bypassed the download path (cache hit in the availability checker). + Complete { + #[educe(Debug(ignore))] + block: Arc>, + peer: Option, + }, +} + +impl BlockRequest { + fn new(block_root: Hash256) -> Self { + BlockRequest::Downloading { + block_root, + state: SingleLookupRequestState::new(), + } + } + + fn new_with_processing_failures(block_root: Hash256, failed_processing: u8) -> Self { + BlockRequest::Downloading { + block_root, + state: SingleLookupRequestState::new_with_processing_failures(failed_processing), + } + } + + fn peek_block(&self) -> Option<&Arc>> { + match self { + BlockRequest::Downloading { state, .. } => state.peek_downloaded_data(), + BlockRequest::Downloaded { block, .. } + | BlockRequest::Processing { block, .. } + | BlockRequest::Complete { block, .. } => Some(block), + } + } + + fn peek_slot(&self) -> Option { + self.peek_block().map(|b| b.slot()) + } + + /// Returns the block peer for error attribution. Available in Downloaded/Processing states. + fn peer(&self) -> Option { + match self { + BlockRequest::Downloaded { peer, .. } | BlockRequest::Processing { peer, .. } => { + Some(*peer) + } + BlockRequest::Downloading { state, .. } => state + .peek_downloaded_peer_group() + .and_then(|pg| pg.all().next().copied()), + BlockRequest::Complete { peer, .. } => *peer, + } + } + + fn is_awaiting_event(&self) -> bool { + match self { + BlockRequest::Downloading { state, .. } => state.is_awaiting_event(), + BlockRequest::Processing { .. } => true, + _ => false, + } + } + + fn is_complete(&self) -> bool { + matches!(self, BlockRequest::Complete { .. }) + } + + fn insert_verified_response( + &mut self, + result: DownloadResult>>, + ) -> bool { + if let BlockRequest::Downloading { state, .. } = self { + state.insert_verified_response(result) + } else { + // The block already transitioned past Downloading (e.g. a child arrived while the + // block was already being processed). Silently dropping would be hard to debug if + // we ever reach this path unexpectedly — log it. + debug!( + state = ?self, + "insert_verified_response called outside Downloading state, dropping" + ); + false + } + } +} + +// === Data request: WaitingForBlock → Downloading → Downloaded → Processing → Complete === + +#[derive(Debug)] +enum DataRequest { + /// Waiting for block to be downloaded to determine what data is needed + WaitingForBlock, + /// Data downloading or awaiting download + Downloading(DataDownload), + /// Data downloaded, waiting for block processing to complete before import + Downloaded { + data: DownloadedData, + peer_group: PeerGroup, + }, + /// Data sent for processing, awaiting result + Processing { + kind: DataDownloadKind, + peer_group: PeerGroup, + }, + /// Data processing complete (or no data needed) + Complete, +} + +impl DataRequest { + fn is_awaiting_event(&self) -> bool { + match self { + DataRequest::Downloading(dl) => dl.is_awaiting_event(), + DataRequest::Processing { .. } => true, + _ => false, + } + } + + fn peer_group(&self) -> Option<&PeerGroup> { + match self { + DataRequest::Downloading(dl) => dl.peek_downloaded_peer_group(), + DataRequest::Downloaded { peer_group, .. } + | DataRequest::Processing { peer_group, .. } => Some(peer_group), + DataRequest::WaitingForBlock | DataRequest::Complete => None, + } + } +} + +/// Fork-dependent data download state +#[derive(Debug)] +enum DataDownload { + Blobs { + block_root: Hash256, + expected_blobs: usize, + state: SingleLookupRequestState>, + }, + Columns { + block_root: Hash256, + state: SingleLookupRequestState>, + }, +} + +impl DataDownload { + fn continue_requests>( + &mut self, + id: Id, + peers: Arc>>, + cx: &mut SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + match self { + DataDownload::Blobs { + block_root, + expected_blobs, + state, + } => { + let br = *block_root; + let eb = *expected_blobs; + state.make_request(|| cx.blob_lookup_request(id, peers, br, eb))?; + } + DataDownload::Columns { + block_root, state, .. + } => { + let br = *block_root; + state.make_request(|| cx.custody_lookup_request(id, br, peers))?; + } + } + Ok(()) + } + + fn is_completed(&self) -> bool { + match self { + DataDownload::Blobs { state, .. } => state.is_completed(), + DataDownload::Columns { state, .. } => state.is_completed(), + } + } + + fn take_download_result(&mut self) -> Option<(DownloadedData, PeerGroup)> { + match self { + DataDownload::Blobs { + expected_blobs, + state, + .. + } => state.take_download_result().map(|r| { + ( + DownloadedData::Blobs { + blobs: r.value, + expected_blobs: *expected_blobs, + }, + r.peer_group, + ) + }), + DataDownload::Columns { state, .. } => state + .take_download_result() + .map(|r| (DownloadedData::Columns(r.value), r.peer_group)), + } + } + + fn is_awaiting_event(&self) -> bool { + match self { + DataDownload::Blobs { state, .. } => state.is_awaiting_event(), + DataDownload::Columns { state, .. } => state.is_awaiting_event(), + } + } + + fn peek_downloaded_peer_group(&self) -> Option<&PeerGroup> { + match self { + DataDownload::Blobs { state, .. } => state.peek_downloaded_peer_group(), + DataDownload::Columns { state, .. } => state.peek_downloaded_peer_group(), + } + } +} + +/// Downloaded data, waiting to be sent for processing +#[derive(Debug)] +enum DownloadedData { + Blobs { + blobs: FixedBlobSidecarList, + expected_blobs: usize, + }, + Columns(DataColumnSidecarList), +} + +impl DownloadedData { + fn kind(&self) -> DataDownloadKind { + match self { + DownloadedData::Blobs { expected_blobs, .. } => DataDownloadKind::Blobs { + expected_blobs: *expected_blobs, + }, + DownloadedData::Columns(_) => DataDownloadKind::Columns, + } + } +} + +/// Enough info to reconstruct a fresh `DataDownload` when we need to retry data download +/// after a processing failure. We can't call `create_data_request` again from here because +/// we're past the `WaitingForBlock` state and don't have the `SyncNetworkContext` (and +/// therefore no `ChainSpec`) — so the request kind (blobs vs columns, plus the expected +/// blob count) is cached alongside the in-flight request instead. +#[derive(Debug, Clone, Copy)] +enum DataDownloadKind { + Blobs { expected_blobs: usize }, + Columns, } +impl DataDownloadKind { + fn into_fresh_download( + self, + block_root: Hash256, + failed_processing: u8, + ) -> DataDownload { + match self { + DataDownloadKind::Blobs { expected_blobs } => DataDownload::Blobs { + block_root, + expected_blobs, + state: SingleLookupRequestState::new_with_processing_failures(failed_processing), + }, + DataDownloadKind::Columns => DataDownload::Columns { + block_root, + state: SingleLookupRequestState::new_with_processing_failures(failed_processing), + }, + } + } +} + +// === Payload request: WaitingForBlock → Downloading → Downloaded → Processing → Complete === + +#[derive(Educe)] +#[educe(Debug)] +enum PayloadRequest { + /// Waiting for block to be downloaded to determine if payload is needed + WaitingForBlock, + Downloading { + block_root: Hash256, + state: SingleLookupRequestState>>, + }, + Downloaded { + peer_group: PeerGroup, + }, + Processing { + peer_group: PeerGroup, + }, + /// Payload processed, or no payload needed. + Complete, +} + +impl PayloadRequest { + fn is_awaiting_event(&self) -> bool { + match self { + PayloadRequest::Downloading { state, .. } => state.is_awaiting_event(), + PayloadRequest::Processing { .. } => true, + _ => false, + } + } +} + +// === SingleBlockLookup — three independent requests === + #[derive(Educe)] #[educe(Debug(bound(T: BeaconChainTypes)))] pub struct SingleBlockLookup { pub id: Id, - pub block_request_state: BlockRequestState, - pub component_requests: ComponentRequests, - /// Peers that claim to have imported this set of block components. This state is shared with - /// the custody request to have an updated view of the peers that claim to have imported the - /// block associated with this lookup. The peer set of a lookup can change rapidly, and faster - /// than the lifetime of a custody request. + block_root: Hash256, + + // Block request — always present + block_request: BlockRequest, + + // Data request — starts as WaitingForBlock, set after block downloaded + data_request: DataRequest, + + // Payload request — starts as WaitingForBlock, set after block downloaded + payload_request: PayloadRequest, + + // Peer sets. + // + // `Arc>` is required by `ActiveCustodyRequest` (columns only), which lives + // in `SyncNetworkContext` and needs to observe peers being added/removed at runtime + // while it's in flight. `data_peers` and `payload_peers` use the same shape purely for + // consistency so all three sets plug into the same `add_peer` / `remove_peer` surface. + /// Peers for block download (also used for data in pre-Gloas forks). #[educe(Debug(method(fmt_peer_set_as_len)))] peers: Arc>>, - block_root: Hash256, - awaiting_parent: Option, + /// Peers for data download (0 initially for Gloas, shared with block for pre-Gloas). + #[educe(Debug(method(fmt_peer_set_as_len)))] + data_peers: Arc>>, + /// Peers for payload download (0 initially, Gloas only). + #[educe(Debug(method(fmt_peer_set_as_len)))] + payload_peers: Arc>>, + + // Parent tracking + awaiting_parent: Option, created: Instant, pub(crate) span: Span, -} -#[derive(Debug)] -pub(crate) enum ComponentRequests { - WaitingForBlock, - ActiveBlobRequest(BlobRequestState, usize), - ActiveCustodyRequest(CustodyRequestState), - // When printing in debug this state display the reason why it's not needed - #[allow(dead_code)] - NotNeeded(&'static str), + // Retry tracking + failed_processing: u8, } impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, peers: &[PeerId], + peer_type: &PeerType, id: Id, - awaiting_parent: Option, + awaiting_parent: Option, ) -> Self { let lookup_span = debug_span!( "lh_single_block_lookup", @@ -97,30 +486,73 @@ impl SingleBlockLookup { id = id, ); + let peer_set: HashSet = peers.iter().copied().collect(); + let data_peers = if peer_type.data { + peer_set.clone() + } else { + HashSet::new() + }; + let payload_peers = if peer_type.payload { + peer_set.clone() + } else { + HashSet::new() + }; + Self { id, - block_request_state: BlockRequestState::new(requested_block_root), - component_requests: ComponentRequests::WaitingForBlock, - peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))), block_root: requested_block_root, + block_request: BlockRequest::new(requested_block_root), + data_request: DataRequest::WaitingForBlock, + payload_request: PayloadRequest::WaitingForBlock, + data_peers: Arc::new(RwLock::new(data_peers)), + payload_peers: Arc::new(RwLock::new(payload_peers)), + peers: Arc::new(RwLock::new(peer_set)), awaiting_parent, created: Instant::now(), + failed_processing: 0, span: lookup_span, } } - /// Reset the status of all internal requests + /// Returns whether this lookup's block was produced with a published payload envelope + /// ("full") as seen by the given child's bid reference. Always `false` pre-Gloas: the + /// empty/full distinction only exists post-Gloas. The child's bid carries the parent + /// execution hash, which we match against this block's bid `block_hash`. + pub fn is_full_payload(&self, awaiting_parent: &AwaitingParent) -> bool { + let Some(parent_hash) = awaiting_parent.parent_hash() else { + return false; + }; + let Some(block) = self.block_request.peek_block() else { + // Block not yet downloaded — we don't know what peers can serve the + // parent envelope/data yet. Treat conservatively as "not full". + // TODO(gloas): cache peers in a deferred set instead of dropping them + // so we can assign them to data/payload streams once the block arrives. + debug!( + block_root = ?self.block_root, + "is_full_payload called before block downloaded, returning false" + ); + return false; + }; + match block.message().body().signed_execution_payload_bid() { + Ok(payload) => payload.message.block_hash == parent_hash, + Err(_) => false, + } + } + + /// Reset the status of all requests (used on block processing failure) pub fn reset_requests(&mut self) { - self.block_request_state = BlockRequestState::new(self.block_root); - self.component_requests = ComponentRequests::WaitingForBlock; + // Increment processing failure counter (we're resetting due to processing error) + self.failed_processing = self.failed_processing.saturating_add(1); + // Reset to fresh Downloading state with the updated counter + self.block_request = + BlockRequest::new_with_processing_failures(self.block_root, self.failed_processing); + self.data_request = DataRequest::WaitingForBlock; + self.payload_request = PayloadRequest::WaitingForBlock; } - /// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing` + /// Return the slot of this lookup's block if it's currently cached pub fn peek_downloaded_block_slot(&self) -> Option { - self.block_request_state - .state - .peek_downloaded_data() - .map(|block| block.slot()) + self.block_request.peek_slot() } /// Get the block root that is being requested. @@ -128,16 +560,10 @@ impl SingleBlockLookup { self.block_root } - pub fn awaiting_parent(&self) -> Option { + pub fn awaiting_parent(&self) -> Option { self.awaiting_parent } - /// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send - /// components for processing. - pub fn set_awaiting_parent(&mut self, parent_root: Hash256) { - self.awaiting_parent = Some(parent_root) - } - /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for /// processing. pub fn resolve_awaiting_parent(&mut self) { @@ -152,15 +578,10 @@ impl SingleBlockLookup { /// Maybe insert a verified response into this lookup. Returns true if imported pub fn add_child_components(&mut self, block_component: BlockComponent) -> bool { match block_component { - BlockComponent::Block(block) => self - .block_request_state - .state - .insert_verified_response(block), + BlockComponent::Block(block) => self.block_request.insert_verified_response(block), BlockComponent::Blob(_) | BlockComponent::DataColumn(_) => { - // For now ignore single blobs and columns, as the blob request state assumes all blobs are - // attributed to the same peer = the peer serving the remaining blobs. Ignoring this - // block component has a minor effect, causing the node to re-request this blob - // once the parent chain is successfully resolved + // For now ignore single blobs and columns, as the blob request state assumes all + // blobs are attributed to the same peer = the peer serving the remaining blobs. false } } @@ -171,184 +592,602 @@ impl SingleBlockLookup { self.block_root() == block_root } - /// Returns true if the block has already been downloaded. - pub fn all_components_processed(&self) -> bool { - self.block_request_state.state.is_processed() - && match &self.component_requests { - ComponentRequests::WaitingForBlock => false, - ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), - ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), - ComponentRequests::NotNeeded { .. } => true, - } - } - /// Returns true if this request is expecting some event to make progress pub fn is_awaiting_event(&self) -> bool { self.awaiting_parent.is_some() - || self.block_request_state.state.is_awaiting_event() - || match &self.component_requests { - // If components are waiting for the block request to complete, here we should - // check if the`block_request_state.state.is_awaiting_event(). However we already - // checked that above, so `WaitingForBlock => false` is equivalent. - ComponentRequests::WaitingForBlock => false, - ComponentRequests::ActiveBlobRequest(request, _) => { - request.state.is_awaiting_event() - } - ComponentRequests::ActiveCustodyRequest(request) => { - request.state.is_awaiting_event() - } - ComponentRequests::NotNeeded { .. } => false, + || self.block_request.is_awaiting_event() + || self.data_request.is_awaiting_event() + || self.payload_request.is_awaiting_event() + } + + /// Returns the block peer if block has been downloaded. Used for peer penalization. + pub fn block_peer(&self) -> Option { + self.block_request.peer() + } + + /// Returns custody column peer group if data has been downloaded. Used for peer penalization. + pub fn data_peer_group(&self) -> Option<&PeerGroup> { + self.data_request.peer_group() + } + + /// Returns `Some(true)` if the current data request is for custody columns (Fulu/Gloas), + /// `Some(false)` for blobs (Deneb/Electra), `None` when no active data request. Used to + /// pick the right penalty string on processing failure. + pub fn data_is_columns(&self) -> Option { + match &self.data_request { + DataRequest::Downloading(DataDownload::Columns { .. }) => Some(true), + DataRequest::Downloading(DataDownload::Blobs { .. }) => Some(false), + DataRequest::Downloaded { data, .. } => { + Some(matches!(data, DownloadedData::Columns(_))) } + DataRequest::Processing { kind, .. } => Some(matches!(kind, DataDownloadKind::Columns)), + DataRequest::WaitingForBlock | DataRequest::Complete => None, + } } + // -- Main state machine driver -- + /// Makes progress on all requests of this lookup. Any error is not recoverable and must result /// in dropping the lookup. May mark the lookup as completed. + /// + /// Each of the block / data / payload sub-state-machines is driven inside its own `loop` + /// so that synchronous state transitions (e.g. Downloading → Downloaded → Processing) run + /// without returning. Each loop `break`s when further progress requires an external event + /// (download response, processing result, or a parent lookup to resolve). pub fn continue_requests( &mut self, cx: &mut SyncNetworkContext, ) -> Result { let _guard = self.span.clone().entered(); - // TODO: Check what's necessary to download, specially for blobs - self.continue_request::>(cx, 0)?; - - if let ComponentRequests::WaitingForBlock = self.component_requests { - let downloaded_block = self - .block_request_state - .state - .peek_downloaded_data() - .cloned(); - - if let Some(block) = downloaded_block.or_else(|| { - // If the block is already being processed or fully validated, retrieve how many blobs - // it expects. Consider any stage of the block. If the block root has been validated, we - // can assert that this is the correct value of `blob_kzg_commitments_count`. - match cx.chain.get_block_process_status(&self.block_root) { - BlockProcessStatus::Unknown => None, - BlockProcessStatus::NotValidated(block, _) - | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), + let id = self.id; + let block_root = self.block_root; + + // === Block request === + loop { + match &mut self.block_request { + BlockRequest::Downloading { state, .. } => { + let peers = self.peers.clone(); + state.make_request(|| cx.block_lookup_request(id, peers, block_root))?; + + if state.is_completed() { + // Block is fully execution-validated and cached in the availability + // checker (NoRequestNeeded). Pull it from the processing-status cache + // so the data/payload streams can continue, and mark the block stream + // complete without re-processing. + match cx.chain.get_block_process_status(&block_root) { + BlockProcessStatus::NotValidated(block, _) + | BlockProcessStatus::ExecutionValidated(block) => { + // No peer to attribute against on a cache hit. + self.block_request = BlockRequest::Complete { block, peer: None }; + continue; + } + BlockProcessStatus::Unknown => { + // Race: the block was imported into fork-choice between + // `block_lookup_request` and this check. All components must + // have landed with it, so the lookup has nothing left to do. + return Ok(LookupResult::Completed); + } + } + } else if let Some(result) = state.take_download_result() { + // Block download requests are sent to a single peer, so the returned + // PeerGroup contains exactly one entry. Take the first and only. + let peer = result.peer_group.all().next().copied().ok_or_else(|| { + LookupRequestError::BadState("block download has no peer".into()) + })?; + self.block_request = BlockRequest::Downloaded { + block: result.value, + peer, + }; + } else { + // Awaiting download + break; + } } - }) { - let expected_blobs = block.num_expected_blobs(); - let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - if expected_blobs == 0 { - self.component_requests = ComponentRequests::NotNeeded("no data"); - } else if cx.chain.should_fetch_blobs(block_epoch) { - self.component_requests = ComponentRequests::ActiveBlobRequest( - BlobRequestState::new(self.block_root), - expected_blobs, - ); - } else if cx.chain.should_fetch_custody_columns(block_epoch) { - self.component_requests = ComponentRequests::ActiveCustodyRequest( - CustodyRequestState::new(self.block_root), - ); - } else { - self.component_requests = ComponentRequests::NotNeeded("outside da window"); + BlockRequest::Downloaded { block, peer } => { + if self.awaiting_parent.is_some() { + break; + } + + let parent_root = block.parent_root(); + // Zero hash is the parent of the genesis block — not a real block. + if parent_root != Hash256::ZERO { + let parent_in_fork_choice = cx + .chain + .canonical_head + .fork_choice_read_lock() + .get_block(&parent_root) + .is_some(); + if !parent_in_fork_choice { + let awaiting_parent = if let Ok(bid) = + block.message().body().signed_execution_payload_bid() + { + AwaitingParent::post_gloas( + parent_root, + bid.message.parent_block_hash, + ) + } else { + AwaitingParent::pre_gloas(parent_root) + }; + self.awaiting_parent = Some(awaiting_parent); + return Ok(LookupResult::ParentUnknown { + awaiting_parent, + block_root: self.block_root, + peers: self.all_peers(), + }); + } + // post-gloas we need to also check if the envelope is known to fork choice + if let Ok(child_bid) = block.message().body().signed_execution_payload_bid() + { + // TODO(gloas): after fork-choice: use parent_proto_block.execution_payload_block_hash here + let parent_is_full = cx + .chain + .get_blinded_block(&parent_root) + .map(|maybe_parent_block| { + if let Some(parent_block) = maybe_parent_block { + parent_block + .message() + .body() + .signed_execution_payload_bid() + .map(|parent_bid| { + parent_bid.message.block_hash + == child_bid.message.parent_block_hash + }) + .unwrap_or(false) + } else { + false + } + }) + .unwrap_or(false); + + if parent_is_full + && !cx.chain.envelope_is_known_to_fork_choice(&parent_root) + { + let awaiting_parent = AwaitingParent::post_gloas( + parent_root, + child_bid.message.parent_block_hash, + ); + self.awaiting_parent = Some(awaiting_parent); + return Ok(LookupResult::ParentUnknown { + awaiting_parent, + block_root: self.block_root, + peers: self.all_peers(), + }); + } + } + } + + let block = block.clone(); + let peer = *peer; + cx.send_block_for_processing( + id, + self.block_root, + block.clone(), + Duration::ZERO, + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + self.block_request = BlockRequest::Processing { block, peer }; + // Processing needs an async trigger (block processing result) before we + // can make progress. + break; } - } else { - // Wait to download the block before downloading blobs. Then we can be sure that the - // block has data, so there's no need to do "blind" requests for all possible blobs and - // latter handle the case where if the peer sent no blobs, penalize. - // - // Lookup sync event safety: Reaching this code means that a block is not in any pre-import - // cache nor in the request state of this lookup. Therefore, the block must either: (1) not - // be downloaded yet or (2) the block is already imported into the fork-choice. - // In case (1) the lookup must either successfully download the block or get dropped. - // In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported` - // and get dropped as completed. + BlockRequest::Processing { .. } | BlockRequest::Complete { .. } => break, } } - match &self.component_requests { - ComponentRequests::WaitingForBlock => {} // do nothing - ComponentRequests::ActiveBlobRequest(_, expected_blobs) => { - self.continue_request::>(cx, *expected_blobs)? + // === Data request === + loop { + match &mut self.data_request { + DataRequest::WaitingForBlock => { + // Prefer a block downloaded by this lookup. Otherwise fall back to the + // chain's processing-status cache: the block may already be in the + // availability checker via gossip/HTTP API before this lookup downloads + // it, and we can still drive the data request in parallel. + let block_metadata = self + .block_request + .peek_block() + .map(|b| (b.slot(), b.num_expected_blobs())) + .or_else(|| match cx.chain.get_block_process_status(&block_root) { + BlockProcessStatus::NotValidated(block, _) + | BlockProcessStatus::ExecutionValidated(block) => { + Some((block.slot(), block.num_expected_blobs())) + } + BlockProcessStatus::Unknown => None, + }); + if let Some((slot, expected_blobs)) = block_metadata { + self.create_data_request(slot, expected_blobs, cx); + } else { + // Wait for block to be downloaded + break; + } + } + DataRequest::Downloading(dl) => { + // Custody column downloads dispatch against the global synced peer pool + // inside `ActiveCustodyRequest`, not against `data_peers`. Only gate on + // `data_peers` for post-Gloas, where peer sets are strictly partitioned + // and no fallback pool exists. + let has_peers = !self.data_peers.read().is_empty(); + let is_gloas = matches!(dl, DataDownload::Columns { .. }) + && self.awaiting_parent.is_some_and(|a| a.is_post_gloas()); + if has_peers || !is_gloas { + dl.continue_requests(id, self.data_peers.clone(), cx)?; + } + if dl.is_completed() { + // All data already imported (e.g. received via gossip) + self.data_request = DataRequest::Complete; + } else if let Some((data, peer_group)) = dl.take_download_result() { + self.data_request = DataRequest::Downloaded { data, peer_group }; + } else { + // Wait for data to be downloaded + break; + } + } + DataRequest::Downloaded { data, peer_group } => { + match data { + DownloadedData::Blobs { blobs, .. } => { + cx.send_blobs_for_processing( + id, + self.block_root, + blobs.clone(), + Duration::ZERO, + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + } + DownloadedData::Columns(columns) => { + cx.send_custody_columns_for_processing( + id, + self.block_root, + columns.clone(), + Duration::ZERO, + BlockProcessType::SingleCustodyColumn(id), + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + } + } + let kind = data.kind(); + let peer_group = peer_group.clone(); + self.data_request = DataRequest::Processing { kind, peer_group }; + // Processing needs an async trigger. + break; + } + DataRequest::Processing { .. } | DataRequest::Complete => break, } - ComponentRequests::ActiveCustodyRequest(_) => { - self.continue_request::>(cx, 0)? + } + + // === Payload request === + loop { + match &mut self.payload_request { + PayloadRequest::WaitingForBlock => { + // Same fallback as the data stream: the block may be in the availability + // checker via gossip before this lookup downloads it. + let block_metadata = self + .block_request + .peek_block() + .map(|b| (b.slot(), b.num_expected_blobs())) + .or_else(|| match cx.chain.get_block_process_status(&block_root) { + BlockProcessStatus::NotValidated(block, _) + | BlockProcessStatus::ExecutionValidated(block) => { + Some((block.slot(), block.num_expected_blobs())) + } + BlockProcessStatus::Unknown => None, + }); + if let Some((slot, expected_blobs)) = block_metadata { + self.create_payload_request(slot, expected_blobs, cx); + } else { + break; + } + } + PayloadRequest::Downloading { state, .. } => { + if !self.payload_peers.read().is_empty() { + let peers = self.payload_peers.clone(); + match cx.payload_lookup_request(id, peers, block_root) { + Ok(LookupRequestResult::RequestSent(req_id)) => { + state.on_download_start(req_id)?; + } + Ok(LookupRequestResult::NoRequestNeeded(_reason)) => { + // Envelope is already known (e.g. imported by gossip). Skip + // download and mark payload stream complete. + self.payload_request = PayloadRequest::Complete; + continue; + } + Ok(LookupRequestResult::Pending(reason)) => { + state.update_awaiting_download_status(reason); + } + Err(e) => { + return Err(LookupRequestError::SendFailedNetwork(e)); + } + } + } + if let Some(result) = state.take_download_result() { + self.payload_request = PayloadRequest::Downloaded { + peer_group: result.peer_group, + }; + } else { + break; + } + } + PayloadRequest::Downloaded { peer_group } => { + if !self.block_request.is_complete() { + break; + } + // TODO(gloas): send payload for processing + // cx.send_payload_for_processing(...) + let peer_group = peer_group.clone(); + self.payload_request = PayloadRequest::Processing { peer_group }; + // Processing needs an async trigger. + break; + } + PayloadRequest::Processing { .. } | PayloadRequest::Complete => break, } - ComponentRequests::NotNeeded { .. } => {} // do nothing } - // If all components of this lookup are already processed, there will be no future events - // that can make progress so it must be dropped. Consider the lookup completed. - // This case can happen if we receive the components from gossip during a retry. - if self.all_components_processed() { - self.span = Span::none(); - Ok(LookupResult::Completed) - } else { - Ok(LookupResult::Pending) + // === Check completion === + if self.block_request.is_complete() + && matches!(self.data_request, DataRequest::Complete) + && matches!(self.payload_request, PayloadRequest::Complete) + { + return Ok(LookupResult::Completed); } + + Ok(LookupResult::Pending) } - /// Potentially makes progress on this request if it's in a progress-able state - fn continue_request>( + /// Create data request based on the downloaded block's content and fork. + fn create_data_request( &mut self, - cx: &mut SyncNetworkContext, + slot: Slot, expected_blobs: usize, - ) -> Result<(), LookupRequestError> { - let id = self.id; - let awaiting_parent = self.awaiting_parent.is_some(); - let request = - R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; - - // Attempt to progress awaiting downloads - if request.get_state().is_awaiting_download() { - // Verify the current request has not exceeded the maximum number of attempts. - let request_state = request.get_state(); - if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { - let cannot_process = request_state.more_failed_processing_attempts(); - return Err(LookupRequestError::TooManyAttempts { cannot_process }); - } + cx: &SyncNetworkContext, + ) { + let block_fork = cx.chain.spec.fork_name_at_slot::(slot); - let peers = self.peers.clone(); - let request = R::request_state_mut(self) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))?; - - match request.make_request(id, peers, expected_blobs, cx)? { - LookupRequestResult::RequestSent(req_id) => { - // Lookup sync event safety: If make_request returns `RequestSent`, we are - // guaranteed that `BlockLookups::on_download_response` will be called exactly - // with this `req_id`. - request.get_state_mut().on_download_start(req_id)? + match block_fork { + ForkName::Base | ForkName::Altair | ForkName::Bellatrix | ForkName::Capella => { + self.data_request = DataRequest::Complete; + } + ForkName::Deneb | ForkName::Electra => { + if expected_blobs > 0 { + self.data_request = DataRequest::Downloading(DataDownload::Blobs { + block_root: self.block_root, + expected_blobs, + state: SingleLookupRequestState::new(), + }); + // Pre-Gloas: data peers = block peers (always need data with block) + self.data_peers = self.peers.clone(); + } else { + self.data_request = DataRequest::Complete; } - LookupRequestResult::NoRequestNeeded(reason) => { - // Lookup sync event safety: Advances this request to the terminal `Processed` - // state. If all requests reach this state, the request is marked as completed - // in `Self::continue_requests`. - request.get_state_mut().on_completed_request(reason)? + } + ForkName::Fulu => { + if expected_blobs > 0 { + self.data_request = DataRequest::Downloading(DataDownload::Columns { + block_root: self.block_root, + state: SingleLookupRequestState::new(), + }); + // Pre-Gloas: data peers = block peers + self.data_peers = self.peers.clone(); + } else { + self.data_request = DataRequest::Complete; } - // Sync will receive a future event to make progress on the request, do nothing now - LookupRequestResult::Pending(reason) => { - // Lookup sync event safety: Refer to the code paths constructing - // `LookupRequestResult::Pending` - request - .get_state_mut() - .update_awaiting_download_status(reason); - return Ok(()); + } + ForkName::Gloas => { + if expected_blobs > 0 { + self.data_request = DataRequest::Downloading(DataDownload::Columns { + block_root: self.block_root, + state: SingleLookupRequestState::new(), + }); + // Gloas: data peers start at 0, populated when children arrive + } else { + self.data_request = DataRequest::Complete; } } + } + } + + /// Create payload request based on the downloaded block's content and fork. + fn create_payload_request( + &mut self, + slot: Slot, + expected_blobs: usize, + cx: &SyncNetworkContext, + ) { + let block_fork = cx.chain.spec.fork_name_at_slot::(slot); - // Otherwise, attempt to progress awaiting processing - // If this request is awaiting a parent lookup to be processed, do not send for processing. - // The request will be rejected with unknown parent error. - } else if !awaiting_parent { - // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is - // useful to conditionally access the result data. - if let Some(result) = request.get_state_mut().maybe_start_processing() { - // Lookup sync event safety: If `send_for_processing` returns Ok() we are guaranteed - // that `BlockLookups::on_processing_result` will be called exactly once with this - // lookup_id - return R::send_for_processing(id, result, cx); + match block_fork { + ForkName::Base + | ForkName::Altair + | ForkName::Bellatrix + | ForkName::Capella + | ForkName::Deneb + | ForkName::Electra + | ForkName::Fulu => { + self.payload_request = PayloadRequest::Complete; + } + ForkName::Gloas => { + if expected_blobs > 0 { + self.payload_request = PayloadRequest::Downloading { + block_root: self.block_root, + state: SingleLookupRequestState::new(), + }; + // Payload peers start at 0, download gated until children provide peers + } else { + // Empty blocks have no payload and no data — both are Done + self.payload_request = PayloadRequest::Complete; + } } - // Lookup sync event safety: If the request is not in `AwaitingDownload` or - // `AwaitingProcessing` state it is guaranteed to receive some event to make progress. } + } - // Lookup sync event safety: If a lookup is awaiting a parent we are guaranteed to either: - // (1) attempt to make progress with `BlockLookups::continue_child_lookups` if the parent - // lookup completes, or (2) get dropped if the parent fails and is dropped. + // -- Processing result handlers -- - Ok(()) + /// Handle block processing result. Advances the lookup state machine. + pub fn on_block_processing_result( + &mut self, + result_is_ok: bool, + cx: &mut SyncNetworkContext, + ) -> Result { + let BlockRequest::Processing { block, peer } = &self.block_request else { + return Err(LookupRequestError::BadState( + "block processing result but not in Processing state".to_owned(), + )); + }; + if result_is_ok { + let block = block.clone(); + let peer = Some(*peer); + self.block_request = BlockRequest::Complete { block, peer }; + self.continue_requests(cx) + } else { + // Block processing failed — reset everything and retry from scratch + self.reset_requests(); + self.continue_requests(cx) + } + } + + /// Handle data processing result (blobs or custody columns imported). + pub fn on_data_processing_result( + &mut self, + result_is_ok: bool, + cx: &mut SyncNetworkContext, + ) -> Result { + if !matches!(self.data_request, DataRequest::Processing { .. }) { + return Err(LookupRequestError::BadState( + "data processing result but not in Processing state".to_owned(), + )); + } + if result_is_ok { + self.data_request = DataRequest::Complete; + self.continue_requests(cx) + } else { + // Data processing failed — bump the shared processing-failure counter so the + // retry is bounded against `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS`, then reset. + self.failed_processing = self.failed_processing.saturating_add(1); + self.reset_data_request(); + self.continue_requests(cx) + } + } + + /// Handle payload processing result. + #[allow(dead_code)] + pub fn on_payload_processing_result( + &mut self, + result_is_ok: bool, + cx: &mut SyncNetworkContext, + ) -> Result { + if !matches!(self.payload_request, PayloadRequest::Processing { .. }) { + return Err(LookupRequestError::BadState( + "payload processing result but not in Processing state".to_owned(), + )); + } + if result_is_ok { + self.payload_request = PayloadRequest::Complete; + self.continue_requests(cx) + } else { + // Bump the shared processing-failure counter to bound retries. + self.failed_processing = self.failed_processing.saturating_add(1); + self.payload_request = PayloadRequest::Downloading { + block_root: self.block_root, + state: SingleLookupRequestState::new_with_processing_failures( + self.failed_processing, + ), + }; + self.continue_requests(cx) + } + } + + /// Reset data request to a fresh download, preserving the download kind. + fn reset_data_request(&mut self) { + let kind = match &self.data_request { + DataRequest::Downloading(dl) => match dl { + DataDownload::Blobs { expected_blobs, .. } => Some(DataDownloadKind::Blobs { + expected_blobs: *expected_blobs, + }), + DataDownload::Columns { .. } => Some(DataDownloadKind::Columns), + }, + DataRequest::Downloaded { data, .. } => Some(data.kind()), + DataRequest::Processing { kind, .. } => Some(*kind), + DataRequest::WaitingForBlock | DataRequest::Complete => None, + }; + if let Some(kind) = kind { + self.data_request = DataRequest::Downloading( + kind.into_fresh_download(self.block_root, self.failed_processing), + ); + } + } + + // -- Download response handlers -- + + /// Handle a block download response. Updates download state and advances the lookup. + #[allow(clippy::type_complexity)] + pub fn on_block_download_response( + &mut self, + req_id: ReqId, + result: Result<(Arc>, PeerGroup, Duration), ()>, + cx: &mut SyncNetworkContext, + ) -> Result { + let BlockRequest::Downloading { state, .. } = &mut self.block_request else { + return Err(LookupRequestError::BadState( + "block response but not downloading".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) + } + + /// Handle a blob download response. Updates download state and advances the lookup. + pub fn on_blob_download_response( + &mut self, + req_id: ReqId, + result: Result<(FixedBlobSidecarList, PeerGroup, Duration), ()>, + cx: &mut SyncNetworkContext, + ) -> Result { + let DataRequest::Downloading(DataDownload::Blobs { state, .. }) = &mut self.data_request + else { + return Err(LookupRequestError::BadState( + "blob response but not downloading blobs".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) + } + + /// Handle a custody columns download response. Updates download state and advances the lookup. + pub fn on_custody_download_response( + &mut self, + req_id: ReqId, + result: Result<(DataColumnSidecarList, PeerGroup, Duration), ()>, + cx: &mut SyncNetworkContext, + ) -> Result { + let DataRequest::Downloading(DataDownload::Columns { state, .. }) = &mut self.data_request + else { + return Err(LookupRequestError::BadState( + "custody response but not downloading columns".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) + } + + /// Handle a payload envelope download response. Updates download state and advances the lookup. + #[allow(clippy::type_complexity)] + pub fn on_payload_download_response( + &mut self, + req_id: ReqId, + result: Result< + ( + Arc>, + PeerGroup, + Duration, + ), + (), + >, + cx: &mut SyncNetworkContext, + ) -> Result { + let PayloadRequest::Downloading { state, .. } = &mut self.payload_request else { + return Err(LookupRequestError::BadState( + "payload envelope response but not downloading payload".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) } /// Get all unique peers that claim to have imported this set of block components @@ -357,14 +1196,24 @@ impl SingleBlockLookup { } /// Add peer to all request states. The peer must be able to serve this request. - /// Returns true if the peer was newly inserted into some request state. - pub fn add_peer(&mut self, peer_id: PeerId) -> bool { - self.peers.write().insert(peer_id) + /// Returns true if the peer was newly inserted into any peer set. + pub fn add_peer(&mut self, peer_id: PeerId, peer_type: &PeerType) -> bool { + let mut added = false; + if peer_type.payload { + added |= self.payload_peers.write().insert(peer_id); + } + if peer_type.data { + added |= self.data_peers.write().insert(peer_id); + } + added |= self.peers.write().insert(peer_id); + added } /// Remove peer from available peers. pub fn remove_peer(&mut self, peer_id: &PeerId) { self.peers.write().remove(peer_id); + self.data_peers.write().remove(peer_id); + self.payload_peers.write().remove(peer_id); } /// Returns true if this lookup has zero peers @@ -373,171 +1222,124 @@ impl SingleBlockLookup { } } -/// The state of the blob request component of a `SingleBlockLookup`. -#[derive(Educe)] -#[educe(Debug)] -pub struct BlobRequestState { - #[educe(Debug(ignore))] - pub block_root: Hash256, - pub state: SingleLookupRequestState>, -} - -impl BlobRequestState { - pub fn new(block_root: Hash256) -> Self { - Self { - block_root, - state: SingleLookupRequestState::new(), - } - } -} - -/// The state of the custody request component of a `SingleBlockLookup`. -#[derive(Educe)] -#[educe(Debug)] -pub struct CustodyRequestState { - #[educe(Debug(ignore))] - pub block_root: Hash256, - pub state: SingleLookupRequestState>, -} - -impl CustodyRequestState { - pub fn new(block_root: Hash256) -> Self { - Self { - block_root, - state: SingleLookupRequestState::new(), - } - } -} - -/// The state of the block request component of a `SingleBlockLookup`. -#[derive(Educe)] -#[educe(Debug)] -pub struct BlockRequestState { - #[educe(Debug(ignore))] - pub requested_block_root: Hash256, - pub state: SingleLookupRequestState>>, +pub struct PeerType { + pub data: bool, + pub payload: bool, } -impl BlockRequestState { - pub fn new(block_root: Hash256) -> Self { - Self { - requested_block_root: block_root, - state: SingleLookupRequestState::new(), - } - } -} - -#[derive(Debug, Clone)] -pub struct DownloadResult { - pub value: T, - pub block_root: Hash256, - pub seen_timestamp: Duration, - pub peer_group: PeerGroup, -} +// === Generic download state machine === #[derive(IntoStaticStr)] -pub enum State { +enum DownloadState { AwaitingDownload(/* reason */ &'static str), Downloading(ReqId), - AwaitingProcess(DownloadResult), - /// Request is processing, sent by lookup sync - Processing(DownloadResult), - /// Request is processed - Processed(/* reason */ &'static str), + Downloaded(DownloadResult), + /// Download completed with no request needed (e.g. all components already imported) + Completed(/* reason */ &'static str), } /// Object representing the state of a single block or blob lookup request. #[derive(Debug)] -pub struct SingleLookupRequestState { - /// State of this request. - state: State, - /// How many times have we attempted to process this block or blob. +struct SingleLookupRequestState { + state: DownloadState, failed_processing: u8, - /// How many times have we attempted to download this block or blob. failed_downloading: u8, } impl SingleLookupRequestState { - pub fn new() -> Self { + fn new() -> Self { Self { - state: State::AwaitingDownload("not started"), + state: DownloadState::AwaitingDownload("not started"), failed_processing: 0, failed_downloading: 0, } } - pub fn is_awaiting_download(&self) -> bool { - match self.state { - State::AwaitingDownload { .. } => true, - State::Downloading { .. } - | State::AwaitingProcess { .. } - | State::Processing { .. } - | State::Processed { .. } => false, + fn new_with_processing_failures(failed_processing: u8) -> Self { + Self { + state: DownloadState::AwaitingDownload("reset after processing failure"), + failed_processing, + failed_downloading: 0, } } - pub fn is_processed(&self) -> bool { - match self.state { - State::AwaitingDownload { .. } - | State::Downloading { .. } - | State::AwaitingProcess { .. } - | State::Processing { .. } => false, - State::Processed { .. } => true, + fn is_awaiting_download(&self) -> bool { + matches!(self.state, DownloadState::AwaitingDownload { .. }) + } + + fn is_completed(&self) -> bool { + matches!(self.state, DownloadState::Completed { .. }) + } + + /// Drive download: check max attempts, issue request, handle result. + fn make_request( + &mut self, + request_fn: impl FnOnce() -> Result, + ) -> Result<(), LookupRequestError> { + if !self.is_awaiting_download() { + return Ok(()); + } + if self.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { + let cannot_process = self.more_failed_processing_attempts(); + return Err(LookupRequestError::TooManyAttempts { cannot_process }); + } + match request_fn().map_err(LookupRequestError::SendFailedNetwork)? { + LookupRequestResult::RequestSent(req_id) => self.on_download_start(req_id)?, + LookupRequestResult::NoRequestNeeded(reason) => self.on_completed_request(reason)?, + LookupRequestResult::Pending(reason) => self.update_awaiting_download_status(reason), } + Ok(()) } - /// Returns true if we can expect some future event to progress this block component request - /// specifically. - pub fn is_awaiting_event(&self) -> bool { - match self.state { - // No event will progress this request specifically, but the request may be put on hold - // due to some external event - State::AwaitingDownload { .. } => false, - // Network will emit a download success / error event - State::Downloading { .. } => true, - // Not awaiting any external event - State::AwaitingProcess { .. } => false, - // Beacon processor will emit a processing result event - State::Processing { .. } => true, - // Request complete, no future event left - State::Processed { .. } => false, - } - } - - pub fn peek_downloaded_data(&self) -> Option<&T> { + fn is_awaiting_event(&self) -> bool { + matches!(self.state, DownloadState::Downloading { .. }) + } + + fn peek_downloaded_data(&self) -> Option<&T> { match &self.state { - State::AwaitingDownload { .. } => None, - State::Downloading { .. } => None, - State::AwaitingProcess(result) => Some(&result.value), - State::Processing(result) => Some(&result.value), - State::Processed { .. } => None, + DownloadState::Downloaded(data) => Some(&data.value), + _ => None, } } - /// Switch to `AwaitingProcessing` if the request is in `AwaitingDownload` state, otherwise - /// ignore. - pub fn insert_verified_response(&mut self, result: DownloadResult) -> bool { - if let State::AwaitingDownload { .. } = &self.state { - self.state = State::AwaitingProcess(result); + fn peek_downloaded_peer_group(&self) -> Option<&PeerGroup> { + match &self.state { + DownloadState::Downloaded(data) => Some(&data.peer_group), + _ => None, + } + } + + /// Take the download result out, transitioning back to AwaitingDownload. + /// Returns None if not in Downloaded state. + fn take_download_result(&mut self) -> Option> { + let old = std::mem::replace(&mut self.state, DownloadState::AwaitingDownload("taken")); + if let DownloadState::Downloaded(result) = old { + Some(result) + } else { + self.state = old; + None + } + } + + fn insert_verified_response(&mut self, result: DownloadResult) -> bool { + if let DownloadState::AwaitingDownload { .. } = &self.state { + self.state = DownloadState::Downloaded(result); true } else { false } } - /// Append metadata on why this request is in AwaitingDownload status. Very helpful to debug - /// stuck lookups. Not fallible as it's purely informational. - pub fn update_awaiting_download_status(&mut self, new_status: &'static str) { - if let State::AwaitingDownload(status) = &mut self.state { - *status = new_status + fn update_awaiting_download_status(&mut self, new_status: &'static str) { + if let DownloadState::AwaitingDownload(status) = &mut self.state { + *status = new_status; } } - /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. - pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { + fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { - State::AwaitingDownload { .. } => { - self.state = State::Downloading(req_id); + DownloadState::AwaitingDownload { .. } => { + self.state = DownloadState::Downloading(req_id); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -546,11 +1348,30 @@ impl SingleLookupRequestState { } } - /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong - /// block. - pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { + /// Handle a download response: dispatch success or failure based on result. + fn on_download_response( + &mut self, + req_id: ReqId, + block_root: Hash256, + result: Result<(T, PeerGroup, Duration), ()>, + ) -> Result<(), LookupRequestError> { + match result { + Ok((value, peer_group, seen_timestamp)) => self.on_download_success( + req_id, + DownloadResult { + value, + block_root, + seen_timestamp, + peer_group, + }, + ), + Err(()) => self.on_download_failure(req_id), + } + } + + fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { - State::Downloading(expected_req_id) => { + DownloadState::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(LookupRequestError::UnexpectedRequestId { expected_req_id: *expected_req_id, @@ -558,7 +1379,7 @@ impl SingleLookupRequestState { }); } self.failed_downloading = self.failed_downloading.saturating_add(1); - self.state = State::AwaitingDownload("not started"); + self.state = DownloadState::AwaitingDownload("not started"); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -567,20 +1388,20 @@ impl SingleLookupRequestState { } } - pub fn on_download_success( + fn on_download_success( &mut self, req_id: ReqId, result: DownloadResult, ) -> Result<(), LookupRequestError> { match &self.state { - State::Downloading(expected_req_id) => { + DownloadState::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(LookupRequestError::UnexpectedRequestId { expected_req_id: *expected_req_id, req_id, }); } - self.state = State::AwaitingProcess(result); + self.state = DownloadState::Downloaded(result); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -589,65 +1410,10 @@ impl SingleLookupRequestState { } } - /// Switch to `Processing` if the request is in `AwaitingProcess` state, otherwise returns None. - pub fn maybe_start_processing(&mut self) -> Option> { - // For 2 lines replace state with placeholder to gain ownership of `result` - match &self.state { - State::AwaitingProcess(result) => { - let result = result.clone(); - self.state = State::Processing(result.clone()); - Some(result) - } - _ => None, - } - } - - /// Revert into `AwaitingProcessing`, if the payload if not invalid and can be submitted for - /// processing latter. - pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupRequestError> { - match &self.state { - State::Processing(result) => { - self.state = State::AwaitingProcess(result.clone()); - Ok(()) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on revert_to_awaiting_processing expected Processing got {other}" - ))), - } - } - - /// Registers a failure in processing a block. - pub fn on_processing_failure(&mut self) -> Result { - match &self.state { - State::Processing(result) => { - let peers_source = result.peer_group.clone(); - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload("not started"); - Ok(peers_source) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on_processing_failure expected Processing got {other}" - ))), - } - } - - pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> { - match &self.state { - State::Processing(_) => { - self.state = State::Processed("processing success"); - Ok(()) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on_processing_success expected Processing got {other}" - ))), - } - } - - /// Mark a request as complete without any download or processing - pub fn on_completed_request(&mut self, reason: &'static str) -> Result<(), LookupRequestError> { + fn on_completed_request(&mut self, reason: &'static str) -> Result<(), LookupRequestError> { match &self.state { - State::AwaitingDownload { .. } => { - self.state = State::Processed(reason); + DownloadState::AwaitingDownload { .. } => { + self.state = DownloadState::Completed(reason); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -656,33 +1422,28 @@ impl SingleLookupRequestState { } } - /// The total number of failures, whether it be processing or downloading. - pub fn failed_attempts(&self) -> u8 { + fn failed_attempts(&self) -> u8 { self.failed_processing + self.failed_downloading } - pub fn more_failed_processing_attempts(&self) -> bool { + fn more_failed_processing_attempts(&self) -> bool { self.failed_processing >= self.failed_downloading } } -// Display is used in the BadState assertions above -impl std::fmt::Display for State { +impl std::fmt::Display for DownloadState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", Into::<&'static str>::into(self)) } } -// Debug is used in the log_stuck_lookups print to include some more info. Implements custom Debug -// to not dump an entire block or blob to terminal which don't add valuable data. -impl std::fmt::Debug for State { +impl std::fmt::Debug for DownloadState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::AwaitingDownload(reason) => write!(f, "AwaitingDownload({})", reason), Self::Downloading(req_id) => write!(f, "Downloading({:?})", req_id), - Self::AwaitingProcess(d) => write!(f, "AwaitingProcess({:?})", d.peer_group), - Self::Processing(d) => write!(f, "Processing({:?})", d.peer_group), - Self::Processed(reason) => write!(f, "Processed({})", reason), + Self::Downloaded(_) => write!(f, "Downloaded()"), + Self::Completed(reason) => write!(f, "Completed({})", reason), } } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 60dcc3efc7d..45a9bd919d0 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -43,9 +43,7 @@ use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::{ - BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, -}; +use crate::sync::block_lookups::{BlockComponent, DownloadResult}; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; @@ -73,7 +71,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -132,6 +131,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -492,6 +499,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) } @@ -838,6 +848,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -897,9 +918,33 @@ impl SyncManager { }), ); } - // TODO(gloas) support gloas data column variant + // In Gloas, data columns identify the beacon block root but do not carry + // parent root. Treat as an unknown block-root trigger (attestation-style). + // The peer is marked as data-capable since it sent us a data column. DataColumnSidecar::Gloas(_) => { - error!("Gloas variant not yet supported") + match self.should_search_for_block(Some(data_column_slot), &peer_id) { + Ok(_) => { + if self.block_lookups.search_unknown_block_with_data_peer( + block_root, + &[peer_id], + &mut self.network, + ) { + debug!( + ?block_root, + "Created unknown block lookup from Gloas data column" + ); + } else { + debug!(?block_root, "No lookup created from Gloas data column"); + } + } + Err(reason) => { + debug!( + %block_root, + reason, + "Ignoring Gloas data column unknown block request" + ); + } + } } } } @@ -1140,14 +1185,13 @@ impl SyncManager { block: RpcEvent>>, ) { if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { - self.block_lookups - .on_download_response::>( - id, - resp.map(|(value, seen_timestamp)| { - (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), - &mut self.network, - ) + self.block_lookups.on_block_download_response( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) } } @@ -1210,14 +1254,53 @@ impl SyncManager { blob: RpcEvent>>, ) { if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { - self.block_lookups - .on_download_response::>( + self.block_lookups.on_blob_download_response( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) + } + } + + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => self + .on_single_payload_envelope_response( id, - resp.map(|(value, seen_timestamp)| { - (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), - &mut self.network, - ) + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ), + _ => { + crit!(%peer_id, "bad request id for payload_envelope"); + } + } + } + + fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_single_payload_envelope_response(id, peer_id, envelope) + { + self.block_lookups.on_payload_download_response( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) } } @@ -1309,11 +1392,7 @@ impl SyncManager { response: CustodyByRootResult, ) { self.block_lookups - .on_download_response::>( - requester.0, - response, - &mut self.network, - ); + .on_custody_download_response(requester.0, response, &mut self.network); } /// Handles receiving a response for a range sync request that should have both blocks and diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b1ba87c75d3..9c11a317b7f 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,10 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; +pub use self::requests::{ + BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest, + PayloadEnvelopesByRootSingleRequest, +}; use super::SyncMessage; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -37,6 +40,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -201,6 +205,9 @@ pub struct SyncNetworkContext { ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRoot requests + payload_envelopes_by_root_requests: + ActiveRequests>, /// A mapping of active DataColumnsByRoot requests data_columns_by_root_requests: ActiveRequests>, @@ -294,6 +301,7 @@ impl SyncNetworkContext { request_id: 1, blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), + payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), @@ -322,6 +330,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -345,6 +354,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlob { id: *id }); + let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id }); let data_column_by_root_ids = data_columns_by_root_requests .active_requests_of_peer(peer_id) .into_iter() @@ -363,6 +376,7 @@ impl SyncNetworkContext { .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) + .chain(payload_envelopes_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) @@ -419,6 +433,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -441,6 +456,7 @@ impl SyncNetworkContext { for peer_id in blocks_by_root_requests .iter_request_peers() .chain(blobs_by_root_requests.iter_request_peers()) + .chain(payload_envelopes_by_root_requests.iter_request_peers()) .chain(data_columns_by_root_requests.iter_request_peers()) .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) @@ -927,6 +943,72 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(id.req_id)) } + /// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC. + pub fn payload_lookup_request( + &mut self, + lookup_id: SingleLookupId, + lookup_peers: Arc>>, + block_root: Hash256, + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + let request = PayloadEnvelopesByRootSingleRequest { block_root }; + + let network_request = RequestType::PayloadEnvelopesByRoot( + request + .clone() + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: network_request, + app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRoot", + ?block_root, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_root_requests.insert( + id, + peer_id, + // true = enforce that the peer returns a response. We only request a single envelope + // and the peer must have it. + true, + PayloadEnvelopesByRootRequestItems::new(request), + Span::none(), + ); + + Ok(LookupRequestResult::RequestSent(id.req_id)) + } + /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// - If we have a downloaded but not yet processed block /// - If the da_checker has a pending block @@ -1464,6 +1546,27 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + pub(crate) fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>> { + let resp = self + .payload_envelopes_by_root_requests + .on_response(id, rpc_event); + let resp = resp.map(|res| { + res.and_then(|(mut envelopes, seen_timestamp)| { + match envelopes.pop() { + Some(envelope) => Ok((envelope, seen_timestamp)), + // Should never happen, we enforce at least 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), + } + }) + }); + self.on_rpc_response_result(resp, peer_id) + } + #[allow(clippy::type_complexity)] pub(crate) fn on_data_columns_by_root_response( &mut self, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index ad60dffb455..8c091eca807 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_root::{ + PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, +}; use crate::metrics; @@ -27,6 +30,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs new file mode 100644 index 00000000000..a142d86e905 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs @@ -0,0 +1,54 @@ +use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest; +use std::sync::Arc; +use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope}; + +use super::{ActiveRequestItems, LookupVerifyError}; + +#[derive(Debug, Clone)] +pub struct PayloadEnvelopesByRootSingleRequest { + pub block_root: Hash256, +} + +impl PayloadEnvelopesByRootSingleRequest { + pub fn into_request( + self, + fork_context: &ForkContext, + ) -> Result { + PayloadEnvelopesByRootRequest::new(vec![self.block_root], fork_context) + } +} + +pub struct PayloadEnvelopesByRootRequestItems { + request: PayloadEnvelopesByRootSingleRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRootRequestItems { + pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRootRequestItems { + type Item = Arc>; + + /// Append a response to the single chunk request. We expect exactly one envelope per + /// block root. Returns `true` when the single expected item has been received. + fn add(&mut self, envelope: Self::Item) -> Result { + let block_root = envelope.message.beacon_block_root; + if self.request.block_root != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + + self.items.push(envelope); + // Always returns true, we expect a single envelope per block root + Ok(true) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a26996ec5ee..8a7b6a394cf 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -37,12 +37,17 @@ use tokio::sync::mpsc; use tracing::info; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, ForkName, - Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, + Hash256, MinimalEthSpec as E, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, test_utils::{SeedableRng, XorShiftRng}, }; const D: Duration = Duration::new(0, 0); +/// Minimum validator set size usable across every fork this rig runs under. Pre-Gloas +/// tolerates 1; Gloas genesis needs enough validators to populate `proposer_lookahead` +/// via balance-weighted selection — 8 is enough for MinimalEthSpec. +const TEST_RIG_VALIDATOR_COUNT: usize = 8; + /// Configuration for how the test rig should respond to sync requests. /// /// Controls simulated peer behavior during lookup tests, including RPC errors, @@ -221,10 +226,11 @@ impl TestRig { Duration::from_secs(12), ); - // Initialise a new beacon chain + // Initialise a new beacon chain. Gloas genesis needs more than 1 validator so the + // `proposer_lookahead` can be populated at the Fulu → Gloas upgrade. let harness = BeaconChainHarness::>::builder(E) .spec(spec.clone()) - .deterministic_keypairs(1) + .deterministic_keypairs(TEST_RIG_VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() .testing_slot_clock(clock.clone()) @@ -305,6 +311,7 @@ impl TestRig { fork_name, network_blocks_by_root: <_>::default(), network_blocks_by_slot: <_>::default(), + network_envelopes_by_root: <_>::default(), penalties: <_>::default(), seen_lookups: <_>::default(), requests: <_>::default(), @@ -671,6 +678,20 @@ impl TestRig { self.send_rpc_columns_response(req_id, peer_id, &columns); } + (RequestType::PayloadEnvelopesByRoot(req), AppRequestId::Sync(req_id)) => { + // The lookup-sync path always requests a single envelope per request, so + // there is exactly one block_root. Serve the cached envelope if the rig + // has one — otherwise respond with an empty stream. + let block_root = req + .beacon_block_roots + .as_slice() + .first() + .copied() + .unwrap_or_else(|| panic!("empty envelope request: {req:?}")); + let envelope = self.network_envelopes_by_root.get(&block_root).cloned(); + self.send_rpc_envelope_response(req_id, peer_id, envelope); + } + (RequestType::BlocksByRange(req), AppRequestId::Sync(req_id)) => { if self.complete_strategy.skip_by_range_routes { return; @@ -930,6 +951,37 @@ impl TestRig { }); } + fn send_rpc_envelope_response( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + ) { + self.log(&format!( + "Completing request {sync_request_id:?} to {peer_id} with envelope {:?}", + envelope.as_ref().map(|e| e.slot()) + )); + + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: envelope.clone(), + seen_timestamp: D, + }); + // Stream termination + self.push_sync_message(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope: None, + seen_timestamp: D, + }); + } + + #[allow(dead_code)] + fn is_after_gloas(&self) -> bool { + self.fork_name.gloas_enabled() + } + // Preparation steps /// Returns the block root of the tip of the built chain @@ -939,7 +991,7 @@ impl TestRig { // Initialise a new beacon chain let external_harness = BeaconChainHarness::>::builder(E) .spec(self.harness.spec.clone()) - .deterministic_keypairs(1) + .deterministic_keypairs(TEST_RIG_VALIDATOR_COUNT) .fresh_ephemeral_store() .mock_execution_layer() .testing_slot_clock(self.harness.chain.slot_clock.clone()) @@ -974,6 +1026,12 @@ impl TestRig { self.network_blocks_by_root .insert(block_root, block.clone()); self.network_blocks_by_slot.insert(block_slot, block); + // Gloas: pull the corresponding execution payload envelope from the external + // harness store so the rig can serve it when the lookup requests it. + if let Ok(Some(envelope)) = external_harness.chain.get_payload_envelope(&block_root) { + self.network_envelopes_by_root + .insert(block_root, Arc::new(envelope)); + } self.log(&format!( "Produced block {} index {i} in external harness", block_slot, @@ -2456,6 +2514,31 @@ async fn blobs_in_da_checker_skip_download() { ); } +/// Test that lookups complete when the block is already fully imported. +/// Exercises the `NoRequestNeeded` → `Completed` download state path. +/// Without the fix, `on_completed_request` left the state as `AwaitingDownload` +/// causing an infinite re-check loop. +#[tokio::test] +async fn lookup_completes_when_block_already_imported() { + let mut r = TestRig::default(); + r.build_chain(1).await; + + // Fully import block 1 (this also imports its blobs/columns if any) + let block_root = r.block_root_at_slot(1); + r.import_block_by_root(block_root).await; + + // Now trigger a lookup for the SAME block via attestation. + // block_lookup_request → ExecutionValidated → NoRequestNeeded + // Without the Completed state fix, the lookup would hang. + r.trigger_with_block_at_slot(1); + assert!( + r.created_lookups() > 0, + "lookup must be created for this test to be valid" + ); + r.simulate(SimulateConfig::happy_path()).await; + r.assert_successful_lookup_sync(); +} + macro_rules! fulu_peer_matrix_tests { ( [$($name:ident => $variant:expr),+ $(,)?] diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 6e948e47261..ca189a4c7e8 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -22,7 +22,7 @@ use tokio::sync::mpsc; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use types::{ForkName, Hash256, MinimalEthSpec as E, Slot}; +use types::{ForkName, Hash256, MinimalEthSpec as E, SignedExecutionPayloadEnvelope, Slot}; mod lookups; mod range; @@ -79,6 +79,10 @@ struct TestRig { /// Blocks that will be used in the test but may not be known to `harness` yet. network_blocks_by_root: HashMap>, network_blocks_by_slot: HashMap>, + /// Gloas execution payload envelopes keyed by block root, populated during `build_chain` + /// from the external harness store. The rig serves these when a lookup issues a + /// `PayloadEnvelopesByRoot` request. + network_envelopes_by_root: HashMap>>, penalties: Vec, /// All seen lookups through the test run seen_lookups: HashMap,