From b29121a3f6269180db6781bd9679fa809b330b47 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 00:15:28 +0200 Subject: [PATCH 01/11] Add `BeaconBlocksByHead` v1 req/resp protocol Implements consensus-specs PR 5181: a new Fulu-only req/resp route `/eth2/beacon_chain/req/beacon_blocks_by_head/1/`. The request is `(beacon_root: Hash256, count: u64)`; the responder walks the parent chain of `beacon_root` (inclusive) and emits up to `min(count, MAX_REQUEST_BLOCKS_DENEB)` blocks in descending slot order, one block per `response_chunk` (same shape as `BeaconBlocksByRange v2`). Walk stops when `count` blocks have been emitted or when the parent chain becomes locally unavailable. Returns `ResourceUnavailable` if `beacon_root` itself is unknown. Wired through: - `Protocol::BlocksByHead` / `SupportedProtocol::BlocksByHeadV1`, registered in `currently_supported()` only when Fulu is scheduled. - `BlocksByHeadRequest` SSZ container (40 bytes, fixed). - Codec encode/decode with fork-context dispatch (Fulu, Gloas). - Per-protocol rate limit (`DEFAULT_BLOCKS_BY_HEAD_QUOTA = 128/10s`). - New `Work::BlocksByHeadRequest` async work item with its own queue, scheduled alongside existing block requests. - Inbound handler `handle_blocks_by_head_request` in `network_beacon_processor`, wired through `router.rs`. The parent walk runs on a blocking thread via the dedicated `get_block_roots_ancestor_of_head` helper; streaming, error handling and `Ok(None)` semantics mirror `BlocksByRange`. Outbound (sync-side) consumption is intentionally out of scope. Codec round-trip and protocol-registration tests cover the new variant. --- beacon_node/beacon_processor/src/lib.rs | 10 + .../src/scheduler/work_queue.rs | 5 + .../src/peer_manager/mod.rs | 3 + .../lighthouse_network/src/rpc/codec.rs | 41 ++++ .../lighthouse_network/src/rpc/config.rs | 9 + .../lighthouse_network/src/rpc/methods.rs | 37 +++- .../lighthouse_network/src/rpc/protocol.rs | 32 +++ .../src/rpc/rate_limiter.rs | 15 ++ .../src/service/api_types.rs | 7 + .../lighthouse_network/src/service/mod.rs | 12 ++ .../src/network_beacon_processor/mod.rs | 24 ++- .../network_beacon_processor/rpc_methods.rs | 198 +++++++++++++++++- beacon_node/network/src/router.rs | 12 ++ 13 files changed, 400 insertions(+), 5 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index ea87e9bc718..25944bcf8a5 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -431,6 +431,7 @@ pub enum Work { Status(BlockingFn), BlocksByRangeRequest(AsyncFn), BlocksByRootsRequest(AsyncFn), + BlocksByHeadRequest(AsyncFn), PayloadEnvelopesByRangeRequest(AsyncFn), PayloadEnvelopesByRootRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), @@ -491,6 +492,7 @@ pub enum WorkType { Status, BlocksByRangeRequest, BlocksByRootsRequest, + BlocksByHeadRequest, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, BlobsByRangeRequest, @@ -553,6 +555,7 @@ impl Work { Work::Status(_) => WorkType::Status, Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, + Work::BlocksByHeadRequest(_) => WorkType::BlocksByHeadRequest, Work::PayloadEnvelopesByRangeRequest(_) => WorkType::PayloadEnvelopesByRangeRequest, Work::PayloadEnvelopesByRootRequest(_) => WorkType::PayloadEnvelopesByRootRequest, Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, @@ -1000,6 +1003,8 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = work_queues.block_broots_queue.pop() { Some(item) + } else if let Some(item) = work_queues.block_bhead_queue.pop() { + Some(item) } else if let Some(item) = work_queues.blob_brange_queue.pop() { Some(item) } else if let Some(item) = work_queues.blob_broots_queue.pop() { @@ -1206,6 +1211,9 @@ impl BeaconProcessor { Work::BlocksByRootsRequest { .. } => { work_queues.block_broots_queue.push(work, work_id) } + Work::BlocksByHeadRequest { .. } => { + work_queues.block_bhead_queue.push(work, work_id) + } Work::PayloadEnvelopesByRangeRequest { .. } => work_queues .payload_envelopes_brange_queue .push(work, work_id), @@ -1331,6 +1339,7 @@ impl BeaconProcessor { WorkType::Status => work_queues.status_queue.len(), WorkType::BlocksByRangeRequest => work_queues.block_brange_queue.len(), WorkType::BlocksByRootsRequest => work_queues.block_broots_queue.len(), + WorkType::BlocksByHeadRequest => work_queues.block_bhead_queue.len(), WorkType::PayloadEnvelopesByRangeRequest => { work_queues.payload_envelopes_brange_queue.len() } @@ -1531,6 +1540,7 @@ impl BeaconProcessor { } Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) + | Work::BlocksByHeadRequest(work) | Work::PayloadEnvelopesByRangeRequest(work) | Work::PayloadEnvelopesByRootRequest(work) => task_spawner.spawn_async(work), Work::ChainSegmentBackfill(process_fn) => { diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index f7163d538b5..eb57b97df28 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -132,6 +132,7 @@ pub struct BeaconProcessorQueueLengths { status_queue: usize, block_brange_queue: usize, block_broots_queue: usize, + block_bhead_queue: usize, blob_broots_queue: usize, blob_brange_queue: usize, dcbroots_queue: usize, @@ -206,6 +207,7 @@ impl BeaconProcessorQueueLengths { status_queue: 1024, block_brange_queue: 1024, block_broots_queue: 1024, + block_bhead_queue: 1024, blob_broots_queue: 1024, blob_brange_queue: 1024, dcbroots_queue: 1024, @@ -263,6 +265,7 @@ pub struct WorkQueues { pub status_queue: FifoQueue>, pub block_brange_queue: FifoQueue>, pub block_broots_queue: FifoQueue>, + pub block_bhead_queue: FifoQueue>, pub payload_envelopes_brange_queue: FifoQueue>, pub payload_envelopes_broots_queue: FifoQueue>, pub blob_broots_queue: FifoQueue>, @@ -334,6 +337,7 @@ impl WorkQueues { let status_queue = FifoQueue::new(queue_lengths.status_queue); let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue); let block_broots_queue = FifoQueue::new(queue_lengths.block_broots_queue); + let block_bhead_queue = FifoQueue::new(queue_lengths.block_bhead_queue); let blob_broots_queue = FifoQueue::new(queue_lengths.blob_broots_queue); let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue); let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); @@ -399,6 +403,7 @@ impl WorkQueues { status_queue, block_brange_queue, block_broots_queue, + block_bhead_queue, blob_broots_queue, blob_brange_queue, dcbroots_queue, diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index d7285c5c8e3..6b5144fa6fd 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -589,6 +589,7 @@ impl PeerManager { Protocol::Ping => PeerAction::MidToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::BlocksByHead => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, @@ -617,6 +618,7 @@ impl PeerManager { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, + Protocol::BlocksByHead => return, Protocol::PayloadEnvelopesByRange => return, Protocol::PayloadEnvelopesByRoot => return, Protocol::BlobsByRange => return, @@ -642,6 +644,7 @@ impl PeerManager { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, + Protocol::BlocksByHead => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError, Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 75e035ae82d..e2a0cfc3d69 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -77,6 +77,7 @@ impl SSZSnappyInboundCodec { }, RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RpcSuccessResponse::BlocksByHead(res) => res.as_ssz_bytes(), RpcSuccessResponse::PayloadEnvelopesByRange(res) => res.as_ssz_bytes(), RpcSuccessResponse::PayloadEnvelopesByRoot(res) => res.as_ssz_bytes(), RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(), @@ -359,6 +360,7 @@ impl Encoder> for SSZSnappyOutboundCodec { BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(), BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(), }, + RequestType::BlocksByHead(req) => req.as_ssz_bytes(), RequestType::PayloadEnvelopesByRange(req) => req.as_ssz_bytes(), RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.as_ssz_bytes(), RequestType::BlobsByRange(req) => req.as_ssz_bytes(), @@ -553,6 +555,9 @@ fn handle_rpc_request( )?, }), ))), + SupportedProtocol::BlocksByHeadV1 => Ok(Some(RequestType::BlocksByHead( + BlocksByHeadRequest::from_ssz_bytes(decoded_buffer)?, + ))), SupportedProtocol::PayloadEnvelopesByRangeV1 => { Ok(Some(RequestType::PayloadEnvelopesByRange( PayloadEnvelopesByRangeRequest::from_ssz_bytes(decoded_buffer)?, @@ -943,6 +948,35 @@ fn handle_rpc_response( ), )), }, + SupportedProtocol::BlocksByHeadV1 => match fork_name { + Some(fork_name) if fork_name.fulu_enabled() => match fork_name { + ForkName::Fulu => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Fulu(SignedBeaconBlockFulu::from_ssz_bytes(decoded_buffer)?), + )))), + ForkName::Gloas => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Gloas(SignedBeaconBlockGloas::from_ssz_bytes( + decoded_buffer, + )?), + )))), + // `fulu_enabled()` returns true only for Fulu and later forks; the matches + // above cover those exhaustively. + _ => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Unexpected fork variant for blocks by head".to_string(), + )), + }, + Some(_) => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Invalid fork name for blocks by head".to_string(), + )), + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, } } @@ -1319,6 +1353,9 @@ mod tests { RequestType::BlocksByRoot(bbroot) => { assert_eq!(decoded, RequestType::BlocksByRoot(bbroot)) } + RequestType::BlocksByHead(bbhead) => { + assert_eq!(decoded, RequestType::BlocksByHead(bbhead)) + } RequestType::BlobsByRange(blbrange) => { assert_eq!(decoded, RequestType::BlobsByRange(blbrange)) } @@ -2063,6 +2100,10 @@ mod tests { RequestType::BlobsByRange(blbrange_request()), RequestType::DataColumnsByRange(dcbrange_request()), RequestType::MetaData(MetadataRequest::new_v2()), + RequestType::BlocksByHead(BlocksByHeadRequest { + beacon_root: Hash256::zero(), + count: 32, + }), ]; for req in requests.iter() { for fork_name in ForkName::list_all() { diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index 9e1c6541ec8..59f0b8e9a2f 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -89,6 +89,7 @@ pub struct RateLimiterConfig { pub(super) goodbye_quota: Quota, pub(super) blocks_by_range_quota: Quota, pub(super) blocks_by_root_quota: Quota, + pub(super) blocks_by_head_quota: Quota, pub(super) payload_envelopes_by_range_quota: Quota, pub(super) payload_envelopes_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, @@ -113,6 +114,8 @@ impl RateLimiterConfig { Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_BLOCKS_BY_HEAD_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota = Quota::n_every(NonZeroU64::new(128).unwrap(), 10); pub const DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota = @@ -143,6 +146,7 @@ impl Default for RateLimiterConfig { goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA, blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA, blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, + blocks_by_head_quota: Self::DEFAULT_BLOCKS_BY_HEAD_QUOTA, payload_envelopes_by_range_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA, payload_envelopes_by_root_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, @@ -177,6 +181,7 @@ impl Debug for RateLimiterConfig { .field("goodbye", fmt_q!(&self.goodbye_quota)) .field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota)) .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) + .field("blocks_by_head", fmt_q!(&self.blocks_by_head_quota)) .field( "payload_envelopes_by_range", fmt_q!(&self.payload_envelopes_by_range_quota), @@ -213,6 +218,7 @@ impl FromStr for RateLimiterConfig { let mut goodbye_quota = None; let mut blocks_by_range_quota = None; let mut blocks_by_root_quota = None; + let mut blocks_by_head_quota = None; let mut payload_envelopes_by_range_quota = None; let mut payload_envelopes_by_root_quota = None; let mut blobs_by_range_quota = None; @@ -232,6 +238,7 @@ impl FromStr for RateLimiterConfig { Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota), Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota), Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), + Protocol::BlocksByHead => blocks_by_head_quota = blocks_by_head_quota.or(quota), Protocol::PayloadEnvelopesByRange => { payload_envelopes_by_range_quota = payload_envelopes_by_range_quota.or(quota) } @@ -274,6 +281,8 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA), blocks_by_root_quota: blocks_by_root_quota .unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA), + blocks_by_head_quota: blocks_by_head_quota + .unwrap_or(Self::DEFAULT_BLOCKS_BY_HEAD_QUOTA), payload_envelopes_by_range_quota: payload_envelopes_by_range_quota .unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA), payload_envelopes_by_root_quota: payload_envelopes_by_root_quota diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index baabf486838..f3f294d9135 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -488,6 +488,18 @@ impl From for OldBlocksByRangeRequest { } } +/// Request a contiguous range of beacon blocks by walking the parent chain of `beacon_root`. +/// +/// New in Fulu (see consensus-specs PR 5181). The responder walks the parent chain of +/// `beacon_root` (inclusive) and emits up to `count` blocks in descending slot order. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct BlocksByHeadRequest { + /// The block root to start the parent walk from (inclusive). + pub beacon_root: Hash256, + /// The maximum number of blocks to return. + pub count: u64, +} + /// Request a number of beacon block bodies from a peer. #[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))] #[derive(Clone, Debug, PartialEq)] @@ -622,6 +634,9 @@ pub enum RpcSuccessResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Arc>), + /// A response to a get BEACON_BLOCKS_BY_HEAD request. + BlocksByHead(Arc>), + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. A None response signifies /// the end of the batch. PayloadEnvelopesByRange(Arc>), @@ -669,6 +684,9 @@ pub enum ResponseTermination { /// Blocks by root stream termination. BlocksByRoot, + /// Blocks by head stream termination. + BlocksByHead, + /// Execution payload envelopes by range stream termination. PayloadEnvelopesByRange, @@ -696,6 +714,7 @@ impl ResponseTermination { match self { ResponseTermination::BlocksByRange => Protocol::BlocksByRange, ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::BlocksByHead => Protocol::BlocksByHead, ResponseTermination::PayloadEnvelopesByRange => Protocol::PayloadEnvelopesByRange, ResponseTermination::PayloadEnvelopesByRoot => Protocol::PayloadEnvelopesByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, @@ -793,6 +812,7 @@ impl RpcSuccessResponse { RpcSuccessResponse::Status(_) => Protocol::Status, RpcSuccessResponse::BlocksByRange(_) => Protocol::BlocksByRange, RpcSuccessResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, + RpcSuccessResponse::BlocksByHead(_) => Protocol::BlocksByHead, RpcSuccessResponse::PayloadEnvelopesByRange(_) => Protocol::PayloadEnvelopesByRange, RpcSuccessResponse::PayloadEnvelopesByRoot(_) => Protocol::PayloadEnvelopesByRoot, RpcSuccessResponse::BlobsByRange(_) => Protocol::BlobsByRange, @@ -812,7 +832,9 @@ impl RpcSuccessResponse { pub fn slot(&self) -> Option { match self { - Self::BlocksByRange(r) | Self::BlocksByRoot(r) => Some(r.slot()), + Self::BlocksByRange(r) | Self::BlocksByRoot(r) | Self::BlocksByHead(r) => { + Some(r.slot()) + } Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesByRange(r) => Some(r.slot()), Self::BlobsByRange(r) | Self::BlobsByRoot(r) => Some(r.slot()), Self::DataColumnsByRange(r) | Self::DataColumnsByRoot(r) => Some(r.slot()), @@ -864,6 +886,9 @@ impl std::fmt::Display for RpcSuccessResponse { RpcSuccessResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: Block slot: {}", block.slot()) } + RpcSuccessResponse::BlocksByHead(block) => { + write!(f, "BlocksByHead: Block slot: {}", block.slot()) + } RpcSuccessResponse::PayloadEnvelopesByRange(envelope) => { write!( f, @@ -975,6 +1000,16 @@ impl std::fmt::Display for OldBlocksByRangeRequest { } } +impl std::fmt::Display for BlocksByHeadRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BlocksByHead: beacon_root: {}, count: {}", + self.beacon_root, self.count + ) + } +} + impl std::fmt::Display for BlobsByRootRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index c949dfe17d8..c18485624e9 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -262,6 +262,9 @@ pub enum Protocol { /// The `BlocksByRoot` protocol name. #[strum(serialize = "beacon_blocks_by_root")] BlocksByRoot, + /// The `BlocksByHead` protocol name. + #[strum(serialize = "beacon_blocks_by_head")] + BlocksByHead, /// The `BlobsByRange` protocol name. #[strum(serialize = "blob_sidecars_by_range")] BlobsByRange, @@ -306,6 +309,7 @@ impl Protocol { Protocol::Goodbye => None, Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange), Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), + Protocol::BlocksByHead => Some(ResponseTermination::BlocksByHead), Protocol::PayloadEnvelopesByRange => Some(ResponseTermination::PayloadEnvelopesByRange), Protocol::PayloadEnvelopesByRoot => Some(ResponseTermination::PayloadEnvelopesByRoot), Protocol::BlobsByRange => Some(ResponseTermination::BlobsByRange), @@ -338,6 +342,7 @@ pub enum SupportedProtocol { BlocksByRangeV2, BlocksByRootV1, BlocksByRootV2, + BlocksByHeadV1, PayloadEnvelopesByRangeV1, PayloadEnvelopesByRootV1, BlobsByRangeV1, @@ -366,6 +371,7 @@ impl SupportedProtocol { SupportedProtocol::PayloadEnvelopesByRootV1 => "1", SupportedProtocol::BlocksByRootV1 => "1", SupportedProtocol::BlocksByRootV2 => "2", + SupportedProtocol::BlocksByHeadV1 => "1", SupportedProtocol::BlobsByRangeV1 => "1", SupportedProtocol::BlobsByRootV1 => "1", SupportedProtocol::DataColumnsByRootV1 => "1", @@ -390,6 +396,7 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRangeV2 => Protocol::BlocksByRange, SupportedProtocol::BlocksByRootV1 => Protocol::BlocksByRoot, SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, + SupportedProtocol::BlocksByHeadV1 => Protocol::BlocksByHead, SupportedProtocol::PayloadEnvelopesByRangeV1 => Protocol::PayloadEnvelopesByRange, SupportedProtocol::PayloadEnvelopesByRootV1 => Protocol::PayloadEnvelopesByRoot, SupportedProtocol::BlobsByRangeV1 => Protocol::BlobsByRange, @@ -458,6 +465,13 @@ impl SupportedProtocol { ), ]); } + // BeaconBlocksByHead is new in Fulu (consensus-specs PR 5181). + if fork_context.fork_exists(ForkName::Fulu) { + supported.push(ProtocolId::new( + SupportedProtocol::BlocksByHeadV1, + Encoding::SSZSnappy, + )); + } supported } } @@ -564,6 +578,10 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlocksByRoot => RpcLimits::new(0, spec.max_blocks_by_root_request), + Protocol::BlocksByHead => RpcLimits::new( + ::ssz_fixed_len(), + ::ssz_fixed_len(), + ), Protocol::PayloadEnvelopesByRange => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -609,6 +627,7 @@ impl ProtocolId { Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response Protocol::BlocksByRange => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork_name()), + Protocol::BlocksByHead => rpc_block_limits_by_fork(fork_context.current_fork_name()), Protocol::PayloadEnvelopesByRange => rpc_payload_limits(), Protocol::PayloadEnvelopesByRoot => rpc_payload_limits(), Protocol::BlobsByRange => rpc_blob_limits::(), @@ -648,6 +667,7 @@ impl ProtocolId { match self.versioned_protocol { SupportedProtocol::BlocksByRangeV2 | SupportedProtocol::BlocksByRootV2 + | SupportedProtocol::BlocksByHeadV1 | SupportedProtocol::PayloadEnvelopesByRangeV1 | SupportedProtocol::PayloadEnvelopesByRootV1 | SupportedProtocol::BlobsByRangeV1 @@ -801,6 +821,7 @@ pub enum RequestType { Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), + BlocksByHead(BlocksByHeadRequest), PayloadEnvelopesByRange(PayloadEnvelopesByRangeRequest), PayloadEnvelopesByRoot(PayloadEnvelopesByRootRequest), BlobsByRange(BlobsByRangeRequest), @@ -826,6 +847,7 @@ impl RequestType { RequestType::Goodbye(_) => 0, RequestType::BlocksByRange(req) => *req.count(), RequestType::BlocksByRoot(req) => req.block_roots().len() as u64, + RequestType::BlocksByHead(req) => req.count, RequestType::PayloadEnvelopesByRange(req) => req.count, RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.len() as u64, RequestType::BlobsByRange(req) => req.max_blobs_requested(digest_epoch, spec), @@ -857,6 +879,7 @@ impl RequestType { BlocksByRootRequest::V1(_) => SupportedProtocol::BlocksByRootV1, BlocksByRootRequest::V2(_) => SupportedProtocol::BlocksByRootV2, }, + RequestType::BlocksByHead(_) => SupportedProtocol::BlocksByHeadV1, RequestType::PayloadEnvelopesByRange(_) => SupportedProtocol::PayloadEnvelopesByRangeV1, RequestType::PayloadEnvelopesByRoot(_) => SupportedProtocol::PayloadEnvelopesByRootV1, RequestType::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, @@ -890,6 +913,7 @@ impl RequestType { // variants that have `multiple_responses()` can have values. RequestType::BlocksByRange(_) => ResponseTermination::BlocksByRange, RequestType::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, + RequestType::BlocksByHead(_) => ResponseTermination::BlocksByHead, RequestType::PayloadEnvelopesByRange(_) => ResponseTermination::PayloadEnvelopesByRange, RequestType::PayloadEnvelopesByRoot(_) => ResponseTermination::PayloadEnvelopesByRoot, RequestType::BlobsByRange(_) => ResponseTermination::BlobsByRange, @@ -926,6 +950,10 @@ impl RequestType { ProtocolId::new(SupportedProtocol::BlocksByRootV2, Encoding::SSZSnappy), ProtocolId::new(SupportedProtocol::BlocksByRootV1, Encoding::SSZSnappy), ], + RequestType::BlocksByHead(_) => vec![ProtocolId::new( + SupportedProtocol::BlocksByHeadV1, + Encoding::SSZSnappy, + )], RequestType::PayloadEnvelopesByRange(_) => vec![ProtocolId::new( SupportedProtocol::PayloadEnvelopesByRangeV1, Encoding::SSZSnappy, @@ -984,6 +1012,7 @@ impl RequestType { RequestType::Goodbye(_) => false, RequestType::BlocksByRange(_) => false, RequestType::BlocksByRoot(_) => false, + RequestType::BlocksByHead(_) => false, RequestType::BlobsByRange(_) => false, RequestType::PayloadEnvelopesByRange(_) => false, RequestType::PayloadEnvelopesByRoot(_) => false, @@ -1097,6 +1126,7 @@ impl std::fmt::Display for RequestType { RequestType::Goodbye(reason) => write!(f, "Goodbye: {}", reason), RequestType::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), RequestType::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), + RequestType::BlocksByHead(req) => write!(f, "{}", req), RequestType::PayloadEnvelopesByRange(req) => { write!(f, "Payload envelopes by range: {:?}", req) } @@ -1171,6 +1201,8 @@ mod tests { fork_context.fork_exists(ForkName::Gloas) } + BlocksByHeadV1 => fork_context.fork_exists(ForkName::Fulu), + // Light client protocols are not in currently_supported() LightClientBootstrapV1 | LightClientOptimisticUpdateV1 diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index ebdca386d88..a5c98a4d309 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -105,6 +105,8 @@ pub struct RPCRateLimiter { bbrange_rl: Limiter, /// BlocksByRoot rate limiter. bbroots_rl: Limiter, + /// BlocksByHead rate limiter. + bbhead_rl: Limiter, /// BlobsByRange rate limiter. blbrange_rl: Limiter, /// BlobsByRoot rate limiter. @@ -152,6 +154,8 @@ pub struct RPCRateLimiterBuilder { bbrange_quota: Option, /// Quota for the BlocksByRoot protocol. bbroots_quota: Option, + /// Quota for the BlocksByHead protocol. + bbhead_quota: Option, /// Quota for the ExecutionPayloadEnvelopesByRange protocol. perange_quota: Option, /// Quota for the ExecutionPayloadEnvelopesByRoot protocol. @@ -185,6 +189,7 @@ impl RPCRateLimiterBuilder { Protocol::Goodbye => self.goodbye_quota = q, Protocol::BlocksByRange => self.bbrange_quota = q, Protocol::BlocksByRoot => self.bbroots_quota = q, + Protocol::BlocksByHead => self.bbhead_quota = q, Protocol::PayloadEnvelopesByRange => self.perange_quota = q, Protocol::PayloadEnvelopesByRoot => self.peroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, @@ -211,6 +216,9 @@ impl RPCRateLimiterBuilder { let bbrange_quota = self .bbrange_quota .ok_or("BlocksByRange quota not specified")?; + let bbhead_quota = self + .bbhead_quota + .ok_or("BlocksByHead quota not specified")?; let perange_quota = self .perange_quota .ok_or("PayloadEnvelopesByRange quota not specified")?; @@ -252,6 +260,7 @@ impl RPCRateLimiterBuilder { let goodbye_rl = Limiter::from_quota(goodbye_quota)?; let bbroots_rl = Limiter::from_quota(bbroots_quota)?; let bbrange_rl = Limiter::from_quota(bbrange_quota)?; + let bbhead_rl = Limiter::from_quota(bbhead_quota)?; let envrange_rl = Limiter::from_quota(perange_quota)?; let envroots_rl = Limiter::from_quota(peroots_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; @@ -277,6 +286,7 @@ impl RPCRateLimiterBuilder { goodbye_rl, bbroots_rl, bbrange_rl, + bbhead_rl, envrange_rl, envroots_rl, blbrange_rl, @@ -332,6 +342,7 @@ impl RPCRateLimiter { goodbye_quota, blocks_by_range_quota, blocks_by_root_quota, + blocks_by_head_quota, payload_envelopes_by_range_quota, payload_envelopes_by_root_quota, blobs_by_range_quota, @@ -351,6 +362,7 @@ impl RPCRateLimiter { .set_quota(Protocol::Goodbye, goodbye_quota) .set_quota(Protocol::BlocksByRange, blocks_by_range_quota) .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) + .set_quota(Protocol::BlocksByHead, blocks_by_head_quota) .set_quota( Protocol::PayloadEnvelopesByRange, payload_envelopes_by_range_quota, @@ -406,6 +418,7 @@ impl RPCRateLimiter { Protocol::Goodbye => &mut self.goodbye_rl, Protocol::BlocksByRange => &mut self.bbrange_rl, Protocol::BlocksByRoot => &mut self.bbroots_rl, + Protocol::BlocksByHead => &mut self.bbhead_rl, Protocol::PayloadEnvelopesByRange => &mut self.envrange_rl, Protocol::PayloadEnvelopesByRoot => &mut self.envroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, @@ -432,6 +445,7 @@ impl RPCRateLimiter { status_rl, bbrange_rl, bbroots_rl, + bbhead_rl, envrange_rl, envroots_rl, blbrange_rl, @@ -451,6 +465,7 @@ impl RPCRateLimiter { status_rl.prune(time_since_start); bbrange_rl.prune(time_since_start); bbroots_rl.prune(time_since_start); + bbhead_rl.prune(time_since_start); envrange_rl.prune(time_since_start); envroots_rl.prune(time_since_start); blbrange_rl.prune(time_since_start); diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a4438579..f598f59aee5 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -161,6 +161,9 @@ pub enum Response { DataColumnsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), + /// A response to a get BEACON_BLOCKS_BY_HEAD request. A None response signals the end of the + /// batch. + BlocksByHead(Option>>), /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT` request. PayloadEnvelopesByRoot(Option>>), /// A response to a get `EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE` request. @@ -186,6 +189,10 @@ impl std::convert::From> for RpcResponse { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByRoot(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlocksByRoot), }, + Response::BlocksByHead(r) => match r { + Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByHead(b)), + None => RpcResponse::StreamTermination(ResponseTermination::BlocksByHead), + }, Response::BlocksByRange(r) => match r { Some(b) => RpcResponse::Success(RpcSuccessResponse::BlocksByRange(b)), None => RpcResponse::StreamTermination(ResponseTermination::BlocksByRange), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index f0c1567cb04..41d937e3245 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1691,6 +1691,14 @@ impl Network { request_type, }) } + RequestType::BlocksByHead(_) => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blocks_by_head"]); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } RequestType::PayloadEnvelopesByRange(_) => { metrics::inc_counter_vec( &metrics::TOTAL_RPC_REQUESTS, @@ -1827,6 +1835,9 @@ impl Network { RpcSuccessResponse::BlocksByRoot(resp) => { self.build_response(id, peer_id, Response::BlocksByRoot(Some(resp))) } + RpcSuccessResponse::BlocksByHead(resp) => { + self.build_response(id, peer_id, Response::BlocksByHead(Some(resp))) + } RpcSuccessResponse::PayloadEnvelopesByRange(resp) => self.build_response( id, peer_id, @@ -1871,6 +1882,7 @@ impl Network { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), + ResponseTermination::BlocksByHead => Response::BlocksByHead(None), ResponseTermination::PayloadEnvelopesByRange => { Response::PayloadEnvelopesByRange(None) } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index bfcff2088b7..b8d1cfd12e1 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -14,8 +14,8 @@ use beacon_processor::{ }; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, - LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest, + BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest, + DataColumnsByRootRequest, LightClientUpdatesByRangeRequest, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::service::api_types::CustodyBackfillBatchId; @@ -703,6 +703,26 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `BlocksByHeadRequest`s from the RPC network. + pub fn send_blocks_by_head_request( + self: &Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: BlocksByHeadRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .handle_blocks_by_head_request(peer_id, inbound_request_id, request) + .await; + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::BlocksByHeadRequest(Box::pin(process_fn)), + }) + } + /// Create a new work event to process `BlocksByRootRequest`s from the RPC network. pub fn send_blocks_by_roots_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 8b31b67acbd..85f3ad1df5d 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -7,8 +7,8 @@ use beacon_chain::payload_envelope_streamer::EnvelopeRequestSource; use beacon_chain::{BeaconChainError, BeaconChainTypes, BlockProcessStatus, WhenSlotSkipped}; use itertools::{Itertools, process_results}; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, - PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, + BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest, + DataColumnsByRootRequest, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, ReportSource, Response, SyncInfo}; @@ -256,6 +256,200 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Handle a `BeaconBlocksByHead` request from the peer. + /// + /// Walks the parent chain of `request.beacon_root` (inclusive) and emits up to + /// `min(request.count, MAX_REQUEST_BLOCKS_DENEB)` blocks in descending slot order. + /// See consensus-specs PR 5181. + #[instrument( + name = "lh_handle_blocks_by_head_request", + parent = None, + level = "debug", + skip_all, + fields(peer_id = %peer_id, client = tracing::field::Empty) + )] + pub async fn handle_blocks_by_head_request( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: BlocksByHeadRequest, + ) { + let client = self.network_globals.client(&peer_id); + Span::current().record("client", field::display(client.kind)); + + self.terminate_response_stream( + peer_id, + inbound_request_id, + self.clone() + .handle_blocks_by_head_request_inner(peer_id, inbound_request_id, request) + .await, + Response::BlocksByHead, + ); + } + + async fn handle_blocks_by_head_request_inner( + self: Arc, + peer_id: PeerId, + inbound_request_id: InboundRequestId, + request: BlocksByHeadRequest, + ) -> Result<(), (RpcErrorResponse, &'static str)> { + let spec = &self.chain.spec; + // Cap the response at MAX_REQUEST_BLOCKS_DENEB regardless of what the peer asked for, + // matching the spec. + let max_request_blocks = spec.max_request_blocks(types::ForkName::Deneb) as u64; + let cap = request.count.min(max_request_blocks); + let beacon_root = request.beacon_root; + + debug!( + %peer_id, + beacon_root = ?beacon_root, + count = request.count, + cap, + "Received BlocksByHead Request" + ); + + if cap == 0 { + return Ok(()); + } + + // Walk the parent chain on a blocking thread because `get_blinded_block` hits the store + // synchronously and we may walk up to MAX_REQUEST_BLOCKS_DENEB ancestors. + let network_beacon_processor = self.clone(); + let block_roots = self + .executor + .spawn_blocking_handle( + move || network_beacon_processor.get_block_roots_ancestor_of_head(beacon_root, cap), + "get_block_roots_ancestor_of_head", + ) + .ok_or((RpcErrorResponse::ServerError, "shutting down"))? + .await + .map_err(|_| (RpcErrorResponse::ServerError, "tokio join"))??; + + let requested_blocks = block_roots.len(); + + let log_results = |peer_id, blocks_sent| { + debug!( + %peer_id, + requested = requested_blocks, + returned = blocks_sent, + "BlocksByHead outgoing response processed" + ); + }; + + let mut block_stream = match self.chain.get_blocks(block_roots) { + Ok(block_stream) => block_stream, + Err(e) => { + error!(error = ?e, "Error getting block stream"); + return Err((RpcErrorResponse::ServerError, "Iterator error")); + } + }; + + // Fetching blocks is async because it may have to hit the execution layer for payloads. + let mut blocks_sent = 0; + while let Some((root, result)) = block_stream.next().await { + match result.as_ref() { + Ok(Some(block)) => { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + inbound_request_id, + response: Response::BlocksByHead(Some(block.clone())), + }); + } + Ok(None) => { + error!( + %peer_id, + request_root = ?root, + "Block in the chain is not in the store" + ); + log_results(peer_id, blocks_sent); + return Err((RpcErrorResponse::ServerError, "Database inconsistency")); + } + Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + debug!( + block_root = ?root, + reason = "execution layer not synced", + "Failed to fetch execution payload for blocks by head request" + ); + log_results(peer_id, blocks_sent); + return Err(( + RpcErrorResponse::ResourceUnavailable, + "Execution layer not synced", + )); + } + Err(e) => { + if matches!( + e, + BeaconChainError::ExecutionLayerErrorPayloadReconstruction(_block_hash, boxed_error) + if matches!(**boxed_error, execution_layer::Error::EngineError(_)) + ) { + warn!( + info = "this may occur occasionally when the EE is busy", + block_root = ?root, + error = ?e, + "Error rebuilding payload for peer" + ); + } else { + error!( + block_root = ?root, + error = ?e, + "Error fetching block for peer" + ); + } + log_results(peer_id, blocks_sent); + return Err((RpcErrorResponse::ServerError, "Failed fetching blocks")); + } + } + } + + log_results(peer_id, blocks_sent); + Ok(()) + } + + /// Walks the parent chain of `head_root` (inclusive) and returns up to `count` block roots + /// in descending slot order. Synchronous so it can be run on a blocking thread. + /// + /// Returns `ResourceUnavailable` if `head_root` itself is unknown. Stops walking (without + /// erroring) once the next ancestor is not locally available, returning whatever was + /// collected. + fn get_block_roots_ancestor_of_head( + &self, + head_root: Hash256, + count: u64, + ) -> Result, (RpcErrorResponse, &'static str)> { + let mut roots = Vec::with_capacity(count as usize); + let mut current_root = head_root; + let mut walked = 0u64; + loop { + if walked == count { + break; + } + match self.chain.get_blinded_block(¤t_root) { + Ok(Some(block)) => { + roots.push(current_root); + walked = walked.saturating_add(1); + let parent = block.parent_root(); + if parent.is_zero() { + // Reached the genesis block (genesis has parent_root = 0x0..0). + break; + } + current_root = parent; + } + Ok(None) => { + if walked == 0 { + return Err((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root")); + } + break; + } + Err(e) => { + error!(error = ?e, "Error walking parent chain for BlocksByHead"); + return Err((RpcErrorResponse::ServerError, "Error walking parent chain")); + } + } + } + Ok(roots) + } + /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. #[instrument( name = "lh_handle_payload_envelopes_by_root_request", diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 443fa51cc67..a718997e0af 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -243,6 +243,13 @@ impl Router { request, ), ), + RequestType::BlocksByHead(request) => self.handle_beacon_processor_send_result( + self.network_beacon_processor.send_blocks_by_head_request( + peer_id, + inbound_request_id, + request, + ), + ), RequestType::PayloadEnvelopesByRoot(request) => self .handle_beacon_processor_send_result( self.network_beacon_processor @@ -346,6 +353,11 @@ impl Router { Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { debug!("Requesting envelopes by root and by range not supported yet"); } + // Lighthouse currently only serves BlocksByHead and does not issue it as a client, + // so receiving a response is unexpected. Drop it without crashing. + Response::BlocksByHead(_) => { + debug!("BlocksByHead response received but not requested by lighthouse"); + } // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_) From e2ccc982cdae630603e3ec6b70d4be403a5411c1 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 00:51:24 +0200 Subject: [PATCH 02/11] Resolve BlocksByHead roots via fork-choice + freezer index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously `get_block_roots_ancestor_of_head` walked the parent chain by calling `get_blinded_block` per ancestor — N store reads just to extract `parent_root`/`slot`. The streamer then re-fetched each block, yielding 2× store reads per request. Switch to: - `fork_choice_read_lock().proto_array().iter_block_roots(&head_root)` for the in-memory parent walk (zero DB reads). - `chain.forwards_iter_block_roots(start_slot)` (the freezer's slot→root index, no block bodies loaded) for spillover below the proto-array boundary; parents of finalized blocks are canonical so the canonical freezer iter is correct. `chain.get_blocks(roots)` then performs the only DB load per block, batched + pipelined via `BeaconBlockStreamer`'s `getPayloadBodiesByRangeV1` path. Skip-slot duplicates from the forwards iter are deduped before truncating to `remaining`. --- .../network_beacon_processor/rpc_methods.rs | 104 ++++++++++++------ 1 file changed, 73 insertions(+), 31 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 85f3ad1df5d..c761bfe80e9 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -409,45 +409,87 @@ impl NetworkBeaconProcessor { /// Walks the parent chain of `head_root` (inclusive) and returns up to `count` block roots /// in descending slot order. Synchronous so it can be run on a blocking thread. /// - /// Returns `ResourceUnavailable` if `head_root` itself is unknown. Stops walking (without - /// erroring) once the next ancestor is not locally available, returning whatever was - /// collected. + /// Sources roots from fork-choice's in-memory proto-array first (no DB reads), then falls + /// back to the freezer's slot→root index for the portion below the finalized boundary. + /// Returns `ResourceUnavailable` if `head_root` is not in proto-array. fn get_block_roots_ancestor_of_head( &self, head_root: Hash256, count: u64, ) -> Result, (RpcErrorResponse, &'static str)> { - let mut roots = Vec::with_capacity(count as usize); - let mut current_root = head_root; - let mut walked = 0u64; - loop { - if walked == count { - break; + if count == 0 { + return Ok(vec![]); + } + + // 1. Walk ancestors in proto-array (in-memory, zero DB reads). + let mut roots_with_slots: Vec<(Hash256, Slot)> = { + let fork_choice = self.chain.canonical_head.fork_choice_read_lock(); + fork_choice + .proto_array() + .iter_block_roots(&head_root) + .take(count as usize) + .collect() + }; + + if roots_with_slots.is_empty() { + return Err((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root")); + } + + let collected = roots_with_slots.len() as u64; + if collected >= count { + return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); + } + + // 2. Spillover below the proto-array boundary. Parents of finalized blocks are + // canonical, so the freezer's `slot → root` index is sufficient — no block + // bodies are loaded. + let oldest_slot = roots_with_slots.last().expect("non-empty checked above").1; + let remaining = count - collected; + let oldest_slot_u64 = oldest_slot.as_u64(); + let start_slot = oldest_slot_u64.saturating_sub(remaining); + if start_slot >= oldest_slot_u64 { + return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); + } + + let iter = match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockOutOfRange { .. }) => { + // Below the oldest available block (e.g. backfilling); return what we have. + return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); } - match self.chain.get_blinded_block(¤t_root) { - Ok(Some(block)) => { - roots.push(current_root); - walked = walked.saturating_add(1); - let parent = block.parent_root(); - if parent.is_zero() { - // Reached the genesis block (genesis has parent_root = 0x0..0). - break; - } - current_root = parent; - } - Ok(None) => { - if walked == 0 { - return Err((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root")); - } - break; - } - Err(e) => { - error!(error = ?e, "Error walking parent chain for BlocksByHead"); - return Err((RpcErrorResponse::ServerError, "Error walking parent chain")); - } + Err(e) => { + error!(error = ?e, "Error opening forwards block roots iterator"); + return Err((RpcErrorResponse::ServerError, "Database error")); + } + }; + + // Collect canonical roots in [start_slot, oldest_slot), ascending. Skip slots + // make `forwards_iter_block_roots` yield the same root for multiple consecutive + // slots, so dedup adjacent duplicates afterwards. + let collected_below: Vec<(Hash256, Slot)> = match process_results(iter, |it| { + it.take_while(|(_, slot)| *slot < oldest_slot) + .collect::>() + }) { + Ok(v) => v, + Err(e) => { + error!(error = ?e, "Error iterating forward block roots"); + return Err((RpcErrorResponse::ServerError, "Iteration error")); + } + }; + + let mut deduped: Vec<(Hash256, Slot)> = Vec::with_capacity(collected_below.len()); + for entry in collected_below { + if deduped.last().map(|(r, _)| *r) != Some(entry.0) { + deduped.push(entry); } } - Ok(roots) + + deduped.reverse(); // descending + deduped.truncate(remaining as usize); + + roots_with_slots.extend(deduped); + + Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()) } /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. From e65e5281cb5414ea35eeec2f49a21ac1b3e0e877 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 01:35:06 +0200 Subject: [PATCH 03/11] Walk head state block_roots for BlocksByHead spillover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous spillover computed `start_slot = oldest_slot - remaining` and asked the freezer for that range — wrong with skip slots: a sparsely-filled window yields fewer unique blocks than `remaining`, so we silently returned a short response. Switch to walking the head state's `block_roots` field (the 8192-slot in-memory circular buffer carried in every `BeaconState`) backward slot-by-slot, deduping adjacent duplicates produced by skip slots, and stopping exactly when `count` blocks are collected. Zero DB reads for the spillover — `block_roots` is already in memory on the head snapshot. For pathological requests whose ancestors fall outside the 8192-slot window we simply stop walking; BlocksByHead's `count <= 128` cap means this can't happen in practice. --- .../network_beacon_processor/rpc_methods.rs | 80 +++++++++---------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index c761bfe80e9..afecc66e8d1 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -441,53 +441,45 @@ impl NetworkBeaconProcessor { } // 2. Spillover below the proto-array boundary. Parents of finalized blocks are - // canonical, so the freezer's `slot → root` index is sufficient — no block - // bodies are loaded. + // canonical, so the head state's `block_roots` bucket (the 8192-slot circular + // buffer carried in every `BeaconState`) gives us canonical roots for free — + // purely in-memory, no DB reads. This naturally handles skip slots (consecutive + // skipped slots yield the same root, deduped on insert) and lets us walk slot + // by slot until we have `count` blocks regardless of skip density. let oldest_slot = roots_with_slots.last().expect("non-empty checked above").1; - let remaining = count - collected; - let oldest_slot_u64 = oldest_slot.as_u64(); - let start_slot = oldest_slot_u64.saturating_sub(remaining); - if start_slot >= oldest_slot_u64 { - return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); - } - - let iter = match self.chain.forwards_iter_block_roots(Slot::from(start_slot)) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockOutOfRange { .. }) => { - // Below the oldest available block (e.g. backfilling); return what we have. - return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); - } - Err(e) => { - error!(error = ?e, "Error opening forwards block roots iterator"); - return Err((RpcErrorResponse::ServerError, "Database error")); - } - }; - - // Collect canonical roots in [start_slot, oldest_slot), ascending. Skip slots - // make `forwards_iter_block_roots` yield the same root for multiple consecutive - // slots, so dedup adjacent duplicates afterwards. - let collected_below: Vec<(Hash256, Slot)> = match process_results(iter, |it| { - it.take_while(|(_, slot)| *slot < oldest_slot) - .collect::>() - }) { - Ok(v) => v, - Err(e) => { - error!(error = ?e, "Error iterating forward block roots"); - return Err((RpcErrorResponse::ServerError, "Iteration error")); - } - }; - - let mut deduped: Vec<(Hash256, Slot)> = Vec::with_capacity(collected_below.len()); - for entry in collected_below { - if deduped.last().map(|(r, _)| *r) != Some(entry.0) { - deduped.push(entry); + let oldest_block_slot = self.chain.store.get_oldest_block_slot(); + + let head = self.chain.head_snapshot(); + let state = &head.beacon_state; + let mut last_root = roots_with_slots.last().map(|(r, _)| *r); + let mut current = oldest_slot; + while (roots_with_slots.len() as u64) < count && current > oldest_block_slot { + current = match current.as_u64().checked_sub(1) { + Some(s) => Slot::from(s), + None => break, + }; + match state.get_block_root(current) { + Ok(root) => { + let root = *root; + // Skip-slot entries reuse the prior block's root; dedup adjacent. + if Some(root) != last_root { + roots_with_slots.push((root, current)); + last_root = Some(root); + } + } + Err(types::BeaconStateError::SlotOutOfBounds) => { + // Slot is older than the head state's `block_roots` window (~8192 + // slots). For BlocksByHead's `count <= 128` cap this shouldn't + // happen in practice; stop walking. + break; + } + Err(e) => { + error!(error = ?e, "Error walking head state block_roots"); + return Err((RpcErrorResponse::ServerError, "State read error")); + } } } - - deduped.reverse(); // descending - deduped.truncate(remaining as usize); - - roots_with_slots.extend(deduped); + drop(head); Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()) } From 7f5d3d2c6ad1e08ba6dc414ed5de329d07f31ad9 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 01:52:30 +0200 Subject: [PATCH 04/11] Handle finalized beacon_root in BlocksByHead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously `get_block_roots_ancestor_of_head` returned `ResourceUnavailable` whenever `head_root` was not in fork-choice's proto-array — including any canonical block at or below the latest finalized checkpoint. The spec requires us to serve at least one block if we have the block at `beacon_root`, so this was non-compliant for any finalized root. Now three cases are handled: 1. All ancestors in fork-choice → proto-array iter (existing path). 2. Mixed → proto-array yields the above-finalized portion, head state's `block_roots` bucket fills the rest (existing path). 3. `head_root` below finalized → fall back to one `get_blinded_block` to fetch its slot, verify it is canonical at that slot via `state.block_roots`, then walk the bucket for ancestors. If verification fails (non-canonical or outside the 8192-slot window) we still return the single block we found, satisfying the spec MUST. --- .../network_beacon_processor/rpc_methods.rs | 69 ++++++++++++++----- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index afecc66e8d1..164edbef111 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -409,9 +409,17 @@ impl NetworkBeaconProcessor { /// Walks the parent chain of `head_root` (inclusive) and returns up to `count` block roots /// in descending slot order. Synchronous so it can be run on a blocking thread. /// - /// Sources roots from fork-choice's in-memory proto-array first (no DB reads), then falls - /// back to the freezer's slot→root index for the portion below the finalized boundary. - /// Returns `ResourceUnavailable` if `head_root` is not in proto-array. + /// Three regimes are handled: + /// 1. `head_root` and ≥`count` ancestors live in fork-choice's in-memory proto-array → + /// served entirely from proto-array, zero DB reads. + /// 2. `head_root` is in proto-array but the requested chain crosses the finalized + /// boundary → proto-array yields the above-finalized portion, the head state's + /// `block_roots` bucket fills the rest (still in-memory). + /// 3. `head_root` is below finalized (proto-array iter empty) → a single + /// `get_blinded_block` looks up its slot, then `state.block_roots` provides the + /// canonical ancestors. + /// + /// Returns `ResourceUnavailable` if `head_root` is not known to the node. fn get_block_roots_ancestor_of_head( &self, head_root: Hash256, @@ -431,26 +439,54 @@ impl NetworkBeaconProcessor { .collect() }; - if roots_with_slots.is_empty() { - return Err((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root")); + let from_proto_array = !roots_with_slots.is_empty(); + + // 2. Fallback for case 3: `head_root` not in proto-array. Look it up in the store + // to get its slot so we can still serve a partial response from `state.block_roots`. + if !from_proto_array { + let block = self + .chain + .get_blinded_block(&head_root) + .map_err(|e| { + error!(error = ?e, "Error reading blinded block for BlocksByHead beacon_root"); + (RpcErrorResponse::ServerError, "Database error") + })? + .ok_or((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root"))?; + roots_with_slots.push((head_root, block.slot())); } - let collected = roots_with_slots.len() as u64; - if collected >= count { + if (roots_with_slots.len() as u64) >= count { return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); } - // 2. Spillover below the proto-array boundary. Parents of finalized blocks are - // canonical, so the head state's `block_roots` bucket (the 8192-slot circular - // buffer carried in every `BeaconState`) gives us canonical roots for free — - // purely in-memory, no DB reads. This naturally handles skip slots (consecutive - // skipped slots yield the same root, deduped on insert) and lets us walk slot - // by slot until we have `count` blocks regardless of skip density. - let oldest_slot = roots_with_slots.last().expect("non-empty checked above").1; + // 3. Spillover via the head state's `block_roots` bucket (the 8192-slot circular + // buffer carried in every `BeaconState`). Below the proto-array boundary parents + // of finalized blocks are canonical, so this gives us the right roots for free. + // Skip slots reuse the prior block's root; dedup on insert. + let oldest_slot = roots_with_slots.last().expect("non-empty above").1; let oldest_block_slot = self.chain.store.get_oldest_block_slot(); let head = self.chain.head_snapshot(); let state = &head.beacon_state; + + // For the case-3 fallback, verify `head_root` is canonical at its slot before + // walking `state.block_roots` — otherwise we'd return a different chain's + // ancestors. For non-canonical or out-of-window heads, return what we already + // collected (which is at least the head_root itself, satisfying the spec's + // "MUST return at least one block if you have it" clause). + if !from_proto_array { + match state.get_block_root(oldest_slot) { + Ok(r) if *r == head_root => {} // canonical, OK to walk + Ok(_) | Err(types::BeaconStateError::SlotOutOfBounds) => { + return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); + } + Err(e) => { + error!(error = ?e, "Error reading head state block_roots"); + return Err((RpcErrorResponse::ServerError, "State read error")); + } + } + } + let mut last_root = roots_with_slots.last().map(|(r, _)| *r); let mut current = oldest_slot; while (roots_with_slots.len() as u64) < count && current > oldest_block_slot { @@ -461,7 +497,6 @@ impl NetworkBeaconProcessor { match state.get_block_root(current) { Ok(root) => { let root = *root; - // Skip-slot entries reuse the prior block's root; dedup adjacent. if Some(root) != last_root { roots_with_slots.push((root, current)); last_root = Some(root); @@ -469,8 +504,8 @@ impl NetworkBeaconProcessor { } Err(types::BeaconStateError::SlotOutOfBounds) => { // Slot is older than the head state's `block_roots` window (~8192 - // slots). For BlocksByHead's `count <= 128` cap this shouldn't - // happen in practice; stop walking. + // slots). For BlocksByHead's `count <= 128` cap this shouldn't happen + // in practice; stop walking. break; } Err(e) => { From 71b2a6d58b2e599f2ad37aa11ef795a5a8e08f7f Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 02:30:17 +0200 Subject: [PATCH 05/11] Read finalized roots from freezer index in BlocksByHead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `get_block_roots_ancestor_of_head` previously walked ancestors below the proto-array boundary by indexing into the head state's `block_roots` circular buffer. That bucket only spans ~8192 slots back from head, so deeper walks were silently truncated, and using head-state lookups to verify the canonicity of a sub-finalized `head_root` is the wrong source of truth: it requires snapshotting and cloning the head state, and a non-canonical hot-DB block at the same slot as a finalized canonical block can shadow the freezer's canonical root. Switch the spillover (and the case-2 canonicity check) to `store.get_cold_block_root(slot)`, which reads the freezer DB's `BeaconBlockRoots` column — the canonical slot→root index for finalized blocks, populated for `[oldest_block_slot, split.slot)` with skip slots reusing the prior block's root (same semantics as `state.block_roots`). This collapses the prior three regimes into two: above-finalization is served by proto-array (in-memory, no DB reads); at-or-below-finalization is served by the freezer index. The head state is no longer consulted, the walk-back window now extends all the way to `oldest_block_slot`, and freezer holes (e.g. below `oldest_block_slot` on a checkpoint-synced node) terminate cleanly instead of erroring. --- .../network_beacon_processor/rpc_methods.rs | 80 +++++++++---------- 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 164edbef111..9190be9604f 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -409,15 +409,15 @@ impl NetworkBeaconProcessor { /// Walks the parent chain of `head_root` (inclusive) and returns up to `count` block roots /// in descending slot order. Synchronous so it can be run on a blocking thread. /// - /// Three regimes are handled: - /// 1. `head_root` and ≥`count` ancestors live in fork-choice's in-memory proto-array → - /// served entirely from proto-array, zero DB reads. - /// 2. `head_root` is in proto-array but the requested chain crosses the finalized - /// boundary → proto-array yields the above-finalized portion, the head state's - /// `block_roots` bucket fills the rest (still in-memory). - /// 3. `head_root` is below finalized (proto-array iter empty) → a single - /// `get_blinded_block` looks up its slot, then `state.block_roots` provides the - /// canonical ancestors. + /// Two regimes are handled: + /// 1. Above finalization → fork-choice's in-memory proto-array supplies the roots + /// (zero DB reads). + /// 2. At or below finalization → the freezer DB's `BeaconBlockRoots` column (the + /// canonical slot→root index for finalized blocks, populated for + /// `[oldest_block_slot, split.slot)` with skip slots reusing the prior block's + /// root) supplies the roots. The head state is never consulted: its 8192-slot + /// `block_roots` bucket would silently truncate deep walks and is the wrong + /// source of truth for canonical history below finalization. /// /// Returns `ResourceUnavailable` if `head_root` is not known to the node. fn get_block_roots_ancestor_of_head( @@ -441,8 +441,9 @@ impl NetworkBeaconProcessor { let from_proto_array = !roots_with_slots.is_empty(); - // 2. Fallback for case 3: `head_root` not in proto-array. Look it up in the store - // to get its slot so we can still serve a partial response from `state.block_roots`. + // 2. Fallback: `head_root` is at or below finalization (proto-array doesn't + // track it). Look it up in the store to learn its slot so we can still serve + // a partial response from the freezer's block-root index. if !from_proto_array { let block = self .chain @@ -459,62 +460,57 @@ impl NetworkBeaconProcessor { return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); } - // 3. Spillover via the head state's `block_roots` bucket (the 8192-slot circular - // buffer carried in every `BeaconState`). Below the proto-array boundary parents - // of finalized blocks are canonical, so this gives us the right roots for free. - // Skip slots reuse the prior block's root; dedup on insert. - let oldest_slot = roots_with_slots.last().expect("non-empty above").1; - let oldest_block_slot = self.chain.store.get_oldest_block_slot(); - - let head = self.chain.head_snapshot(); - let state = &head.beacon_state; - - // For the case-3 fallback, verify `head_root` is canonical at its slot before - // walking `state.block_roots` — otherwise we'd return a different chain's - // ancestors. For non-canonical or out-of-window heads, return what we already - // collected (which is at least the head_root itself, satisfying the spec's - // "MUST return at least one block if you have it" clause). + // 3. Spillover via the freezer DB's `BeaconBlockRoots` index (the canonical + // slot→root mapping for finalized blocks). Skip slots reuse the prior + // block's root; dedup on insert. + let store = &self.chain.store; + let oldest_block_slot = store.get_oldest_block_slot(); + let mut current = roots_with_slots.last().expect("non-empty above").1; + + // For the case-2 fallback, verify `head_root` is the canonical block at its + // slot before walking back — otherwise we'd return a different chain's + // ancestors. A non-canonical hot-DB block at slot < split.slot can shadow the + // finalized chain at the same slot. If verification fails (or the slot is + // outside the freezer's range) we return what we already collected, which is + // at least the head_root itself, satisfying the spec's "MUST return at least + // one block if you have it" clause. if !from_proto_array { - match state.get_block_root(oldest_slot) { - Ok(r) if *r == head_root => {} // canonical, OK to walk - Ok(_) | Err(types::BeaconStateError::SlotOutOfBounds) => { + match store.get_cold_block_root(current) { + Ok(Some(r)) if r == head_root => {} // canonical, OK to walk + Ok(_) => { return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); } Err(e) => { - error!(error = ?e, "Error reading head state block_roots"); - return Err((RpcErrorResponse::ServerError, "State read error")); + error!(error = ?e, "Error reading freezer block_root for BlocksByHead"); + return Err((RpcErrorResponse::ServerError, "Database error")); } } } let mut last_root = roots_with_slots.last().map(|(r, _)| *r); - let mut current = oldest_slot; while (roots_with_slots.len() as u64) < count && current > oldest_block_slot { current = match current.as_u64().checked_sub(1) { Some(s) => Slot::from(s), None => break, }; - match state.get_block_root(current) { - Ok(root) => { - let root = *root; + match store.get_cold_block_root(current) { + Ok(Some(root)) => { if Some(root) != last_root { roots_with_slots.push((root, current)); last_root = Some(root); } } - Err(types::BeaconStateError::SlotOutOfBounds) => { - // Slot is older than the head state's `block_roots` window (~8192 - // slots). For BlocksByHead's `count <= 128` cap this shouldn't happen - // in practice; stop walking. + Ok(None) => { + // Hole in the freezer index (e.g. before `oldest_block_slot` on a + // checkpoint-synced node). Stop walking. break; } Err(e) => { - error!(error = ?e, "Error walking head state block_roots"); - return Err((RpcErrorResponse::ServerError, "State read error")); + error!(error = ?e, "Error walking freezer block_roots"); + return Err((RpcErrorResponse::ServerError, "Database error")); } } } - drop(head); Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()) } From 88254f4eeded6a91706b27d4461b4a9f4ca12a99 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 02:48:35 +0200 Subject: [PATCH 06/11] Rename `from_proto_array` to `head_below_finalization` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous name forced the case-2 branches to read `if !from_proto_array` — a negated check on a negated boolean. Inverting the variable lets the branches read positively. --- .../network/src/network_beacon_processor/rpc_methods.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 9190be9604f..2e20109d542 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -439,12 +439,12 @@ impl NetworkBeaconProcessor { .collect() }; - let from_proto_array = !roots_with_slots.is_empty(); + let head_below_finalization = roots_with_slots.is_empty(); // 2. Fallback: `head_root` is at or below finalization (proto-array doesn't // track it). Look it up in the store to learn its slot so we can still serve // a partial response from the freezer's block-root index. - if !from_proto_array { + if head_below_finalization { let block = self .chain .get_blinded_block(&head_root) @@ -474,7 +474,7 @@ impl NetworkBeaconProcessor { // outside the freezer's range) we return what we already collected, which is // at least the head_root itself, satisfying the spec's "MUST return at least // one block if you have it" clause. - if !from_proto_array { + if head_below_finalization { match store.get_cold_block_root(current) { Ok(Some(r)) if r == head_root => {} // canonical, OK to walk Ok(_) => { From 5984f52e32b180e8ae5621d0b63caabbbfe5f5fa Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 03:12:35 +0200 Subject: [PATCH 07/11] Simplify ancestor walk in BlocksByHead Drop the `roots_with_slots: Vec<(Hash256, Slot)>` accumulator and the `head_below_finalization` boolean. Now the result is built directly as `Vec`, with a `current_slot: Slot` scalar tracking where the freezer walk picks up. The case-2 fallback (head_root at/below finalization) does its canonicity check inline against the freezer index before falling through to the spillover loop, which removes the second `if` on the boolean. No behavior change; just less collection-then-discard and a clearer flow. --- .../network_beacon_processor/rpc_methods.rs | 83 ++++++++++--------- 1 file changed, 42 insertions(+), 41 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 2e20109d542..37a6f3779ae 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -429,22 +429,34 @@ impl NetworkBeaconProcessor { return Ok(vec![]); } - // 1. Walk ancestors in proto-array (in-memory, zero DB reads). - let mut roots_with_slots: Vec<(Hash256, Slot)> = { + // 1. Walk ancestors in proto-array (in-memory, zero DB reads). Track the + // deepest slot we collected — that's where the freezer walk picks up. + let mut roots: Vec = Vec::with_capacity(count as usize); + let mut deepest_slot: Option = None; + { let fork_choice = self.chain.canonical_head.fork_choice_read_lock(); - fork_choice + for (root, slot) in fork_choice .proto_array() .iter_block_roots(&head_root) .take(count as usize) - .collect() - }; + { + roots.push(root); + deepest_slot = Some(slot); + } + } - let head_below_finalization = roots_with_slots.is_empty(); + let store = &self.chain.store; // 2. Fallback: `head_root` is at or below finalization (proto-array doesn't - // track it). Look it up in the store to learn its slot so we can still serve - // a partial response from the freezer's block-root index. - if head_below_finalization { + // track it). Look up its slot in the store, then verify it is the canonical + // block at that slot via the freezer index — a non-canonical hot-DB block at + // slot < split.slot can shadow the finalized chain. If the freezer + // disagrees (or doesn't have that slot), serve just the single block we + // found, satisfying the spec's "MUST return at least one block if you have + // it" clause. + let mut current_slot = if let Some(slot) = deepest_slot { + slot + } else { let block = self .chain .get_blinded_block(&head_root) @@ -453,50 +465,39 @@ impl NetworkBeaconProcessor { (RpcErrorResponse::ServerError, "Database error") })? .ok_or((RpcErrorResponse::ResourceUnavailable, "Unknown beacon_root"))?; - roots_with_slots.push((head_root, block.slot())); - } - - if (roots_with_slots.len() as u64) >= count { - return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); - } + let block_slot = block.slot(); + roots.push(head_root); - // 3. Spillover via the freezer DB's `BeaconBlockRoots` index (the canonical - // slot→root mapping for finalized blocks). Skip slots reuse the prior - // block's root; dedup on insert. - let store = &self.chain.store; - let oldest_block_slot = store.get_oldest_block_slot(); - let mut current = roots_with_slots.last().expect("non-empty above").1; - - // For the case-2 fallback, verify `head_root` is the canonical block at its - // slot before walking back — otherwise we'd return a different chain's - // ancestors. A non-canonical hot-DB block at slot < split.slot can shadow the - // finalized chain at the same slot. If verification fails (or the slot is - // outside the freezer's range) we return what we already collected, which is - // at least the head_root itself, satisfying the spec's "MUST return at least - // one block if you have it" clause. - if head_below_finalization { - match store.get_cold_block_root(current) { - Ok(Some(r)) if r == head_root => {} // canonical, OK to walk - Ok(_) => { - return Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()); - } + match store.get_cold_block_root(block_slot) { + Ok(Some(r)) if r == head_root => {} // canonical, OK to walk back + Ok(_) => return Ok(roots), Err(e) => { error!(error = ?e, "Error reading freezer block_root for BlocksByHead"); return Err((RpcErrorResponse::ServerError, "Database error")); } } + + block_slot + }; + + if (roots.len() as u64) >= count { + return Ok(roots); } - let mut last_root = roots_with_slots.last().map(|(r, _)| *r); - while (roots_with_slots.len() as u64) < count && current > oldest_block_slot { - current = match current.as_u64().checked_sub(1) { + // 3. Spillover via the freezer DB's `BeaconBlockRoots` index (the canonical + // slot→root mapping for finalized blocks). Skip slots reuse the prior + // block's root; dedup on insert. + let oldest_block_slot = store.get_oldest_block_slot(); + let mut last_root = roots.last().copied(); + while (roots.len() as u64) < count && current_slot > oldest_block_slot { + current_slot = match current_slot.as_u64().checked_sub(1) { Some(s) => Slot::from(s), None => break, }; - match store.get_cold_block_root(current) { + match store.get_cold_block_root(current_slot) { Ok(Some(root)) => { if Some(root) != last_root { - roots_with_slots.push((root, current)); + roots.push(root); last_root = Some(root); } } @@ -512,7 +513,7 @@ impl NetworkBeaconProcessor { } } - Ok(roots_with_slots.into_iter().map(|(r, _)| r).collect()) + Ok(roots) } /// Handle a `ExecutionPayloadEnvelopesByRoot` request from the peer. From dc534c34dcd77774f710852a422c60f844b680e5 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 03:12:47 +0200 Subject: [PATCH 08/11] Add BlocksByHead handler tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three end-to-end tests in the `TestRig` harness, exercising the `get_block_roots_ancestor_of_head` paths the previous agents kept getting wrong: - `test_blocks_by_head_spillover_into_freezer`: 4-epoch chain so finalization migrates state to the freezer; request walks all the way back to slot 1, crossing the proto-array → freezer boundary. - `test_blocks_by_head_finalized_root`: uses the finalized checkpoint's block root as `beacon_root` (case-2 fallback), verifying the `get_blinded_block` + freezer canonicity check + freezer walk path. - `test_blocks_by_head_unknown_root`: a random root yields `ResourceUnavailable`. A new `enqueue_blocks_by_head_request` helper mirrors the existing `enqueue_blobs_by_*` helpers, and a small `drain_blocks_by_head_response` utility reads the response stream up to its `None` terminator. --- .../src/network_beacon_processor/tests.rs | 164 +++++++++++++++++- 1 file changed, 162 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index c4e7f8f8d1f..f13815f7b66 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -24,8 +24,8 @@ use itertools::Itertools; use libp2p::gossipsub::MessageAcceptance; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, MetaDataV3, - PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, + BlobsByRangeRequest, BlobsByRootRequest, BlocksByHeadRequest, DataColumnsByRangeRequest, + MetaDataV3, PayloadEnvelopesByRangeRequest, PayloadEnvelopesByRootRequest, }; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, @@ -501,6 +501,16 @@ impl TestRig { .unwrap(); } + pub fn enqueue_blocks_by_head_request(&self, beacon_root: Hash256, count: u64) { + self.network_beacon_processor + .send_blocks_by_head_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlocksByHeadRequest { beacon_root, count }, + ) + .unwrap(); + } + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { self.network_beacon_processor .send_blobs_by_roots_request( @@ -2346,3 +2356,153 @@ async fn test_payload_envelopes_by_range_no_duplicates_with_skip_slots() { // 1. Gossip envelope arrives before its block → queued via UnknownBlockForEnvelope // 2. Block imported → envelope released and processed successfully // 3. Timeout path → envelope released and re-verified + +/// Drain `network_rx` collecting `Response::BlocksByHead(Some(_))` block roots until the +/// stream terminator (`None`) arrives. Panics on any other message type so tests fail +/// loudly if an error response sneaks in. +async fn drain_blocks_by_head_response(rig: &mut TestRig) -> Vec { + let mut roots = Vec::new(); + while let Some(msg) = rig.network_rx.recv().await { + match msg { + NetworkMessage::SendResponse { + response: Response::BlocksByHead(Some(block)), + .. + } => roots.push(block.canonical_root()), + NetworkMessage::SendResponse { + response: Response::BlocksByHead(None), + .. + } => return roots, + other => panic!("unexpected message: {:?}", other), + } + } + roots +} + +// `BlocksByHead` request that crosses the finalized boundary: proto-array supplies +// the unfinalized head + ancestors down to the finalized root, then the freezer's +// `BeaconBlockRoots` index supplies the rest. Verifies the spillover path +// `get_block_roots_ancestor_of_head` takes when count > proto-array depth. +#[tokio::test] +async fn test_blocks_by_head_spillover_into_freezer() { + // Long enough for finalization + state migration to populate the freezer. + let mut rig = TestRig::new(SLOTS_PER_EPOCH * 4).await; + + // Sanity-check the precondition: finalization advanced past genesis and the split + // slot is non-zero, so the freezer's `BeaconBlockRoots` column has entries. + assert!( + rig.chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + > Epoch::new(0), + "test precondition: chain must have finalized past epoch 0", + ); + assert!( + rig.chain.store.get_split_slot() > Slot::new(0), + "test precondition: state migration must have populated the freezer", + ); + + let head_slot = rig.chain.canonical_head.cached_head().head_slot(); + let head_root = rig.chain.canonical_head.cached_head().head_block_root(); + + // Walk all the way back to slot 1: exercises both proto-array (above finalization) + // and freezer (at/below finalization). + let count = head_slot.as_u64(); + rig.enqueue_blocks_by_head_request(head_root, count); + let actual = drain_blocks_by_head_response(&mut rig).await; + + // Build the canonical descending root list independently. The harness has no skip + // slots so every slot in [1, head_slot] has a unique block, but we still dedup + // defensively to mirror the function under test. + let mut expected: Vec = Vec::new(); + let mut last: Option = None; + for offset in 0..count { + let slot = Slot::new(head_slot.as_u64() - offset); + if let Some(root) = rig + .chain + .block_root_at_slot(slot, WhenSlotSkipped::Prev) + .unwrap() + && Some(root) != last + { + expected.push(root); + last = Some(root); + } + } + + assert_eq!( + actual, expected, + "BlocksByHead must serve the full canonical parent chain across the finalized boundary", + ); + assert_eq!(actual.first(), Some(&head_root), "first root must be head"); +} + +// `BlocksByHead` with `beacon_root` set to a finalized block root (case-2 fallback in +// `get_block_roots_ancestor_of_head`): proto-array doesn't track it, so we +// `get_blinded_block` for its slot, verify canonicity via the freezer index, and walk +// back from there. +#[tokio::test] +async fn test_blocks_by_head_finalized_root() { + let mut rig = TestRig::new(SLOTS_PER_EPOCH * 4).await; + + let finalized_root = rig + .chain + .canonical_head + .cached_head() + .finalized_checkpoint() + .root; + let finalized_slot = rig + .chain + .get_blinded_block(&finalized_root) + .unwrap() + .expect("finalized block exists in store") + .slot(); + assert!( + finalized_slot > Slot::new(0), + "test precondition: finalized block must not be genesis", + ); + + let count = 8u64.min(finalized_slot.as_u64()); + rig.enqueue_blocks_by_head_request(finalized_root, count); + let actual = drain_blocks_by_head_response(&mut rig).await; + + let mut expected: Vec = Vec::new(); + let mut last: Option = None; + for offset in 0..count { + let slot = Slot::new(finalized_slot.as_u64() - offset); + if let Some(root) = rig + .chain + .block_root_at_slot(slot, WhenSlotSkipped::Prev) + .unwrap() + && Some(root) != last + { + expected.push(root); + last = Some(root); + } + } + + assert_eq!(actual, expected); + assert_eq!( + actual.first(), + Some(&finalized_root), + "first root must be the requested finalized root", + ); +} + +// `BlocksByHead` for a `beacon_root` we don't have. Spec says we MUST return an error +// (we map this to `ResourceUnavailable`). +#[tokio::test] +async fn test_blocks_by_head_unknown_root() { + let mut rig = TestRig::new(SLOTS_PER_EPOCH).await; + rig.enqueue_blocks_by_head_request(Hash256::repeat_byte(0xab), 4); + + match rig.network_rx.recv().await.expect("a network message") { + NetworkMessage::SendErrorResponse { error, .. } => { + assert_matches!( + error, + lighthouse_network::rpc::RpcErrorResponse::ResourceUnavailable + ); + } + other => panic!("expected SendErrorResponse, got {:?}", other), + } +} From 7626b3aa70d90c3911743e119882bc4ea9416eb1 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 1 May 2026 10:37:58 +0200 Subject: [PATCH 09/11] Update beacon_node/lighthouse_network/src/rpc/protocol.rs Co-authored-by: Jimmy Chen --- beacon_node/lighthouse_network/src/rpc/protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index c18485624e9..056ffc03b85 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -1126,7 +1126,7 @@ impl std::fmt::Display for RequestType { RequestType::Goodbye(reason) => write!(f, "Goodbye: {}", reason), RequestType::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), RequestType::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), - RequestType::BlocksByHead(req) => write!(f, "{}", req), + RequestType::BlocksByHead(req) => write!(f, "Blocks by head: {}", req), RequestType::PayloadEnvelopesByRange(req) => { write!(f, "Payload envelopes by range: {:?}", req) } From 184a50e3b4ecb6cde588d9842e9e9a3aa9ffe37f Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 1 May 2026 10:58:53 +0200 Subject: [PATCH 10/11] Accept any-fork blocks in BlocksByHead codec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `BlocksByHeadV1` previously rejected pre-Fulu blocks at the response codec with `InvalidRequest`. The protocol is new in Fulu but the wire shape is just `SignedBeaconBlock`, and a Fulu peer's parent walk naturally crosses the Fulu fork boundary — the server has the older canonical blocks and should be able to serve them, mirroring how `BlocksByRangeV2` and `BlocksByRootV2` accept all eight fork variants. Replace the Fulu-only arm with the same Base→Gloas fan-out used by `BlocksByRootV2`. The server-side handler is already fork-agnostic (`chain.get_blocks(...)` streams whichever variant the block is), so relaxing the wire codec is the only change needed. Adds a small `test_blocks_by_head_decodes_all_forks` round-trip test to guard the new arms against regressions. --- .../lighthouse_network/src/rpc/codec.rs | 75 ++++++++++++++----- 1 file changed, 55 insertions(+), 20 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index e2a0cfc3d69..8040425c3ca 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -949,26 +949,36 @@ fn handle_rpc_response( )), }, SupportedProtocol::BlocksByHeadV1 => match fork_name { - Some(fork_name) if fork_name.fulu_enabled() => match fork_name { - ForkName::Fulu => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Fulu(SignedBeaconBlockFulu::from_ssz_bytes(decoded_buffer)?), - )))), - ForkName::Gloas => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Gloas(SignedBeaconBlockGloas::from_ssz_bytes( - decoded_buffer, - )?), - )))), - // `fulu_enabled()` returns true only for Fulu and later forks; the matches - // above cover those exhaustively. - _ => Err(RPCError::ErrorResponse( - RpcErrorResponse::InvalidRequest, - "Unexpected fork variant for blocks by head".to_string(), - )), - }, - Some(_) => Err(RPCError::ErrorResponse( - RpcErrorResponse::InvalidRequest, - "Invalid fork name for blocks by head".to_string(), - )), + Some(ForkName::Base) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Altair) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Bellatrix) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Bellatrix(SignedBeaconBlockBellatrix::from_ssz_bytes( + decoded_buffer, + )?), + )))), + Some(ForkName::Capella) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes( + decoded_buffer, + )?), + )))), + Some(ForkName::Deneb) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Deneb(SignedBeaconBlockDeneb::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Electra) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Electra(SignedBeaconBlockElectra::from_ssz_bytes( + decoded_buffer, + )?), + )))), + Some(ForkName::Fulu) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Fulu(SignedBeaconBlockFulu::from_ssz_bytes(decoded_buffer)?), + )))), + Some(ForkName::Gloas) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::Gloas(SignedBeaconBlockGloas::from_ssz_bytes(decoded_buffer)?), + )))), None => Err(RPCError::ErrorResponse( RpcErrorResponse::InvalidRequest, format!( @@ -1904,6 +1914,31 @@ mod tests { ); } + // BlocksByHead is introduced in Fulu but the response is just `SignedBeaconBlock`, + // so the codec must accept blocks of any fork variant — the chain a Fulu peer walks + // back may straddle the Fulu boundary and include pre-Fulu canonical blocks. + #[test] + fn test_blocks_by_head_decodes_all_forks() { + let chain_spec = spec_with_all_forks_enabled(); + for (block, fork) in [ + (empty_base_block(&chain_spec), ForkName::Base), + (altair_block(&chain_spec), ForkName::Altair), + (bellatrix_block_small(&chain_spec), ForkName::Bellatrix), + ] { + let block_arc = Arc::new(block); + assert_eq!( + encode_then_decode_response( + SupportedProtocol::BlocksByHeadV1, + RpcResponse::Success(RpcSuccessResponse::BlocksByHead(block_arc.clone())), + fork, + &chain_spec, + ), + Ok(Some(RpcSuccessResponse::BlocksByHead(block_arc))), + "BlocksByHeadV1 must round-trip a {fork} block" + ); + } + } + // Test RPCResponse encoding/decoding for V2 messages #[test] fn test_context_bytes_v2() { From 8adc7261c668547cb0cc737fcfda6f53d94c3364 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 1 May 2026 12:09:58 +0200 Subject: [PATCH 11/11] Use SignedBeaconBlock::from_ssz_bytes_by_fork in BlocksByHead codec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per Jimmy's review: collapse the eight-arm fork match to a single `from_ssz_bytes_by_fork(bytes, fork_name)` call. Net −30/+3 lines, same behavior, easier to keep in sync if a future fork is added. --- .../lighthouse_network/src/rpc/codec.rs | 33 ++----------------- 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 8040425c3ca..ba95fff5e8e 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -18,7 +18,7 @@ use tokio_util::codec::{Decoder, Encoder}; use types::SignedExecutionPayloadEnvelope; use types::{ BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext, - ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate, + ForkName, ForkVersionDecode, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu, @@ -949,35 +949,8 @@ fn handle_rpc_response( )), }, SupportedProtocol::BlocksByHeadV1 => match fork_name { - Some(ForkName::Base) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), - )))), - Some(ForkName::Altair) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Altair(SignedBeaconBlockAltair::from_ssz_bytes(decoded_buffer)?), - )))), - Some(ForkName::Bellatrix) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Bellatrix(SignedBeaconBlockBellatrix::from_ssz_bytes( - decoded_buffer, - )?), - )))), - Some(ForkName::Capella) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Capella(SignedBeaconBlockCapella::from_ssz_bytes( - decoded_buffer, - )?), - )))), - Some(ForkName::Deneb) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Deneb(SignedBeaconBlockDeneb::from_ssz_bytes(decoded_buffer)?), - )))), - Some(ForkName::Electra) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Electra(SignedBeaconBlockElectra::from_ssz_bytes( - decoded_buffer, - )?), - )))), - Some(ForkName::Fulu) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Fulu(SignedBeaconBlockFulu::from_ssz_bytes(decoded_buffer)?), - )))), - Some(ForkName::Gloas) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( - SignedBeaconBlock::Gloas(SignedBeaconBlockGloas::from_ssz_bytes(decoded_buffer)?), + Some(fork_name) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new( + SignedBeaconBlock::from_ssz_bytes_by_fork(decoded_buffer, fork_name)?, )))), None => Err(RPCError::ErrorResponse( RpcErrorResponse::InvalidRequest,