Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lean_client/fork_choice/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ pub fn on_gossip_attestation(
.set(store.gossip_signatures.len() as i64);
});
} else {
METRICS.get().map(|m| m.grandine_xmss_verify_skipped_total.inc());
METRICS
.get()
.map(|m| m.grandine_xmss_verify_skipped_total.inc());
}

store
Expand Down Expand Up @@ -644,7 +646,9 @@ fn process_block_internal(
adr.get(&key.data_root)
.map_or(true, |data| data.target.slot.0 > finalized_slot)
});
store.attestation_data_by_root.retain(|_, data| data.target.slot.0 > finalized_slot);
store
.attestation_data_by_root
.retain(|_, data| data.target.slot.0 > finalized_slot);
METRICS.get().map(|m| {
m.grandine_attestation_data_by_root
.set(store.attestation_data_by_root.len() as i64)
Expand Down
103 changes: 72 additions & 31 deletions lean_client/fork_choice/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::collections::{HashMap, HashSet};

use anyhow::{Result, anyhow, ensure};
use containers::{
AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader,
Checkpoint, Config, SignatureKey, SignedAggregatedAttestation, SignedAttestation,
SignedBlockWithAttestation, Slot, State,
AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config,
SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Slot,
State,
};
use metrics::{METRICS, set_gauge_u64};
use tracing::warn;
use ssz::{H256, SszHash};
use tracing::warn;
use xmss::Signature;

pub type Interval = u64;
Expand Down Expand Up @@ -111,11 +111,25 @@ impl Store {

let target_checkpoint = self.get_attestation_target();

let head_state = self
.states
.get(&self.head)
.ok_or_else(|| anyhow!("head state not found"))?;

let source = if head_state.latest_justified.root.is_zero() {
Checkpoint {
root: self.head,
slot: head_state.latest_justified.slot,
}
} else {
head_state.latest_justified.clone()
};

Ok(AttestationData {
slot,
head: head_checkpoint,
target: target_checkpoint,
source: self.latest_justified.clone(),
source,
})
}

Expand Down Expand Up @@ -543,32 +557,22 @@ pub fn get_proposal_head(store: &mut Store, slot: Slot) -> H256 {
store.head
}

/// Produce a block and aggregated signature proofs for the target slot per devnet-2.
///
/// The proposer returns the block and `MultisigAggregatedSignature` proofs aligned
/// with `block.body.attestations` so it can craft `SignedBlockWithAttestation`.
///
/// # Algorithm Overview
/// 1. **Get Proposal Head**: Retrieve current chain head as parent
/// 2. **Collect Attestations**: Convert known attestations to plain attestations
/// 3. **Build Block**: Use State.build_block with signature caches
///
/// The block and state are NOT inserted here. The caller signs the block and sends
/// it back via `ChainMessage::ProcessBlock`, which runs the full `on_block` path:
/// state transition, `update_head`, checkpoint updates, and proposer attestation.
///
/// # Arguments
/// * `store` - Mutable reference to the fork choice store
/// * `slot` - Target slot number for block production
/// * `validator_index` - Index of validator authorized to propose this block
///
/// # Returns
/// Tuple of (block root, finalized Block, attestation signature proofs)
pub fn produce_block_with_signatures(
pub struct BlockProductionInputs {
pub slot: Slot,
pub validator_index: u64,
pub head_root: H256,
pub head_state: State,
pub available_attestations: Vec<Attestation>,
pub known_block_roots: HashSet<H256>,
pub gossip_signatures: HashMap<SignatureKey, Signature>,
pub aggregated_payloads: HashMap<SignatureKey, Vec<AggregatedSignatureProof>>,
}

pub fn prepare_block_production(
store: &mut Store,
slot: Slot,
validator_index: u64,
) -> Result<(H256, Block, Vec<AggregatedSignatureProof>)> {
) -> Result<BlockProductionInputs> {
let head_root = get_proposal_head(store, slot);
let head_state = store
.states
Expand All @@ -595,7 +599,35 @@ pub fn produce_block_with_signatures(
})
.collect();

let known_block_roots: std::collections::HashSet<H256> = store.blocks.keys().copied().collect();
let known_block_roots: HashSet<H256> = store.blocks.keys().copied().collect();
let gossip_signatures = store.gossip_signatures.clone();
let aggregated_payloads = store.latest_known_aggregated_payloads.clone();

Ok(BlockProductionInputs {
slot,
validator_index,
head_root,
head_state,
available_attestations,
known_block_roots,
gossip_signatures,
aggregated_payloads,
})
}

pub fn execute_block_production(
inputs: BlockProductionInputs,
) -> Result<(H256, Block, Vec<AggregatedSignatureProof>)> {
let BlockProductionInputs {
slot,
validator_index,
head_root,
head_state,
available_attestations,
known_block_roots,
gossip_signatures,
aggregated_payloads,
} = inputs;

let (final_block, _final_post_state, _aggregated_attestations, signatures) = head_state
.build_block(
Expand All @@ -605,11 +637,20 @@ pub fn produce_block_with_signatures(
None,
Some(available_attestations),
Some(&known_block_roots),
Some(&store.gossip_signatures),
Some(&store.latest_known_aggregated_payloads),
Some(&gossip_signatures),
Some(&aggregated_payloads),
)?;

let block_root = final_block.hash_tree_root();

Ok((block_root, final_block, signatures))
}

pub fn produce_block_with_signatures(
store: &mut Store,
slot: Slot,
validator_index: u64,
) -> Result<(H256, Block, Vec<AggregatedSignatureProof>)> {
let inputs = prepare_block_production(store, slot, validator_index)?;
execute_block_production(inputs)
}
47 changes: 45 additions & 2 deletions lean_client/fork_choice/tests/unit_tests/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,19 @@ fn test_block_production_then_attestation() {
assert_eq!(attestation.validator_id, attestor_idx);
assert_eq!(attestation.data.slot, Slot(2));

// The attestation should be consistent with current forkchoice state
assert_eq!(attestation.data.source, store.latest_justified);
let head_state = store
.states
.get(&store.head)
.expect("head state must exist");
let expected_source = if head_state.latest_justified.root.is_zero() {
Checkpoint {
root: store.head,
slot: head_state.latest_justified.slot,
}
} else {
head_state.latest_justified.clone()
};
assert_eq!(attestation.data.source, expected_source);
}

#[test]
Expand Down Expand Up @@ -528,3 +539,35 @@ fn test_validator_operations_invalid_parameters() {
};
assert_eq!(attestation.validator_id, large_validator);
}

#[test]
fn test_produce_attestation_data_uses_head_state_justified() {
let mut store = create_test_store();

// Simulate a minority-fork block advancing store.latest_justified
// past what the head chain has seen.
store.latest_justified = Checkpoint {
root: H256::from_slice(&[0xff; 32]),
slot: Slot(5),
};

let attestation_data = store
.produce_attestation_data(Slot(1))
.expect("produce_attestation_data failed");

let head_state = store
.states
.get(&store.head)
.expect("head state must exist");
let expected_source = if head_state.latest_justified.root.is_zero() {
Checkpoint {
root: store.head,
slot: head_state.latest_justified.slot,
}
} else {
head_state.latest_justified.clone()
};

assert_eq!(attestation_data.source, expected_source);
assert_ne!(attestation_data.source, store.latest_justified);
}
5 changes: 3 additions & 2 deletions lean_client/metrics/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ pub struct Metrics {
pub lean_attestation_committee_count: IntGauge,

// OOM Detection Metrics

/// Number of entries in the attestation_data_by_root secondary index
pub grandine_attestation_data_by_root: IntGauge,

Expand Down Expand Up @@ -520,7 +519,9 @@ impl Metrics {
default_registry.register(Box::new(self.grandine_pending_fetch_roots.clone()))?;
default_registry.register(Box::new(self.grandine_block_cache_size.clone()))?;
default_registry.register(Box::new(self.grandine_slots_behind.clone()))?;
default_registry.register(Box::new(self.grandine_fork_choice_known_attestations.clone()))?;
default_registry.register(Box::new(
self.grandine_fork_choice_known_attestations.clone(),
))?;
default_registry.register(Box::new(self.grandine_fork_choice_new_attestations.clone()))?;
default_registry.register(Box::new(self.grandine_xmss_verify_skipped_total.clone()))?;

Expand Down
5 changes: 1 addition & 4 deletions lean_client/networking/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1076,10 +1076,7 @@ where
&& peer_id != self.local_peer_id()
{
let current_state = self.peer_table.lock().get(&peer_id).cloned();
if !matches!(
current_state,
Some(ConnectionState::Disconnected) | None
) {
if !matches!(current_state, Some(ConnectionState::Disconnected) | None) {
trace!(?peer_id, "Already connected or connecting");
continue;
}
Expand Down
6 changes: 1 addition & 5 deletions lean_client/networking/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::HashMap,
fmt::Display,
sync::Arc,
};
use std::{collections::HashMap, fmt::Display, sync::Arc};

use anyhow::{Result, anyhow};
use async_trait::async_trait;
Expand Down
26 changes: 20 additions & 6 deletions lean_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use fork_choice::{
block_cache::BlockCache,
handlers::{on_aggregated_attestation, on_attestation, on_block, on_tick},
store::{
INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, get_forkchoice_store,
produce_block_with_signatures,
INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, execute_block_production,
get_forkchoice_store, prepare_block_production,
},
sync_state::SyncState,
};
Expand Down Expand Up @@ -1280,10 +1280,24 @@ async fn main() -> Result<()> {
let Some(v_message) = v_message else { break };
match v_message {
ValidatorChainMessage::ProduceBlock { slot, proposer_index, sender } => {
let result = produce_block_with_signatures(
&mut *store.write(), slot, proposer_index
).map(|(_, block, sigs)| (block, sigs));
let _ = sender.send(result);
let prepare_result = {
let mut w = store.write();
prepare_block_production(&mut *w, slot, proposer_index)
};

match prepare_result {
Err(e) => { let _ = sender.send(Err(e)); }
Ok(inputs) => {
let result = task::spawn_blocking(move || {
execute_block_production(inputs)
.map(|(_, block, sigs)| (block, sigs))
})
.await
.unwrap_or_else(|e| Err(anyhow::anyhow!("block production task panicked: {e}")));

let _ = sender.send(result);
}
}
}
ValidatorChainMessage::BuildAttestationData { slot, sender } => {
let store_read = store.read();
Expand Down
Loading