Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ pub enum Work<E: EthSpec> {
Status(BlockingFn),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
BlocksByHeadRequest(AsyncFn),
PayloadEnvelopesByRangeRequest(AsyncFn),
PayloadEnvelopesByRootRequest(AsyncFn),
BlobsByRangeRequest(BlockingFn),
Expand Down Expand Up @@ -491,6 +492,7 @@ pub enum WorkType {
Status,
BlocksByRangeRequest,
BlocksByRootsRequest,
BlocksByHeadRequest,
PayloadEnvelopesByRangeRequest,
PayloadEnvelopesByRootRequest,
BlobsByRangeRequest,
Expand Down Expand Up @@ -553,6 +555,7 @@ impl<E: EthSpec> Work<E> {
Work::Status(_) => WorkType::Status,
Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest,
Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest,
Work::BlocksByHeadRequest(_) => WorkType::BlocksByHeadRequest,
Work::PayloadEnvelopesByRangeRequest(_) => WorkType::PayloadEnvelopesByRangeRequest,
Work::PayloadEnvelopesByRootRequest(_) => WorkType::PayloadEnvelopesByRootRequest,
Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest,
Expand Down Expand Up @@ -1000,6 +1003,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = work_queues.block_broots_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.block_bhead_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.blob_brange_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.blob_broots_queue.pop() {
Expand Down Expand Up @@ -1206,6 +1211,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlocksByRootsRequest { .. } => {
work_queues.block_broots_queue.push(work, work_id)
}
Work::BlocksByHeadRequest { .. } => {
work_queues.block_bhead_queue.push(work, work_id)
}
Work::PayloadEnvelopesByRangeRequest { .. } => work_queues
.payload_envelopes_brange_queue
.push(work, work_id),
Expand Down Expand Up @@ -1331,6 +1339,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::Status => work_queues.status_queue.len(),
WorkType::BlocksByRangeRequest => work_queues.block_brange_queue.len(),
WorkType::BlocksByRootsRequest => work_queues.block_broots_queue.len(),
WorkType::BlocksByHeadRequest => work_queues.block_bhead_queue.len(),
WorkType::PayloadEnvelopesByRangeRequest => {
work_queues.payload_envelopes_brange_queue.len()
}
Expand Down Expand Up @@ -1531,6 +1540,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}
Work::BlocksByRangeRequest(work)
| Work::BlocksByRootsRequest(work)
| Work::BlocksByHeadRequest(work)
| Work::PayloadEnvelopesByRangeRequest(work)
| Work::PayloadEnvelopesByRootRequest(work) => task_spawner.spawn_async(work),
Work::ChainSegmentBackfill(process_fn) => {
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_processor/src/scheduler/work_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub struct BeaconProcessorQueueLengths {
status_queue: usize,
block_brange_queue: usize,
block_broots_queue: usize,
block_bhead_queue: usize,
blob_broots_queue: usize,
blob_brange_queue: usize,
dcbroots_queue: usize,
Expand Down Expand Up @@ -206,6 +207,7 @@ impl BeaconProcessorQueueLengths {
status_queue: 1024,
block_brange_queue: 1024,
block_broots_queue: 1024,
block_bhead_queue: 1024,
blob_broots_queue: 1024,
blob_brange_queue: 1024,
dcbroots_queue: 1024,
Expand Down Expand Up @@ -263,6 +265,7 @@ pub struct WorkQueues<E: EthSpec> {
pub status_queue: FifoQueue<Work<E>>,
pub block_brange_queue: FifoQueue<Work<E>>,
pub block_broots_queue: FifoQueue<Work<E>>,
pub block_bhead_queue: FifoQueue<Work<E>>,
pub payload_envelopes_brange_queue: FifoQueue<Work<E>>,
pub payload_envelopes_broots_queue: FifoQueue<Work<E>>,
pub blob_broots_queue: FifoQueue<Work<E>>,
Expand Down Expand Up @@ -334,6 +337,7 @@ impl<E: EthSpec> WorkQueues<E> {
let status_queue = FifoQueue::new(queue_lengths.status_queue);
let block_brange_queue = FifoQueue::new(queue_lengths.block_brange_queue);
let block_broots_queue = FifoQueue::new(queue_lengths.block_broots_queue);
let block_bhead_queue = FifoQueue::new(queue_lengths.block_bhead_queue);
let blob_broots_queue = FifoQueue::new(queue_lengths.blob_broots_queue);
let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue);
let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
Expand Down Expand Up @@ -399,6 +403,7 @@ impl<E: EthSpec> WorkQueues<E> {
status_queue,
block_brange_queue,
block_broots_queue,
block_bhead_queue,
blob_broots_queue,
blob_brange_queue,
dcbroots_queue,
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::Ping => PeerAction::MidToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlocksByHead => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError,
Expand Down Expand Up @@ -617,6 +618,7 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::Ping => PeerAction::Fatal,
Protocol::BlocksByRange => return,
Protocol::BlocksByRoot => return,
Protocol::BlocksByHead => return,
Protocol::PayloadEnvelopesByRange => return,
Protocol::PayloadEnvelopesByRoot => return,
Protocol::BlobsByRange => return,
Expand All @@ -642,6 +644,7 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::Ping => PeerAction::LowToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlocksByHead => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Expand Down
51 changes: 50 additions & 1 deletion beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio_util::codec::{Decoder, Encoder};
use types::SignedExecutionPayloadEnvelope;
use types::{
BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext,
ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
ForkName, ForkVersionDecode, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu,
Expand Down Expand Up @@ -77,6 +77,7 @@ impl<E: EthSpec> SSZSnappyInboundCodec<E> {
},
RpcSuccessResponse::BlocksByRange(res) => res.as_ssz_bytes(),
RpcSuccessResponse::BlocksByRoot(res) => res.as_ssz_bytes(),
RpcSuccessResponse::BlocksByHead(res) => res.as_ssz_bytes(),
RpcSuccessResponse::PayloadEnvelopesByRange(res) => res.as_ssz_bytes(),
RpcSuccessResponse::PayloadEnvelopesByRoot(res) => res.as_ssz_bytes(),
RpcSuccessResponse::BlobsByRange(res) => res.as_ssz_bytes(),
Expand Down Expand Up @@ -359,6 +360,7 @@ impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
BlocksByRootRequest::V1(req) => req.block_roots.as_ssz_bytes(),
BlocksByRootRequest::V2(req) => req.block_roots.as_ssz_bytes(),
},
RequestType::BlocksByHead(req) => req.as_ssz_bytes(),
RequestType::PayloadEnvelopesByRange(req) => req.as_ssz_bytes(),
RequestType::PayloadEnvelopesByRoot(req) => req.beacon_block_roots.as_ssz_bytes(),
RequestType::BlobsByRange(req) => req.as_ssz_bytes(),
Expand Down Expand Up @@ -553,6 +555,9 @@ fn handle_rpc_request<E: EthSpec>(
)?,
}),
))),
SupportedProtocol::BlocksByHeadV1 => Ok(Some(RequestType::BlocksByHead(
BlocksByHeadRequest::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::PayloadEnvelopesByRangeV1 => {
Ok(Some(RequestType::PayloadEnvelopesByRange(
PayloadEnvelopesByRangeRequest::from_ssz_bytes(decoded_buffer)?,
Expand Down Expand Up @@ -943,6 +948,18 @@ fn handle_rpc_response<E: EthSpec>(
),
)),
},
SupportedProtocol::BlocksByHeadV1 => match fork_name {
Some(fork_name) => Ok(Some(RpcSuccessResponse::BlocksByHead(Arc::new(
SignedBeaconBlock::from_ssz_bytes_by_fork(decoded_buffer, fork_name)?,
)))),
None => Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
},
}
}

Expand Down Expand Up @@ -1319,6 +1336,9 @@ mod tests {
RequestType::BlocksByRoot(bbroot) => {
assert_eq!(decoded, RequestType::BlocksByRoot(bbroot))
}
RequestType::BlocksByHead(bbhead) => {
assert_eq!(decoded, RequestType::BlocksByHead(bbhead))
}
RequestType::BlobsByRange(blbrange) => {
assert_eq!(decoded, RequestType::BlobsByRange(blbrange))
}
Expand Down Expand Up @@ -1867,6 +1887,31 @@ mod tests {
);
}

// BlocksByHead is introduced in Fulu but the response is just `SignedBeaconBlock`,
// so the codec must accept blocks of any fork variant — the chain a Fulu peer walks
// back may straddle the Fulu boundary and include pre-Fulu canonical blocks.
#[test]
fn test_blocks_by_head_decodes_all_forks() {
let chain_spec = spec_with_all_forks_enabled();
for (block, fork) in [
(empty_base_block(&chain_spec), ForkName::Base),
(altair_block(&chain_spec), ForkName::Altair),
(bellatrix_block_small(&chain_spec), ForkName::Bellatrix),
] {
let block_arc = Arc::new(block);
assert_eq!(
encode_then_decode_response(
SupportedProtocol::BlocksByHeadV1,
RpcResponse::Success(RpcSuccessResponse::BlocksByHead(block_arc.clone())),
fork,
&chain_spec,
),
Ok(Some(RpcSuccessResponse::BlocksByHead(block_arc))),
"BlocksByHeadV1 must round-trip a {fork} block"
);
}
}

// Test RPCResponse encoding/decoding for V2 messages
#[test]
fn test_context_bytes_v2() {
Expand Down Expand Up @@ -2063,6 +2108,10 @@ mod tests {
RequestType::BlobsByRange(blbrange_request()),
RequestType::DataColumnsByRange(dcbrange_request()),
RequestType::MetaData(MetadataRequest::new_v2()),
RequestType::BlocksByHead(BlocksByHeadRequest {
beacon_root: Hash256::zero(),
count: 32,
}),
];
for req in requests.iter() {
for fork_name in ForkName::list_all() {
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub struct RateLimiterConfig {
pub(super) goodbye_quota: Quota,
pub(super) blocks_by_range_quota: Quota,
pub(super) blocks_by_root_quota: Quota,
pub(super) blocks_by_head_quota: Quota,
pub(super) payload_envelopes_by_range_quota: Quota,
pub(super) payload_envelopes_by_root_quota: Quota,
pub(super) blobs_by_range_quota: Quota,
Expand All @@ -113,6 +114,8 @@ impl RateLimiterConfig {
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
pub const DEFAULT_BLOCKS_BY_HEAD_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
pub const DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
pub const DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota =
Expand Down Expand Up @@ -143,6 +146,7 @@ impl Default for RateLimiterConfig {
goodbye_quota: Self::DEFAULT_GOODBYE_QUOTA,
blocks_by_range_quota: Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA,
blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA,
blocks_by_head_quota: Self::DEFAULT_BLOCKS_BY_HEAD_QUOTA,
payload_envelopes_by_range_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA,
payload_envelopes_by_root_quota: Self::DEFAULT_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA,
blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA,
Expand Down Expand Up @@ -177,6 +181,7 @@ impl Debug for RateLimiterConfig {
.field("goodbye", fmt_q!(&self.goodbye_quota))
.field("blocks_by_range", fmt_q!(&self.blocks_by_range_quota))
.field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota))
.field("blocks_by_head", fmt_q!(&self.blocks_by_head_quota))
.field(
"payload_envelopes_by_range",
fmt_q!(&self.payload_envelopes_by_range_quota),
Expand Down Expand Up @@ -213,6 +218,7 @@ impl FromStr for RateLimiterConfig {
let mut goodbye_quota = None;
let mut blocks_by_range_quota = None;
let mut blocks_by_root_quota = None;
let mut blocks_by_head_quota = None;
let mut payload_envelopes_by_range_quota = None;
let mut payload_envelopes_by_root_quota = None;
let mut blobs_by_range_quota = None;
Expand All @@ -232,6 +238,7 @@ impl FromStr for RateLimiterConfig {
Protocol::Goodbye => goodbye_quota = goodbye_quota.or(quota),
Protocol::BlocksByRange => blocks_by_range_quota = blocks_by_range_quota.or(quota),
Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota),
Protocol::BlocksByHead => blocks_by_head_quota = blocks_by_head_quota.or(quota),
Protocol::PayloadEnvelopesByRange => {
payload_envelopes_by_range_quota = payload_envelopes_by_range_quota.or(quota)
}
Expand Down Expand Up @@ -274,6 +281,8 @@ impl FromStr for RateLimiterConfig {
.unwrap_or(Self::DEFAULT_BLOCKS_BY_RANGE_QUOTA),
blocks_by_root_quota: blocks_by_root_quota
.unwrap_or(Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA),
blocks_by_head_quota: blocks_by_head_quota
.unwrap_or(Self::DEFAULT_BLOCKS_BY_HEAD_QUOTA),
payload_envelopes_by_range_quota: payload_envelopes_by_range_quota
.unwrap_or(Self::DEFAULT_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA),
payload_envelopes_by_root_quota: payload_envelopes_by_root_quota
Expand Down
37 changes: 36 additions & 1 deletion beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,18 @@ impl From<BlocksByRangeRequest> for OldBlocksByRangeRequest {
}
}

/// Request a contiguous range of beacon blocks by walking the parent chain of `beacon_root`.
///
/// New in Fulu (see consensus-specs PR 5181). The responder walks the parent chain of
/// `beacon_root` (inclusive) and emits up to `count` blocks in descending slot order.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct BlocksByHeadRequest {
/// The block root to start the parent walk from (inclusive).
pub beacon_root: Hash256,
/// The maximum number of blocks to return.
pub count: u64,
}

/// Request a number of beacon block bodies from a peer.
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -622,6 +634,9 @@ pub enum RpcSuccessResponse<E: EthSpec> {
/// A response to a get BLOCKS_BY_ROOT request.
BlocksByRoot(Arc<SignedBeaconBlock<E>>),

/// A response to a get BEACON_BLOCKS_BY_HEAD request.
BlocksByHead(Arc<SignedBeaconBlock<E>>),

/// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. A None response signifies
/// the end of the batch.
PayloadEnvelopesByRange(Arc<SignedExecutionPayloadEnvelope<E>>),
Expand Down Expand Up @@ -669,6 +684,9 @@ pub enum ResponseTermination {
/// Blocks by root stream termination.
BlocksByRoot,

/// Blocks by head stream termination.
BlocksByHead,

/// Execution payload envelopes by range stream termination.
PayloadEnvelopesByRange,

Expand Down Expand Up @@ -696,6 +714,7 @@ impl ResponseTermination {
match self {
ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
ResponseTermination::BlocksByHead => Protocol::BlocksByHead,
ResponseTermination::PayloadEnvelopesByRange => Protocol::PayloadEnvelopesByRange,
ResponseTermination::PayloadEnvelopesByRoot => Protocol::PayloadEnvelopesByRoot,
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
Expand Down Expand Up @@ -793,6 +812,7 @@ impl<E: EthSpec> RpcSuccessResponse<E> {
RpcSuccessResponse::Status(_) => Protocol::Status,
RpcSuccessResponse::BlocksByRange(_) => Protocol::BlocksByRange,
RpcSuccessResponse::BlocksByRoot(_) => Protocol::BlocksByRoot,
RpcSuccessResponse::BlocksByHead(_) => Protocol::BlocksByHead,
RpcSuccessResponse::PayloadEnvelopesByRange(_) => Protocol::PayloadEnvelopesByRange,
RpcSuccessResponse::PayloadEnvelopesByRoot(_) => Protocol::PayloadEnvelopesByRoot,
RpcSuccessResponse::BlobsByRange(_) => Protocol::BlobsByRange,
Expand All @@ -812,7 +832,9 @@ impl<E: EthSpec> RpcSuccessResponse<E> {

pub fn slot(&self) -> Option<Slot> {
match self {
Self::BlocksByRange(r) | Self::BlocksByRoot(r) => Some(r.slot()),
Self::BlocksByRange(r) | Self::BlocksByRoot(r) | Self::BlocksByHead(r) => {
Some(r.slot())
}
Self::PayloadEnvelopesByRoot(r) | Self::PayloadEnvelopesByRange(r) => Some(r.slot()),
Self::BlobsByRange(r) | Self::BlobsByRoot(r) => Some(r.slot()),
Self::DataColumnsByRange(r) | Self::DataColumnsByRoot(r) => Some(r.slot()),
Expand Down Expand Up @@ -864,6 +886,9 @@ impl<E: EthSpec> std::fmt::Display for RpcSuccessResponse<E> {
RpcSuccessResponse::BlocksByRoot(block) => {
write!(f, "BlocksByRoot: Block slot: {}", block.slot())
}
RpcSuccessResponse::BlocksByHead(block) => {
write!(f, "BlocksByHead: Block slot: {}", block.slot())
}
RpcSuccessResponse::PayloadEnvelopesByRange(envelope) => {
write!(
f,
Expand Down Expand Up @@ -975,6 +1000,16 @@ impl std::fmt::Display for OldBlocksByRangeRequest {
}
}

impl std::fmt::Display for BlocksByHeadRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"BlocksByHead: beacon_root: {}, count: {}",
self.beacon_root, self.count
)
}
}

impl std::fmt::Display for BlobsByRootRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
Loading
Loading