diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 3bfcfb83..5a0b24ff 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -204,7 +204,9 @@ pub fn on_gossip_attestation( .set(store.gossip_signatures.len() as i64); }); } else { - METRICS.get().map(|m| m.grandine_xmss_verify_skipped_total.inc()); + METRICS + .get() + .map(|m| m.grandine_xmss_verify_skipped_total.inc()); } store @@ -644,7 +646,9 @@ fn process_block_internal( adr.get(&key.data_root) .map_or(true, |data| data.target.slot.0 > finalized_slot) }); - store.attestation_data_by_root.retain(|_, data| data.target.slot.0 > finalized_slot); + store + .attestation_data_by_root + .retain(|_, data| data.target.slot.0 > finalized_slot); METRICS.get().map(|m| { m.grandine_attestation_data_by_root .set(store.attestation_data_by_root.len() as i64) diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index c8444dfa..2cf760a0 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -2,13 +2,13 @@ use std::collections::{HashMap, HashSet}; use anyhow::{Result, anyhow, ensure}; use containers::{ - AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, - Checkpoint, Config, SignatureKey, SignedAggregatedAttestation, SignedAttestation, - SignedBlockWithAttestation, Slot, State, + AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config, + SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Slot, + State, }; use metrics::{METRICS, set_gauge_u64}; -use tracing::warn; use ssz::{H256, SszHash}; +use tracing::warn; use xmss::Signature; pub type Interval = u64; @@ -111,11 +111,25 @@ impl Store { let target_checkpoint = self.get_attestation_target(); + let head_state = self + .states + .get(&self.head) + .ok_or_else(|| anyhow!("head state not found"))?; + + let source = if head_state.latest_justified.root.is_zero() { + Checkpoint { + root: self.head, + slot: head_state.latest_justified.slot, + } + } else { + head_state.latest_justified.clone() + }; + Ok(AttestationData { slot, head: head_checkpoint, target: target_checkpoint, - source: self.latest_justified.clone(), + source, }) } @@ -543,32 +557,22 @@ pub fn get_proposal_head(store: &mut Store, slot: Slot) -> H256 { store.head } -/// Produce a block and aggregated signature proofs for the target slot per devnet-2. -/// -/// The proposer returns the block and `MultisigAggregatedSignature` proofs aligned -/// with `block.body.attestations` so it can craft `SignedBlockWithAttestation`. -/// -/// # Algorithm Overview -/// 1. **Get Proposal Head**: Retrieve current chain head as parent -/// 2. **Collect Attestations**: Convert known attestations to plain attestations -/// 3. **Build Block**: Use State.build_block with signature caches -/// -/// The block and state are NOT inserted here. The caller signs the block and sends -/// it back via `ChainMessage::ProcessBlock`, which runs the full `on_block` path: -/// state transition, `update_head`, checkpoint updates, and proposer attestation. -/// -/// # Arguments -/// * `store` - Mutable reference to the fork choice store -/// * `slot` - Target slot number for block production -/// * `validator_index` - Index of validator authorized to propose this block -/// -/// # Returns -/// Tuple of (block root, finalized Block, attestation signature proofs) -pub fn produce_block_with_signatures( +pub struct BlockProductionInputs { + pub slot: Slot, + pub validator_index: u64, + pub head_root: H256, + pub head_state: State, + pub available_attestations: Vec, + pub known_block_roots: HashSet, + pub gossip_signatures: HashMap, + pub aggregated_payloads: HashMap>, +} + +pub fn prepare_block_production( store: &mut Store, slot: Slot, validator_index: u64, -) -> Result<(H256, Block, Vec)> { +) -> Result { let head_root = get_proposal_head(store, slot); let head_state = store .states @@ -595,7 +599,35 @@ pub fn produce_block_with_signatures( }) .collect(); - let known_block_roots: std::collections::HashSet = store.blocks.keys().copied().collect(); + let known_block_roots: HashSet = store.blocks.keys().copied().collect(); + let gossip_signatures = store.gossip_signatures.clone(); + let aggregated_payloads = store.latest_known_aggregated_payloads.clone(); + + Ok(BlockProductionInputs { + slot, + validator_index, + head_root, + head_state, + available_attestations, + known_block_roots, + gossip_signatures, + aggregated_payloads, + }) +} + +pub fn execute_block_production( + inputs: BlockProductionInputs, +) -> Result<(H256, Block, Vec)> { + let BlockProductionInputs { + slot, + validator_index, + head_root, + head_state, + available_attestations, + known_block_roots, + gossip_signatures, + aggregated_payloads, + } = inputs; let (final_block, _final_post_state, _aggregated_attestations, signatures) = head_state .build_block( @@ -605,11 +637,20 @@ pub fn produce_block_with_signatures( None, Some(available_attestations), Some(&known_block_roots), - Some(&store.gossip_signatures), - Some(&store.latest_known_aggregated_payloads), + Some(&gossip_signatures), + Some(&aggregated_payloads), )?; let block_root = final_block.hash_tree_root(); Ok((block_root, final_block, signatures)) } + +pub fn produce_block_with_signatures( + store: &mut Store, + slot: Slot, + validator_index: u64, +) -> Result<(H256, Block, Vec)> { + let inputs = prepare_block_production(store, slot, validator_index)?; + execute_block_production(inputs) +} diff --git a/lean_client/fork_choice/tests/unit_tests/validator.rs b/lean_client/fork_choice/tests/unit_tests/validator.rs index 876e9b6a..d945bda7 100644 --- a/lean_client/fork_choice/tests/unit_tests/validator.rs +++ b/lean_client/fork_choice/tests/unit_tests/validator.rs @@ -330,8 +330,19 @@ fn test_block_production_then_attestation() { assert_eq!(attestation.validator_id, attestor_idx); assert_eq!(attestation.data.slot, Slot(2)); - // The attestation should be consistent with current forkchoice state - assert_eq!(attestation.data.source, store.latest_justified); + let head_state = store + .states + .get(&store.head) + .expect("head state must exist"); + let expected_source = if head_state.latest_justified.root.is_zero() { + Checkpoint { + root: store.head, + slot: head_state.latest_justified.slot, + } + } else { + head_state.latest_justified.clone() + }; + assert_eq!(attestation.data.source, expected_source); } #[test] @@ -528,3 +539,35 @@ fn test_validator_operations_invalid_parameters() { }; assert_eq!(attestation.validator_id, large_validator); } + +#[test] +fn test_produce_attestation_data_uses_head_state_justified() { + let mut store = create_test_store(); + + // Simulate a minority-fork block advancing store.latest_justified + // past what the head chain has seen. + store.latest_justified = Checkpoint { + root: H256::from_slice(&[0xff; 32]), + slot: Slot(5), + }; + + let attestation_data = store + .produce_attestation_data(Slot(1)) + .expect("produce_attestation_data failed"); + + let head_state = store + .states + .get(&store.head) + .expect("head state must exist"); + let expected_source = if head_state.latest_justified.root.is_zero() { + Checkpoint { + root: store.head, + slot: head_state.latest_justified.slot, + } + } else { + head_state.latest_justified.clone() + }; + + assert_eq!(attestation_data.source, expected_source); + assert_ne!(attestation_data.source, store.latest_justified); +} diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index 5940d8da..29d5f5b0 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -142,7 +142,6 @@ pub struct Metrics { pub lean_attestation_committee_count: IntGauge, // OOM Detection Metrics - /// Number of entries in the attestation_data_by_root secondary index pub grandine_attestation_data_by_root: IntGauge, @@ -520,7 +519,9 @@ impl Metrics { default_registry.register(Box::new(self.grandine_pending_fetch_roots.clone()))?; default_registry.register(Box::new(self.grandine_block_cache_size.clone()))?; default_registry.register(Box::new(self.grandine_slots_behind.clone()))?; - default_registry.register(Box::new(self.grandine_fork_choice_known_attestations.clone()))?; + default_registry.register(Box::new( + self.grandine_fork_choice_known_attestations.clone(), + ))?; default_registry.register(Box::new(self.grandine_fork_choice_new_attestations.clone()))?; default_registry.register(Box::new(self.grandine_xmss_verify_skipped_total.clone()))?; diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index 54719ca6..dee08a82 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -1076,10 +1076,7 @@ where && peer_id != self.local_peer_id() { let current_state = self.peer_table.lock().get(&peer_id).cloned(); - if !matches!( - current_state, - Some(ConnectionState::Disconnected) | None - ) { + if !matches!(current_state, Some(ConnectionState::Disconnected) | None) { trace!(?peer_id, "Already connected or connecting"); continue; } diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index 6969b41a..a3690967 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashMap, - fmt::Display, - sync::Arc, -}; +use std::{collections::HashMap, fmt::Display, sync::Arc}; use anyhow::{Result, anyhow}; use async_trait::async_trait; diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index fc6ae90d..43463ecd 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -11,8 +11,8 @@ use fork_choice::{ block_cache::BlockCache, handlers::{on_aggregated_attestation, on_attestation, on_block, on_tick}, store::{ - INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, get_forkchoice_store, - produce_block_with_signatures, + INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, execute_block_production, + get_forkchoice_store, prepare_block_production, }, sync_state::SyncState, }; @@ -1280,10 +1280,24 @@ async fn main() -> Result<()> { let Some(v_message) = v_message else { break }; match v_message { ValidatorChainMessage::ProduceBlock { slot, proposer_index, sender } => { - let result = produce_block_with_signatures( - &mut *store.write(), slot, proposer_index - ).map(|(_, block, sigs)| (block, sigs)); - let _ = sender.send(result); + let prepare_result = { + let mut w = store.write(); + prepare_block_production(&mut *w, slot, proposer_index) + }; + + match prepare_result { + Err(e) => { let _ = sender.send(Err(e)); } + Ok(inputs) => { + let result = task::spawn_blocking(move || { + execute_block_production(inputs) + .map(|(_, block, sigs)| (block, sigs)) + }) + .await + .unwrap_or_else(|e| Err(anyhow::anyhow!("block production task panicked: {e}"))); + + let _ = sender.send(result); + } + } } ValidatorChainMessage::BuildAttestationData { slot, sender } => { let store_read = store.read();