From 36cb41523c6cf24e9d4a4242f43619798e4e89da Mon Sep 17 00:00:00 2001 From: bomanaps Date: Fri, 1 May 2026 17:31:39 +0100 Subject: [PATCH 1/5] align proposer to spec offload XMSS bound store raise libp2p timeout --- lean_client/Cargo.lock | 71 ++- lean_client/Cargo.toml | 6 +- lean_client/containers/src/attestation.rs | 40 ++ lean_client/containers/src/state.rs | 582 ++++++------------ lean_client/fork_choice/src/handlers.rs | 195 ++++-- lean_client/fork_choice/src/store.rs | 119 ++-- .../fork_choice/tests/unit_tests/validator.rs | 112 ++-- lean_client/metrics/src/metrics.rs | 163 +++++ lean_client/networking/src/network/service.rs | 22 +- lean_client/networking/src/req_resp.rs | 19 +- lean_client/networking/src/types.rs | 14 +- lean_client/src/aggregation.rs | 91 ++- lean_client/src/banner.rs | 32 + lean_client/src/main.rs | 419 ++++++++++--- lean_client/validator/Cargo.toml | 2 + lean_client/validator/src/lib.rs | 142 +++-- 16 files changed, 1349 insertions(+), 680 deletions(-) create mode 100644 lean_client/src/banner.rs diff --git a/lean_client/Cargo.lock b/lean_client/Cargo.lock index 9a194ddb..28a69003 100644 --- a/lean_client/Cargo.lock +++ b/lean_client/Cargo.lock @@ -449,7 +449,7 @@ dependencies = [ "futures-lite", "parking", "polling", - "rustix", + "rustix 1.1.4", "slab", "windows-sys 0.61.2", ] @@ -1326,6 +1326,21 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dedicated_executor" +version = "0.1.0" +source = "git+https://github.com/bomanaps/dedicated_executor?rev=e39afad2959e6ae3f55ed56f6117dfe12ec07359#e39afad2959e6ae3f55ed56f6117dfe12ec07359" +dependencies = [ + "futures", + "libc", + "log", + "once_cell", + "parking_lot", + "pin-project", + "tokio", + "tokio-util", +] + [[package]] name = "delay_map" version = "0.4.1" @@ -2934,6 +2949,7 @@ dependencies = [ "chain", "clap", "containers", + "dedicated_executor", "ethereum-types", "features", "fork_choice", @@ -2942,6 +2958,7 @@ dependencies = [ "libp2p-identity 0.2.13", "metrics", "networking", + "num_cpus", "parking_lot", "reqwest", "ssz", @@ -3503,6 +3520,12 @@ dependencies = [ "yamux 0.13.10", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -4064,6 +4087,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" dependencies = [ "critical-section", + "parking_lot_core", "portable-atomic", ] @@ -4440,7 +4464,7 @@ dependencies = [ "concurrent-queue", "hermit-abi", "pin-project-lite", - "rustix", + "rustix 1.1.4", "windows-sys 0.61.2", ] @@ -4616,6 +4640,28 @@ dependencies = [ "version_check", ] +[[package]] +name = "procfs" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" +dependencies = [ + "bitflags", + "hex", + "procfs-core", + "rustix 0.38.44", +] + +[[package]] +name = "procfs-core" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" +dependencies = [ + "bitflags", + "hex", +] + [[package]] name = "prometheus" version = "0.14.0" @@ -4625,8 +4671,10 @@ dependencies = [ "cfg-if", "fnv", "lazy_static", + "libc", "memchr", "parking_lot", + "procfs", "protobuf", "thiserror 2.0.18", ] @@ -5266,6 +5314,19 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.52.0", +] + [[package]] name = "rustix" version = "1.1.4" @@ -5275,7 +5336,7 @@ dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.12.1", "windows-sys 0.61.2", ] @@ -5985,7 +6046,7 @@ dependencies = [ "fastrand", "getrandom 0.4.2", "once_cell", - "rustix", + "rustix 1.1.4", "windows-sys 0.61.2", ] @@ -6579,9 +6640,11 @@ version = "0.0.0" dependencies = [ "anyhow", "containers", + "dedicated_executor", "env-config", "ethereum-types", "fork_choice", + "futures", "metrics", "rayon", "serde", diff --git a/lean_client/Cargo.toml b/lean_client/Cargo.toml index f4bd6db7..5451615c 100644 --- a/lean_client/Cargo.toml +++ b/lean_client/Cargo.toml @@ -264,13 +264,15 @@ libp2p = { version = "0.56.0", default-features = false, features = [ ] } libp2p-identity = { version = "0.2", features = ["secp256k1"] } libp2p-mplex = "0.39" +dedicated_executor = { git = "https://github.com/bomanaps/dedicated_executor", rev = "e39afad2959e6ae3f55ed56f6117dfe12ec07359" } num-bigint = "0.4" num-traits = "0.2" +num_cpus = "1" once_cell = "1.21" parking_lot = "0.12" paste = "1.0.15" pretty_assertions = "1.4" -prometheus = "0.14" +prometheus = { version = "0.14", features = ["process"] } rand = "0.10" rand_chacha = "0.10" rayon = "1" @@ -309,6 +311,7 @@ bls = { workspace = true } chain = { workspace = true } clap = { workspace = true } containers = { workspace = true } +dedicated_executor = { workspace = true } ethereum-types = { workspace = true } features = { workspace = true } fork_choice = { workspace = true } @@ -317,6 +320,7 @@ http_api = { workspace = true } libp2p-identity = { workspace = true } metrics = { workspace = true } networking = { workspace = true } +num_cpus = { workspace = true } parking_lot = { workspace = true } ssz = { workspace = true } tokio = { workspace = true } diff --git a/lean_client/containers/src/attestation.rs b/lean_client/containers/src/attestation.rs index 3524598f..57f8623f 100644 --- a/lean_client/containers/src/attestation.rs +++ b/lean_client/containers/src/attestation.rs @@ -92,6 +92,46 @@ impl AggregatedSignatureProof { ) -> Result<()> { self.proof_data.verify(public_keys, message, slot) } + + /// Greedy set-cover over a slice of proofs, returning indices of the proofs + /// to admit in priority order. Repeatedly picks the proof covering the + /// most uncovered validators; stops when no remaining proof adds coverage. + /// Mirrors leanSpec `AggregatedSignatureProof.select_greedily`. + pub fn select_greedily(proofs: &[AggregatedSignatureProof]) -> Vec { + let mut selected: Vec = Vec::new(); + let mut covered: HashSet = HashSet::new(); + let mut remaining: HashSet = (0..proofs.len()).collect(); + + while !remaining.is_empty() { + let best = remaining + .iter() + .copied() + .map(|idx| { + let new_count = proofs[idx] + .get_participant_indices() + .into_iter() + .filter(|vid| !covered.contains(vid)) + .count(); + (idx, new_count) + }) + .max_by_key(|(_, n)| *n); + + let Some((best_idx, best_count)) = best else { + break; + }; + if best_count == 0 { + break; + } + + for vid in proofs[best_idx].get_participant_indices() { + covered.insert(vid); + } + selected.push(best_idx); + remaining.remove(&best_idx); + } + + selected + } } /// Bitlist representing validator participation in an attestation. diff --git a/lean_client/containers/src/state.rs b/lean_client/containers/src/state.rs index 7a979fed..035fa21d 100644 --- a/lean_client/containers/src/state.rs +++ b/lean_client/containers/src/state.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result, anyhow, ensure}; +use anyhow::{Result, anyhow, ensure}; use bitvec::{bitvec, order::Lsb0, vec::BitVec}; use metrics::METRICS; use serde::{Deserialize, Serialize}; @@ -10,8 +10,10 @@ use typenum::{Prod, U262144}; use xmss::{PublicKey, Signature}; use crate::{ - AggregatedSignatureProof, Attestation, Checkpoint, Config, SignatureKey, Slot, - attestation::{AggregatedAttestation, AggregatedAttestations, AggregationBits}, + AggregatedSignatureProof, Checkpoint, Config, Slot, + attestation::{ + AggregatedAttestation, AggregatedAttestations, AggregationBits, AttestationData, + }, block::{Block, BlockBody, BlockHeader, SignedBlock}, validator::{Validator, ValidatorRegistryLimit, Validators}, }; @@ -604,37 +606,24 @@ impl State { /// Build a valid block on top of this state. /// - /// Computes the post-state and creates a block with the correct state root. - /// If `available_attestations` and `known_block_roots` are provided, - /// performs fixed-point attestation collection: iteratively adds valid - /// attestations until no more can be included. This is necessary because - /// processing attestations may update the justified checkpoint, which may - /// make additional attestations valid. + /// Iterates over `aggregated_payloads` keyed by `data_root` (with the + /// AttestationData carried in the value), applies the spec's fixed-point + /// attestation selection: sort by `target.slot`, admit entries whose + /// `head.root` is known and whose `source` matches the current justified + /// checkpoint, greedily select proofs maximizing validator coverage, run + /// the STF, and repeat as long as the post-state's justified checkpoint + /// advances. Caps distinct AttestationData entries at MAX_ATTESTATIONS_DATA. /// - /// # Arguments - /// - /// * `slot` - Target slot for the block - /// * `proposer_index` - Validator index of the proposer - /// * `parent_root` - Root of the parent block (must match state after slot processing) - /// * `initial_attestations` - Initial attestations to include - /// * `available_attestations` - Optional pool of attestations to collect from - /// * `known_block_roots` - Optional set of known block roots for attestation validation - /// * `gossip_signatures` - Optional map of individual signatures from gossip - /// * `aggregated_payloads` - Optional map of aggregated signature proofs - /// - /// # Returns - /// - /// Tuple of (Block, post-State, collected aggregated attestations, aggregated proofs) + /// Aggregator-published proofs are the sole input — no gossip-time + /// re-aggregation at proposal. Matches leanSpec build_block, ethlambda's + /// blockchain::store::build_block, and zeam's getProposalAttestations. pub fn build_block( &self, slot: Slot, proposer_index: u64, parent_root: H256, - initial_attestations: Option>, - available_attestations: Option>, - known_block_roots: Option<&HashSet>, - gossip_signatures: Option<&HashMap>, - aggregated_payloads: Option<&HashMap>>, + known_block_roots: &HashSet, + aggregated_payloads: &HashMap)>, log_inv_rate: usize, ) -> Result<( Block, @@ -642,121 +631,106 @@ impl State { Vec, Vec, )> { - // Initialize attestation set - let mut attestations = initial_attestations.unwrap_or_default(); - - // Fixed-point attestation collection loop - // Iteratively add valid attestations until no new ones can be added - loop { - // Create candidate block with current attestation set - let aggregated = AggregatedAttestation::aggregate_by_data(&attestations); - - let candidate_block = Block { - slot, - proposer_index, - parent_root, - state_root: H256::zero(), - body: BlockBody { - attestations: AggregatedAttestations::try_from_iter(aggregated.into_iter())?, - }, - }; + let mut selected: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = Vec::new(); - // Apply state transition to get the post-block state - let post_state = self.process_slots(slot)?.process_block(&candidate_block)?; - - let Some(ref available_attestations) = available_attestations else { - // No attestation source provided: done after computing post_state - break; + if !aggregated_payloads.is_empty() { + // Genesis edge case: process_block_header rebinds latest_justified.root + // to parent_root when building on slot 0. Apply the same derivation + // here so attestation sources match. + let mut current_justified = if self.latest_block_header.slot == Slot(0) { + Checkpoint { + root: parent_root, + slot: self.latest_justified.slot, + } + } else { + self.latest_justified.clone() }; - let Some(known_block_roots) = known_block_roots else { - // No attestation source provided: done after computing post_state - break; - }; + // Sort by target.slot for deterministic processing order. + let mut sorted_entries: Vec<(&H256, &(AttestationData, Vec))> = + aggregated_payloads.iter().collect(); + sorted_entries.sort_by_key(|(_, (data, _))| data.target.slot); - // Find new valid attestations matching post-state justification - let mut new_attestations = Vec::new(); + let mut processed_data_roots: HashSet = HashSet::new(); - // Track distinct AttestationData roots already accepted and newly added this iteration - let accepted_data_roots: HashSet = attestations - .iter() - .map(|a| a.data.hash_tree_root()) - .collect(); - let mut new_att_data_roots: HashSet = HashSet::new(); + loop { + let mut found_new = false; - for attestation in available_attestations { - let data = &attestation.data; - let validator_id = attestation.validator_id; - let data_root = data.hash_tree_root(); - let sig_key = SignatureKey::new(validator_id, data_root); + for &(data_root, (att_data, proofs)) in &sorted_entries { + if processed_data_roots.contains(data_root) { + continue; + } + if processed_data_roots.len() >= MAX_ATTESTATIONS_DATA { + break; + } + if !known_block_roots.contains(&att_data.head.root) { + continue; + } + if att_data.source != current_justified { + continue; + } - // Skip if target block is unknown - if !known_block_roots.contains(&data.head.root) { - continue; - } + processed_data_roots.insert(*data_root); + found_new = true; - // Skip if attestation source does not match post-state's latest justified - if data.source != post_state.latest_justified { - continue; + let indices = AggregatedSignatureProof::select_greedily(proofs); + for idx in indices { + let proof = proofs[idx].clone(); + selected.push(( + AggregatedAttestation { + aggregation_bits: proof.participants.clone(), + data: att_data.clone(), + }, + proof, + )); + } } - // Avoid adding duplicates of attestations already in the candidate set - if attestations.contains(attestation) { - continue; + if !found_new { + break; } - // Enforce MAX_ATTESTATIONS_DATA: only admit a new AttestationData if under the limit - let is_existing_data = accepted_data_roots.contains(&data_root) - || new_att_data_roots.contains(&data_root); - if !is_existing_data - && accepted_data_roots.len() + new_att_data_roots.len() >= MAX_ATTESTATIONS_DATA - { - continue; - } + // Run STF on a candidate block to see if justification advanced. + let candidate_attestations = AggregatedAttestations::try_from_iter( + selected.iter().map(|(att, _)| att.clone()), + )?; + let candidate_block = Block { + slot, + proposer_index, + parent_root, + state_root: H256::zero(), + body: BlockBody { + attestations: candidate_attestations, + }, + }; + let post_state = self.process_slots(slot)?.process_block(&candidate_block)?; - // We can only include an attestation if we have some way to later provide - // an aggregated proof for its group: - // - either a per validator XMSS signature from gossip, or - // - at least one aggregated proof learned from a block that references - // this validator+data. - let has_gossip_sig = - gossip_signatures.is_some_and(|sigs| sigs.contains_key(&sig_key)); - let has_block_proof = - aggregated_payloads.is_some_and(|payloads| payloads.contains_key(&data_root)); - - if has_gossip_sig || has_block_proof { - new_att_data_roots.insert(data_root); - new_attestations.push(attestation.clone()); + if post_state.latest_justified != current_justified { + current_justified = post_state.latest_justified; + } else { + break; } } - - // Fixed point reached: no new attestations found - if new_attestations.is_empty() { - break; - } - - // Add new attestations and continue iteration - attestations.extend(new_attestations); } - let (aggregated_attestations, aggregated_signatures) = self.compute_aggregated_signatures( - &attestations, - gossip_signatures, - aggregated_payloads, - log_inv_rate, - )?; + // Compact: merge proofs sharing the same AttestationData via recursive + // aggregation so each AttestationData appears at most once in the body. + let compacted = self.compact_proofs_by_data(selected, log_inv_rate)?; METRICS.get().map(|metrics| { metrics .lean_pq_sig_attestations_in_aggregated_signatures_total .inc_by( - aggregated_attestations + compacted .iter() - .map(|v| v.aggregation_bits.to_validator_indices().len()) + .map(|(att, _)| att.aggregation_bits.to_validator_indices().len()) .sum::() as u64, ); }); + let (aggregated_attestations, aggregated_signatures): (Vec<_>, Vec<_>) = + compacted.into_iter().unzip(); + let mut final_block = Block { slot, proposer_index, @@ -764,7 +738,7 @@ impl State { state_root: H256::zero(), body: BlockBody { attestations: AggregatedAttestations::try_from_iter( - aggregated_attestations.clone(), + aggregated_attestations.iter().cloned(), )?, }, }; @@ -781,309 +755,107 @@ impl State { )) } - pub fn compute_aggregated_signatures( + /// Merge selected (AggregatedAttestation, Proof) entries that share the + /// same AttestationData into a single recursive proof via + /// `aggregate_with_children`, preserving first-occurrence order. A block + /// body must contain at most one entry per AttestationData (spec invariant). + fn compact_proofs_by_data( &self, - attestations: &[Attestation], - gossip_signatures: Option<&HashMap>, - aggregated_payloads: Option<&HashMap>>, + entries: Vec<(AggregatedAttestation, AggregatedSignatureProof)>, log_inv_rate: usize, - ) -> Result<(Vec, Vec)> { - let mut results: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = Vec::new(); - - // Group individual attestations by data - for aggregated in AggregatedAttestation::aggregate_by_data(attestations) { - let data = &aggregated.data; - let data_root = data.hash_tree_root(); - let validator_ids = aggregated.aggregation_bits.to_validator_indices(); - - // Phase 1: Gossip Collection - // Try to collect individual signatures from gossip network - let mut gossip_sigs = Vec::new(); - let mut gossip_keys = Vec::new(); - let mut gossip_ids = Vec::new(); - - let mut remaining = HashSet::new(); - - if let Some(gossip_signatures) = gossip_signatures { - for vid in validator_ids { - let key = SignatureKey::new(vid, data_root); - if let Some(sig) = gossip_signatures.get(&key) { - gossip_sigs.push(sig.clone()); - gossip_keys.push( - self.validators - .get(vid) - .map(|v| v.attestation_pubkey.clone()) - .context(format!("invalid validator id {vid}"))?, - ); - gossip_ids.push(vid); - } else { - remaining.insert(vid); - } - } - } else { - // No gossip data: all validators need fallback - remaining = validator_ids.iter().copied().collect(); - } - - // If we collected any gossip signatures, create an aggregated proof - if !gossip_ids.is_empty() { - let participants = AggregationBits::from_validator_indices(&gossip_ids); - - let proof = AggregatedSignatureProof::aggregate( - participants.clone(), - gossip_keys, - gossip_sigs, - data_root, - data.slot.0 as u32, - log_inv_rate, - )?; - - results.push(( - AggregatedAttestation { - aggregation_bits: participants, - data: data.clone(), - }, - proof, - )); - } - - // Phase 2: Fallback to block proofs using greedy set-cover. - // Collect all selected proofs as children, then compress into ONE - // recursive proof via aggregate_with_children (spec intent). - if let Some(payloads) = aggregated_payloads { - if !remaining.is_empty() { - if let Some(candidates) = payloads.get(&data_root) { - if !candidates.is_empty() { - let mut phase2_children: Vec<&AggregatedSignatureProof> = Vec::new(); - - loop { - if remaining.is_empty() { - break; - } - - let Some((best_proof, covered_set)) = candidates - .iter() - .map(|proof| { - let proof_validators: HashSet = - proof.get_participant_indices().into_iter().collect(); - let intersection: HashSet = remaining - .intersection(&proof_validators) - .copied() - .collect(); - (proof, intersection) - }) - .max_by_key(|(_, intersection)| intersection.len()) - else { - break; - }; - - if covered_set.is_empty() { - break; - } - - phase2_children.push(best_proof); - for vid in &covered_set { - remaining.remove(vid); - } - } - - if !phase2_children.is_empty() { - if phase2_children.len() == 1 { - // leanSpec state.py: `if len(proofs) == 1: sig = proofs[0]` - // Single proof: pass through directly without re-aggregating. - let single_proof = (*phase2_children[0]).clone(); - let phase2_participants = single_proof.participants.clone(); - - info!( - slot = data.slot.0, - validators = - phase2_children[0].get_participant_indices().len(), - "Phase 2: single proof passed through directly" - ); - results.push(( - AggregatedAttestation { - aggregation_bits: phase2_participants, - data: data.clone(), - }, - single_proof, - )); - } else { - let child_pk_vecs: Vec> = phase2_children - .iter() - .map(|child| { - child - .get_participant_indices() - .into_iter() - .filter_map(|vid| { - self.validators - .get(vid) - .ok() - .map(|v| v.attestation_pubkey.clone()) - }) - .collect() - }) - .collect(); - - let children_arg: Vec<( - &[PublicKey], - &AggregatedSignatureProof, - )> = child_pk_vecs - .iter() - .zip(phase2_children.iter()) - .map(|(pks, proof)| (pks.as_slice(), *proof)) - .collect(); - - let mut phase2_validator_ids: Vec = phase2_children - .iter() - .flat_map(|child| child.get_participant_indices()) - .collect(); - phase2_validator_ids.sort(); - phase2_validator_ids.dedup(); - - let phase2_participants = - AggregationBits::from_validator_indices( - &phase2_validator_ids, - ); - - match AggregatedSignatureProof::aggregate_with_children( - phase2_participants.clone(), - &children_arg, - Vec::::new(), - Vec::::new(), - data_root, - data.slot.0 as u32, - log_inv_rate, - ) { - Ok(proof) => { - info!( - slot = data.slot.0, - children = phase2_children.len(), - validators = phase2_validator_ids.len(), - "Phase 2: recursive block proof via aggregate_with_children" - ); - results.push(( - AggregatedAttestation { - aggregation_bits: phase2_participants, - data: data.clone(), - }, - proof, - )); - } - Err(e) => { - warn!( - error = %e, - "Phase 2 recursive aggregation failed, skipping" - ); - } - } - } - } - } - } - } - } - } - - // Handle empty case - if results.is_empty() { - return Ok((Vec::new(), Vec::new())); + ) -> Result> { + if entries.len() <= 1 { + return Ok(entries); } - // Post-loop compaction: the main loop may have emitted a Phase 1 (gossip) proof - // AND a Phase 2 (children) proof for the same AttestationData. - // Per spec, each AttestationData must appear at most once in the block body — - // merge any such pairs into a single recursive proof via aggregate_with_children. - let mut proof_groups: HashMap< - H256, - Vec<(AggregatedAttestation, AggregatedSignatureProof)>, - > = HashMap::new(); - for (att, proof) in results { - proof_groups - .entry(att.data.hash_tree_root()) - .or_default() - .push((att, proof)); + let mut order: Vec = Vec::new(); + let mut groups: HashMap> = + HashMap::new(); + for (att, proof) in entries { + let dr = att.data.hash_tree_root(); + if !groups.contains_key(&dr) { + order.push(dr); + } + groups.entry(dr).or_default().push((att, proof)); } - let mut compacted: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = Vec::new(); + let mut compacted: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = + Vec::with_capacity(order.len()); - for (data_root, group) in proof_groups { + for data_root in order { + let group = groups.remove(&data_root).expect("group exists for data_root"); if group.len() == 1 { - // Only one proof for this data — no merge needed compacted.extend(group); - } else { - // Multiple proofs (e.g. Phase 1 gossip + Phase 2 children) — - // merge into one recursive proof so each AttestationData appears once. - let data = group[0].0.data.clone(); - - let child_pk_vecs: Vec> = group - .iter() - .map(|(_, proof)| { - proof - .get_participant_indices() - .into_iter() - .filter_map(|vid| { - self.validators - .get(vid) - .ok() - .map(|v| v.attestation_pubkey.clone()) - }) - .collect() - }) - .collect(); + continue; + } - let children_arg: Vec<(&[PublicKey], &AggregatedSignatureProof)> = child_pk_vecs - .iter() - .zip(group.iter()) - .map(|(pks, (_, proof))| (pks.as_slice(), proof)) - .collect(); + let data = group[0].0.data.clone(); - let mut all_validator_ids: Vec = group - .iter() - .flat_map(|(_, proof)| proof.get_participant_indices()) - .collect(); - all_validator_ids.sort(); - all_validator_ids.dedup(); - let all_participants = AggregationBits::from_validator_indices(&all_validator_ids); - - match AggregatedSignatureProof::aggregate_with_children( - all_participants.clone(), - &children_arg, - Vec::::new(), - Vec::::new(), - data_root, - data.slot.0 as u32, - log_inv_rate, - ) { - Ok(merged_proof) => { - info!( - slot = data.slot.0, - children = group.len(), - validators = all_validator_ids.len(), - "Post-loop compaction: merged proofs into recursive proof" - ); - compacted.push(( - AggregatedAttestation { - aggregation_bits: all_participants, - data, - }, - merged_proof, - )); - } - Err(e) => { - warn!( - error = %e, - "Post-loop compaction failed, keeping proofs separate" - ); - compacted.extend(group); - } + let child_pk_vecs: Vec> = group + .iter() + .map(|(_, proof)| { + proof + .get_participant_indices() + .into_iter() + .filter_map(|vid| { + self.validators + .get(vid) + .ok() + .map(|v| v.attestation_pubkey.clone()) + }) + .collect() + }) + .collect(); + + let children_arg: Vec<(&[PublicKey], &AggregatedSignatureProof)> = child_pk_vecs + .iter() + .zip(group.iter()) + .map(|(pks, (_, proof))| (pks.as_slice(), proof)) + .collect(); + + let mut all_validator_ids: Vec = group + .iter() + .flat_map(|(_, proof)| proof.get_participant_indices()) + .collect(); + all_validator_ids.sort(); + all_validator_ids.dedup(); + let all_participants = AggregationBits::from_validator_indices(&all_validator_ids); + + match AggregatedSignatureProof::aggregate_with_children( + all_participants.clone(), + &children_arg, + Vec::::new(), + Vec::::new(), + data_root, + data.slot.0 as u32, + log_inv_rate, + ) { + Ok(merged_proof) => { + info!( + slot = data.slot.0, + children = group.len(), + validators = all_validator_ids.len(), + "compact_proofs_by_data: merged proofs into recursive proof" + ); + compacted.push(( + AggregatedAttestation { + aggregation_bits: all_participants, + data, + }, + merged_proof, + )); + } + Err(e) => { + warn!( + error = %e, + "compact_proofs_by_data: merge failed, keeping proofs separate" + ); + compacted.extend(group); } } } - let (aggregated_attestations, aggregated_proofs): (Vec<_>, Vec<_>) = - compacted.into_iter().unzip(); - - Ok((aggregated_attestations, aggregated_proofs)) + Ok(compacted) } } diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index bfd66c19..e83a77b5 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -1,6 +1,9 @@ +use std::collections::HashSet; + use anyhow::{Context, Result, anyhow, bail, ensure}; use containers::{ AttestationData, SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlock, + State, }; use metrics::METRICS; use ssz::{H256, SszHash}; @@ -8,8 +11,8 @@ use tracing::warn; use crate::block_cache::BlockCache; use crate::store::{ - GOSSIP_DISPARITY_INTERVALS, INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, Store, - tick_interval, update_head, + BLOCKS_TO_KEEP, GOSSIP_DISPARITY_INTERVALS, HEAD_RETENTION_SLOTS, INTERVALS_PER_SLOT, + MILLIS_PER_INTERVAL, STATES_TO_KEEP, STATE_PRUNE_BUFFER, Store, tick_interval, update_head, }; #[inline] @@ -480,15 +483,21 @@ fn on_attestation_internal( /// 3. Processing attestations included in the block body (on-chain) /// 4. Updating the forkchoice head /// 5. Processing the proposer's attestation (as if gossiped) +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BlockOutcome { + Applied, + AlreadyKnown, +} + pub fn on_block( store: &mut Store, cache: &mut BlockCache, signed_block: SignedBlock, -) -> Result<()> { +) -> Result { let block_root = signed_block.block.hash_tree_root(); if store.blocks.contains_key(&block_root) { - return Ok(()); + return Ok(BlockOutcome::AlreadyKnown); } let parent_root = signed_block.block.parent_root; @@ -503,48 +512,34 @@ pub fn on_block( process_block_internal(store, signed_block, block_root)?; process_pending_blocks(store, cache, vec![block_root]); - Ok(()) + Ok(BlockOutcome::Applied) } -fn process_block_internal( - store: &mut Store, - signed_block: SignedBlock, - block_root: H256, -) -> Result<()> { +/// CPU-bound portion of block processing: verify XMSS signatures against the parent state +/// and run the state transition. Safe to run on a `DedicatedExecutor` thread because it +/// touches no `Store` state. +pub fn verify_and_transition(parent_state: State, signed_block: SignedBlock) -> Result { let _timer = METRICS.get().map(|metrics| { metrics .lean_fork_choice_block_processing_time_seconds .start_timer() }); - let block = signed_block.block.clone(); - let attestations_count = block.body.attestations.len_u64(); - - // Get parent state for validation - let state = store - .states - .get(&block.parent_root) - .ok_or(anyhow!("no parent state"))?; - - // Debug: Log parent state checkpoints before transition - tracing::debug!( - block_slot = block.slot.0, - attestations_in_block = attestations_count, - parent_justified_slot = state.latest_justified.slot.0, - parent_finalized_slot = state.latest_finalized.slot.0, - justified_slots_len = state.justified_slots.0.len(), - "Processing block - parent state info" - ); - - // Verify block signatures against parent state before executing the state transition. - // If any signature is invalid the error propagates and the block is rejected; - // it never enters store.blocks or store.states. - signed_block.verify_signatures(state.clone())?; + signed_block.verify_signatures(parent_state.clone())?; + parent_state.state_transition(signed_block, true) +} - // Execute state transition to get post-state (signatures verified above) - let new_state = state.state_transition(signed_block.clone(), true)?; +/// Store-mutating portion of block processing: must run on the chain task. Inserts the +/// block + post-state, retries pending attestations, advances justification/finalization, +/// processes block-body attestations on-chain, and recomputes the head. +pub fn apply_verified_block( + store: &mut Store, + signed_block: SignedBlock, + new_state: State, + block_root: H256, +) -> Result<()> { + let block = signed_block.block.clone(); - // Debug: Log new state checkpoints after transition tracing::debug!( block_slot = block.slot.0, new_justified_slot = new_state.latest_justified.slot.0, @@ -556,6 +551,17 @@ fn process_block_internal( store.blocks.insert(block_root, block.clone()); store.states.insert(block_root, new_state.clone()); + METRICS.get().map(|m| { + m.grandine_store_blocks_size.set(store.blocks.len() as i64); + m.grandine_store_states_size.set(store.states.len() as i64); + m.grandine_store_gossip_signatures_size + .set(store.gossip_signatures.len() as i64); + m.grandine_store_known_aggregated_payloads_size + .set(store.latest_known_aggregated_payloads.len() as i64); + m.grandine_store_new_aggregated_payloads_size + .set(store.latest_new_aggregated_payloads.len() as i64); + }); + // Retry attestations that arrived before this block was known. // Drain the queue for this root and re-process each attestation. // Attestations that still reference other unknown blocks are re-queued automatically. @@ -647,7 +653,7 @@ fn process_block_internal( .retain(|_, data| data.target.slot.0 > finalized_slot); METRICS.get().map(|m| { m.grandine_attestation_data_by_root - .set(store.attestation_data_by_root.len() as i64) + .set(store.attestation_data_by_root.len() as i64); }); } @@ -723,9 +729,124 @@ fn process_block_internal( update_head(store); + prune_with_retention_bounds(store); + Ok(()) } +/// Defensive retention bounds. Runs unconditionally on every `apply_verified_block` +/// so map growth stays bounded even when `latest_finalized` does not advance — +/// the spec-mandated `prune_stale_attestation_data` is a necessary but not a +/// sufficient bound, since it never fires while finalization is stalled. +fn prune_with_retention_bounds(store: &mut Store) { + let head_slot = store + .blocks + .get(&store.head) + .map(|b| b.slot.0) + .unwrap_or(0); + let keep_min_slot = head_slot.saturating_sub(HEAD_RETENTION_SLOTS); + + let mut protected: HashSet = HashSet::with_capacity(4); + protected.insert(store.latest_finalized.root); + protected.insert(store.latest_justified.root); + protected.insert(store.head); + protected.insert(store.safe_target); + + if store.blocks.len() > BLOCKS_TO_KEEP { + let mut by_slot: Vec<(H256, u64)> = store + .blocks + .iter() + .map(|(root, block)| (*root, block.slot.0)) + .collect(); + by_slot.sort_by_key(|(_, slot)| std::cmp::Reverse(*slot)); + let evict: HashSet = by_slot + .into_iter() + .skip(BLOCKS_TO_KEEP) + .filter(|(root, _)| !protected.contains(root)) + .map(|(root, _)| root) + .collect(); + store.blocks.retain(|root, _| !evict.contains(root)); + } + + if store.states.len() > STATES_TO_KEEP { + let mut by_slot: Vec<(H256, u64)> = store + .states + .iter() + .map(|(root, state)| (*root, state.slot.0)) + .collect(); + by_slot.sort_by_key(|(_, slot)| std::cmp::Reverse(*slot)); + let evict: HashSet = by_slot + .into_iter() + .skip(STATES_TO_KEEP) + .filter(|(root, _)| !protected.contains(root)) + .map(|(root, _)| root) + .collect(); + store.states.retain(|root, _| !evict.contains(root)); + } + + // Three retain calls below read attestation_data_by_root as a secondary index; + // attestation_data_by_root must be pruned last so the lookups can resolve. + let adr = &store.attestation_data_by_root; + store.gossip_signatures.retain(|key, _| { + adr.get(&key.data_root) + .is_none_or(|data| data.target.slot.0 >= keep_min_slot) + }); + store.latest_known_aggregated_payloads.retain(|data_root, _| { + adr.get(data_root) + .is_none_or(|data| data.target.slot.0 >= keep_min_slot) + }); + store.latest_new_aggregated_payloads.retain(|data_root, _| { + adr.get(data_root) + .is_none_or(|data| data.target.slot.0 >= keep_min_slot) + }); + store + .attestation_data_by_root + .retain(|_, data| data.target.slot.0 >= keep_min_slot); + + METRICS.get().map(|m| { + m.grandine_store_blocks_size.set(store.blocks.len() as i64); + m.grandine_store_states_size.set(store.states.len() as i64); + m.grandine_store_gossip_signatures_size + .set(store.gossip_signatures.len() as i64); + m.grandine_store_known_aggregated_payloads_size + .set(store.latest_known_aggregated_payloads.len() as i64); + m.grandine_store_new_aggregated_payloads_size + .set(store.latest_new_aggregated_payloads.len() as i64); + m.grandine_attestation_data_by_root + .set(store.attestation_data_by_root.len() as i64); + }); +} + +/// Synchronous wrapper retained for the cascade in `process_pending_blocks` and for tests. +/// The production path on the chain task drives `verify_and_transition` on the +/// `DedicatedExecutor` and `apply_verified_block` on the chain task directly. +fn process_block_internal( + store: &mut Store, + signed_block: SignedBlock, + block_root: H256, +) -> Result<()> { + let block = signed_block.block.clone(); + let attestations_count = block.body.attestations.len_u64(); + + let parent_state = store + .states + .get(&block.parent_root) + .ok_or(anyhow!("no parent state"))? + .clone(); + + tracing::debug!( + block_slot = block.slot.0, + attestations_in_block = attestations_count, + parent_justified_slot = parent_state.latest_justified.slot.0, + parent_finalized_slot = parent_state.latest_finalized.slot.0, + justified_slots_len = parent_state.justified_slots.0.len(), + "Processing block - parent state info" + ); + + let new_state = verify_and_transition(parent_state, signed_block.clone())?; + apply_verified_block(store, signed_block, new_state, block_root) +} + pub fn process_pending_blocks(store: &mut Store, cache: &mut BlockCache, mut roots: Vec) { while let Some(parent_root) = roots.pop() { let children: Vec<(H256, SignedBlock)> = cache diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index be2d928d..b1d2e837 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -2,12 +2,12 @@ use std::collections::{HashMap, HashSet}; use anyhow::{Result, anyhow, ensure}; use containers::{ - AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config, + AggregatedSignatureProof, AttestationData, Block, BlockHeader, Checkpoint, Config, SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlock, Slot, State, }; use metrics::{METRICS, set_gauge_u64}; use ssz::{H256, SszHash}; -use tracing::warn; +use tracing::{info, warn}; use xmss::Signature; pub type Interval = u64; @@ -110,6 +110,19 @@ const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; /// for an in-flight state transition. pub const STATE_PRUNE_BUFFER: u64 = 128; +/// Hard upper bound on `store.blocks` entries when finalization stalls. +/// At ~4s slots this is roughly 24h of chain history. +pub const BLOCKS_TO_KEEP: usize = 21_600; + +/// Hard upper bound on `store.states` entries when finalization stalls. +/// At ~4s slots this is roughly 3.3h of state history. +pub const STATES_TO_KEEP: usize = 3_000; + +/// Slot-distance retention for attestation-related maps. Entries whose +/// target.slot falls below `head_slot - HEAD_RETENTION_SLOTS` are evicted +/// regardless of finalization state. +pub const HEAD_RETENTION_SLOTS: u64 = 128; + impl Store { pub fn produce_attestation_data(&self, slot: Slot) -> Result { let head_checkpoint = Checkpoint { @@ -559,10 +572,12 @@ pub struct BlockProductionInputs { pub validator_index: u64, pub head_root: H256, pub head_state: State, - pub available_attestations: Vec, pub known_block_roots: HashSet, - pub gossip_signatures: HashMap, - pub aggregated_payloads: HashMap>, + /// Joined view of `latest_known_aggregated_payloads` keyed by `data_root`, + /// with the `AttestationData` carried in the value. Entries whose + /// `attestation_data_by_root` lookup misses are dropped from this map and + /// counted in `lean_build_block_pool_missing_att_data`. + pub aggregated_payloads: HashMap)>, pub log_inv_rate: usize, pub store_latest_justified: Checkpoint, } @@ -590,55 +605,39 @@ pub fn prepare_block_production( expected_proposer ); - // Step 1: individual attestations already tracked per-validator. - let mut available_attestations: Vec = store - .latest_known_attestations - .iter() - .map(|(validator_idx, attestation_data)| Attestation { - validator_id: *validator_idx, - data: attestation_data.clone(), - }) - .collect(); - - // Step 2: synthesize entries for validators that arrived *only* via - // on_aggregated_attestation. Those validators have proofs in - // latest_known_aggregated_payloads but were never inserted into - // latest_known_attestations, so build_block's fixed-point loop would - // otherwise silently skip them. - { - let known_validators: HashSet = - store.latest_known_attestations.keys().copied().collect(); - let mut seen_synthesized: HashSet<(u64, H256)> = HashSet::new(); - for (data_root, proofs) in &store.latest_known_aggregated_payloads { - if let Some(att_data) = store.attestation_data_by_root.get(data_root) { - for proof in proofs { - for vid in proof.participants.to_validator_indices() { - if !known_validators.contains(&vid) - && seen_synthesized.insert((vid, *data_root)) - { - available_attestations.push(Attestation { - validator_id: vid, - data: att_data.clone(), - }); - } - } - } - } + // Join `latest_known_aggregated_payloads` (proofs only, keyed by data_root) + // with `attestation_data_by_root` (the secondary index storing the + // AttestationData itself) into the spec-shaped pool unit consumed by + // build_block. Entries whose secondary-index lookup misses are dropped and + // counted; this keeps the proposer aligned with leanSpec/zeam/ethlambda + // which carry att_data inside the pool value. + let mut aggregated_payloads: HashMap< + H256, + (AttestationData, Vec), + > = HashMap::with_capacity(store.latest_known_aggregated_payloads.len()); + let mut missing_att_data: u64 = 0; + for (data_root, proofs) in &store.latest_known_aggregated_payloads { + if let Some(att_data) = store.attestation_data_by_root.get(data_root) { + aggregated_payloads.insert(*data_root, (att_data.clone(), proofs.clone())); + } else { + missing_att_data += 1; + } + } + if missing_att_data > 0 { + if let Some(m) = METRICS.get() { + m.lean_build_block_pool_missing_att_data + .inc_by(missing_att_data); } } let known_block_roots: HashSet = store.blocks.keys().copied().collect(); - let gossip_signatures = store.gossip_signatures.clone(); - let aggregated_payloads = store.latest_known_aggregated_payloads.clone(); Ok(BlockProductionInputs { slot, validator_index, head_root, head_state, - available_attestations, known_block_roots, - gossip_signatures, aggregated_payloads, log_inv_rate, store_latest_justified: store.latest_justified.clone(), @@ -653,27 +652,47 @@ pub fn execute_block_production( validator_index, head_root, head_state, - available_attestations, known_block_roots, - gossip_signatures, aggregated_payloads, log_inv_rate, store_latest_justified, } = inputs; + let pool_known_payloads = aggregated_payloads.len(); + let pool_known_payloads_proofs: usize = aggregated_payloads + .values() + .map(|(_, proofs)| proofs.len()) + .sum(); + let pool_known_block_roots = known_block_roots.len(); + + info!( + slot = slot.0, + proposer = validator_index, + head_root = %head_root, + pool_known_payloads, + pool_known_payloads_proofs, + pool_known_block_roots, + "proposer pool snapshot" + ); + let (final_block, final_post_state, _aggregated_attestations, signatures) = head_state .build_block( slot, validator_index, head_root, - None, - Some(available_attestations), - Some(&known_block_roots), - Some(&gossip_signatures), - Some(&aggregated_payloads), + &known_block_roots, + &aggregated_payloads, log_inv_rate, )?; + info!( + slot = slot.0, + proposer = validator_index, + block_attestations = final_block.body.attestations.len_usize(), + block_signatures = signatures.len(), + "proposer block built" + ); + ensure!( final_post_state.latest_justified.slot >= store_latest_justified.slot, "Produced block justified={} < store justified={}. Fixed-point attestation loop did not converge.", diff --git a/lean_client/fork_choice/tests/unit_tests/validator.rs b/lean_client/fork_choice/tests/unit_tests/validator.rs index 791d4a01..dbee9fa0 100644 --- a/lean_client/fork_choice/tests/unit_tests/validator.rs +++ b/lean_client/fork_choice/tests/unit_tests/validator.rs @@ -6,8 +6,8 @@ use std::collections::HashMap; use crate::unit_tests::common::create_test_store; use containers::{ - Attestation, AttestationData, Block, BlockBody, Checkpoint, Config, SignatureKey, SignedBlock, - Slot, State, Validator, + AggregatedSignatureProof, AggregationBits, Attestation, AttestationData, Block, BlockBody, + Checkpoint, Config, SignedBlock, Slot, State, Validator, }; use fork_choice::store::{Store, get_forkchoice_store, produce_block_with_signatures, update_head}; use rand::SeedableRng; @@ -15,6 +15,65 @@ use rand_chacha::ChaChaRng; use ssz::{H256, SszHash}; use xmss::SecretKey; +/// Build an `AggregatedSignatureProof` for the given validator set on the +/// given AttestationData, then publish it into the proposer's input pool +/// (`store.latest_known_aggregated_payloads` + `store.attestation_data_by_root`). +/// Mirrors what an aggregator's `maybe_aggregate` would have done at runtime. +fn publish_aggregated_payload( + store: &mut Store, + data: &AttestationData, + validator_ids: &[u64], + keys: &HashMap, +) { + let data_root = data.hash_tree_root(); + let head_state = store + .states + .get(&store.head) + .expect("head state must exist"); + + let pubkeys: Vec<_> = validator_ids + .iter() + .map(|&vid| { + head_state + .validators + .get(vid) + .expect("validator index out of range") + .attestation_pubkey + .clone() + }) + .collect(); + + let signatures: Vec<_> = validator_ids + .iter() + .map(|&vid| { + keys.get(&vid) + .expect("missing secret key") + .sign(data_root, data.slot.0 as u32) + .expect("XMSS signing failed") + }) + .collect(); + + let participants = AggregationBits::from_validator_indices(validator_ids); + let proof = AggregatedSignatureProof::aggregate( + participants, + pubkeys, + signatures, + data_root, + data.slot.0 as u32, + 1, + ) + .expect("AggregatedSignatureProof::aggregate failed"); + + store + .attestation_data_by_root + .insert(data_root, data.clone()); + store + .latest_known_aggregated_payloads + .entry(data_root) + .or_default() + .push(proof); +} + fn create_test_store_with_signers() -> (Store, HashMap) { let config = Config { genesis_time: 1000 }; @@ -104,29 +163,16 @@ fn test_produce_block_with_attestations() { }; let target = store.get_attestation_target(); - // Add attestations for validators 5 and 6 - for vid in [5u64, 6] { - let data = AttestationData { - slot: head_block.slot, - head: head_checkpoint.clone(), - target: target.clone(), - source: store.latest_justified.clone(), - }; - store.latest_known_attestations.insert(vid, data.clone()); - - let data_root = data.hash_tree_root(); - let sig_key = SignatureKey { - validator_id: vid, - data_root: data_root.clone(), - }; - store.gossip_signatures.insert( - sig_key, - keys.get(&vid) - .unwrap() - .sign(data_root, head_block.slot.0 as u32) - .unwrap(), - ); - } + // Publish a single aggregated payload covering validators 5 and 6 — this + // mirrors what an aggregator's `maybe_aggregate` would publish via gossip + // and what `on_aggregated_attestation` would land in the proposer's pool. + let data = AttestationData { + slot: head_block.slot, + head: head_checkpoint, + target, + source: store.latest_justified.clone(), + }; + publish_aggregated_payload(&mut store, &data, &[5u64, 6], &keys); let slot = Slot(2); let validator_idx = 2; @@ -234,7 +280,8 @@ fn test_produce_block_empty_attestations() { fn test_produce_block_state_consistency() { let (mut store, keys) = create_test_store_with_signers(); - // Add an attestation for validator 7 + // Publish an aggregated payload for validator 7. Same shape as a real + // aggregator-published payload feeding the proposer's pool. let head_block = store.blocks[&store.head].clone(); let head_checkpoint = Checkpoint { root: store.head, @@ -247,18 +294,7 @@ fn test_produce_block_state_consistency() { target, source: store.latest_justified.clone(), }; - store.latest_known_attestations.insert(7, data.clone()); - let sig_key = SignatureKey { - validator_id: 7, - data_root: data.hash_tree_root(), - }; - store.gossip_signatures.insert( - sig_key, - keys.get(&7) - .unwrap() - .sign(data.hash_tree_root(), head_block.slot.0 as u32) - .unwrap(), - ); + publish_aggregated_payload(&mut store, &data, &[7u64], &keys); let slot = Slot(4); let validator_idx = 4; diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index 78362c9b..791480b7 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -166,6 +166,60 @@ pub struct Metrics { /// XMSS verifications skipped because the signature was already cached pub grandine_xmss_verify_skipped_total: IntCounter, + pub grandine_chain_message_channel_depth: IntGauge, + pub grandine_validator_chain_message_channel_depth: IntGauge, + pub grandine_verify_result_channel_depth: IntGauge, + pub grandine_cpu_normal_executor_tasks_in_flight: IntGauge, + + /// Wall-clock time of the aggregation snapshot deep-clone. After V2, the + /// clone runs on the aggregation worker thread (not the chain task), so this + /// no longer reflects chain-task wall-clock; it reflects how expensive a + /// single Store deep-clone is at the moment the aggregation worker picks up + /// a trigger. + pub lean_aggregation_snapshot_clone_seconds: Histogram, + + /// Total number of times the chain task triggered an aggregation snapshot. + /// Compare against `lean_aggregation_snapshot_clone_seconds_count` to derive + /// dropped-trigger count (watch channel overwrites unconsumed values). + pub lean_aggregation_snapshots_triggered_total: IntCounter, + + /// Number of payload entries dropped at proposal time because the + /// `attestation_data_by_root` secondary index has no AttestationData for + /// the data_root present in `latest_known_aggregated_payloads`. Drift + /// between the two maps would silently shrink the proposer's pool. + pub lean_build_block_pool_missing_att_data: IntCounter, + + /// Snapshot size at clone time, measured in total entries across the largest + /// Store maps (blocks + states + gossip_signatures + known_aggregated_payloads + /// + new_aggregated_payloads + attestation_data_by_root). Used to correlate + /// snapshot wall-clock and aggregator memory growth with chain length. + pub lean_aggregation_snapshot_size_entries: Histogram, + + /// Number of aggregation snapshots currently held in memory by the worker + /// thread (inc when `spawn_blocking` task starts, dec when it returns). + /// Should oscillate 0..=1 with the watch-channel design; sustained values + /// of 1 mean the worker is continuously busy (XMSS slower than slot rate). + pub lean_aggregation_in_flight_snapshots: IntGauge, + + /// Time the chain task spends processing one `ChainMessage` (block, + /// attestation, aggregated attestation, etc.). Captures total work done + /// inside the `chain_message_receiver.recv() => { … }` select arm body + /// regardless of message kind. Bimodal distribution expected: spawn-only + /// path for blocks vs full write-locked attestation processing. + pub lean_chain_task_chain_message_seconds: Histogram, + + /// Time the chain task spends inside the `verify_result_rx.recv() => { … }` + /// arm body — i.e. Phase 3 apply work (apply_verified_block + post-apply + /// bookkeeping + cascade respawn). Used to compare apply cost against the + /// snapshot/message-processing costs. + pub lean_chain_task_apply_seconds: Histogram, + pub grandine_store_blocks_size: IntGauge, + pub grandine_store_states_size: IntGauge, + pub grandine_store_gossip_signatures_size: IntGauge, + pub grandine_store_known_aggregated_payloads_size: IntGauge, + pub grandine_store_new_aggregated_payloads_size: IntGauge, + pub grandine_pending_blocks_by_root_size: IntGauge, + pub lean_block_building_time_seconds: Histogram, pub lean_block_building_payload_aggregation_time_seconds: Histogram, pub lean_block_aggregated_payloads: Histogram, @@ -431,6 +485,78 @@ impl Metrics { "grandine_xmss_verify_skipped_total", "XMSS verifications skipped (signature already cached) — root cause 4 indicator", )?, + grandine_chain_message_channel_depth: IntGauge::new( + "grandine_chain_message_channel_depth", + "Pending ChainMessage queue depth", + )?, + grandine_validator_chain_message_channel_depth: IntGauge::new( + "grandine_validator_chain_message_channel_depth", + "Pending ValidatorChainMessage queue depth", + )?, + grandine_verify_result_channel_depth: IntGauge::new( + "grandine_verify_result_channel_depth", + "Pending verify-result queue depth (verified blocks awaiting apply on chain task)", + )?, + grandine_cpu_normal_executor_tasks_in_flight: IntGauge::new( + "grandine_cpu_normal_executor_tasks_in_flight", + "Active tasks on cpu_normal DedicatedExecutor (XMSS block verify + attestation signing combined)", + )?, + lean_aggregation_snapshot_clone_seconds: Histogram::with_opts(histogram_opts!( + "lean_aggregation_snapshot_clone_seconds", + "Wall-clock time of the aggregation snapshot deep-clone (runs on aggregation worker thread)", + vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0], + ))?, + lean_aggregation_snapshots_triggered_total: IntCounter::new( + "lean_aggregation_snapshots_triggered_total", + "Total aggregation snapshot triggers issued by the chain task (aggregator nodes only)", + )?, + lean_build_block_pool_missing_att_data: IntCounter::new( + "lean_build_block_pool_missing_att_data", + "Total payload entries dropped at proposal time because attestation_data_by_root has no entry for the data_root", + )?, + lean_aggregation_snapshot_size_entries: Histogram::with_opts(histogram_opts!( + "lean_aggregation_snapshot_size_entries", + "Total entries across all major Store maps at snapshot clone time", + vec![100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0, 100000.0, 500000.0], + ))?, + lean_aggregation_in_flight_snapshots: IntGauge::new( + "lean_aggregation_in_flight_snapshots", + "Snapshots currently held by the aggregation spawn_blocking worker (0 or 1 expected)", + )?, + lean_chain_task_chain_message_seconds: Histogram::with_opts(histogram_opts!( + "lean_chain_task_chain_message_seconds", + "Wall-clock time the chain task spends inside one ChainMessage select-arm body", + vec![0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0], + ))?, + lean_chain_task_apply_seconds: Histogram::with_opts(histogram_opts!( + "lean_chain_task_apply_seconds", + "Wall-clock time the chain task spends inside one verify_result (Phase 3 apply) select-arm body", + vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0], + ))?, + grandine_store_blocks_size: IntGauge::new( + "grandine_store_blocks_size", + "Entries in store.blocks", + )?, + grandine_store_states_size: IntGauge::new( + "grandine_store_states_size", + "Entries in store.states", + )?, + grandine_store_gossip_signatures_size: IntGauge::new( + "grandine_store_gossip_signatures_size", + "Entries in store.gossip_signatures", + )?, + grandine_store_known_aggregated_payloads_size: IntGauge::new( + "grandine_store_known_aggregated_payloads_size", + "Entries in latest_known_aggregated_payloads", + )?, + grandine_store_new_aggregated_payloads_size: IntGauge::new( + "grandine_store_new_aggregated_payloads_size", + "Entries in latest_new_aggregated_payloads", + )?, + grandine_pending_blocks_by_root_size: IntGauge::new( + "grandine_pending_blocks_by_root_size", + "In-flight BlocksByRoot requests", + )?, // Block Production Metrics lean_block_building_time_seconds: Histogram::with_opts(histogram_opts!( @@ -616,6 +742,43 @@ impl Metrics { ))?; default_registry.register(Box::new(self.grandine_fork_choice_new_attestations.clone()))?; default_registry.register(Box::new(self.grandine_xmss_verify_skipped_total.clone()))?; + default_registry + .register(Box::new(self.grandine_chain_message_channel_depth.clone()))?; + default_registry.register(Box::new( + self.grandine_validator_chain_message_channel_depth.clone(), + ))?; + default_registry + .register(Box::new(self.grandine_verify_result_channel_depth.clone()))?; + default_registry.register(Box::new( + self.grandine_cpu_normal_executor_tasks_in_flight.clone(), + ))?; + default_registry + .register(Box::new(self.lean_aggregation_snapshot_clone_seconds.clone()))?; + default_registry.register(Box::new( + self.lean_aggregation_snapshots_triggered_total.clone(), + ))?; + default_registry.register(Box::new( + self.lean_build_block_pool_missing_att_data.clone(), + ))?; + default_registry + .register(Box::new(self.lean_aggregation_snapshot_size_entries.clone()))?; + default_registry + .register(Box::new(self.lean_aggregation_in_flight_snapshots.clone()))?; + default_registry + .register(Box::new(self.lean_chain_task_chain_message_seconds.clone()))?; + default_registry.register(Box::new(self.lean_chain_task_apply_seconds.clone()))?; + default_registry.register(Box::new(self.grandine_store_blocks_size.clone()))?; + default_registry.register(Box::new(self.grandine_store_states_size.clone()))?; + default_registry + .register(Box::new(self.grandine_store_gossip_signatures_size.clone()))?; + default_registry.register(Box::new( + self.grandine_store_known_aggregated_payloads_size.clone(), + ))?; + default_registry.register(Box::new( + self.grandine_store_new_aggregated_payloads_size.clone(), + ))?; + default_registry + .register(Box::new(self.grandine_pending_blocks_by_root_size.clone()))?; // Block Production Metrics default_registry.register(Box::new(self.lean_block_building_time_seconds.clone()))?; diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index da247789..a0240376 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -49,12 +49,14 @@ use crate::{ }, }; -const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 3; +const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 10; const MAX_BLOCK_FETCH_DEPTH: u32 = 65536; const MAX_BLOCKS_PER_REQUEST: usize = 10; /// Stalled request timeout. If a peer accepts the stream but never sends a response, /// the request is cancelled and retried with a different peer after this duration. -const BLOCKS_BY_ROOT_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); +/// Set comfortably above libp2p's default protocol timeout (10s) so the app-layer +/// gives the underlying stream room to complete under host CPU contention. +const BLOCKS_BY_ROOT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); struct PendingBlocksRequest { roots: Vec, @@ -774,6 +776,10 @@ where } => { let pending = self.pending_blocks_by_root.remove(&request_id); let request_depth = pending.as_ref().map(|p| p.depth).unwrap_or(0); + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); // Release in-flight tracking so these roots can be re-requested if needed. // Retry paths re-add them via send_blocks_by_root_request_internal. @@ -957,6 +963,10 @@ where } => { warn!(peer = %peer, ?error, "BlocksByRoot outbound request failed"); if let Some(req) = self.pending_blocks_by_root.remove(&request_id) { + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); // Release in-flight tracking before retry; retry re-adds via send_internal. for root in &req.roots { self.in_flight_roots.remove(root); @@ -1025,6 +1035,10 @@ where for request_id in timed_out { if let Some(req) = self.pending_blocks_by_root.remove(&request_id) { + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); warn!( num_roots = req.roots.len(), depth = req.depth, @@ -1417,6 +1431,10 @@ where created_at: tokio::time::Instant::now(), }, ); + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); } fn build_behaviour( diff --git a/lean_client/networking/src/req_resp.rs b/lean_client/networking/src/req_resp.rs index 7fe9c0b3..933347d4 100644 --- a/lean_client/networking/src/req_resp.rs +++ b/lean_client/networking/src/req_resp.rs @@ -1,5 +1,6 @@ use std::io; use std::io::{Read, Write}; +use std::time::Duration; use async_trait::async_trait; use containers::{SignedBlock, Status}; @@ -294,6 +295,17 @@ impl LeanCodec { // 3-byte LE length field (bytes 1..=3 of header) let chunk_len = u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], 0]) as usize; + if pos + 4 + chunk_len > data.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "snappy chunk_len {} at pos {} exceeds buffer len {}", + chunk_len, + pos, + data.len() + ), + )); + } pos += 4 + chunk_len; } @@ -560,7 +572,12 @@ pub fn build(protocols: impl IntoIterator) -> ReqResp { .map(|name| (LeanProtocol(name), ProtocolSupport::Full)) .collect::>(); - RequestResponse::with_codec(LeanCodec::default(), protocols, Config::default()) + // libp2p Config::default() sets request_timeout to 10s. Under host CPU + // contention, lean's tokio worker can be late polling the stream and the + // protocol layer kills the request before our app-level retry logic gets + // a chance to react. Raise it well above the app-layer 30s timeout. + let config = Config::default().with_request_timeout(Duration::from_secs(60)); + RequestResponse::with_codec(LeanCodec::default(), protocols, config) } /// Build a RequestResponse behavior for Status protocol only diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index 9c13ac8a..9881ed51 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -250,10 +250,18 @@ pub trait ChainMessageSink: Send + Sync + Clone { } #[async_trait] -impl ChainMessageSink for mpsc::UnboundedSender { +impl ChainMessageSink for mpsc::Sender { async fn send(&self, message: M) -> Result<()> { - self.send(message) - .map_err(|err| anyhow!("failed to send message to chain: {err}")) + let result = self + .send(message) + .await + .map_err(|err| anyhow!("failed to send message to chain: {err}")); + if result.is_ok() { + METRICS.get().map(|m| { + m.grandine_chain_message_channel_depth.inc(); + }); + } + result } } diff --git a/lean_client/src/aggregation.rs b/lean_client/src/aggregation.rs index d4e527b5..7a286a50 100644 --- a/lean_client/src/aggregation.rs +++ b/lean_client/src/aggregation.rs @@ -1,8 +1,11 @@ use std::collections::HashSet; use std::sync::Arc; +use std::time::Instant; use containers::{SignedAggregatedAttestation, Slot}; use fork_choice::store::Store; +use metrics::METRICS; +use parking_lot::RwLock; use ssz::H256; use tokio::sync::{mpsc, watch}; use tokio::task; @@ -10,11 +13,18 @@ use validator::ValidatorService; /// Aggregation service that decouples XMSS aggregation from the chain task. /// -/// Owns a `watch` channel for receiving store snapshots (always latest value) -/// and an `mpsc` channel for returning results. The caller drives the service -/// through [`trigger`] and [`poll`] instead of managing raw channel ends. +/// Owns: +/// - a `watch` channel carrying just the slot for which to aggregate (always +/// latest value); +/// - a shared `Arc>` that the worker reads from briefly to clone +/// a snapshot on its own thread (V2 — keeps the chain task off the deep-clone); +/// - an `mpsc` channel for returning results. +/// +/// The caller drives the service through [`trigger`] and [`poll`] instead of +/// managing raw channel ends. pub struct AggregationService { - agg_tx: watch::Sender>, + vs: Arc, + agg_tx: watch::Sender>, res_rx: mpsc::Receiver<( u64, Option<(Vec, HashSet)>, @@ -23,43 +33,94 @@ pub struct AggregationService { impl AggregationService { /// Creates the service and spawns the background aggregation task. - pub fn new(vs: Arc, log_rate: usize) -> Self { - let (agg_tx, mut agg_rx) = watch::channel::>(None); + /// + /// The worker holds an `Arc>` reference so it can read + + /// clone the snapshot on its own thread instead of forcing the chain task + /// to do the deep-clone. + pub fn new( + vs: Arc, + store: Arc>, + log_rate: usize, + ) -> Self { + let (agg_tx, mut agg_rx) = watch::channel::>(None); let (res_tx, res_rx) = mpsc::channel::<( u64, Option<(Vec, HashSet)>, )>(4); + let vs_for_worker = vs.clone(); + task::spawn(async move { loop { if agg_rx.changed().await.is_err() { break; // sender dropped — chain task shut down } - let Some((slot, snapshot)) = agg_rx.borrow_and_update().clone() else { + let Some(slot) = *agg_rx.borrow_and_update() else { continue; }; - let vs = vs.clone(); + let vs = vs_for_worker.clone(); + let store = store.clone(); + METRICS + .get() + .map(|m| m.lean_aggregation_in_flight_snapshots.inc()); let result = task::spawn_blocking(move || { + // Clone on this worker thread: brief read lock held only for + // the duration of the clone, then released before XMSS work. + let clone_start = Instant::now(); + let snapshot = { + let guard = store.read(); + guard.clone() + }; + let clone_elapsed = clone_start.elapsed(); + + METRICS.get().map(|m| { + m.lean_aggregation_snapshot_clone_seconds + .observe(clone_elapsed.as_secs_f64()); + let entries = snapshot.blocks.len() + + snapshot.states.len() + + snapshot.gossip_signatures.len() + + snapshot.latest_known_aggregated_payloads.len() + + snapshot.latest_new_aggregated_payloads.len() + + snapshot.attestation_data_by_root.len(); + m.lean_aggregation_snapshot_size_entries + .observe(entries as f64); + }); + vs.maybe_aggregate(&snapshot, Slot(slot), log_rate) }) .await .unwrap_or(None); + METRICS + .get() + .map(|m| m.lean_aggregation_in_flight_snapshots.dec()); if res_tx.send((slot, result)).await.is_err() { break; // chain task dropped — shut down } } }); - Self { agg_tx, res_rx } + Self { + vs, + agg_tx, + res_rx, + } + } + + /// Returns true if this node should aggregate for the given slot. Used by + /// the chain task to gate the trigger so non-aggregator nodes never enqueue + /// snapshot work. + pub fn is_aggregator_for_slot(&self, slot: Slot) -> bool { + self.vs.is_aggregator_for_slot(slot) } - /// Triggers aggregation for the given slot with the provided store snapshot. + /// Triggers aggregation for the given slot. The worker reads + clones the + /// store on its own thread. /// - /// Uses watch semantics: if a previous trigger has not been consumed yet, - /// it is overwritten with the latest value so XMSS always works on the - /// most recent state. - pub fn trigger(&self, slot: u64, snapshot: Store) { - let _ = self.agg_tx.send(Some((slot, snapshot))); + /// Watch semantics: if a previous trigger has not been consumed yet, it is + /// overwritten with the latest slot so XMSS always works on the most recent + /// trigger. + pub fn trigger(&self, slot: u64) { + let _ = self.agg_tx.send(Some(slot)); } /// Returns the next completed aggregation result, or `None` if none is ready. diff --git a/lean_client/src/banner.rs b/lean_client/src/banner.rs new file mode 100644 index 00000000..dfdf04a3 --- /dev/null +++ b/lean_client/src/banner.rs @@ -0,0 +1,32 @@ +use std::io::IsTerminal; + +const TEAL: &str = "\x1b[38;2;77;182;172m"; +const DIM: &str = "\x1b[90m"; +const RESET: &str = "\x1b[0m"; + +const WORDMARK: &str = "\ + ██╗ ███████╗ █████╗ ███╗ ██╗ + ██║ ██╔════╝ ██╔══██╗ ████╗ ██║ + ██║ █████╗ ███████║ ██╔██╗ ██║ + ██║ ██╔══╝ ██╔══██║ ██║╚██╗██║ + ███████╗███████╗ ██║ ██║ ██║ ╚████║ + ╚══════╝╚══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═══╝"; + +pub fn banner() -> String { + let (teal, dim, reset) = if std::io::stdout().is_terminal() { + (TEAL, DIM, RESET) + } else { + ("", "", "") + }; + + format!( + "\n{teal}{wordmark}{reset}\n\n {dim}A lean consensus client{reset}\n {dim}v{version} | {os} | {arch}{reset}\n", + teal = teal, + wordmark = WORDMARK, + reset = reset, + dim = dim, + version = env!("CARGO_PKG_VERSION"), + os = std::env::consts::OS, + arch = std::env::consts::ARCH, + ) +} diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index da2e899c..567b2f9b 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -4,12 +4,14 @@ use containers::{ Block, BlockBody, BlockHeader, BlockSignatures, Checkpoint, Config, SignedBlock, Slot, State, Status, Validator, }; +use dedicated_executor::DedicatedExecutor; use ethereum_types::H256; use features::Feature; use fork_choice::{ block_cache::BlockCache, handlers::{ - on_aggregated_attestation, on_attestation, on_block, on_gossip_attestation, on_tick, + apply_verified_block, on_aggregated_attestation, on_attestation, on_gossip_attestation, + on_tick, verify_and_transition, }, store::{ INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, execute_block_production, @@ -45,6 +47,7 @@ use validator::{ValidatorConfig, ValidatorService}; use xmss::{PublicKey, Signature}; mod aggregation; +mod banner; fn load_node_key(path: &str) -> Result> { let hex_str = std::fs::read_to_string(path)?.trim().to_string(); @@ -401,6 +404,14 @@ async fn main() -> Result<()> { ) .init(); + info!("{}", banner::banner()); + info!("════════════════════════════════════════════════════════"); + info!( + "🚀 lean_client v{} started", + env!("CARGO_PKG_VERSION") + ); + info!("════════════════════════════════════════════════════════"); + let args = Args::parse(); for feature in args.features { @@ -436,13 +447,39 @@ async fn main() -> Result<()> { let (outbound_p2p_sender, outbound_p2p_receiver) = mpsc::unbounded_channel::(); let (chain_message_sender, mut chain_message_receiver) = - mpsc::unbounded_channel::(); + mpsc::channel::(1024); // Separate channel for validator task → chain task request-response messages. // Keeps ValidatorChainMessage (which carries oneshot senders) separate from // ChainMessage (which is Clone and used by the network layer). let (validator_chain_sender, mut validator_chain_receiver) = mpsc::unbounded_channel::(); + let num_cpus = num_cpus::get(); + let cpu_normal_executor = Arc::new(DedicatedExecutor::new( + "lean-cpu-normal", + (num_cpus / 2).max(2), + None, + )); + let cpu_low_executor = Arc::new(DedicatedExecutor::new( + "lean-cpu-low", + (num_cpus / 4).max(1), + Some(19), + )); + + // Verified blocks travel back from the executor to the chain task on this channel. + // Tuple: (block_root, signed_block, should_gossip, verify outcome). + // `should_gossip` is threaded so Phase 3 can rebroadcast only blocks that arrived + // with should_gossip=true AND applied successfully (Bug 1 invariant). + // tx side increments `grandine_verify_result_channel_depth`; rx decrements after recv. + // + // Shutdown semantics: if the chain task exits, `verify_result_rx` is dropped and + // any in-flight `tx.send()` returns Err — the spawned forwarder rolls back the + // gauge inc and exits. The verify itself runs to completion on the executor + // (output discarded). `DedicatedExecutor::Drop` joins worker threads, so no + // verify thread leaks past process shutdown. + let (verify_result_tx, mut verify_result_rx) = + mpsc::channel::<(H256, SignedBlock, bool, Result)>(1024); + let (genesis_time, validators, genesis_log_inv_rate, genesis_attestation_committee_count) = if let Some(genesis_path) = &args.genesis { let genesis_config = containers::GenesisConfig::load_from_file(genesis_path) @@ -595,6 +632,7 @@ async fn main() -> Result<()> { config.clone(), num_validators, keys_path, + cpu_normal_executor.clone(), args.is_aggregator, ) { Ok(service) => { @@ -615,6 +653,7 @@ async fn main() -> Result<()> { Some(ValidatorService::new_with_aggregator( config, num_validators, + cpu_normal_executor.clone(), args.is_aggregator, )) } @@ -627,6 +666,7 @@ async fn main() -> Result<()> { Some(ValidatorService::new_with_aggregator( config, num_validators, + cpu_normal_executor.clone(), args.is_aggregator, )) } @@ -640,6 +680,7 @@ async fn main() -> Result<()> { Some(ValidatorService::new_with_aggregator( config, num_validators, + cpu_normal_executor.clone(), args.is_aggregator, )) } @@ -831,6 +872,7 @@ async fn main() -> Result<()> { "Chain message channel closed during anchor block wait" )); }; + METRICS.get().map(|m| m.grandine_chain_message_channel_depth.dec()); if let ChainMessage::ProcessBlock { signed_block, .. } = msg { let root = signed_block.block.hash_tree_root(); if root == expected_root { @@ -923,6 +965,15 @@ async fn main() -> Result<()> { root: s.head, slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), }; + + METRICS.get().map(|m| { + if let Ok(j) = i64::try_from(s.latest_justified.slot.0) { + m.lean_latest_justified_slot.set(j); + } + if let Ok(f) = i64::try_from(s.latest_finalized.slot.0) { + m.lean_latest_finalized_slot.set(f); + } + }); } let chain_outbound_sender = outbound_p2p_sender.clone(); @@ -971,9 +1022,13 @@ async fn main() -> Result<()> { // watch channel = lossless latest-value semantics: send() always overwrites so // the aggregation task always sees the most recent trigger even if XMSS was // still running when a newer slot arrived. No trigger is ever silently dropped. - let has_aggregator = vs_for_chain.is_some(); - let mut aggregator = - vs_for_chain.map(|vs| aggregation::AggregationService::new(vs, chain_log_inv_rate)); + let mut aggregator = vs_for_chain + .map(|vs| aggregation::AggregationService::new(vs, store.clone(), chain_log_inv_rate)); + // Whether this node has any validator service at all. Used for sync-state + // coloring; NOT a per-slot aggregator gate. The per-slot aggregator check + // lives at the snapshot trigger site below and goes through + // `AggregationService::is_aggregator_for_slot`. + let has_aggregator = aggregator.is_some(); // Channel for the chain task to signal block arrival to the validator task. let (block_slot_tx, block_slot_rx) = watch::channel::(0); @@ -1012,6 +1067,13 @@ async fn main() -> Result<()> { let has_proposal = target_interval % INTERVALS_PER_SLOT == 0; on_tick(&mut *store.write(), now_millis, has_proposal); + // Sample the cpu_normal executor's queue depth once per tick. + // Counts both XMSS block verify and attestation signing tasks. + METRICS.get().map(|m| { + m.grandine_cpu_normal_executor_tasks_in_flight + .set(cpu_normal_executor.tasks() as i64) + }); + let (current_slot, current_interval) = { let s = store.read(); (s.time / INTERVALS_PER_SLOT, s.time % INTERVALS_PER_SLOT) @@ -1059,13 +1121,25 @@ async fn main() -> Result<()> { // after a long block-processing burst) → trigger still fires for // the current slot, mirroring zeam's explicit catch-up loop. // last_agg_slot prevents double-firing within the same slot. - if has_aggregator && current_interval >= 2 && current_slot > last_agg_slot { - last_agg_slot = current_slot; - let snapshot = store.read().clone(); - if let Some(ref agg) = aggregator { - agg.trigger(current_slot, snapshot); + // Only trigger if this node is the aggregator for this slot. + // Previously gated on `has_aggregator` (any-validator-service), + // which made every validator node deep-clone the Store every + // slot — pure waste on non-aggregators. The per-slot check via + // AggregationService::is_aggregator_for_slot routes through the + // ValidatorService AtomicBool flag, so admin-API runtime toggles + // are still honored. + if let Some(ref agg) = aggregator { + if agg.is_aggregator_for_slot(Slot(current_slot)) + && current_interval >= 2 + && current_slot > last_agg_slot + { + last_agg_slot = current_slot; + METRICS.get().map(|m| { + m.lean_aggregation_snapshots_triggered_total.inc() + }); + agg.trigger(current_slot); + info!(slot = current_slot, "Aggregation phase - triggered"); } - info!(slot = current_slot, "Aggregation phase - triggered"); } match current_interval { @@ -1093,8 +1167,180 @@ async fn main() -> Result<()> { let nf = *network_finalized_slot.lock(); evaluate_sync_state(&mut sync_state, peers, head_slot, nf); } + // Phase 3 (apply) is placed ABOVE `chain_message_receiver.recv()` in the + // biased select so completed verifies always drain before we accept new + // chain messages. Without this priority, a backlog of incoming gossip + // would starve apply: chain_message would always be ready, biased polling + // would pick it every iteration, and verified blocks would never land in + // store.blocks (observed live: depth 200+ growing, store_blocks_size frozen). + result = verify_result_rx.recv() => { + let Some((block_root, signed_block, should_gossip, outcome)) = result else { break }; + METRICS + .get() + .map(|m| m.grandine_verify_result_channel_depth.dec()); + // RAII timer for Phase 3 apply wall-clock; records on Drop so + // all `continue` early-exits are captured automatically. + let _apply_timer = METRICS + .get() + .map(|m| m.lean_chain_task_apply_seconds.start_timer()); + + // Skip if another path already applied this block (e.g. cascade race). + if store.read().blocks.contains_key(&block_root) { + continue; + } + + // Skip if parent state was pruned by finalization while verify ran. + // `apply_verified_block` retains finalized + buffer states only; if + // finalization advanced past this block's parent during the verify + // window, the post-state we just received is no longer applicable. + // Drop it; if the block becomes head-relevant it will be re-fetched + // via BlocksByRoot. + let parent_root = signed_block.block.parent_root; + if !store.read().states.contains_key(&parent_root) { + warn!( + block_root = %format!("0x{:x}", block_root), + "Parent state pruned during verify, dropping result" + ); + continue; + } + + let block_slot = signed_block.block.slot; + + match outcome { + Ok(new_state) => { + let apply_result = apply_verified_block( + &mut *store.write(), + signed_block.clone(), + new_state, + block_root, + ); + + match apply_result { + Ok(()) => { + info!("Block processed successfully"); + let _ = block_slot_tx.send(block_slot.0); + + { + let s = store.read(); + let mut status = status_provider.write(); + status.finalized = s.latest_finalized.clone(); + status.head = Checkpoint { + root: s.head, + slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), + }; + } + + // Bug 1 invariant: rebroadcast only on Apply (apply_verified_block + // is reached only when the dedup guard above missed, i.e. the + // block is new to us). The original ChainMessage carried + // should_gossip; cascade respawns will set it to false. + if should_gossip { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::GossipBlock(signed_block.clone()) + ) { + warn!("Failed to gossip block: {}", e); + } else { + info!(slot = block_slot.0, "Broadcasted block"); + } + } + + let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; + let nf = *network_finalized_slot.lock(); + check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); + + // Cascade: orphans waiting on this block can now verify. + // Spawn each cached child against the just-applied state and + // route the result back through `verify_result_tx`. The reply + // re-enters this same arm, so grandchildren are picked up + // automatically without going through `chain_message_sender` + // (avoids burst pressure on the bounded chain channel). + let cascade_children: Vec<(H256, SignedBlock)> = block_cache + .get_children(&block_root) + .into_iter() + .map(|p| (p.root, p.block.clone())) + .collect(); + + if !cascade_children.is_empty() { + if let Some(cascade_parent_state) = + store.read().states.get(&block_root).cloned() + { + for (child_root, child_block) in cascade_children { + let exec = cpu_normal_executor.clone(); + let result_tx = verify_result_tx.clone(); + let parent_state_for_child = + cascade_parent_state.clone(); + let child_for_verify = child_block.clone(); + METRICS.get().map(|m| { + m.grandine_verify_result_channel_depth.inc() + }); + tokio::spawn(async move { + let child_for_send = child_for_verify.clone(); + let job = exec.spawn(async move { + verify_and_transition( + parent_state_for_child, + child_for_verify, + ) + }); + let outcome = job.await.unwrap_or_else(|e| { + Err(anyhow::anyhow!("executor: {e}")) + }); + if result_tx + .send(( + child_root, + child_for_send, + false, // cascade: never re-gossip + outcome, + )) + .await + .is_err() + { + METRICS.get().map(|m| { + m.grandine_verify_result_channel_depth + .dec() + }); + } + }); + block_cache.remove(&child_root); + } + } + } + + METRICS + .get() + .map(|m| m.grandine_block_cache_size.set(block_cache.len() as i64)); + + // Drain block roots queued by retried attestations inside + // apply_verified_block. + let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); + if !missing.is_empty() { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(missing) + ) { + warn!("Failed to request blocks missing from retried attestations: {}", e); + } + } + } + Err(e) => warn!( + block_root = %format!("0x{:x}", block_root), + "Problem applying verified block: {}", e + ), + } + } + Err(e) => warn!( + block_root = %format!("0x{:x}", block_root), + "Block verify failed: {}", e + ), + } + } message = chain_message_receiver.recv() => { let Some(message) = message else { break }; + METRICS.get().map(|m| m.grandine_chain_message_channel_depth.dec()); + // RAII timer: HistogramTimer records on Drop, so all `continue` + // exits inside the match below are captured automatically. + let _chain_message_timer = METRICS + .get() + .map(|m| m.lean_chain_task_chain_message_seconds.start_timer()); match message { ChainMessage::ProcessBlock { signed_block, @@ -1115,6 +1361,10 @@ async fn main() -> Result<()> { let block_root = signed_block.block.hash_tree_root(); let parent_root = signed_block.block.parent_root; + if store.read().blocks.contains_key(&block_root) { + continue; + } + info!( slot = block_slot.0, block_root = %format!("0x{:x}", block_root), @@ -1129,11 +1379,6 @@ async fn main() -> Result<()> { .as_millis() as u64; on_tick(&mut *store.write(), now_millis, false); - let parent_exists = { - let s = store.read(); - parent_root.is_zero() || s.states.contains_key(&parent_root) - }; - // Store block immediately so we can serve it to peers via // BlocksByRoot even if it can't be processed yet (e.g. parent // missing). This prevents STREAM_CLOSED errors when a peer @@ -1155,7 +1400,16 @@ async fn main() -> Result<()> { } } - if !parent_exists { + // Snapshot parent state for the executor. If absent, treat as + // orphan and re-queue. Genesis (parent_root.is_zero()) is loaded + // at startup, so a None here for non-zero parents means the + // parent block hasn't been processed yet. + let parent_state = { + let s = store.read(); + s.states.get(&parent_root).cloned() + }; + + let Some(parent_state) = parent_state else { block_cache.add( signed_block.clone(), block_root, @@ -1192,53 +1446,41 @@ async fn main() -> Result<()> { check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); continue; - } - - let result = {on_block(&mut *store.write(), &mut block_cache, signed_block.clone())}; - match result { - Ok(()) => { - info!("Block processed successfully"); - let _ = block_slot_tx.send(block_slot.0); - - { - let s = store.read(); - let mut status = status_provider.write(); - status.finalized = s.latest_finalized.clone(); - status.head = Checkpoint { - root: s.head, - slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), - }; - } - - if should_gossip { - if let Err(e) = outbound_p2p_sender.send( - OutboundP2pRequest::GossipBlock(signed_block) - ) { - warn!("Failed to gossip block: {}", e); - } else { - info!(slot = block_slot.0, "Broadcasted block"); - } - } - - let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; - let nf = *network_finalized_slot.lock(); - check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); - } - Err(e) => warn!("Problem processing block: {}", e), - } - - METRICS.get().map(|m| m.grandine_block_cache_size.set(block_cache.len() as i64)); + }; - // Drain block roots queued by retried attestations inside on_block. - let missing: Vec = store.write().pending_fetch_roots.drain().collect(); - METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); - if !missing.is_empty() { - if let Err(e) = outbound_p2p_sender.send( - OutboundP2pRequest::RequestBlocksByRoot(missing) - ) { - warn!("Failed to request blocks missing from retried attestations: {}", e); + // Phase 1: ship verify+state_transition to the cpu_normal executor. + // The chain task returns to the select! loop immediately and processes + // the verified outcome later via verify_result_rx (Phase 3). + // No store mutation happens here; gossip rebroadcast and post-apply + // bookkeeping are deferred to Phase 3 once we know the block applied. + let exec = cpu_normal_executor.clone(); + let result_tx = verify_result_tx.clone(); + let signed_block_for_verify = signed_block.clone(); + METRICS + .get() + .map(|m| m.grandine_verify_result_channel_depth.inc()); + tokio::spawn(async move { + let signed_block_for_send = signed_block_for_verify.clone(); + let job = exec.spawn(async move { + verify_and_transition(parent_state, signed_block_for_verify) + }); + let outcome = job + .await + .unwrap_or_else(|e| { + Err(anyhow::anyhow!("executor: {e}")) + }); + if result_tx + .send((block_root, signed_block_for_send, should_gossip, outcome)) + .await + .is_err() + { + // Receiver gone (chain task exited): roll back the inc above + // since this entry will never be drained. + METRICS + .get() + .map(|m| m.grandine_verify_result_channel_depth.dec()); } - } + }); } ChainMessage::ProcessAttestation { signed_attestation, @@ -1363,6 +1605,7 @@ async fn main() -> Result<()> { } v_message = validator_chain_receiver.recv() => { let Some(v_message) = v_message else { break }; + METRICS.get().map(|m| m.grandine_validator_chain_message_channel_depth.dec()); match v_message { ValidatorChainMessage::ProduceBlock { slot, proposer_index, sender } => { let block_build_start = Instant::now(); @@ -1468,13 +1711,18 @@ async fn main() -> Result<()> { ); let (tx, rx) = oneshot::channel(); - if validator_chain_sender + let send_result = validator_chain_sender .send(ValidatorChainMessage::ProduceBlock { slot: Slot(current_slot), proposer_index: proposer_idx, sender: tx, - }) - .is_err() + }); + if send_result.is_ok() { + METRICS.get().map(|m| { + m.grandine_validator_chain_message_channel_depth.inc() + }); + } + if send_result.is_err() { warn!("Validator task: chain channel closed, stopping"); break; @@ -1505,13 +1753,19 @@ async fn main() -> Result<()> { block_root = %format!("0x{:x}", block_root), "Validator task: block signed, sending to chain" ); - if chain_msg_sender_for_validator + let send_result = chain_msg_sender_for_validator .send(ChainMessage::ProcessBlock { signed_block, is_trusted: true, should_gossip: true, }) - .is_err() + .await; + if send_result.is_ok() { + METRICS.get().map(|m| { + m.grandine_chain_message_channel_depth.inc() + }); + } + if send_result.is_err() { warn!( "Validator task: chain message channel closed, stopping" @@ -1551,12 +1805,17 @@ async fn main() -> Result<()> { } let (tx, rx) = oneshot::channel(); - if validator_chain_sender + let send_result = validator_chain_sender .send(ValidatorChainMessage::BuildAttestationData { slot: Slot(current_slot), sender: tx, - }) - .is_err() + }); + if send_result.is_ok() { + METRICS.get().map(|m| { + m.grandine_validator_chain_message_channel_depth.inc() + }); + } + if send_result.is_err() { warn!("Validator task: chain channel closed, stopping"); break; @@ -1569,10 +1828,12 @@ async fn main() -> Result<()> { } else { u64::MAX }; - let attestations = vs.create_attestations_from_data( - Slot(current_slot), - attestation_data, - ); + let attestations = vs + .create_attestations_from_data( + Slot(current_slot), + attestation_data, + ) + .await; for signed_att in attestations { if signed_att.validator_id == proposer_index { continue; @@ -1588,13 +1849,19 @@ async fn main() -> Result<()> { subnet_id = subnet_id, "Validator task: broadcasting attestation" ); - if chain_msg_sender_for_validator + let send_result = chain_msg_sender_for_validator .send(ChainMessage::ProcessAttestation { signed_attestation: signed_att, is_trusted: true, should_gossip: true, }) - .is_err() + .await; + if send_result.is_ok() { + METRICS.get().map(|m| { + m.grandine_chain_message_channel_depth.inc() + }); + } + if send_result.is_err() { warn!( "Validator task: chain message channel closed, stopping" diff --git a/lean_client/validator/Cargo.toml b/lean_client/validator/Cargo.toml index 4d584c48..01471cbf 100644 --- a/lean_client/validator/Cargo.toml +++ b/lean_client/validator/Cargo.toml @@ -5,9 +5,11 @@ edition = { workspace = true } [dependencies] anyhow = { workspace = true } containers = { workspace = true } +dedicated_executor = { workspace = true } env-config = { workspace = true } ethereum-types = { workspace = true } fork_choice = { workspace = true } +futures = { workspace = true } metrics = { workspace = true } serde = { workspace = true } serde_yaml = { workspace = true } diff --git a/lean_client/validator/src/lib.rs b/lean_client/validator/src/lib.rs index 96bb110e..a407ba4b 100644 --- a/lean_client/validator/src/lib.rs +++ b/lean_client/validator/src/lib.rs @@ -1,6 +1,7 @@ // Lean validator client with XMSS signing support use std::collections::{HashMap, HashSet}; use std::path::Path; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use serde::Deserialize; @@ -10,7 +11,9 @@ use containers::{ AggregatedSignatureProof, AggregationBits, AttestationData, AttestationSignatures, Block, BlockSignatures, SignedAggregatedAttestation, SignedAttestation, SignedBlock, Slot, }; +use dedicated_executor::DedicatedExecutor; use fork_choice::store::Store; +use futures::stream::{FuturesUnordered, StreamExt}; use metrics::{METRICS, stop_and_discard, stop_and_record}; use rayon::prelude::*; use ssz::H256; @@ -98,7 +101,12 @@ impl ValidatorConfig { pub struct ValidatorService { pub config: ValidatorConfig, pub num_validators: u64, - key_manager: Option, + /// Wrapped in `Arc` so each per-validator XMSS signing job can be moved + /// into a `DedicatedExecutor` task without cloning the secret keys. + key_manager: Option>, + /// Shared CPU pool used for offloading XMSS attestation signing off the + /// validator task / chain task. + cpu_normal_executor: Arc, /// Whether this node performs aggregation duties (devnet-3). /// Uses `AtomicBool` for interior mutability so the admin API can toggle /// the flag at runtime without requiring `&mut self` or a write lock. @@ -151,13 +159,18 @@ fn extend_children_greedily<'a>( } impl ValidatorService { - pub fn new(config: ValidatorConfig, num_validators: u64) -> Self { - Self::new_with_aggregator(config, num_validators, false) + pub fn new( + config: ValidatorConfig, + num_validators: u64, + cpu_normal_executor: Arc, + ) -> Self { + Self::new_with_aggregator(config, num_validators, cpu_normal_executor, false) } pub fn new_with_aggregator( config: ValidatorConfig, num_validators: u64, + cpu_normal_executor: Arc, is_aggregator: bool, ) -> Self { info!( @@ -178,6 +191,7 @@ impl ValidatorService { config, num_validators, key_manager: None, + cpu_normal_executor, is_aggregator: AtomicBool::new(is_aggregator), } } @@ -186,14 +200,22 @@ impl ValidatorService { config: ValidatorConfig, num_validators: u64, keys_dir: impl AsRef, + cpu_normal_executor: Arc, ) -> Result> { - Self::new_with_keys_and_aggregator(config, num_validators, keys_dir, false) + Self::new_with_keys_and_aggregator( + config, + num_validators, + keys_dir, + cpu_normal_executor, + false, + ) } pub fn new_with_keys_and_aggregator( config: ValidatorConfig, num_validators: u64, keys_dir: impl AsRef, + cpu_normal_executor: Arc, is_aggregator: bool, ) -> Result> { let mut key_manager = KeyManager::new(&keys_dir)?; @@ -229,7 +251,8 @@ impl ValidatorService { Ok(Self { config, num_validators, - key_manager: Some(key_manager), + key_manager: Some(Arc::new(key_manager)), + cpu_normal_executor, is_aggregator: AtomicBool::new(is_aggregator), }) } @@ -567,7 +590,7 @@ impl ValidatorService { /// The validator task calls `BuildAttestationData` on the chain task first, /// receives the `AttestationData` via oneshot, then calls this method. /// This keeps XMSS signing entirely off the chain task's thread. - pub fn create_attestations_from_data( + pub async fn create_attestations_from_data( &self, slot: Slot, attestation_data: AttestationData, @@ -587,58 +610,81 @@ impl ValidatorService { .start_timer() }); - self.config - .validator_indices - .iter() - .filter_map(|&idx| { - let signature = if let Some(ref key_manager) = self.key_manager { - let message = attestation_data.hash_tree_root(); - let epoch = slot.0 as u32; + // No keys: return zero-signature attestations directly (test / passive mode). + let Some(key_manager) = self.key_manager.as_ref() else { + return self + .config + .validator_indices + .iter() + .map(|&idx| { + info!( + slot = slot.0, + validator = idx, + "Created attestation with zero signature" + ); + SignedAttestation { + validator_id: idx, + message: attestation_data.clone(), + signature: Signature::default(), + } + }) + .collect(); + }; + let message = attestation_data.hash_tree_root(); + let epoch = slot.0 as u32; + let target_slot = attestation_data.target.slot.0; + let source_slot = attestation_data.source.slot.0; + + // Fan out signing across the executor's worker threads so we get + // concurrent XMSS signing instead of one-at-a-time on this task. + let mut sign_jobs = FuturesUnordered::new(); + for &idx in &self.config.validator_indices { + let exec = self.cpu_normal_executor.clone(); + let km = Arc::clone(key_manager); + sign_jobs.push(async move { + let job = exec.spawn(async move { let _timer = METRICS.get().map(|metrics| { metrics .lean_pq_sig_attestation_signing_time_seconds .start_timer() }); + km.sign_attestation(idx, epoch, message) + }); + let result = job + .await + .unwrap_or_else(|e| Err(anyhow!("executor: {e}"))); + (idx, result) + }); + } - match key_manager.sign_attestation(idx, epoch, message) { - Ok(sig) => { - METRICS.get().map(|metrics| { - metrics.lean_pq_sig_attestation_signatures_total.inc(); - }); - info!( - slot = slot.0, - validator = idx, - target_slot = attestation_data.target.slot.0, - source_slot = attestation_data.source.slot.0, - "Created signed attestation" - ); - sig - } - Err(e) => { - warn!( - validator = idx, - error = %e, - "Failed to sign attestation, skipping" - ); - return None; - } - } - } else { + let mut attestations = Vec::with_capacity(self.config.validator_indices.len()); + while let Some((idx, sig_result)) = sign_jobs.next().await { + match sig_result { + Ok(signature) => { + METRICS.get().map(|metrics| { + metrics.lean_pq_sig_attestation_signatures_total.inc(); + }); info!( slot = slot.0, validator = idx, - "Created attestation with zero signature" + target_slot, + source_slot, + "Created signed attestation" ); - Signature::default() - }; - - Some(SignedAttestation { - validator_id: idx, - message: attestation_data.clone(), - signature, - }) - }) - .collect() + attestations.push(SignedAttestation { + validator_id: idx, + message: attestation_data.clone(), + signature, + }); + } + Err(e) => warn!( + validator = idx, + error = %e, + "Failed to sign attestation, skipping" + ), + } + } + attestations } } From 2c77d142ec3a24fb6a3d487f4423a81fe28a6fb3 Mon Sep 17 00:00:00 2001 From: bomanaps Date: Fri, 1 May 2026 17:35:49 +0100 Subject: [PATCH 2/5] run fmt --- lean_client/containers/src/state.rs | 10 ++++-- lean_client/fork_choice/src/handlers.rs | 18 +++++----- lean_client/fork_choice/src/store.rs | 6 ++-- lean_client/metrics/src/metrics.rs | 36 +++++++++---------- lean_client/src/aggregation.rs | 12 ++----- lean_client/src/main.rs | 46 +++++++++++-------------- lean_client/validator/src/lib.rs | 4 +-- 7 files changed, 58 insertions(+), 74 deletions(-) diff --git a/lean_client/containers/src/state.rs b/lean_client/containers/src/state.rs index 035fa21d..c7c670ee 100644 --- a/lean_client/containers/src/state.rs +++ b/lean_client/containers/src/state.rs @@ -647,8 +647,10 @@ impl State { }; // Sort by target.slot for deterministic processing order. - let mut sorted_entries: Vec<(&H256, &(AttestationData, Vec))> = - aggregated_payloads.iter().collect(); + let mut sorted_entries: Vec<( + &H256, + &(AttestationData, Vec), + )> = aggregated_payloads.iter().collect(); sorted_entries.sort_by_key(|(_, (data, _))| data.target.slot); let mut processed_data_roots: HashSet = HashSet::new(); @@ -783,7 +785,9 @@ impl State { Vec::with_capacity(order.len()); for data_root in order { - let group = groups.remove(&data_root).expect("group exists for data_root"); + let group = groups + .remove(&data_root) + .expect("group exists for data_root"); if group.len() == 1 { compacted.extend(group); continue; diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index e83a77b5..3111eb86 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -12,7 +12,7 @@ use tracing::warn; use crate::block_cache::BlockCache; use crate::store::{ BLOCKS_TO_KEEP, GOSSIP_DISPARITY_INTERVALS, HEAD_RETENTION_SLOTS, INTERVALS_PER_SLOT, - MILLIS_PER_INTERVAL, STATES_TO_KEEP, STATE_PRUNE_BUFFER, Store, tick_interval, update_head, + MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, STATES_TO_KEEP, Store, tick_interval, update_head, }; #[inline] @@ -739,11 +739,7 @@ pub fn apply_verified_block( /// the spec-mandated `prune_stale_attestation_data` is a necessary but not a /// sufficient bound, since it never fires while finalization is stalled. fn prune_with_retention_bounds(store: &mut Store) { - let head_slot = store - .blocks - .get(&store.head) - .map(|b| b.slot.0) - .unwrap_or(0); + let head_slot = store.blocks.get(&store.head).map(|b| b.slot.0).unwrap_or(0); let keep_min_slot = head_slot.saturating_sub(HEAD_RETENTION_SLOTS); let mut protected: HashSet = HashSet::with_capacity(4); @@ -791,10 +787,12 @@ fn prune_with_retention_bounds(store: &mut Store) { adr.get(&key.data_root) .is_none_or(|data| data.target.slot.0 >= keep_min_slot) }); - store.latest_known_aggregated_payloads.retain(|data_root, _| { - adr.get(data_root) - .is_none_or(|data| data.target.slot.0 >= keep_min_slot) - }); + store + .latest_known_aggregated_payloads + .retain(|data_root, _| { + adr.get(data_root) + .is_none_or(|data| data.target.slot.0 >= keep_min_slot) + }); store.latest_new_aggregated_payloads.retain(|data_root, _| { adr.get(data_root) .is_none_or(|data| data.target.slot.0 >= keep_min_slot) diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index b1d2e837..fd4b87f1 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -611,10 +611,8 @@ pub fn prepare_block_production( // build_block. Entries whose secondary-index lookup misses are dropped and // counted; this keeps the proposer aligned with leanSpec/zeam/ethlambda // which carry att_data inside the pool value. - let mut aggregated_payloads: HashMap< - H256, - (AttestationData, Vec), - > = HashMap::with_capacity(store.latest_known_aggregated_payloads.len()); + let mut aggregated_payloads: HashMap)> = + HashMap::with_capacity(store.latest_known_aggregated_payloads.len()); let mut missing_att_data: u64 = 0; for (data_root, proofs) in &store.latest_known_aggregated_payloads { if let Some(att_data) = store.attestation_data_by_root.get(data_root) { diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index 791480b7..5b72f34c 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -517,7 +517,9 @@ impl Metrics { lean_aggregation_snapshot_size_entries: Histogram::with_opts(histogram_opts!( "lean_aggregation_snapshot_size_entries", "Total entries across all major Store maps at snapshot clone time", - vec![100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0, 100000.0, 500000.0], + vec![ + 100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0, 100000.0, 500000.0 + ], ))?, lean_aggregation_in_flight_snapshots: IntGauge::new( "lean_aggregation_in_flight_snapshots", @@ -526,7 +528,9 @@ impl Metrics { lean_chain_task_chain_message_seconds: Histogram::with_opts(histogram_opts!( "lean_chain_task_chain_message_seconds", "Wall-clock time the chain task spends inside one ChainMessage select-arm body", - vec![0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0], + vec![ + 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0 + ], ))?, lean_chain_task_apply_seconds: Histogram::with_opts(histogram_opts!( "lean_chain_task_apply_seconds", @@ -742,43 +746,39 @@ impl Metrics { ))?; default_registry.register(Box::new(self.grandine_fork_choice_new_attestations.clone()))?; default_registry.register(Box::new(self.grandine_xmss_verify_skipped_total.clone()))?; - default_registry - .register(Box::new(self.grandine_chain_message_channel_depth.clone()))?; + default_registry.register(Box::new(self.grandine_chain_message_channel_depth.clone()))?; default_registry.register(Box::new( self.grandine_validator_chain_message_channel_depth.clone(), ))?; - default_registry - .register(Box::new(self.grandine_verify_result_channel_depth.clone()))?; + default_registry.register(Box::new(self.grandine_verify_result_channel_depth.clone()))?; default_registry.register(Box::new( self.grandine_cpu_normal_executor_tasks_in_flight.clone(), ))?; - default_registry - .register(Box::new(self.lean_aggregation_snapshot_clone_seconds.clone()))?; + default_registry.register(Box::new( + self.lean_aggregation_snapshot_clone_seconds.clone(), + ))?; default_registry.register(Box::new( self.lean_aggregation_snapshots_triggered_total.clone(), ))?; default_registry.register(Box::new( self.lean_build_block_pool_missing_att_data.clone(), ))?; - default_registry - .register(Box::new(self.lean_aggregation_snapshot_size_entries.clone()))?; - default_registry - .register(Box::new(self.lean_aggregation_in_flight_snapshots.clone()))?; - default_registry - .register(Box::new(self.lean_chain_task_chain_message_seconds.clone()))?; + default_registry.register(Box::new( + self.lean_aggregation_snapshot_size_entries.clone(), + ))?; + default_registry.register(Box::new(self.lean_aggregation_in_flight_snapshots.clone()))?; + default_registry.register(Box::new(self.lean_chain_task_chain_message_seconds.clone()))?; default_registry.register(Box::new(self.lean_chain_task_apply_seconds.clone()))?; default_registry.register(Box::new(self.grandine_store_blocks_size.clone()))?; default_registry.register(Box::new(self.grandine_store_states_size.clone()))?; - default_registry - .register(Box::new(self.grandine_store_gossip_signatures_size.clone()))?; + default_registry.register(Box::new(self.grandine_store_gossip_signatures_size.clone()))?; default_registry.register(Box::new( self.grandine_store_known_aggregated_payloads_size.clone(), ))?; default_registry.register(Box::new( self.grandine_store_new_aggregated_payloads_size.clone(), ))?; - default_registry - .register(Box::new(self.grandine_pending_blocks_by_root_size.clone()))?; + default_registry.register(Box::new(self.grandine_pending_blocks_by_root_size.clone()))?; // Block Production Metrics default_registry.register(Box::new(self.lean_block_building_time_seconds.clone()))?; diff --git a/lean_client/src/aggregation.rs b/lean_client/src/aggregation.rs index 7a286a50..4a861717 100644 --- a/lean_client/src/aggregation.rs +++ b/lean_client/src/aggregation.rs @@ -37,11 +37,7 @@ impl AggregationService { /// The worker holds an `Arc>` reference so it can read + /// clone the snapshot on its own thread instead of forcing the chain task /// to do the deep-clone. - pub fn new( - vs: Arc, - store: Arc>, - log_rate: usize, - ) -> Self { + pub fn new(vs: Arc, store: Arc>, log_rate: usize) -> Self { let (agg_tx, mut agg_rx) = watch::channel::>(None); let (res_tx, res_rx) = mpsc::channel::<( u64, @@ -99,11 +95,7 @@ impl AggregationService { } }); - Self { - vs, - agg_tx, - res_rx, - } + Self { vs, agg_tx, res_rx } } /// Returns true if this node should aggregate for the given slot. Used by diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 567b2f9b..2aeb5f0c 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -406,10 +406,7 @@ async fn main() -> Result<()> { info!("{}", banner::banner()); info!("════════════════════════════════════════════════════════"); - info!( - "🚀 lean_client v{} started", - env!("CARGO_PKG_VERSION") - ); + info!("🚀 lean_client v{} started", env!("CARGO_PKG_VERSION")); info!("════════════════════════════════════════════════════════"); let args = Args::parse(); @@ -446,8 +443,7 @@ async fn main() -> Result<()> { let (outbound_p2p_sender, outbound_p2p_receiver) = mpsc::unbounded_channel::(); - let (chain_message_sender, mut chain_message_receiver) = - mpsc::channel::(1024); + let (chain_message_sender, mut chain_message_receiver) = mpsc::channel::(1024); // Separate channel for validator task → chain task request-response messages. // Keeps ValidatorChainMessage (which carries oneshot senders) separate from // ChainMessage (which is Clone and used by the network layer). @@ -1711,19 +1707,19 @@ async fn main() -> Result<()> { ); let (tx, rx) = oneshot::channel(); - let send_result = validator_chain_sender - .send(ValidatorChainMessage::ProduceBlock { + let send_result = validator_chain_sender.send( + ValidatorChainMessage::ProduceBlock { slot: Slot(current_slot), proposer_index: proposer_idx, sender: tx, - }); + }, + ); if send_result.is_ok() { METRICS.get().map(|m| { m.grandine_validator_chain_message_channel_depth.inc() }); } - if send_result.is_err() - { + if send_result.is_err() { warn!("Validator task: chain channel closed, stopping"); break; } @@ -1765,8 +1761,7 @@ async fn main() -> Result<()> { m.grandine_chain_message_channel_depth.inc() }); } - if send_result.is_err() - { + if send_result.is_err() { warn!( "Validator task: chain message channel closed, stopping" ); @@ -1805,18 +1800,18 @@ async fn main() -> Result<()> { } let (tx, rx) = oneshot::channel(); - let send_result = validator_chain_sender - .send(ValidatorChainMessage::BuildAttestationData { + let send_result = validator_chain_sender.send( + ValidatorChainMessage::BuildAttestationData { slot: Slot(current_slot), sender: tx, - }); + }, + ); if send_result.is_ok() { - METRICS.get().map(|m| { - m.grandine_validator_chain_message_channel_depth.inc() - }); + METRICS + .get() + .map(|m| m.grandine_validator_chain_message_channel_depth.inc()); } - if send_result.is_err() - { + if send_result.is_err() { warn!("Validator task: chain channel closed, stopping"); break; } @@ -1857,12 +1852,11 @@ async fn main() -> Result<()> { }) .await; if send_result.is_ok() { - METRICS.get().map(|m| { - m.grandine_chain_message_channel_depth.inc() - }); + METRICS + .get() + .map(|m| m.grandine_chain_message_channel_depth.inc()); } - if send_result.is_err() - { + if send_result.is_err() { warn!( "Validator task: chain message channel closed, stopping" ); diff --git a/lean_client/validator/src/lib.rs b/lean_client/validator/src/lib.rs index a407ba4b..771938e4 100644 --- a/lean_client/validator/src/lib.rs +++ b/lean_client/validator/src/lib.rs @@ -651,9 +651,7 @@ impl ValidatorService { }); km.sign_attestation(idx, epoch, message) }); - let result = job - .await - .unwrap_or_else(|e| Err(anyhow!("executor: {e}"))); + let result = job.await.unwrap_or_else(|e| Err(anyhow!("executor: {e}"))); (idx, result) }); } From aafbba8a80b153501f357dfc0ab3e6a8e69ccdfc Mon Sep 17 00:00:00 2001 From: bomanaps Date: Wed, 6 May 2026 13:41:57 +0100 Subject: [PATCH 3/5] address banner review comment --- lean_client/build.rs | 16 ++++++++++++++++ lean_client/src/banner.rs | 32 -------------------------------- lean_client/src/main.rs | 10 +++++----- 3 files changed, 21 insertions(+), 37 deletions(-) create mode 100644 lean_client/build.rs delete mode 100644 lean_client/src/banner.rs diff --git a/lean_client/build.rs b/lean_client/build.rs new file mode 100644 index 00000000..18001e90 --- /dev/null +++ b/lean_client/build.rs @@ -0,0 +1,16 @@ +use std::process::Command; + +fn main() { + let hash = Command::new("git") + .args(["rev-parse", "--short=8", "HEAD"]) + .output() + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "unknown".to_string()); + + println!("cargo:rustc-env=GRANDINE_GIT_COMMIT_HASH={hash}"); + println!("cargo:rerun-if-changed=../.git/HEAD"); + println!("cargo:rerun-if-changed=../.git/refs/heads"); +} diff --git a/lean_client/src/banner.rs b/lean_client/src/banner.rs deleted file mode 100644 index dfdf04a3..00000000 --- a/lean_client/src/banner.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::io::IsTerminal; - -const TEAL: &str = "\x1b[38;2;77;182;172m"; -const DIM: &str = "\x1b[90m"; -const RESET: &str = "\x1b[0m"; - -const WORDMARK: &str = "\ - ██╗ ███████╗ █████╗ ███╗ ██╗ - ██║ ██╔════╝ ██╔══██╗ ████╗ ██║ - ██║ █████╗ ███████║ ██╔██╗ ██║ - ██║ ██╔══╝ ██╔══██║ ██║╚██╗██║ - ███████╗███████╗ ██║ ██║ ██║ ╚████║ - ╚══════╝╚══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═══╝"; - -pub fn banner() -> String { - let (teal, dim, reset) = if std::io::stdout().is_terminal() { - (TEAL, DIM, RESET) - } else { - ("", "", "") - }; - - format!( - "\n{teal}{wordmark}{reset}\n\n {dim}A lean consensus client{reset}\n {dim}v{version} | {os} | {arch}{reset}\n", - teal = teal, - wordmark = WORDMARK, - reset = reset, - dim = dim, - version = env!("CARGO_PKG_VERSION"), - os = std::env::consts::OS, - arch = std::env::consts::ARCH, - ) -} diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 2aeb5f0c..ac89f741 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -47,7 +47,6 @@ use validator::{ValidatorConfig, ValidatorService}; use xmss::{PublicKey, Signature}; mod aggregation; -mod banner; fn load_node_key(path: &str) -> Result> { let hex_str = std::fs::read_to_string(path)?.trim().to_string(); @@ -404,10 +403,11 @@ async fn main() -> Result<()> { ) .init(); - info!("{}", banner::banner()); - info!("════════════════════════════════════════════════════════"); - info!("🚀 lean_client v{} started", env!("CARGO_PKG_VERSION")); - info!("════════════════════════════════════════════════════════"); + info!( + "Starting grandine v{} ({})", + env!("CARGO_PKG_VERSION"), + env!("GRANDINE_GIT_COMMIT_HASH"), + ); let args = Args::parse(); From aec2787182d957dee748f5d08740ffbc7d0c3301 Mon Sep 17 00:00:00 2001 From: bomanaps Date: Thu, 7 May 2026 09:22:22 +0100 Subject: [PATCH 4/5] address review feedback --- .gitmodules | 3 +++ lean_client/Cargo.lock | 23 ++++++++++++++++++++++- lean_client/Cargo.toml | 5 ++++- lean_client/build.rs | 16 ---------------- lean_client/dedicated_executor | 1 + lean_client/src/main.rs | 4 +++- 6 files changed, 33 insertions(+), 19 deletions(-) create mode 100644 .gitmodules delete mode 100644 lean_client/build.rs create mode 160000 lean_client/dedicated_executor diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..007f8e42 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "lean_client/dedicated_executor"] + path = lean_client/dedicated_executor + url = https://github.com/grandinetech/dedicated_executor diff --git a/lean_client/Cargo.lock b/lean_client/Cargo.lock index 28a69003..de73c807 100644 --- a/lean_client/Cargo.lock +++ b/lean_client/Cargo.lock @@ -1329,7 +1329,6 @@ dependencies = [ [[package]] name = "dedicated_executor" version = "0.1.0" -source = "git+https://github.com/bomanaps/dedicated_executor?rev=e39afad2959e6ae3f55ed56f6117dfe12ec07359#e39afad2959e6ae3f55ed56f6117dfe12ec07359" dependencies = [ "futures", "libc", @@ -1337,6 +1336,7 @@ dependencies = [ "once_cell", "parking_lot", "pin-project", + "prometheus_metrics", "tokio", "tokio-util", ] @@ -2119,6 +2119,26 @@ dependencies = [ "polyval", ] +[[package]] +name = "git-version" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad568aa3db0fcbc81f2f116137f263d7304f512a1209b35b85150d3ef88ad19" +dependencies = [ + "git-version-macro", +] + +[[package]] +name = "git-version-macro" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53010ccb100b96a67bc32c0175f0ed1426b31b655d562898e57325f81c023ac0" +dependencies = [ + "proc-macro2 1.0.106", + "quote 1.0.45", + "syn 2.0.117", +] + [[package]] name = "glob" version = "0.3.3" @@ -2953,6 +2973,7 @@ dependencies = [ "ethereum-types", "features", "fork_choice", + "git-version", "hex", "http_api", "libp2p-identity 0.2.13", diff --git a/lean_client/Cargo.toml b/lean_client/Cargo.toml index 5451615c..00607668 100644 --- a/lean_client/Cargo.toml +++ b/lean_client/Cargo.toml @@ -243,6 +243,7 @@ eth_ssz = { package = "ethereum_ssz", version = "0.10.0" } ethereum-types = "0.14" futures = "0.3" features = { git = "https://github.com/grandinetech/grandine", rev = "64afdee3c6be79fceffb66933dcb69a943f3f1ae" } +git-version = "0.3" hex = "0.4.3" http-body-util = "0.1" http_api_utils = { git = "https://github.com/grandinetech/grandine", rev = "64afdee3c6be79fceffb66933dcb69a943f3f1ae" } @@ -264,7 +265,7 @@ libp2p = { version = "0.56.0", default-features = false, features = [ ] } libp2p-identity = { version = "0.2", features = ["secp256k1"] } libp2p-mplex = "0.39" -dedicated_executor = { git = "https://github.com/bomanaps/dedicated_executor", rev = "e39afad2959e6ae3f55ed56f6117dfe12ec07359" } +dedicated_executor = { path = "dedicated_executor" } num-bigint = "0.4" num-traits = "0.2" num_cpus = "1" @@ -273,6 +274,7 @@ parking_lot = "0.12" paste = "1.0.15" pretty_assertions = "1.4" prometheus = { version = "0.14", features = ["process"] } +prometheus_metrics = { git = "https://github.com/grandinetech/grandine", rev = "64afdee3c6be79fceffb66933dcb69a943f3f1ae" } rand = "0.10" rand_chacha = "0.10" rayon = "1" @@ -315,6 +317,7 @@ dedicated_executor = { workspace = true } ethereum-types = { workspace = true } features = { workspace = true } fork_choice = { workspace = true } +git-version = { workspace = true } hex = { workspace = true } http_api = { workspace = true } libp2p-identity = { workspace = true } diff --git a/lean_client/build.rs b/lean_client/build.rs deleted file mode 100644 index 18001e90..00000000 --- a/lean_client/build.rs +++ /dev/null @@ -1,16 +0,0 @@ -use std::process::Command; - -fn main() { - let hash = Command::new("git") - .args(["rev-parse", "--short=8", "HEAD"]) - .output() - .ok() - .filter(|o| o.status.success()) - .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) - .filter(|s| !s.is_empty()) - .unwrap_or_else(|| "unknown".to_string()); - - println!("cargo:rustc-env=GRANDINE_GIT_COMMIT_HASH={hash}"); - println!("cargo:rerun-if-changed=../.git/HEAD"); - println!("cargo:rerun-if-changed=../.git/refs/heads"); -} diff --git a/lean_client/dedicated_executor b/lean_client/dedicated_executor new file mode 160000 index 00000000..0d36fea3 --- /dev/null +++ b/lean_client/dedicated_executor @@ -0,0 +1 @@ +Subproject commit 0d36fea385dae3c576eff35e4eb7daae07a7d248 diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index ac89f741..536593f4 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -406,7 +406,7 @@ async fn main() -> Result<()> { info!( "Starting grandine v{} ({})", env!("CARGO_PKG_VERSION"), - env!("GRANDINE_GIT_COMMIT_HASH"), + git_version::git_version!(args = ["--always", "--abbrev=8"]), ); let args = Args::parse(); @@ -455,11 +455,13 @@ async fn main() -> Result<()> { "lean-cpu-normal", (num_cpus / 2).max(2), None, + None, )); let cpu_low_executor = Arc::new(DedicatedExecutor::new( "lean-cpu-low", (num_cpus / 4).max(1), Some(19), + None, )); // Verified blocks travel back from the executor to the chain task on this channel. From 0a2bba06b6d110a73fe98e1b58a855f0ad3419f6 Mon Sep 17 00:00:00 2001 From: bomanaps Date: Thu, 7 May 2026 09:35:42 +0100 Subject: [PATCH 5/5] fetch submodules in checkout for dedicated_executor --- .github/workflows/ci.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d27f3d34..d1070005 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,6 +13,8 @@ jobs: steps: - name: Checkout repository uses: actions/checkout@v6 + with: + submodules: recursive - name: Check formatting run: make check-format @@ -31,6 +33,8 @@ jobs: - name: Checkout repository uses: actions/checkout@v6 + with: + submodules: recursive - name: Build client run: make build @@ -49,6 +53,8 @@ jobs: - name: Checkout repository uses: actions/checkout@v6 + with: + submodules: recursive - name: Run tests run: make test