diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ea87e9bc718..25944bcf8a5 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -431,6 +431,7 @@ pub enum Work { Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), + BlocksByHeadRequest(AsyncFn), PayloadEnvelopesByRangeRequest(AsyncFn), PayloadEnvelopesByRootRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), @@ -491,6 +492,7 @@ pub enum WorkType { Status, BlocksByRangeRequest, BlocksByRootsRequest, + BlocksByHeadRequest, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, BlobsByRangeRequest, @@ -553,6 +555,7 @@ impl Work { Work::Status(_) => WorkType::Status, Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, + Work::BlocksByHeadRequest(_) => WorkType::BlocksByHeadRequest, Work::PayloadEnvelopesByRangeRequest(_) => WorkType::PayloadEnvelopesByRangeRequest, Work::PayloadEnvelopesByRootRequest(_) => WorkType::PayloadEnvelopesByRootRequest, Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, @@ -1000,6 +1003,8 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.block_broots_queue.pop() { Some(item) + } else if let Some(item) = work_queues.block_bhead_queue.pop() { + Some(item) } else if let Some(item) = work_queues.blob_brange_queue.pop() { Some(item) } else if let Some(item) = work_queues.blob_broots_queue.pop() { @@ -1206,6 +1211,9 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { work_queues.block_broots_queue.push(work, work_id) } + Work::BlocksByHeadRequest { .. } => { + work_queues.block_bhead_queue.push(work, work_id) + } Work::PayloadEnvelopesByRangeRequest { .. } => work_queues .payload_envelopes_brange_queue .push(work, work_id), @@ -1331,6 +1339,7 @@ impl BeaconProcessor { WorkType::Status => work_queues.status_queue.len(), WorkType::BlocksByRangeRequest => work_queues.block_brange_queue.len(), WorkType::BlocksByRootsRequest => work_queues.block_broots_queue.len(), + WorkType::BlocksByHeadRequest => work_queues.block_bhead_queue.len(), WorkType::PayloadEnvelopesByRangeRequest => { work_queues.payload_envelopes_brange_queue.len() } @@ -1531,6 +1540,7 @@ impl BeaconProcessor { } Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) + | Work::BlocksByHeadRequest(work) | Work::PayloadEnvelopesByRangeRequest(work) | Work::PayloadEnvelopesByRootRequest(work) => task_spawner.spawn_async(work), Work::ChainSegmentBackfill(process_fn) => { diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index f7163d538b5..eb57b97df28 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -132,6 +132,7 @@ pub struct BeaconProcessorQueueLengths { status_queue: usize, block_brange_queue: usize, block_broots_queue: usize, + block_bhead_queue: usize, blob_broots_queue: usize, blob_brange_queue: usize, dcbroots_queue: usize, @@ -206,6 +207,7 @@ impl BeaconProcessorQueueLengths { status_queue: 1024, block_brange_queue: 1024, block_broots_queue: 1024, + block_bhead_queue: 1024, blob_broots_queue: 1024, blob_brange_queue: 1024, dcbroots_queue: 1024, @@ -263,6 +265,7 @@ pub struct WorkQueues { pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, + pub block_bhead_queue: FifoQueue>, pub payload_envelopes_brange_queue: FifoQueue>, pub payload_envelopes_broots_queue: FifoQueue>, pub blob_broots_queue: FifoQueue>, @@ -334,6 +337,7 @@ impl WorkQueues { let status_queue = FifoQueue::new(queue_lengths.status_queue); let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue); let block_broots_queue = FifoQueue::new(queue_lengths.block_broots_queue); + let block_bhead_queue = FifoQueue::new(queue_lengths.block_bhead_queue); let blob_broots_queue = FifoQueue::new(queue_lengths.blob_broots_queue); let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue); let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); @@ -399,6 +403,7 @@ impl WorkQueues { status_queue, block_brange_queue, block_broots_queue, + block_bhead_queue, blob_broots_queue, blob_brange_queue, dcbroots_queue, diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index d7285c5c8e3..6b5144fa6fd 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -589,6 +589,7 @@ impl PeerManager { Protocol::Ping => PeerAction::MidToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::BlocksByHead => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, @@ -617,6 +618,7 @@ impl PeerManager { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, + Protocol::BlocksByHead => return, Protocol::PayloadEnvelopesByRange => return, Protocol::PayloadEnvelopesByRoot => return, Protocol::BlobsByRange => return, @@ -642,6 +644,7 @@ impl PeerManager { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::BlocksByHead => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 75e035ae82d..ba95fff5e8e 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -18,7 +18,7 @@ use tokio_util::codec::{Decoder, Encoder}; use types::SignedExecutionPayloadEnvelope; use types::{ BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext, - ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate, + ForkName, ForkVersionDecode, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu, @@ -77,6 +77,7 @@ impl SSZSnappyInboundCodec { }, RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RpcSuccessResponse::BlocksByHead(res) => res.as_ssz_bytes(), RpcSuccessResponse::PayloadEnvelopesByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::PayloadEnvelopesByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(), @@ -359,6 +360,7 @@ impl Encoder> for SSZSnappyOutboundCodec { BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), }, + RequestType::BlocksByHead(req) => req.as_ssz_bytes(), RequestType::PayloadEnvelopesByRange(req) => req.as_ssz_bytes(), RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.as_ssz_bytes(), RequestType::BlobsByRange(req) => req.as_ssz_bytes(), @@ -553,6 +555,9 @@ fn handle_rpc_request( )?, }), ))), + SupportedProtocol::BlocksByHeadV1 => Ok(Some(RequestType::BlocksByHead( + BlocksByHeadRequest::from_ssz_bytes(decoded_buffer)?, + ))), SupportedProtocol::PayloadEnvelopesByRangeV1 => { Ok(Some(RequestType::PayloadEnvelopesByRange( PayloadEnvelopesByRangeRequest::from_ssz_bytes(decoded_buffer)?, @@ -943,6 +948,18 @@ fn handle_rpc_response( ), )), }, + SupportedProtocol::BlocksByHeadV1 => match fork_name { + Some(fork_name) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::from_ssz_bytes_by_fork(decoded_buffer, fork_name)?, + )))), + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, } } @@ -1319,6 +1336,9 @@ mod tests { RequestType::BlocksByRoot(bbroot) => { assert_eq!(decoded, RequestType::BlocksByRoot(bbroot)) } + RequestType::BlocksByHead(bbhead) => { + assert_eq!(decoded, RequestType::BlocksByHead(bbhead)) + } RequestType::BlobsByRange(blbrange) => { assert_eq!(decoded, RequestType::BlobsByRange(blbrange)) } @@ -1867,6 +1887,31 @@ mod tests { ); } + // BlocksByHead is introduced in Fulu but the response is just `SignedBeaconBlock`, + // so the codec must accept blocks of any fork variant — the chain a Fulu peer walks + // back may straddle the Fulu boundary and include pre-Fulu canonical blocks. + #[test] + fn test_blocks_by_head_decodes_all_forks() { + let chain_spec = spec_with_all_forks_enabled(); + for (block, fork) in [ + (empty_base_block(&chain_spec), ForkName::Base), + (altair_block(&chain_spec), ForkName::Altair), + (bellatrix_block_small(&chain_spec), ForkName::Bellatrix), + ] { + let block_arc = Arc::new(block); + assert_eq!( + encode_then_decode_response( + SupportedProtocol::BlocksByHeadV1, + RpcResponse::Success(RpcSuccessResponse::BlocksByHead(block_arc.clone())), + fork, + &chain_spec, + ), + Ok(Some(RpcSuccessResponse::BlocksByHead(block_arc))), + "BlocksByHeadV1 must round-trip a {fork} block" + ); + } + } + // Test RPCResponse encoding/decoding for V2 messages #[test] fn test_context_bytes_v2() { @@ -2063,6 +2108,10 @@ mod tests { RequestType::BlobsByRange(blbrange_request()), RequestType::DataColumnsByRange(dcbrange_request()), RequestType::MetaData(MetadataRequest::new_v2()), + RequestType::BlocksByHead(BlocksByHeadRequest { + beacon_root: Hash256::zero(), + count: 32, + }), ]; for req in requests.iter() { for fork_name in ForkName::list_all() { diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index 9e1c6541ec8..59f0b8e9a2f 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -89,6 +89,7 @@ pub struct RateLimiterConfig { pub(super) goodbye_quota: Quota, pub(super) blocks_by_range_quota: Quota, pub(super) blocks_by_root_quota: Quota, + pub(super) blocks_by_head_quota: Quota, pub(super) payload_envelopes_by_range_quota: Quota, pub(super) payload_envelopes_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, @@ -113,6 +114,8 @@ impl RateLimiterConfig { Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_BLOCKS_BY_HEAD_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota = @@ -143,6 +146,7 @@ impl Default for RateLimiterConfig { goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA, blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA, blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, + blocks_by_head_quota: Self::DEFAULT_BLOCKS_BY_HEAD_QUOTA, payload_envelopes_by_range_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA, payload_envelopes_by_root_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, @@ -177,6 +181,7 @@ impl Debug for RateLimiterConfig { .field("goodbye", fmt_q!(&self.goodbye_quota)) .field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota)) .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) + .field("blocks_by_head", fmt_q!(&self.blocks_by_head_quota)) .field( "payload_envelopes_by_range", fmt_q!(&self.payload_envelopes_by_range_quota), @@ -213,6 +218,7 @@ impl FromStr for RateLimiterConfig { let mut goodbye_quota = None; let mut blocks_by_range_quota = None; let mut blocks_by_root_quota = None; + let mut blocks_by_head_quota = None; let mut payload_envelopes_by_range_quota = None; let mut payload_envelopes_by_root_quota = None; let mut blobs_by_range_quota = None; @@ -232,6 +238,7 @@ impl FromStr for RateLimiterConfig { Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota), Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota), Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), + Protocol::BlocksByHead => blocks_by_head_quota = blocks_by_head_quota.or(quota), Protocol::PayloadEnvelopesByRange => { payload_envelopes_by_range_quota = payload_envelopes_by_range_quota.or(quota) } @@ -274,6 +281,8 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA), blocks_by_root_quota: blocks_by_root_quota .unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA), + blocks_by_head_quota: blocks_by_head_quota + .unwrap_or(Self::DEFAULT_BLOCKS_BY_HEAD_QUOTA), payload_envelopes_by_range_quota: payload_envelopes_by_range_quota .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA), payload_envelopes_by_root_quota: payload_envelopes_by_root_quota diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index baabf486838..f3f294d9135 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -488,6 +488,18 @@ impl From for OldBlocksByRangeRequest { } } +/// Request a contiguous range of beacon blocks by walking the parent chain of `beacon_root`. +/// +/// New in Fulu (see consensus-specs PR 5181). The responder walks the parent chain of +/// `beacon_root` (inclusive) and emits up to `count` blocks in descending slot order. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct BlocksByHeadRequest { + /// The block root to start the parent walk from (inclusive). + pub beacon_root: Hash256, + /// The maximum number of blocks to return. + pub count: u64, +} + /// Request a number of beacon block bodies from a peer. #[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))] #[derive(Clone, Debug, PartialEq)] @@ -622,6 +634,9 @@ pub enum RpcSuccessResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), + /// A response to a get BEACON_BLOCKS_BY_HEAD request. + BlocksByHead(Arc>), + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. A None response signifies /// the end of the batch. PayloadEnvelopesByRange(Arc>), @@ -669,6 +684,9 @@ pub enum ResponseTermination { /// Blocks by root stream termination. BlocksByRoot, + /// Blocks by head stream termination. + BlocksByHead, + /// Execution payload envelopes by range stream termination. PayloadEnvelopesByRange, @@ -696,6 +714,7 @@ impl ResponseTermination { match self { ResponseTermination::BlocksByRange => Protocol::BlocksByRange, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::BlocksByHead => Protocol::BlocksByHead, ResponseTermination::PayloadEnvelopesByRange => Protocol::PayloadEnvelopesByRange, ResponseTermination::PayloadEnvelopesByRoot => Protocol::PayloadEnvelopesByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, @@ -793,6 +812,7 @@ impl RpcSuccessResponse { RpcSuccessResponse::Status(_) => Protocol::Status, RpcSuccessResponse::BlocksByRange(_) => Protocol::BlocksByRange, RpcSuccessResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, + RpcSuccessResponse::BlocksByHead(_) => Protocol::BlocksByHead, RpcSuccessResponse::PayloadEnvelopesByRange(_) => Protocol::PayloadEnvelopesByRange, RpcSuccessResponse::PayloadEnvelopesByRoot(_) => Protocol::PayloadEnvelopesByRoot, RpcSuccessResponse::BlobsByRange(_) => Protocol::BlobsByRange, @@ -812,7 +832,9 @@ impl RpcSuccessResponse { pub fn slot(&self) -> Option { match self { - Self::BlocksByRange(r) | Self::BlocksByRoot(r) => Some(r.slot()), + Self::BlocksByRange(r) | Self::BlocksByRoot(r) | Self::BlocksByHead(r) => { + Some(r.slot()) + } Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesByRange(r) => Some(r.slot()), Self::BlobsByRange(r) | Self::BlobsByRoot(r) => Some(r.slot()), Self::DataColumnsByRange(r) | Self::DataColumnsByRoot(r) => Some(r.slot()), @@ -864,6 +886,9 @@ impl std::fmt::Display for RpcSuccessResponse { RpcSuccessResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } + RpcSuccessResponse::BlocksByHead(block) => { + write!(f, "BlocksByHead: Block slot: {}", block.slot()) + } RpcSuccessResponse::PayloadEnvelopesByRange(envelope) => { write!( f, @@ -975,6 +1000,16 @@ impl std::fmt::Display for OldBlocksByRangeRequest { } } +impl std::fmt::Display for BlocksByHeadRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BlocksByHead: beacon_root: {}, count: {}", + self.beacon_root, self.count + ) + } +} + impl std::fmt::Display for BlobsByRootRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index c949dfe17d8..056ffc03b85 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -262,6 +262,9 @@ pub enum Protocol { /// The `BlocksByRoot` protocol name. #[strum(serialize = "beacon_blocks_by_root")] BlocksByRoot, + /// The `BlocksByHead` protocol name. + #[strum(serialize = "beacon_blocks_by_head")] + BlocksByHead, /// The `BlobsByRange` protocol name. #[strum(serialize = "blob_sidecars_by_range")] BlobsByRange, @@ -306,6 +309,7 @@ impl Protocol { Protocol::Goodbye => None, Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange), Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), + Protocol::BlocksByHead => Some(ResponseTermination::BlocksByHead), Protocol::PayloadEnvelopesByRange => Some(ResponseTermination::PayloadEnvelopesByRange), Protocol::PayloadEnvelopesByRoot => Some(ResponseTermination::PayloadEnvelopesByRoot), Protocol::BlobsByRange => Some(ResponseTermination::BlobsByRange), @@ -338,6 +342,7 @@ pub enum SupportedProtocol { BlocksByRangeV2, BlocksByRootV1, BlocksByRootV2, + BlocksByHeadV1, PayloadEnvelopesByRangeV1, PayloadEnvelopesByRootV1, BlobsByRangeV1, @@ -366,6 +371,7 @@ impl SupportedProtocol { SupportedProtocol::PayloadEnvelopesByRootV1 => "1", SupportedProtocol::BlocksByRootV1 => "1", SupportedProtocol::BlocksByRootV2 => "2", + SupportedProtocol::BlocksByHeadV1 => "1", SupportedProtocol::BlobsByRangeV1 => "1", SupportedProtocol::BlobsByRootV1 => "1", SupportedProtocol::DataColumnsByRootV1 => "1", @@ -390,6 +396,7 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange, SupportedProtocol::BlocksByRootV1 => Protocol::BlocksByRoot, SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, + SupportedProtocol::BlocksByHeadV1 => Protocol::BlocksByHead, SupportedProtocol::PayloadEnvelopesByRangeV1 => Protocol::PayloadEnvelopesByRange, SupportedProtocol::PayloadEnvelopesByRootV1 => Protocol::PayloadEnvelopesByRoot, SupportedProtocol::BlobsByRangeV1 => Protocol::BlobsByRange, @@ -458,6 +465,13 @@ impl SupportedProtocol { ), ]); } + // BeaconBlocksByHead is new in Fulu (consensus-specs PR 5181). + if fork_context.fork_exists(ForkName::Fulu) { + supported.push(ProtocolId::new( + SupportedProtocol::BlocksByHeadV1, + Encoding::SSZSnappy, + )); + } supported } } @@ -564,6 +578,10 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlocksByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::BlocksByHead => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), Protocol::PayloadEnvelopesByRange => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -609,6 +627,7 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork_name()), + Protocol::BlocksByHead => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::PayloadEnvelopesByRange => rpc_payload_limits(), Protocol::PayloadEnvelopesByRoot => rpc_payload_limits(), Protocol::BlobsByRange => rpc_blob_limits::(), @@ -648,6 +667,7 @@ impl ProtocolId { match self.versioned_protocol { SupportedProtocol::BlocksByRangeV2 | SupportedProtocol::BlocksByRootV2 + | SupportedProtocol::BlocksByHeadV1 | SupportedProtocol::PayloadEnvelopesByRangeV1 | SupportedProtocol::PayloadEnvelopesByRootV1 | SupportedProtocol::BlobsByRangeV1 @@ -801,6 +821,7 @@ pub enum RequestType { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + BlocksByHead(BlocksByHeadRequest), PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequest), PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest), BlobsByRange(BlobsByRangeRequest), @@ -826,6 +847,7 @@ impl RequestType { RequestType::Goodbye(_) => 0, RequestType::BlocksByRange(req) => *req.count(), RequestType::BlocksByRoot(req) => req.block_roots().len() as u64, + RequestType::BlocksByHead(req) => req.count, RequestType::PayloadEnvelopesByRange(req) => req.count, RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.len() as u64, RequestType::BlobsByRange(req) => req.max_blobs_requested(digest_epoch, spec), @@ -857,6 +879,7 @@ impl RequestType { BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1, BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2, }, + RequestType::BlocksByHead(_) => SupportedProtocol::BlocksByHeadV1, RequestType::PayloadEnvelopesByRange(_) => SupportedProtocol::PayloadEnvelopesByRangeV1, RequestType::PayloadEnvelopesByRoot(_) => SupportedProtocol::PayloadEnvelopesByRootV1, RequestType::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, @@ -890,6 +913,7 @@ impl RequestType { // variants that have `multiple_responses()` can have values. RequestType::BlocksByRange(_) => ResponseTermination::BlocksByRange, RequestType::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + RequestType::BlocksByHead(_) => ResponseTermination::BlocksByHead, RequestType::PayloadEnvelopesByRange(_) => ResponseTermination::PayloadEnvelopesByRange, RequestType::PayloadEnvelopesByRoot(_) => ResponseTermination::PayloadEnvelopesByRoot, RequestType::BlobsByRange(_) => ResponseTermination::BlobsByRange, @@ -926,6 +950,10 @@ impl RequestType { ProtocolId::new(SupportedProtocol::BlocksByRootV2, Encoding::SSZSnappy), ProtocolId::new(SupportedProtocol::BlocksByRootV1, Encoding::SSZSnappy), ], + RequestType::BlocksByHead(_) => vec![ProtocolId::new( + SupportedProtocol::BlocksByHeadV1, + Encoding::SSZSnappy, + )], RequestType::PayloadEnvelopesByRange(_) => vec![ProtocolId::new( SupportedProtocol::PayloadEnvelopesByRangeV1, Encoding::SSZSnappy, @@ -984,6 +1012,7 @@ impl RequestType { RequestType::Goodbye(_) => false, RequestType::BlocksByRange(_) => false, RequestType::BlocksByRoot(_) => false, + RequestType::BlocksByHead(_) => false, RequestType::BlobsByRange(_) => false, RequestType::PayloadEnvelopesByRange(_) => false, RequestType::PayloadEnvelopesByRoot(_) => false, @@ -1097,6 +1126,7 @@ impl std::fmt::Display for RequestType { RequestType::Goodbye(reason) => write!(f, "Goodbye: {}", reason), RequestType::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), RequestType::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + RequestType::BlocksByHead(req) => write!(f, "Blocks by head: {}", req), RequestType::PayloadEnvelopesByRange(req) => { write!(f, "Payload envelopes by range: {:?}", req) } @@ -1171,6 +1201,8 @@ mod tests { fork_context.fork_exists(ForkName::Gloas) } + BlocksByHeadV1 => fork_context.fork_exists(ForkName::Fulu), + // Light client protocols are not in currently_supported() LightClientBootstrapV1 | LightClientOptimisticUpdateV1 diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index ebdca386d88..a5c98a4d309 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -105,6 +105,8 @@ pub struct RPCRateLimiter { bbrange_rl: Limiter, /// BlocksByRoot rate limiter. bbroots_rl: Limiter, + /// BlocksByHead rate limiter. + bbhead_rl: Limiter, /// BlobsByRange rate limiter. blbrange_rl: Limiter, /// BlobsByRoot rate limiter. @@ -152,6 +154,8 @@ pub struct RPCRateLimiterBuilder { bbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, + /// Quota for the BlocksByHead protocol. + bbhead_quota: Option, /// Quota for the ExecutionPayloadEnvelopesByRange protocol. perange_quota: Option, /// Quota for the ExecutionPayloadEnvelopesByRoot protocol. @@ -185,6 +189,7 @@ impl RPCRateLimiterBuilder { Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, + Protocol::BlocksByHead => self.bbhead_quota = q, Protocol::PayloadEnvelopesByRange => self.perange_quota = q, Protocol::PayloadEnvelopesByRoot => self.peroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, @@ -211,6 +216,9 @@ impl RPCRateLimiterBuilder { let bbrange_quota = self .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let bbhead_quota = self + .bbhead_quota + .ok_or("BlocksByHead quota not specified")?; let perange_quota = self .perange_quota .ok_or("PayloadEnvelopesByRange quota not specified")?; @@ -252,6 +260,7 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let bbhead_rl = Limiter::from_quota(bbhead_quota)?; let envrange_rl = Limiter::from_quota(perange_quota)?; let envroots_rl = Limiter::from_quota(peroots_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; @@ -277,6 +286,7 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + bbhead_rl, envrange_rl, envroots_rl, blbrange_rl, @@ -332,6 +342,7 @@ impl RPCRateLimiter { goodbye_quota, blocks_by_range_quota, blocks_by_root_quota, + blocks_by_head_quota, payload_envelopes_by_range_quota, payload_envelopes_by_root_quota, blobs_by_range_quota, @@ -351,6 +362,7 @@ impl RPCRateLimiter { .set_quota(Protocol::Goodbye, goodbye_quota) .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) + .set_quota(Protocol::BlocksByHead, blocks_by_head_quota) .set_quota( Protocol::PayloadEnvelopesByRange, payload_envelopes_by_range_quota, @@ -406,6 +418,7 @@ impl RPCRateLimiter { Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, + Protocol::BlocksByHead => &mut self.bbhead_rl, Protocol::PayloadEnvelopesByRange => &mut self.envrange_rl, Protocol::PayloadEnvelopesByRoot => &mut self.envroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, @@ -432,6 +445,7 @@ impl RPCRateLimiter { status_rl, bbrange_rl, bbroots_rl, + bbhead_rl, envrange_rl, envroots_rl, blbrange_rl, @@ -451,6 +465,7 @@ impl RPCRateLimiter { status_rl.prune(time_since_start); bbrange_rl.prune(time_since_start); bbroots_rl.prune(time_since_start); + bbhead_rl.prune(time_since_start); envrange_rl.prune(time_since_start); envroots_rl.prune(time_since_start); blbrange_rl.prune(time_since_start); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a4438579..f598f59aee5 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -161,6 +161,9 @@ pub enum Response { DataColumnsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a get BEACON_BLOCKS_BY_HEAD request. A None response signals the end of the + /// batch. + BlocksByHead(Option>>), /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT` request. PayloadEnvelopesByRoot(Option>>), /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE` request. @@ -186,6 +189,10 @@ impl std::convert::From> for RpcResponse { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByRoot(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlocksByRoot), }, + Response::BlocksByHead(r) => match r { + Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByHead(b)), + None => RpcResponse::StreamTermination(ResponseTermination::BlocksByHead), + }, Response::BlocksByRange(r) => match r { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByRange(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlocksByRange), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index f0c1567cb04..41d937e3245 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1691,6 +1691,14 @@ impl Network { request_type, }) } + RequestType::BlocksByHead(_) => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_head"]); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } RequestType::PayloadEnvelopesByRange(_) => { metrics::inc_counter_vec( &metrics::TOTAL_RPC_REQUESTS, @@ -1827,6 +1835,9 @@ impl Network { RpcSuccessResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + RpcSuccessResponse::BlocksByHead(resp) => { + self.build_response(id, peer_id, Response::BlocksByHead(Some(resp))) + } RpcSuccessResponse::PayloadEnvelopesByRange(resp) => self.build_response( id, peer_id, @@ -1871,6 +1882,7 @@ impl Network { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), + ResponseTermination::BlocksByHead => Response::BlocksByHead(None), ResponseTermination::PayloadEnvelopesByRange => { Response::PayloadEnvelopesByRange(None) } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index bfcff2088b7..b8d1cfd12e1 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -14,8 +14,8 @@ use beacon_processor::{ }; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, - LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest, + BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest, + DataColumnsByRootRequest, LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::service::api_types::CustodyBackfillBatchId; @@ -703,6 +703,26 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `BlocksByHeadRequest`s from the RPC network. + pub fn send_blocks_by_head_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: BlocksByHeadRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_blocks_by_head_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::BlocksByHeadRequest(Box::pin(process_fn)), + }) + } + /// Create a new work event to process `BlocksByRootRequest`s from the RPC network. pub fn send_blocks_by_roots_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 8b31b67acbd..37a6f3779ae 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -7,8 +7,8 @@ use beacon_chain::payload_envelope_streamer::EnvelopeRequestSource; use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped}; use itertools::{Itertools, process_results}; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, - PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, + BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest, + DataColumnsByRootRequest, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo}; @@ -256,6 +256,266 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Handle a `BeaconBlocksByHead` request from the peer. + /// + /// Walks the parent chain of `request.beacon_root` (inclusive) and emits up to + /// `min(request.count, MAX_REQUEST_BLOCKS_DENEB)` blocks in descending slot order. + /// See consensus-specs PR 5181. + #[instrument( + name = "lh_handle_blocks_by_head_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_blocks_by_head_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: BlocksByHeadRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_blocks_by_head_request_inner(peer_id, inbound_request_id, request) + .await, + Response::BlocksByHead, + ); + } + + async fn handle_blocks_by_head_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: BlocksByHeadRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let spec = &self.chain.spec; + // Cap the response at MAX_REQUEST_BLOCKS_DENEB regardless of what the peer asked for, + // matching the spec. + let max_request_blocks = spec.max_request_blocks(types::ForkName::Deneb) as u64; + let cap = request.count.min(max_request_blocks); + let beacon_root = request.beacon_root; + + debug!( + %peer_id, + beacon_root = ?beacon_root, + count = request.count, + cap, + "Received BlocksByHead Request" + ); + + if cap == 0 { + return Ok(()); + } + + // Walk the parent chain on a blocking thread because `get_blinded_block` hits the store + // synchronously and we may walk up to MAX_REQUEST_BLOCKS_DENEB ancestors. + let network_beacon_processor = self.clone(); + let block_roots = self + .executor + .spawn_blocking_handle( + move || network_beacon_processor.get_block_roots_ancestor_of_head(beacon_root, cap), + "get_block_roots_ancestor_of_head", + ) + .ok_or((RpcErrorResponse::ServerError, "shutting down"))? + .await + .map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??; + + let requested_blocks = block_roots.len(); + + let log_results = |peer_id, blocks_sent| { + debug!( + %peer_id, + requested = requested_blocks, + returned = blocks_sent, + "BlocksByHead outgoing response processed" + ); + }; + + let mut block_stream = match self.chain.get_blocks(block_roots) { + Ok(block_stream) => block_stream, + Err(e) => { + error!(error = ?e, "Error getting block stream"); + return Err((RpcErrorResponse::ServerError, "Iterator error")); + } + }; + + // Fetching blocks is async because it may have to hit the execution layer for payloads. + let mut blocks_sent = 0; + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { + Ok(Some(block)) => { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + inbound_request_id, + response: Response::BlocksByHead(Some(block.clone())), + }); + } + Ok(None) => { + error!( + %peer_id, + request_root = ?root, + "Block in the chain is not in the store" + ); + log_results(peer_id, blocks_sent); + return Err((RpcErrorResponse::ServerError, "Database inconsistency")); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for blocks by head request" + ); + log_results(peer_id, blocks_sent); + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + if matches!( + e, + BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, boxed_error) + if matches!(**boxed_error, execution_layer::Error::EngineError(_)) + ) { + warn!( + info = "this may occur occasionally when the EE is busy", + block_root = ?root, + error = ?e, + "Error rebuilding payload for peer" + ); + } else { + error!( + block_root = ?root, + error = ?e, + "Error fetching block for peer" + ); + } + log_results(peer_id, blocks_sent); + return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); + } + } + } + + log_results(peer_id, blocks_sent); + Ok(()) + } + + /// Walks the parent chain of `head_root` (inclusive) and returns up to `count` block roots + /// in descending slot order. Synchronous so it can be run on a blocking thread. + /// + /// Two regimes are handled: + /// 1. Above finalization → fork-choice's in-memory proto-array supplies the roots + /// (zero DB reads). + /// 2. At or below finalization → the freezer DB's `BeaconBlockRoots` column (the + /// canonical slot→root index for finalized blocks, populated for + /// `[oldest_block_slot, split.slot)` with skip slots reusing the prior block's + /// root) supplies the roots. The head state is never consulted: its 8192-slot + /// `block_roots` bucket would silently truncate deep walks and is the wrong + /// source of truth for canonical history below finalization. + /// + /// Returns `ResourceUnavailable` if `head_root` is not known to the node. + fn get_block_roots_ancestor_of_head( + &self, + head_root: Hash256, + count: u64, + ) -> Result, (RpcErrorResponse, &'static str)> { + if count == 0 { + return Ok(vec![]); + } + + // 1. Walk ancestors in proto-array (in-memory, zero DB reads). Track the + // deepest slot we collected — that's where the freezer walk picks up. + let mut roots: Vec = Vec::with_capacity(count as usize); + let mut deepest_slot: Option = None; + { + let fork_choice = self.chain.canonical_head.fork_choice_read_lock(); + for (root, slot) in fork_choice + .proto_array() + .iter_block_roots(&head_root) + .take(count as usize) + { + roots.push(root); + deepest_slot = Some(slot); + } + } + + let store = &self.chain.store; + + // 2. Fallback: `head_root` is at or below finalization (proto-array doesn't + // track it). Look up its slot in the store, then verify it is the canonical + // block at that slot via the freezer index — a non-canonical hot-DB block at + // slot < split.slot can shadow the finalized chain. If the freezer + // disagrees (or doesn't have that slot), serve just the single block we + // found, satisfying the spec's "MUST return at least one block if you have + // it" clause. + let mut current_slot = if let Some(slot) = deepest_slot { + slot + } else { + let block = self + .chain + .get_blinded_block(&head_root) + .map_err(|e| { + error!(error = ?e, "Error reading blinded block for BlocksByHead beacon_root"); + (RpcErrorResponse::ServerError, "Database error") + })? + .ok_or((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root"))?; + let block_slot = block.slot(); + roots.push(head_root); + + match store.get_cold_block_root(block_slot) { + Ok(Some(r)) if r == head_root => {} // canonical, OK to walk back + Ok(_) => return Ok(roots), + Err(e) => { + error!(error = ?e, "Error reading freezer block_root for BlocksByHead"); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + } + + block_slot + }; + + if (roots.len() as u64) >= count { + return Ok(roots); + } + + // 3. Spillover via the freezer DB's `BeaconBlockRoots` index (the canonical + // slot→root mapping for finalized blocks). Skip slots reuse the prior + // block's root; dedup on insert. + let oldest_block_slot = store.get_oldest_block_slot(); + let mut last_root = roots.last().copied(); + while (roots.len() as u64) < count && current_slot > oldest_block_slot { + current_slot = match current_slot.as_u64().checked_sub(1) { + Some(s) => Slot::from(s), + None => break, + }; + match store.get_cold_block_root(current_slot) { + Ok(Some(root)) => { + if Some(root) != last_root { + roots.push(root); + last_root = Some(root); + } + } + Ok(None) => { + // Hole in the freezer index (e.g. before `oldest_block_slot` on a + // checkpoint-synced node). Stop walking. + break; + } + Err(e) => { + error!(error = ?e, "Error walking freezer block_roots"); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + } + } + + Ok(roots) + } + /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. #[instrument( name = "lh_handle_payload_envelopes_by_root_request", diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index c4e7f8f8d1f..f13815f7b66 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -24,8 +24,8 @@ use itertools::Itertools; use libp2p::gossipsub::MessageAcceptance; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3, - PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, + BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest, + MetaDataV3, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, @@ -501,6 +501,16 @@ impl TestRig { .unwrap(); } + pub fn enqueue_blocks_by_head_request(&self, beacon_root: Hash256, count: u64) { + self.network_beacon_processor + .send_blocks_by_head_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlocksByHeadRequest { beacon_root, count }, + ) + .unwrap(); + } + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { self.network_beacon_processor .send_blobs_by_roots_request( @@ -2346,3 +2356,153 @@ async fn test_payload_envelopes_by_range_no_duplicates_with_skip_slots() { // 1. Gossip envelope arrives before its block → queued via UnknownBlockForEnvelope // 2. Block imported → envelope released and processed successfully // 3. Timeout path → envelope released and re-verified + +/// Drain `network_rx` collecting `Response::BlocksByHead(Some(_))` block roots until the +/// stream terminator (`None`) arrives. Panics on any other message type so tests fail +/// loudly if an error response sneaks in. +async fn drain_blocks_by_head_response(rig: &mut TestRig) -> Vec { + let mut roots = Vec::new(); + while let Some(msg) = rig.network_rx.recv().await { + match msg { + NetworkMessage::SendResponse { + response: Response::BlocksByHead(Some(block)), + .. + } => roots.push(block.canonical_root()), + NetworkMessage::SendResponse { + response: Response::BlocksByHead(None), + .. + } => return roots, + other => panic!("unexpected message: {:?}", other), + } + } + roots +} + +// `BlocksByHead` request that crosses the finalized boundary: proto-array supplies +// the unfinalized head + ancestors down to the finalized root, then the freezer's +// `BeaconBlockRoots` index supplies the rest. Verifies the spillover path +// `get_block_roots_ancestor_of_head` takes when count > proto-array depth. +#[tokio::test] +async fn test_blocks_by_head_spillover_into_freezer() { + // Long enough for finalization + state migration to populate the freezer. + let mut rig = TestRig::new(SLOTS_PER_EPOCH * 4).await; + + // Sanity-check the precondition: finalization advanced past genesis and the split + // slot is non-zero, so the freezer's `BeaconBlockRoots` column has entries. + assert!( + rig.chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + > Epoch::new(0), + "test precondition: chain must have finalized past epoch 0", + ); + assert!( + rig.chain.store.get_split_slot() > Slot::new(0), + "test precondition: state migration must have populated the freezer", + ); + + let head_slot = rig.chain.canonical_head.cached_head().head_slot(); + let head_root = rig.chain.canonical_head.cached_head().head_block_root(); + + // Walk all the way back to slot 1: exercises both proto-array (above finalization) + // and freezer (at/below finalization). + let count = head_slot.as_u64(); + rig.enqueue_blocks_by_head_request(head_root, count); + let actual = drain_blocks_by_head_response(&mut rig).await; + + // Build the canonical descending root list independently. The harness has no skip + // slots so every slot in [1, head_slot] has a unique block, but we still dedup + // defensively to mirror the function under test. + let mut expected: Vec = Vec::new(); + let mut last: Option = None; + for offset in 0..count { + let slot = Slot::new(head_slot.as_u64() - offset); + if let Some(root) = rig + .chain + .block_root_at_slot(slot, WhenSlotSkipped::Prev) + .unwrap() + && Some(root) != last + { + expected.push(root); + last = Some(root); + } + } + + assert_eq!( + actual, expected, + "BlocksByHead must serve the full canonical parent chain across the finalized boundary", + ); + assert_eq!(actual.first(), Some(&head_root), "first root must be head"); +} + +// `BlocksByHead` with `beacon_root` set to a finalized block root (case-2 fallback in +// `get_block_roots_ancestor_of_head`): proto-array doesn't track it, so we +// `get_blinded_block` for its slot, verify canonicity via the freezer index, and walk +// back from there. +#[tokio::test] +async fn test_blocks_by_head_finalized_root() { + let mut rig = TestRig::new(SLOTS_PER_EPOCH * 4).await; + + let finalized_root = rig + .chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .root; + let finalized_slot = rig + .chain + .get_blinded_block(&finalized_root) + .unwrap() + .expect("finalized block exists in store") + .slot(); + assert!( + finalized_slot > Slot::new(0), + "test precondition: finalized block must not be genesis", + ); + + let count = 8u64.min(finalized_slot.as_u64()); + rig.enqueue_blocks_by_head_request(finalized_root, count); + let actual = drain_blocks_by_head_response(&mut rig).await; + + let mut expected: Vec = Vec::new(); + let mut last: Option = None; + for offset in 0..count { + let slot = Slot::new(finalized_slot.as_u64() - offset); + if let Some(root) = rig + .chain + .block_root_at_slot(slot, WhenSlotSkipped::Prev) + .unwrap() + && Some(root) != last + { + expected.push(root); + last = Some(root); + } + } + + assert_eq!(actual, expected); + assert_eq!( + actual.first(), + Some(&finalized_root), + "first root must be the requested finalized root", + ); +} + +// `BlocksByHead` for a `beacon_root` we don't have. Spec says we MUST return an error +// (we map this to `ResourceUnavailable`). +#[tokio::test] +async fn test_blocks_by_head_unknown_root() { + let mut rig = TestRig::new(SLOTS_PER_EPOCH).await; + rig.enqueue_blocks_by_head_request(Hash256::repeat_byte(0xab), 4); + + match rig.network_rx.recv().await.expect("a network message") { + NetworkMessage::SendErrorResponse { error, .. } => { + assert_matches!( + error, + lighthouse_network::rpc::RpcErrorResponse::ResourceUnavailable + ); + } + other => panic!("expected SendErrorResponse, got {:?}", other), + } +} diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 443fa51cc67..a718997e0af 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -243,6 +243,13 @@ impl Router { request, ), ), + RequestType::BlocksByHead(request) => self.handle_beacon_processor_send_result( + self.network_beacon_processor.send_blocks_by_head_request( + peer_id, + inbound_request_id, + request, + ), + ), RequestType::PayloadEnvelopesByRoot(request) => self .handle_beacon_processor_send_result( self.network_beacon_processor @@ -346,6 +353,11 @@ impl Router { Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { debug!("Requesting envelopes by root and by range not supported yet"); } + // Lighthouse currently only serves BlocksByHead and does not issue it as a client, + // so receiving a response is unexpected. Drop it without crashing. + Response::BlocksByHead(_) => { + debug!("BlocksByHead response received but not requested by lighthouse"); + } // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_)