diff --git a/lean_client/Cargo.lock b/lean_client/Cargo.lock index 9a194dd..28a6900 100644 --- a/lean_client/Cargo.lock +++ b/lean_client/Cargo.lock @@ -449,7 +449,7 @@ dependencies = [ "futures-lite", "parking", "polling", - "rustix", + "rustix 1.1.4", "slab", "windows-sys 0.61.2", ] @@ -1326,6 +1326,21 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dedicated_executor" +version = "0.1.0" +source = "git+https://github.com/bomanaps/dedicated_executor?rev=e39afad2959e6ae3f55ed56f6117dfe12ec07359#e39afad2959e6ae3f55ed56f6117dfe12ec07359" +dependencies = [ + "futures", + "libc", + "log", + "once_cell", + "parking_lot", + "pin-project", + "tokio", + "tokio-util", +] + [[package]] name = "delay_map" version = "0.4.1" @@ -2934,6 +2949,7 @@ dependencies = [ "chain", "clap", "containers", + "dedicated_executor", "ethereum-types", "features", "fork_choice", @@ -2942,6 +2958,7 @@ dependencies = [ "libp2p-identity 0.2.13", "metrics", "networking", + "num_cpus", "parking_lot", "reqwest", "ssz", @@ -3503,6 +3520,12 @@ dependencies = [ "yamux 0.13.10", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -4064,6 +4087,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" dependencies = [ "critical-section", + "parking_lot_core", "portable-atomic", ] @@ -4440,7 +4464,7 @@ dependencies = [ "concurrent-queue", "hermit-abi", "pin-project-lite", - "rustix", + "rustix 1.1.4", "windows-sys 0.61.2", ] @@ -4616,6 +4640,28 @@ dependencies = [ "version_check", ] +[[package]] +name = "procfs" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" +dependencies = [ + "bitflags", + "hex", + "procfs-core", + "rustix 0.38.44", +] + +[[package]] +name = "procfs-core" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" +dependencies = [ + "bitflags", + "hex", +] + [[package]] name = "prometheus" version = "0.14.0" @@ -4625,8 +4671,10 @@ dependencies = [ "cfg-if", "fnv", "lazy_static", + "libc", "memchr", "parking_lot", + "procfs", "protobuf", "thiserror 2.0.18", ] @@ -5266,6 +5314,19 @@ dependencies = [ "nom", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.52.0", +] + [[package]] name = "rustix" version = "1.1.4" @@ -5275,7 +5336,7 @@ dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.12.1", "windows-sys 0.61.2", ] @@ -5985,7 +6046,7 @@ dependencies = [ "fastrand", "getrandom 0.4.2", "once_cell", - "rustix", + "rustix 1.1.4", "windows-sys 0.61.2", ] @@ -6579,9 +6640,11 @@ version = "0.0.0" dependencies = [ "anyhow", "containers", + "dedicated_executor", "env-config", "ethereum-types", "fork_choice", + "futures", "metrics", "rayon", "serde", diff --git a/lean_client/Cargo.toml b/lean_client/Cargo.toml index f4bd6db..5451615 100644 --- a/lean_client/Cargo.toml +++ b/lean_client/Cargo.toml @@ -264,13 +264,15 @@ libp2p = { version = "0.56.0", default-features = false, features = [ ] } libp2p-identity = { version = "0.2", features = ["secp256k1"] } libp2p-mplex = "0.39" +dedicated_executor = { git = "https://github.com/bomanaps/dedicated_executor", rev = "e39afad2959e6ae3f55ed56f6117dfe12ec07359" } num-bigint = "0.4" num-traits = "0.2" +num_cpus = "1" once_cell = "1.21" parking_lot = "0.12" paste = "1.0.15" pretty_assertions = "1.4" -prometheus = "0.14" +prometheus = { version = "0.14", features = ["process"] } rand = "0.10" rand_chacha = "0.10" rayon = "1" @@ -309,6 +311,7 @@ bls = { workspace = true } chain = { workspace = true } clap = { workspace = true } containers = { workspace = true } +dedicated_executor = { workspace = true } ethereum-types = { workspace = true } features = { workspace = true } fork_choice = { workspace = true } @@ -317,6 +320,7 @@ http_api = { workspace = true } libp2p-identity = { workspace = true } metrics = { workspace = true } networking = { workspace = true } +num_cpus = { workspace = true } parking_lot = { workspace = true } ssz = { workspace = true } tokio = { workspace = true } diff --git a/lean_client/build.rs b/lean_client/build.rs new file mode 100644 index 0000000..18001e9 --- /dev/null +++ b/lean_client/build.rs @@ -0,0 +1,16 @@ +use std::process::Command; + +fn main() { + let hash = Command::new("git") + .args(["rev-parse", "--short=8", "HEAD"]) + .output() + .ok() + .filter(|o| o.status.success()) + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "unknown".to_string()); + + println!("cargo:rustc-env=GRANDINE_GIT_COMMIT_HASH={hash}"); + println!("cargo:rerun-if-changed=../.git/HEAD"); + println!("cargo:rerun-if-changed=../.git/refs/heads"); +} diff --git a/lean_client/containers/src/attestation.rs b/lean_client/containers/src/attestation.rs index 3524598..57f8623 100644 --- a/lean_client/containers/src/attestation.rs +++ b/lean_client/containers/src/attestation.rs @@ -92,6 +92,46 @@ impl AggregatedSignatureProof { ) -> Result<()> { self.proof_data.verify(public_keys, message, slot) } + + /// Greedy set-cover over a slice of proofs, returning indices of the proofs + /// to admit in priority order. Repeatedly picks the proof covering the + /// most uncovered validators; stops when no remaining proof adds coverage. + /// Mirrors leanSpec `AggregatedSignatureProof.select_greedily`. + pub fn select_greedily(proofs: &[AggregatedSignatureProof]) -> Vec { + let mut selected: Vec = Vec::new(); + let mut covered: HashSet = HashSet::new(); + let mut remaining: HashSet = (0..proofs.len()).collect(); + + while !remaining.is_empty() { + let best = remaining + .iter() + .copied() + .map(|idx| { + let new_count = proofs[idx] + .get_participant_indices() + .into_iter() + .filter(|vid| !covered.contains(vid)) + .count(); + (idx, new_count) + }) + .max_by_key(|(_, n)| *n); + + let Some((best_idx, best_count)) = best else { + break; + }; + if best_count == 0 { + break; + } + + for vid in proofs[best_idx].get_participant_indices() { + covered.insert(vid); + } + selected.push(best_idx); + remaining.remove(&best_idx); + } + + selected + } } /// Bitlist representing validator participation in an attestation. diff --git a/lean_client/containers/src/state.rs b/lean_client/containers/src/state.rs index 7a979fe..c7c670e 100644 --- a/lean_client/containers/src/state.rs +++ b/lean_client/containers/src/state.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result, anyhow, ensure}; +use anyhow::{Result, anyhow, ensure}; use bitvec::{bitvec, order::Lsb0, vec::BitVec}; use metrics::METRICS; use serde::{Deserialize, Serialize}; @@ -10,8 +10,10 @@ use typenum::{Prod, U262144}; use xmss::{PublicKey, Signature}; use crate::{ - AggregatedSignatureProof, Attestation, Checkpoint, Config, SignatureKey, Slot, - attestation::{AggregatedAttestation, AggregatedAttestations, AggregationBits}, + AggregatedSignatureProof, Checkpoint, Config, Slot, + attestation::{ + AggregatedAttestation, AggregatedAttestations, AggregationBits, AttestationData, + }, block::{Block, BlockBody, BlockHeader, SignedBlock}, validator::{Validator, ValidatorRegistryLimit, Validators}, }; @@ -604,37 +606,24 @@ impl State { /// Build a valid block on top of this state. /// - /// Computes the post-state and creates a block with the correct state root. - /// If `available_attestations` and `known_block_roots` are provided, - /// performs fixed-point attestation collection: iteratively adds valid - /// attestations until no more can be included. This is necessary because - /// processing attestations may update the justified checkpoint, which may - /// make additional attestations valid. + /// Iterates over `aggregated_payloads` keyed by `data_root` (with the + /// AttestationData carried in the value), applies the spec's fixed-point + /// attestation selection: sort by `target.slot`, admit entries whose + /// `head.root` is known and whose `source` matches the current justified + /// checkpoint, greedily select proofs maximizing validator coverage, run + /// the STF, and repeat as long as the post-state's justified checkpoint + /// advances. Caps distinct AttestationData entries at MAX_ATTESTATIONS_DATA. /// - /// # Arguments - /// - /// * `slot` - Target slot for the block - /// * `proposer_index` - Validator index of the proposer - /// * `parent_root` - Root of the parent block (must match state after slot processing) - /// * `initial_attestations` - Initial attestations to include - /// * `available_attestations` - Optional pool of attestations to collect from - /// * `known_block_roots` - Optional set of known block roots for attestation validation - /// * `gossip_signatures` - Optional map of individual signatures from gossip - /// * `aggregated_payloads` - Optional map of aggregated signature proofs - /// - /// # Returns - /// - /// Tuple of (Block, post-State, collected aggregated attestations, aggregated proofs) + /// Aggregator-published proofs are the sole input — no gossip-time + /// re-aggregation at proposal. Matches leanSpec build_block, ethlambda's + /// blockchain::store::build_block, and zeam's getProposalAttestations. pub fn build_block( &self, slot: Slot, proposer_index: u64, parent_root: H256, - initial_attestations: Option>, - available_attestations: Option>, - known_block_roots: Option<&HashSet>, - gossip_signatures: Option<&HashMap>, - aggregated_payloads: Option<&HashMap>>, + known_block_roots: &HashSet, + aggregated_payloads: &HashMap)>, log_inv_rate: usize, ) -> Result<( Block, @@ -642,121 +631,108 @@ impl State { Vec, Vec, )> { - // Initialize attestation set - let mut attestations = initial_attestations.unwrap_or_default(); - - // Fixed-point attestation collection loop - // Iteratively add valid attestations until no new ones can be added - loop { - // Create candidate block with current attestation set - let aggregated = AggregatedAttestation::aggregate_by_data(&attestations); - - let candidate_block = Block { - slot, - proposer_index, - parent_root, - state_root: H256::zero(), - body: BlockBody { - attestations: AggregatedAttestations::try_from_iter(aggregated.into_iter())?, - }, - }; + let mut selected: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = Vec::new(); - // Apply state transition to get the post-block state - let post_state = self.process_slots(slot)?.process_block(&candidate_block)?; - - let Some(ref available_attestations) = available_attestations else { - // No attestation source provided: done after computing post_state - break; + if !aggregated_payloads.is_empty() { + // Genesis edge case: process_block_header rebinds latest_justified.root + // to parent_root when building on slot 0. Apply the same derivation + // here so attestation sources match. + let mut current_justified = if self.latest_block_header.slot == Slot(0) { + Checkpoint { + root: parent_root, + slot: self.latest_justified.slot, + } + } else { + self.latest_justified.clone() }; - let Some(known_block_roots) = known_block_roots else { - // No attestation source provided: done after computing post_state - break; - }; + // Sort by target.slot for deterministic processing order. + let mut sorted_entries: Vec<( + &H256, + &(AttestationData, Vec), + )> = aggregated_payloads.iter().collect(); + sorted_entries.sort_by_key(|(_, (data, _))| data.target.slot); - // Find new valid attestations matching post-state justification - let mut new_attestations = Vec::new(); + let mut processed_data_roots: HashSet = HashSet::new(); - // Track distinct AttestationData roots already accepted and newly added this iteration - let accepted_data_roots: HashSet = attestations - .iter() - .map(|a| a.data.hash_tree_root()) - .collect(); - let mut new_att_data_roots: HashSet = HashSet::new(); + loop { + let mut found_new = false; - for attestation in available_attestations { - let data = &attestation.data; - let validator_id = attestation.validator_id; - let data_root = data.hash_tree_root(); - let sig_key = SignatureKey::new(validator_id, data_root); + for &(data_root, (att_data, proofs)) in &sorted_entries { + if processed_data_roots.contains(data_root) { + continue; + } + if processed_data_roots.len() >= MAX_ATTESTATIONS_DATA { + break; + } + if !known_block_roots.contains(&att_data.head.root) { + continue; + } + if att_data.source != current_justified { + continue; + } - // Skip if target block is unknown - if !known_block_roots.contains(&data.head.root) { - continue; - } + processed_data_roots.insert(*data_root); + found_new = true; - // Skip if attestation source does not match post-state's latest justified - if data.source != post_state.latest_justified { - continue; + let indices = AggregatedSignatureProof::select_greedily(proofs); + for idx in indices { + let proof = proofs[idx].clone(); + selected.push(( + AggregatedAttestation { + aggregation_bits: proof.participants.clone(), + data: att_data.clone(), + }, + proof, + )); + } } - // Avoid adding duplicates of attestations already in the candidate set - if attestations.contains(attestation) { - continue; + if !found_new { + break; } - // Enforce MAX_ATTESTATIONS_DATA: only admit a new AttestationData if under the limit - let is_existing_data = accepted_data_roots.contains(&data_root) - || new_att_data_roots.contains(&data_root); - if !is_existing_data - && accepted_data_roots.len() + new_att_data_roots.len() >= MAX_ATTESTATIONS_DATA - { - continue; - } + // Run STF on a candidate block to see if justification advanced. + let candidate_attestations = AggregatedAttestations::try_from_iter( + selected.iter().map(|(att, _)| att.clone()), + )?; + let candidate_block = Block { + slot, + proposer_index, + parent_root, + state_root: H256::zero(), + body: BlockBody { + attestations: candidate_attestations, + }, + }; + let post_state = self.process_slots(slot)?.process_block(&candidate_block)?; - // We can only include an attestation if we have some way to later provide - // an aggregated proof for its group: - // - either a per validator XMSS signature from gossip, or - // - at least one aggregated proof learned from a block that references - // this validator+data. - let has_gossip_sig = - gossip_signatures.is_some_and(|sigs| sigs.contains_key(&sig_key)); - let has_block_proof = - aggregated_payloads.is_some_and(|payloads| payloads.contains_key(&data_root)); - - if has_gossip_sig || has_block_proof { - new_att_data_roots.insert(data_root); - new_attestations.push(attestation.clone()); + if post_state.latest_justified != current_justified { + current_justified = post_state.latest_justified; + } else { + break; } } - - // Fixed point reached: no new attestations found - if new_attestations.is_empty() { - break; - } - - // Add new attestations and continue iteration - attestations.extend(new_attestations); } - let (aggregated_attestations, aggregated_signatures) = self.compute_aggregated_signatures( - &attestations, - gossip_signatures, - aggregated_payloads, - log_inv_rate, - )?; + // Compact: merge proofs sharing the same AttestationData via recursive + // aggregation so each AttestationData appears at most once in the body. + let compacted = self.compact_proofs_by_data(selected, log_inv_rate)?; METRICS.get().map(|metrics| { metrics .lean_pq_sig_attestations_in_aggregated_signatures_total .inc_by( - aggregated_attestations + compacted .iter() - .map(|v| v.aggregation_bits.to_validator_indices().len()) + .map(|(att, _)| att.aggregation_bits.to_validator_indices().len()) .sum::() as u64, ); }); + let (aggregated_attestations, aggregated_signatures): (Vec<_>, Vec<_>) = + compacted.into_iter().unzip(); + let mut final_block = Block { slot, proposer_index, @@ -764,7 +740,7 @@ impl State { state_root: H256::zero(), body: BlockBody { attestations: AggregatedAttestations::try_from_iter( - aggregated_attestations.clone(), + aggregated_attestations.iter().cloned(), )?, }, }; @@ -781,309 +757,109 @@ impl State { )) } - pub fn compute_aggregated_signatures( + /// Merge selected (AggregatedAttestation, Proof) entries that share the + /// same AttestationData into a single recursive proof via + /// `aggregate_with_children`, preserving first-occurrence order. A block + /// body must contain at most one entry per AttestationData (spec invariant). + fn compact_proofs_by_data( &self, - attestations: &[Attestation], - gossip_signatures: Option<&HashMap>, - aggregated_payloads: Option<&HashMap>>, + entries: Vec<(AggregatedAttestation, AggregatedSignatureProof)>, log_inv_rate: usize, - ) -> Result<(Vec, Vec)> { - let mut results: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = Vec::new(); - - // Group individual attestations by data - for aggregated in AggregatedAttestation::aggregate_by_data(attestations) { - let data = &aggregated.data; - let data_root = data.hash_tree_root(); - let validator_ids = aggregated.aggregation_bits.to_validator_indices(); - - // Phase 1: Gossip Collection - // Try to collect individual signatures from gossip network - let mut gossip_sigs = Vec::new(); - let mut gossip_keys = Vec::new(); - let mut gossip_ids = Vec::new(); - - let mut remaining = HashSet::new(); - - if let Some(gossip_signatures) = gossip_signatures { - for vid in validator_ids { - let key = SignatureKey::new(vid, data_root); - if let Some(sig) = gossip_signatures.get(&key) { - gossip_sigs.push(sig.clone()); - gossip_keys.push( - self.validators - .get(vid) - .map(|v| v.attestation_pubkey.clone()) - .context(format!("invalid validator id {vid}"))?, - ); - gossip_ids.push(vid); - } else { - remaining.insert(vid); - } - } - } else { - // No gossip data: all validators need fallback - remaining = validator_ids.iter().copied().collect(); - } - - // If we collected any gossip signatures, create an aggregated proof - if !gossip_ids.is_empty() { - let participants = AggregationBits::from_validator_indices(&gossip_ids); - - let proof = AggregatedSignatureProof::aggregate( - participants.clone(), - gossip_keys, - gossip_sigs, - data_root, - data.slot.0 as u32, - log_inv_rate, - )?; - - results.push(( - AggregatedAttestation { - aggregation_bits: participants, - data: data.clone(), - }, - proof, - )); - } - - // Phase 2: Fallback to block proofs using greedy set-cover. - // Collect all selected proofs as children, then compress into ONE - // recursive proof via aggregate_with_children (spec intent). - if let Some(payloads) = aggregated_payloads { - if !remaining.is_empty() { - if let Some(candidates) = payloads.get(&data_root) { - if !candidates.is_empty() { - let mut phase2_children: Vec<&AggregatedSignatureProof> = Vec::new(); - - loop { - if remaining.is_empty() { - break; - } - - let Some((best_proof, covered_set)) = candidates - .iter() - .map(|proof| { - let proof_validators: HashSet = - proof.get_participant_indices().into_iter().collect(); - let intersection: HashSet = remaining - .intersection(&proof_validators) - .copied() - .collect(); - (proof, intersection) - }) - .max_by_key(|(_, intersection)| intersection.len()) - else { - break; - }; - - if covered_set.is_empty() { - break; - } - - phase2_children.push(best_proof); - for vid in &covered_set { - remaining.remove(vid); - } - } - - if !phase2_children.is_empty() { - if phase2_children.len() == 1 { - // leanSpec state.py: `if len(proofs) == 1: sig = proofs[0]` - // Single proof: pass through directly without re-aggregating. - let single_proof = (*phase2_children[0]).clone(); - let phase2_participants = single_proof.participants.clone(); - - info!( - slot = data.slot.0, - validators = - phase2_children[0].get_participant_indices().len(), - "Phase 2: single proof passed through directly" - ); - results.push(( - AggregatedAttestation { - aggregation_bits: phase2_participants, - data: data.clone(), - }, - single_proof, - )); - } else { - let child_pk_vecs: Vec> = phase2_children - .iter() - .map(|child| { - child - .get_participant_indices() - .into_iter() - .filter_map(|vid| { - self.validators - .get(vid) - .ok() - .map(|v| v.attestation_pubkey.clone()) - }) - .collect() - }) - .collect(); - - let children_arg: Vec<( - &[PublicKey], - &AggregatedSignatureProof, - )> = child_pk_vecs - .iter() - .zip(phase2_children.iter()) - .map(|(pks, proof)| (pks.as_slice(), *proof)) - .collect(); - - let mut phase2_validator_ids: Vec = phase2_children - .iter() - .flat_map(|child| child.get_participant_indices()) - .collect(); - phase2_validator_ids.sort(); - phase2_validator_ids.dedup(); - - let phase2_participants = - AggregationBits::from_validator_indices( - &phase2_validator_ids, - ); - - match AggregatedSignatureProof::aggregate_with_children( - phase2_participants.clone(), - &children_arg, - Vec::::new(), - Vec::::new(), - data_root, - data.slot.0 as u32, - log_inv_rate, - ) { - Ok(proof) => { - info!( - slot = data.slot.0, - children = phase2_children.len(), - validators = phase2_validator_ids.len(), - "Phase 2: recursive block proof via aggregate_with_children" - ); - results.push(( - AggregatedAttestation { - aggregation_bits: phase2_participants, - data: data.clone(), - }, - proof, - )); - } - Err(e) => { - warn!( - error = %e, - "Phase 2 recursive aggregation failed, skipping" - ); - } - } - } - } - } - } - } - } - } - - // Handle empty case - if results.is_empty() { - return Ok((Vec::new(), Vec::new())); + ) -> Result> { + if entries.len() <= 1 { + return Ok(entries); } - // Post-loop compaction: the main loop may have emitted a Phase 1 (gossip) proof - // AND a Phase 2 (children) proof for the same AttestationData. - // Per spec, each AttestationData must appear at most once in the block body — - // merge any such pairs into a single recursive proof via aggregate_with_children. - let mut proof_groups: HashMap< - H256, - Vec<(AggregatedAttestation, AggregatedSignatureProof)>, - > = HashMap::new(); - for (att, proof) in results { - proof_groups - .entry(att.data.hash_tree_root()) - .or_default() - .push((att, proof)); + let mut order: Vec = Vec::new(); + let mut groups: HashMap> = + HashMap::new(); + for (att, proof) in entries { + let dr = att.data.hash_tree_root(); + if !groups.contains_key(&dr) { + order.push(dr); + } + groups.entry(dr).or_default().push((att, proof)); } - let mut compacted: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = Vec::new(); + let mut compacted: Vec<(AggregatedAttestation, AggregatedSignatureProof)> = + Vec::with_capacity(order.len()); - for (data_root, group) in proof_groups { + for data_root in order { + let group = groups + .remove(&data_root) + .expect("group exists for data_root"); if group.len() == 1 { - // Only one proof for this data — no merge needed compacted.extend(group); - } else { - // Multiple proofs (e.g. Phase 1 gossip + Phase 2 children) — - // merge into one recursive proof so each AttestationData appears once. - let data = group[0].0.data.clone(); - - let child_pk_vecs: Vec> = group - .iter() - .map(|(_, proof)| { - proof - .get_participant_indices() - .into_iter() - .filter_map(|vid| { - self.validators - .get(vid) - .ok() - .map(|v| v.attestation_pubkey.clone()) - }) - .collect() - }) - .collect(); + continue; + } - let children_arg: Vec<(&[PublicKey], &AggregatedSignatureProof)> = child_pk_vecs - .iter() - .zip(group.iter()) - .map(|(pks, (_, proof))| (pks.as_slice(), proof)) - .collect(); + let data = group[0].0.data.clone(); - let mut all_validator_ids: Vec = group - .iter() - .flat_map(|(_, proof)| proof.get_participant_indices()) - .collect(); - all_validator_ids.sort(); - all_validator_ids.dedup(); - let all_participants = AggregationBits::from_validator_indices(&all_validator_ids); - - match AggregatedSignatureProof::aggregate_with_children( - all_participants.clone(), - &children_arg, - Vec::::new(), - Vec::::new(), - data_root, - data.slot.0 as u32, - log_inv_rate, - ) { - Ok(merged_proof) => { - info!( - slot = data.slot.0, - children = group.len(), - validators = all_validator_ids.len(), - "Post-loop compaction: merged proofs into recursive proof" - ); - compacted.push(( - AggregatedAttestation { - aggregation_bits: all_participants, - data, - }, - merged_proof, - )); - } - Err(e) => { - warn!( - error = %e, - "Post-loop compaction failed, keeping proofs separate" - ); - compacted.extend(group); - } + let child_pk_vecs: Vec> = group + .iter() + .map(|(_, proof)| { + proof + .get_participant_indices() + .into_iter() + .filter_map(|vid| { + self.validators + .get(vid) + .ok() + .map(|v| v.attestation_pubkey.clone()) + }) + .collect() + }) + .collect(); + + let children_arg: Vec<(&[PublicKey], &AggregatedSignatureProof)> = child_pk_vecs + .iter() + .zip(group.iter()) + .map(|(pks, (_, proof))| (pks.as_slice(), proof)) + .collect(); + + let mut all_validator_ids: Vec = group + .iter() + .flat_map(|(_, proof)| proof.get_participant_indices()) + .collect(); + all_validator_ids.sort(); + all_validator_ids.dedup(); + let all_participants = AggregationBits::from_validator_indices(&all_validator_ids); + + match AggregatedSignatureProof::aggregate_with_children( + all_participants.clone(), + &children_arg, + Vec::::new(), + Vec::::new(), + data_root, + data.slot.0 as u32, + log_inv_rate, + ) { + Ok(merged_proof) => { + info!( + slot = data.slot.0, + children = group.len(), + validators = all_validator_ids.len(), + "compact_proofs_by_data: merged proofs into recursive proof" + ); + compacted.push(( + AggregatedAttestation { + aggregation_bits: all_participants, + data, + }, + merged_proof, + )); + } + Err(e) => { + warn!( + error = %e, + "compact_proofs_by_data: merge failed, keeping proofs separate" + ); + compacted.extend(group); } } } - let (aggregated_attestations, aggregated_proofs): (Vec<_>, Vec<_>) = - compacted.into_iter().unzip(); - - Ok((aggregated_attestations, aggregated_proofs)) + Ok(compacted) } } diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 744e30d..3111eb8 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -1,6 +1,9 @@ +use std::collections::HashSet; + use anyhow::{Context, Result, anyhow, bail, ensure}; use containers::{ AttestationData, SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlock, + State, }; use metrics::METRICS; use ssz::{H256, SszHash}; @@ -8,7 +11,8 @@ use tracing::warn; use crate::block_cache::BlockCache; use crate::store::{ - INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, Store, tick_interval, update_head, + BLOCKS_TO_KEEP, GOSSIP_DISPARITY_INTERVALS, HEAD_RETENTION_SLOTS, INTERVALS_PER_SLOT, + MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, STATES_TO_KEEP, Store, tick_interval, update_head, }; #[inline] @@ -74,15 +78,19 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() head_block.slot.0 ); - // Validate attestation is not too far in the future - // We allow a small margin for clock disparity (1 slot), but no further. - let current_slot = store.time / INTERVALS_PER_SLOT; + // Honest validators emit votes only after their slot has begun. Allow exactly + // one interval (~800 ms) of clock skew between peers; a whole-slot margin would + // let an adversary pre-publish next-slot aggregates ahead of any honest + // validator. Lean analogue of mainnet's MAXIMUM_GOSSIP_CLOCK_DISPARITY. + let attestation_start_interval = data.slot.0 * INTERVALS_PER_SLOT; ensure!( - data.slot.0 <= current_slot + 1, - "Attestation too far in future: attestation slot {} > current slot {} + 1", + attestation_start_interval <= store.time + GOSSIP_DISPARITY_INTERVALS, + "Attestation too far in future: data slot {} (start interval {}) > store time {} + {}", data.slot.0, - current_slot + attestation_start_interval, + store.time, + GOSSIP_DISPARITY_INTERVALS, ); Ok(()) @@ -475,15 +483,21 @@ fn on_attestation_internal( /// 3. Processing attestations included in the block body (on-chain) /// 4. Updating the forkchoice head /// 5. Processing the proposer's attestation (as if gossiped) +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BlockOutcome { + Applied, + AlreadyKnown, +} + pub fn on_block( store: &mut Store, cache: &mut BlockCache, signed_block: SignedBlock, -) -> Result<()> { +) -> Result { let block_root = signed_block.block.hash_tree_root(); if store.blocks.contains_key(&block_root) { - return Ok(()); + return Ok(BlockOutcome::AlreadyKnown); } let parent_root = signed_block.block.parent_root; @@ -498,48 +512,34 @@ pub fn on_block( process_block_internal(store, signed_block, block_root)?; process_pending_blocks(store, cache, vec![block_root]); - Ok(()) + Ok(BlockOutcome::Applied) } -fn process_block_internal( - store: &mut Store, - signed_block: SignedBlock, - block_root: H256, -) -> Result<()> { +/// CPU-bound portion of block processing: verify XMSS signatures against the parent state +/// and run the state transition. Safe to run on a `DedicatedExecutor` thread because it +/// touches no `Store` state. +pub fn verify_and_transition(parent_state: State, signed_block: SignedBlock) -> Result { let _timer = METRICS.get().map(|metrics| { metrics .lean_fork_choice_block_processing_time_seconds .start_timer() }); - let block = signed_block.block.clone(); - let attestations_count = block.body.attestations.len_u64(); - - // Get parent state for validation - let state = store - .states - .get(&block.parent_root) - .ok_or(anyhow!("no parent state"))?; - - // Debug: Log parent state checkpoints before transition - tracing::debug!( - block_slot = block.slot.0, - attestations_in_block = attestations_count, - parent_justified_slot = state.latest_justified.slot.0, - parent_finalized_slot = state.latest_finalized.slot.0, - justified_slots_len = state.justified_slots.0.len(), - "Processing block - parent state info" - ); - - // Verify block signatures against parent state before executing the state transition. - // If any signature is invalid the error propagates and the block is rejected; - // it never enters store.blocks or store.states. - signed_block.verify_signatures(state.clone())?; + signed_block.verify_signatures(parent_state.clone())?; + parent_state.state_transition(signed_block, true) +} - // Execute state transition to get post-state (signatures verified above) - let new_state = state.state_transition(signed_block.clone(), true)?; +/// Store-mutating portion of block processing: must run on the chain task. Inserts the +/// block + post-state, retries pending attestations, advances justification/finalization, +/// processes block-body attestations on-chain, and recomputes the head. +pub fn apply_verified_block( + store: &mut Store, + signed_block: SignedBlock, + new_state: State, + block_root: H256, +) -> Result<()> { + let block = signed_block.block.clone(); - // Debug: Log new state checkpoints after transition tracing::debug!( block_slot = block.slot.0, new_justified_slot = new_state.latest_justified.slot.0, @@ -551,6 +551,17 @@ fn process_block_internal( store.blocks.insert(block_root, block.clone()); store.states.insert(block_root, new_state.clone()); + METRICS.get().map(|m| { + m.grandine_store_blocks_size.set(store.blocks.len() as i64); + m.grandine_store_states_size.set(store.states.len() as i64); + m.grandine_store_gossip_signatures_size + .set(store.gossip_signatures.len() as i64); + m.grandine_store_known_aggregated_payloads_size + .set(store.latest_known_aggregated_payloads.len() as i64); + m.grandine_store_new_aggregated_payloads_size + .set(store.latest_new_aggregated_payloads.len() as i64); + }); + // Retry attestations that arrived before this block was known. // Drain the queue for this root and re-process each attestation. // Attestations that still reference other unknown blocks are re-queued automatically. @@ -642,7 +653,7 @@ fn process_block_internal( .retain(|_, data| data.target.slot.0 > finalized_slot); METRICS.get().map(|m| { m.grandine_attestation_data_by_root - .set(store.attestation_data_by_root.len() as i64) + .set(store.attestation_data_by_root.len() as i64); }); } @@ -718,9 +729,122 @@ fn process_block_internal( update_head(store); + prune_with_retention_bounds(store); + Ok(()) } +/// Defensive retention bounds. Runs unconditionally on every `apply_verified_block` +/// so map growth stays bounded even when `latest_finalized` does not advance — +/// the spec-mandated `prune_stale_attestation_data` is a necessary but not a +/// sufficient bound, since it never fires while finalization is stalled. +fn prune_with_retention_bounds(store: &mut Store) { + let head_slot = store.blocks.get(&store.head).map(|b| b.slot.0).unwrap_or(0); + let keep_min_slot = head_slot.saturating_sub(HEAD_RETENTION_SLOTS); + + let mut protected: HashSet = HashSet::with_capacity(4); + protected.insert(store.latest_finalized.root); + protected.insert(store.latest_justified.root); + protected.insert(store.head); + protected.insert(store.safe_target); + + if store.blocks.len() > BLOCKS_TO_KEEP { + let mut by_slot: Vec<(H256, u64)> = store + .blocks + .iter() + .map(|(root, block)| (*root, block.slot.0)) + .collect(); + by_slot.sort_by_key(|(_, slot)| std::cmp::Reverse(*slot)); + let evict: HashSet = by_slot + .into_iter() + .skip(BLOCKS_TO_KEEP) + .filter(|(root, _)| !protected.contains(root)) + .map(|(root, _)| root) + .collect(); + store.blocks.retain(|root, _| !evict.contains(root)); + } + + if store.states.len() > STATES_TO_KEEP { + let mut by_slot: Vec<(H256, u64)> = store + .states + .iter() + .map(|(root, state)| (*root, state.slot.0)) + .collect(); + by_slot.sort_by_key(|(_, slot)| std::cmp::Reverse(*slot)); + let evict: HashSet = by_slot + .into_iter() + .skip(STATES_TO_KEEP) + .filter(|(root, _)| !protected.contains(root)) + .map(|(root, _)| root) + .collect(); + store.states.retain(|root, _| !evict.contains(root)); + } + + // Three retain calls below read attestation_data_by_root as a secondary index; + // attestation_data_by_root must be pruned last so the lookups can resolve. + let adr = &store.attestation_data_by_root; + store.gossip_signatures.retain(|key, _| { + adr.get(&key.data_root) + .is_none_or(|data| data.target.slot.0 >= keep_min_slot) + }); + store + .latest_known_aggregated_payloads + .retain(|data_root, _| { + adr.get(data_root) + .is_none_or(|data| data.target.slot.0 >= keep_min_slot) + }); + store.latest_new_aggregated_payloads.retain(|data_root, _| { + adr.get(data_root) + .is_none_or(|data| data.target.slot.0 >= keep_min_slot) + }); + store + .attestation_data_by_root + .retain(|_, data| data.target.slot.0 >= keep_min_slot); + + METRICS.get().map(|m| { + m.grandine_store_blocks_size.set(store.blocks.len() as i64); + m.grandine_store_states_size.set(store.states.len() as i64); + m.grandine_store_gossip_signatures_size + .set(store.gossip_signatures.len() as i64); + m.grandine_store_known_aggregated_payloads_size + .set(store.latest_known_aggregated_payloads.len() as i64); + m.grandine_store_new_aggregated_payloads_size + .set(store.latest_new_aggregated_payloads.len() as i64); + m.grandine_attestation_data_by_root + .set(store.attestation_data_by_root.len() as i64); + }); +} + +/// Synchronous wrapper retained for the cascade in `process_pending_blocks` and for tests. +/// The production path on the chain task drives `verify_and_transition` on the +/// `DedicatedExecutor` and `apply_verified_block` on the chain task directly. +fn process_block_internal( + store: &mut Store, + signed_block: SignedBlock, + block_root: H256, +) -> Result<()> { + let block = signed_block.block.clone(); + let attestations_count = block.body.attestations.len_u64(); + + let parent_state = store + .states + .get(&block.parent_root) + .ok_or(anyhow!("no parent state"))? + .clone(); + + tracing::debug!( + block_slot = block.slot.0, + attestations_in_block = attestations_count, + parent_justified_slot = parent_state.latest_justified.slot.0, + parent_finalized_slot = parent_state.latest_finalized.slot.0, + justified_slots_len = parent_state.justified_slots.0.len(), + "Processing block - parent state info" + ); + + let new_state = verify_and_transition(parent_state, signed_block.clone())?; + apply_verified_block(store, signed_block, new_state, block_root) +} + pub fn process_pending_blocks(store: &mut Store, cache: &mut BlockCache, mut roots: Vec) { while let Some(parent_root) = roots.pop() { let children: Vec<(H256, SignedBlock)> = cache diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index 4ed3ea0..fd4b87f 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -2,12 +2,12 @@ use std::collections::{HashMap, HashSet}; use anyhow::{Result, anyhow, ensure}; use containers::{ - AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config, + AggregatedSignatureProof, AttestationData, Block, BlockHeader, Checkpoint, Config, SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlock, Slot, State, }; use metrics::{METRICS, set_gauge_u64}; use ssz::{H256, SszHash}; -use tracing::warn; +use tracing::{info, warn}; use xmss::Signature; pub type Interval = u64; @@ -16,6 +16,11 @@ pub const SECONDS_PER_SLOT: u64 = 4; /// Milliseconds per interval: (4 * 1000) / 5 = 800ms /// Using milliseconds avoids integer division truncation (4/5 = 0 in integer math) pub const MILLIS_PER_INTERVAL: u64 = (SECONDS_PER_SLOT * 1000) / INTERVALS_PER_SLOT; +/// Future-slot tolerance for attestation gossip, in intervals. +/// Bounds the clock skew the time check absorbs when admitting a vote whose +/// slot has not yet started locally. One interval is ~800 ms — the lean +/// analogue of mainnet's MAXIMUM_GOSSIP_CLOCK_DISPARITY. +pub const GOSSIP_DISPARITY_INTERVALS: u64 = 1; /// Forkchoice store tracking chain state and validator attestations @@ -105,6 +110,19 @@ const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; /// for an in-flight state transition. pub const STATE_PRUNE_BUFFER: u64 = 128; +/// Hard upper bound on `store.blocks` entries when finalization stalls. +/// At ~4s slots this is roughly 24h of chain history. +pub const BLOCKS_TO_KEEP: usize = 21_600; + +/// Hard upper bound on `store.states` entries when finalization stalls. +/// At ~4s slots this is roughly 3.3h of state history. +pub const STATES_TO_KEEP: usize = 3_000; + +/// Slot-distance retention for attestation-related maps. Entries whose +/// target.slot falls below `head_slot - HEAD_RETENTION_SLOTS` are evicted +/// regardless of finalization state. +pub const HEAD_RETENTION_SLOTS: u64 = 128; + impl Store { pub fn produce_attestation_data(&self, slot: Slot) -> Result { let head_checkpoint = Checkpoint { @@ -554,10 +572,12 @@ pub struct BlockProductionInputs { pub validator_index: u64, pub head_root: H256, pub head_state: State, - pub available_attestations: Vec, pub known_block_roots: HashSet, - pub gossip_signatures: HashMap, - pub aggregated_payloads: HashMap>, + /// Joined view of `latest_known_aggregated_payloads` keyed by `data_root`, + /// with the `AttestationData` carried in the value. Entries whose + /// `attestation_data_by_root` lookup misses are dropped from this map and + /// counted in `lean_build_block_pool_missing_att_data`. + pub aggregated_payloads: HashMap)>, pub log_inv_rate: usize, pub store_latest_justified: Checkpoint, } @@ -585,55 +605,37 @@ pub fn prepare_block_production( expected_proposer ); - // Step 1: individual attestations already tracked per-validator. - let mut available_attestations: Vec = store - .latest_known_attestations - .iter() - .map(|(validator_idx, attestation_data)| Attestation { - validator_id: *validator_idx, - data: attestation_data.clone(), - }) - .collect(); - - // Step 2: synthesize entries for validators that arrived *only* via - // on_aggregated_attestation. Those validators have proofs in - // latest_known_aggregated_payloads but were never inserted into - // latest_known_attestations, so build_block's fixed-point loop would - // otherwise silently skip them. - { - let known_validators: HashSet = - store.latest_known_attestations.keys().copied().collect(); - let mut seen_synthesized: HashSet<(u64, H256)> = HashSet::new(); - for (data_root, proofs) in &store.latest_known_aggregated_payloads { - if let Some(att_data) = store.attestation_data_by_root.get(data_root) { - for proof in proofs { - for vid in proof.participants.to_validator_indices() { - if !known_validators.contains(&vid) - && seen_synthesized.insert((vid, *data_root)) - { - available_attestations.push(Attestation { - validator_id: vid, - data: att_data.clone(), - }); - } - } - } - } + // Join `latest_known_aggregated_payloads` (proofs only, keyed by data_root) + // with `attestation_data_by_root` (the secondary index storing the + // AttestationData itself) into the spec-shaped pool unit consumed by + // build_block. Entries whose secondary-index lookup misses are dropped and + // counted; this keeps the proposer aligned with leanSpec/zeam/ethlambda + // which carry att_data inside the pool value. + let mut aggregated_payloads: HashMap)> = + HashMap::with_capacity(store.latest_known_aggregated_payloads.len()); + let mut missing_att_data: u64 = 0; + for (data_root, proofs) in &store.latest_known_aggregated_payloads { + if let Some(att_data) = store.attestation_data_by_root.get(data_root) { + aggregated_payloads.insert(*data_root, (att_data.clone(), proofs.clone())); + } else { + missing_att_data += 1; + } + } + if missing_att_data > 0 { + if let Some(m) = METRICS.get() { + m.lean_build_block_pool_missing_att_data + .inc_by(missing_att_data); } } let known_block_roots: HashSet = store.blocks.keys().copied().collect(); - let gossip_signatures = store.gossip_signatures.clone(); - let aggregated_payloads = store.latest_known_aggregated_payloads.clone(); Ok(BlockProductionInputs { slot, validator_index, head_root, head_state, - available_attestations, known_block_roots, - gossip_signatures, aggregated_payloads, log_inv_rate, store_latest_justified: store.latest_justified.clone(), @@ -648,27 +650,47 @@ pub fn execute_block_production( validator_index, head_root, head_state, - available_attestations, known_block_roots, - gossip_signatures, aggregated_payloads, log_inv_rate, store_latest_justified, } = inputs; + let pool_known_payloads = aggregated_payloads.len(); + let pool_known_payloads_proofs: usize = aggregated_payloads + .values() + .map(|(_, proofs)| proofs.len()) + .sum(); + let pool_known_block_roots = known_block_roots.len(); + + info!( + slot = slot.0, + proposer = validator_index, + head_root = %head_root, + pool_known_payloads, + pool_known_payloads_proofs, + pool_known_block_roots, + "proposer pool snapshot" + ); + let (final_block, final_post_state, _aggregated_attestations, signatures) = head_state .build_block( slot, validator_index, head_root, - None, - Some(available_attestations), - Some(&known_block_roots), - Some(&gossip_signatures), - Some(&aggregated_payloads), + &known_block_roots, + &aggregated_payloads, log_inv_rate, )?; + info!( + slot = slot.0, + proposer = validator_index, + block_attestations = final_block.body.attestations.len_usize(), + block_signatures = signatures.len(), + "proposer block built" + ); + ensure!( final_post_state.latest_justified.slot >= store_latest_justified.slot, "Produced block justified={} < store justified={}. Fixed-point attestation loop did not converge.", diff --git a/lean_client/fork_choice/tests/unit_tests/validator.rs b/lean_client/fork_choice/tests/unit_tests/validator.rs index 791d4a0..dbee9fa 100644 --- a/lean_client/fork_choice/tests/unit_tests/validator.rs +++ b/lean_client/fork_choice/tests/unit_tests/validator.rs @@ -6,8 +6,8 @@ use std::collections::HashMap; use crate::unit_tests::common::create_test_store; use containers::{ - Attestation, AttestationData, Block, BlockBody, Checkpoint, Config, SignatureKey, SignedBlock, - Slot, State, Validator, + AggregatedSignatureProof, AggregationBits, Attestation, AttestationData, Block, BlockBody, + Checkpoint, Config, SignedBlock, Slot, State, Validator, }; use fork_choice::store::{Store, get_forkchoice_store, produce_block_with_signatures, update_head}; use rand::SeedableRng; @@ -15,6 +15,65 @@ use rand_chacha::ChaChaRng; use ssz::{H256, SszHash}; use xmss::SecretKey; +/// Build an `AggregatedSignatureProof` for the given validator set on the +/// given AttestationData, then publish it into the proposer's input pool +/// (`store.latest_known_aggregated_payloads` + `store.attestation_data_by_root`). +/// Mirrors what an aggregator's `maybe_aggregate` would have done at runtime. +fn publish_aggregated_payload( + store: &mut Store, + data: &AttestationData, + validator_ids: &[u64], + keys: &HashMap, +) { + let data_root = data.hash_tree_root(); + let head_state = store + .states + .get(&store.head) + .expect("head state must exist"); + + let pubkeys: Vec<_> = validator_ids + .iter() + .map(|&vid| { + head_state + .validators + .get(vid) + .expect("validator index out of range") + .attestation_pubkey + .clone() + }) + .collect(); + + let signatures: Vec<_> = validator_ids + .iter() + .map(|&vid| { + keys.get(&vid) + .expect("missing secret key") + .sign(data_root, data.slot.0 as u32) + .expect("XMSS signing failed") + }) + .collect(); + + let participants = AggregationBits::from_validator_indices(validator_ids); + let proof = AggregatedSignatureProof::aggregate( + participants, + pubkeys, + signatures, + data_root, + data.slot.0 as u32, + 1, + ) + .expect("AggregatedSignatureProof::aggregate failed"); + + store + .attestation_data_by_root + .insert(data_root, data.clone()); + store + .latest_known_aggregated_payloads + .entry(data_root) + .or_default() + .push(proof); +} + fn create_test_store_with_signers() -> (Store, HashMap) { let config = Config { genesis_time: 1000 }; @@ -104,29 +163,16 @@ fn test_produce_block_with_attestations() { }; let target = store.get_attestation_target(); - // Add attestations for validators 5 and 6 - for vid in [5u64, 6] { - let data = AttestationData { - slot: head_block.slot, - head: head_checkpoint.clone(), - target: target.clone(), - source: store.latest_justified.clone(), - }; - store.latest_known_attestations.insert(vid, data.clone()); - - let data_root = data.hash_tree_root(); - let sig_key = SignatureKey { - validator_id: vid, - data_root: data_root.clone(), - }; - store.gossip_signatures.insert( - sig_key, - keys.get(&vid) - .unwrap() - .sign(data_root, head_block.slot.0 as u32) - .unwrap(), - ); - } + // Publish a single aggregated payload covering validators 5 and 6 — this + // mirrors what an aggregator's `maybe_aggregate` would publish via gossip + // and what `on_aggregated_attestation` would land in the proposer's pool. + let data = AttestationData { + slot: head_block.slot, + head: head_checkpoint, + target, + source: store.latest_justified.clone(), + }; + publish_aggregated_payload(&mut store, &data, &[5u64, 6], &keys); let slot = Slot(2); let validator_idx = 2; @@ -234,7 +280,8 @@ fn test_produce_block_empty_attestations() { fn test_produce_block_state_consistency() { let (mut store, keys) = create_test_store_with_signers(); - // Add an attestation for validator 7 + // Publish an aggregated payload for validator 7. Same shape as a real + // aggregator-published payload feeding the proposer's pool. let head_block = store.blocks[&store.head].clone(); let head_checkpoint = Checkpoint { root: store.head, @@ -247,18 +294,7 @@ fn test_produce_block_state_consistency() { target, source: store.latest_justified.clone(), }; - store.latest_known_attestations.insert(7, data.clone()); - let sig_key = SignatureKey { - validator_id: 7, - data_root: data.hash_tree_root(), - }; - store.gossip_signatures.insert( - sig_key, - keys.get(&7) - .unwrap() - .sign(data.hash_tree_root(), head_block.slot.0 as u32) - .unwrap(), - ); + publish_aggregated_payload(&mut store, &data, &[7u64], &keys); let slot = Slot(4); let validator_idx = 4; diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index 78362c9..5b72f34 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -166,6 +166,60 @@ pub struct Metrics { /// XMSS verifications skipped because the signature was already cached pub grandine_xmss_verify_skipped_total: IntCounter, + pub grandine_chain_message_channel_depth: IntGauge, + pub grandine_validator_chain_message_channel_depth: IntGauge, + pub grandine_verify_result_channel_depth: IntGauge, + pub grandine_cpu_normal_executor_tasks_in_flight: IntGauge, + + /// Wall-clock time of the aggregation snapshot deep-clone. After V2, the + /// clone runs on the aggregation worker thread (not the chain task), so this + /// no longer reflects chain-task wall-clock; it reflects how expensive a + /// single Store deep-clone is at the moment the aggregation worker picks up + /// a trigger. + pub lean_aggregation_snapshot_clone_seconds: Histogram, + + /// Total number of times the chain task triggered an aggregation snapshot. + /// Compare against `lean_aggregation_snapshot_clone_seconds_count` to derive + /// dropped-trigger count (watch channel overwrites unconsumed values). + pub lean_aggregation_snapshots_triggered_total: IntCounter, + + /// Number of payload entries dropped at proposal time because the + /// `attestation_data_by_root` secondary index has no AttestationData for + /// the data_root present in `latest_known_aggregated_payloads`. Drift + /// between the two maps would silently shrink the proposer's pool. + pub lean_build_block_pool_missing_att_data: IntCounter, + + /// Snapshot size at clone time, measured in total entries across the largest + /// Store maps (blocks + states + gossip_signatures + known_aggregated_payloads + /// + new_aggregated_payloads + attestation_data_by_root). Used to correlate + /// snapshot wall-clock and aggregator memory growth with chain length. + pub lean_aggregation_snapshot_size_entries: Histogram, + + /// Number of aggregation snapshots currently held in memory by the worker + /// thread (inc when `spawn_blocking` task starts, dec when it returns). + /// Should oscillate 0..=1 with the watch-channel design; sustained values + /// of 1 mean the worker is continuously busy (XMSS slower than slot rate). + pub lean_aggregation_in_flight_snapshots: IntGauge, + + /// Time the chain task spends processing one `ChainMessage` (block, + /// attestation, aggregated attestation, etc.). Captures total work done + /// inside the `chain_message_receiver.recv() => { … }` select arm body + /// regardless of message kind. Bimodal distribution expected: spawn-only + /// path for blocks vs full write-locked attestation processing. + pub lean_chain_task_chain_message_seconds: Histogram, + + /// Time the chain task spends inside the `verify_result_rx.recv() => { … }` + /// arm body — i.e. Phase 3 apply work (apply_verified_block + post-apply + /// bookkeeping + cascade respawn). Used to compare apply cost against the + /// snapshot/message-processing costs. + pub lean_chain_task_apply_seconds: Histogram, + pub grandine_store_blocks_size: IntGauge, + pub grandine_store_states_size: IntGauge, + pub grandine_store_gossip_signatures_size: IntGauge, + pub grandine_store_known_aggregated_payloads_size: IntGauge, + pub grandine_store_new_aggregated_payloads_size: IntGauge, + pub grandine_pending_blocks_by_root_size: IntGauge, + pub lean_block_building_time_seconds: Histogram, pub lean_block_building_payload_aggregation_time_seconds: Histogram, pub lean_block_aggregated_payloads: Histogram, @@ -431,6 +485,82 @@ impl Metrics { "grandine_xmss_verify_skipped_total", "XMSS verifications skipped (signature already cached) — root cause 4 indicator", )?, + grandine_chain_message_channel_depth: IntGauge::new( + "grandine_chain_message_channel_depth", + "Pending ChainMessage queue depth", + )?, + grandine_validator_chain_message_channel_depth: IntGauge::new( + "grandine_validator_chain_message_channel_depth", + "Pending ValidatorChainMessage queue depth", + )?, + grandine_verify_result_channel_depth: IntGauge::new( + "grandine_verify_result_channel_depth", + "Pending verify-result queue depth (verified blocks awaiting apply on chain task)", + )?, + grandine_cpu_normal_executor_tasks_in_flight: IntGauge::new( + "grandine_cpu_normal_executor_tasks_in_flight", + "Active tasks on cpu_normal DedicatedExecutor (XMSS block verify + attestation signing combined)", + )?, + lean_aggregation_snapshot_clone_seconds: Histogram::with_opts(histogram_opts!( + "lean_aggregation_snapshot_clone_seconds", + "Wall-clock time of the aggregation snapshot deep-clone (runs on aggregation worker thread)", + vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0], + ))?, + lean_aggregation_snapshots_triggered_total: IntCounter::new( + "lean_aggregation_snapshots_triggered_total", + "Total aggregation snapshot triggers issued by the chain task (aggregator nodes only)", + )?, + lean_build_block_pool_missing_att_data: IntCounter::new( + "lean_build_block_pool_missing_att_data", + "Total payload entries dropped at proposal time because attestation_data_by_root has no entry for the data_root", + )?, + lean_aggregation_snapshot_size_entries: Histogram::with_opts(histogram_opts!( + "lean_aggregation_snapshot_size_entries", + "Total entries across all major Store maps at snapshot clone time", + vec![ + 100.0, 500.0, 1000.0, 5000.0, 10000.0, 50000.0, 100000.0, 500000.0 + ], + ))?, + lean_aggregation_in_flight_snapshots: IntGauge::new( + "lean_aggregation_in_flight_snapshots", + "Snapshots currently held by the aggregation spawn_blocking worker (0 or 1 expected)", + )?, + lean_chain_task_chain_message_seconds: Histogram::with_opts(histogram_opts!( + "lean_chain_task_chain_message_seconds", + "Wall-clock time the chain task spends inside one ChainMessage select-arm body", + vec![ + 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0 + ], + ))?, + lean_chain_task_apply_seconds: Histogram::with_opts(histogram_opts!( + "lean_chain_task_apply_seconds", + "Wall-clock time the chain task spends inside one verify_result (Phase 3 apply) select-arm body", + vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0], + ))?, + grandine_store_blocks_size: IntGauge::new( + "grandine_store_blocks_size", + "Entries in store.blocks", + )?, + grandine_store_states_size: IntGauge::new( + "grandine_store_states_size", + "Entries in store.states", + )?, + grandine_store_gossip_signatures_size: IntGauge::new( + "grandine_store_gossip_signatures_size", + "Entries in store.gossip_signatures", + )?, + grandine_store_known_aggregated_payloads_size: IntGauge::new( + "grandine_store_known_aggregated_payloads_size", + "Entries in latest_known_aggregated_payloads", + )?, + grandine_store_new_aggregated_payloads_size: IntGauge::new( + "grandine_store_new_aggregated_payloads_size", + "Entries in latest_new_aggregated_payloads", + )?, + grandine_pending_blocks_by_root_size: IntGauge::new( + "grandine_pending_blocks_by_root_size", + "In-flight BlocksByRoot requests", + )?, // Block Production Metrics lean_block_building_time_seconds: Histogram::with_opts(histogram_opts!( @@ -616,6 +746,39 @@ impl Metrics { ))?; default_registry.register(Box::new(self.grandine_fork_choice_new_attestations.clone()))?; default_registry.register(Box::new(self.grandine_xmss_verify_skipped_total.clone()))?; + default_registry.register(Box::new(self.grandine_chain_message_channel_depth.clone()))?; + default_registry.register(Box::new( + self.grandine_validator_chain_message_channel_depth.clone(), + ))?; + default_registry.register(Box::new(self.grandine_verify_result_channel_depth.clone()))?; + default_registry.register(Box::new( + self.grandine_cpu_normal_executor_tasks_in_flight.clone(), + ))?; + default_registry.register(Box::new( + self.lean_aggregation_snapshot_clone_seconds.clone(), + ))?; + default_registry.register(Box::new( + self.lean_aggregation_snapshots_triggered_total.clone(), + ))?; + default_registry.register(Box::new( + self.lean_build_block_pool_missing_att_data.clone(), + ))?; + default_registry.register(Box::new( + self.lean_aggregation_snapshot_size_entries.clone(), + ))?; + default_registry.register(Box::new(self.lean_aggregation_in_flight_snapshots.clone()))?; + default_registry.register(Box::new(self.lean_chain_task_chain_message_seconds.clone()))?; + default_registry.register(Box::new(self.lean_chain_task_apply_seconds.clone()))?; + default_registry.register(Box::new(self.grandine_store_blocks_size.clone()))?; + default_registry.register(Box::new(self.grandine_store_states_size.clone()))?; + default_registry.register(Box::new(self.grandine_store_gossip_signatures_size.clone()))?; + default_registry.register(Box::new( + self.grandine_store_known_aggregated_payloads_size.clone(), + ))?; + default_registry.register(Box::new( + self.grandine_store_new_aggregated_payloads_size.clone(), + ))?; + default_registry.register(Box::new(self.grandine_pending_blocks_by_root_size.clone()))?; // Block Production Metrics default_registry.register(Box::new(self.lean_block_building_time_seconds.clone()))?; diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index da24778..a024037 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -49,12 +49,14 @@ use crate::{ }, }; -const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 3; +const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 10; const MAX_BLOCK_FETCH_DEPTH: u32 = 65536; const MAX_BLOCKS_PER_REQUEST: usize = 10; /// Stalled request timeout. If a peer accepts the stream but never sends a response, /// the request is cancelled and retried with a different peer after this duration. -const BLOCKS_BY_ROOT_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); +/// Set comfortably above libp2p's default protocol timeout (10s) so the app-layer +/// gives the underlying stream room to complete under host CPU contention. +const BLOCKS_BY_ROOT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); struct PendingBlocksRequest { roots: Vec, @@ -774,6 +776,10 @@ where } => { let pending = self.pending_blocks_by_root.remove(&request_id); let request_depth = pending.as_ref().map(|p| p.depth).unwrap_or(0); + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); // Release in-flight tracking so these roots can be re-requested if needed. // Retry paths re-add them via send_blocks_by_root_request_internal. @@ -957,6 +963,10 @@ where } => { warn!(peer = %peer, ?error, "BlocksByRoot outbound request failed"); if let Some(req) = self.pending_blocks_by_root.remove(&request_id) { + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); // Release in-flight tracking before retry; retry re-adds via send_internal. for root in &req.roots { self.in_flight_roots.remove(root); @@ -1025,6 +1035,10 @@ where for request_id in timed_out { if let Some(req) = self.pending_blocks_by_root.remove(&request_id) { + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); warn!( num_roots = req.roots.len(), depth = req.depth, @@ -1417,6 +1431,10 @@ where created_at: tokio::time::Instant::now(), }, ); + METRICS.get().map(|m| { + m.grandine_pending_blocks_by_root_size + .set(self.pending_blocks_by_root.len() as i64) + }); } fn build_behaviour( diff --git a/lean_client/networking/src/req_resp.rs b/lean_client/networking/src/req_resp.rs index 7fe9c0b..933347d 100644 --- a/lean_client/networking/src/req_resp.rs +++ b/lean_client/networking/src/req_resp.rs @@ -1,5 +1,6 @@ use std::io; use std::io::{Read, Write}; +use std::time::Duration; use async_trait::async_trait; use containers::{SignedBlock, Status}; @@ -294,6 +295,17 @@ impl LeanCodec { // 3-byte LE length field (bytes 1..=3 of header) let chunk_len = u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], 0]) as usize; + if pos + 4 + chunk_len > data.len() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "snappy chunk_len {} at pos {} exceeds buffer len {}", + chunk_len, + pos, + data.len() + ), + )); + } pos += 4 + chunk_len; } @@ -560,7 +572,12 @@ pub fn build(protocols: impl IntoIterator) -> ReqResp { .map(|name| (LeanProtocol(name), ProtocolSupport::Full)) .collect::>(); - RequestResponse::with_codec(LeanCodec::default(), protocols, Config::default()) + // libp2p Config::default() sets request_timeout to 10s. Under host CPU + // contention, lean's tokio worker can be late polling the stream and the + // protocol layer kills the request before our app-level retry logic gets + // a chance to react. Raise it well above the app-layer 30s timeout. + let config = Config::default().with_request_timeout(Duration::from_secs(60)); + RequestResponse::with_codec(LeanCodec::default(), protocols, config) } /// Build a RequestResponse behavior for Status protocol only diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index 9c13ac8..9881ed5 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -250,10 +250,18 @@ pub trait ChainMessageSink: Send + Sync + Clone { } #[async_trait] -impl ChainMessageSink for mpsc::UnboundedSender { +impl ChainMessageSink for mpsc::Sender { async fn send(&self, message: M) -> Result<()> { - self.send(message) - .map_err(|err| anyhow!("failed to send message to chain: {err}")) + let result = self + .send(message) + .await + .map_err(|err| anyhow!("failed to send message to chain: {err}")); + if result.is_ok() { + METRICS.get().map(|m| { + m.grandine_chain_message_channel_depth.inc(); + }); + } + result } } diff --git a/lean_client/src/aggregation.rs b/lean_client/src/aggregation.rs index d4e527b..4a86171 100644 --- a/lean_client/src/aggregation.rs +++ b/lean_client/src/aggregation.rs @@ -1,8 +1,11 @@ use std::collections::HashSet; use std::sync::Arc; +use std::time::Instant; use containers::{SignedAggregatedAttestation, Slot}; use fork_choice::store::Store; +use metrics::METRICS; +use parking_lot::RwLock; use ssz::H256; use tokio::sync::{mpsc, watch}; use tokio::task; @@ -10,11 +13,18 @@ use validator::ValidatorService; /// Aggregation service that decouples XMSS aggregation from the chain task. /// -/// Owns a `watch` channel for receiving store snapshots (always latest value) -/// and an `mpsc` channel for returning results. The caller drives the service -/// through [`trigger`] and [`poll`] instead of managing raw channel ends. +/// Owns: +/// - a `watch` channel carrying just the slot for which to aggregate (always +/// latest value); +/// - a shared `Arc>` that the worker reads from briefly to clone +/// a snapshot on its own thread (V2 — keeps the chain task off the deep-clone); +/// - an `mpsc` channel for returning results. +/// +/// The caller drives the service through [`trigger`] and [`poll`] instead of +/// managing raw channel ends. pub struct AggregationService { - agg_tx: watch::Sender>, + vs: Arc, + agg_tx: watch::Sender>, res_rx: mpsc::Receiver<( u64, Option<(Vec, HashSet)>, @@ -23,43 +33,86 @@ pub struct AggregationService { impl AggregationService { /// Creates the service and spawns the background aggregation task. - pub fn new(vs: Arc, log_rate: usize) -> Self { - let (agg_tx, mut agg_rx) = watch::channel::>(None); + /// + /// The worker holds an `Arc>` reference so it can read + + /// clone the snapshot on its own thread instead of forcing the chain task + /// to do the deep-clone. + pub fn new(vs: Arc, store: Arc>, log_rate: usize) -> Self { + let (agg_tx, mut agg_rx) = watch::channel::>(None); let (res_tx, res_rx) = mpsc::channel::<( u64, Option<(Vec, HashSet)>, )>(4); + let vs_for_worker = vs.clone(); + task::spawn(async move { loop { if agg_rx.changed().await.is_err() { break; // sender dropped — chain task shut down } - let Some((slot, snapshot)) = agg_rx.borrow_and_update().clone() else { + let Some(slot) = *agg_rx.borrow_and_update() else { continue; }; - let vs = vs.clone(); + let vs = vs_for_worker.clone(); + let store = store.clone(); + METRICS + .get() + .map(|m| m.lean_aggregation_in_flight_snapshots.inc()); let result = task::spawn_blocking(move || { + // Clone on this worker thread: brief read lock held only for + // the duration of the clone, then released before XMSS work. + let clone_start = Instant::now(); + let snapshot = { + let guard = store.read(); + guard.clone() + }; + let clone_elapsed = clone_start.elapsed(); + + METRICS.get().map(|m| { + m.lean_aggregation_snapshot_clone_seconds + .observe(clone_elapsed.as_secs_f64()); + let entries = snapshot.blocks.len() + + snapshot.states.len() + + snapshot.gossip_signatures.len() + + snapshot.latest_known_aggregated_payloads.len() + + snapshot.latest_new_aggregated_payloads.len() + + snapshot.attestation_data_by_root.len(); + m.lean_aggregation_snapshot_size_entries + .observe(entries as f64); + }); + vs.maybe_aggregate(&snapshot, Slot(slot), log_rate) }) .await .unwrap_or(None); + METRICS + .get() + .map(|m| m.lean_aggregation_in_flight_snapshots.dec()); if res_tx.send((slot, result)).await.is_err() { break; // chain task dropped — shut down } } }); - Self { agg_tx, res_rx } + Self { vs, agg_tx, res_rx } + } + + /// Returns true if this node should aggregate for the given slot. Used by + /// the chain task to gate the trigger so non-aggregator nodes never enqueue + /// snapshot work. + pub fn is_aggregator_for_slot(&self, slot: Slot) -> bool { + self.vs.is_aggregator_for_slot(slot) } - /// Triggers aggregation for the given slot with the provided store snapshot. + /// Triggers aggregation for the given slot. The worker reads + clones the + /// store on its own thread. /// - /// Uses watch semantics: if a previous trigger has not been consumed yet, - /// it is overwritten with the latest value so XMSS always works on the - /// most recent state. - pub fn trigger(&self, slot: u64, snapshot: Store) { - let _ = self.agg_tx.send(Some((slot, snapshot))); + /// Watch semantics: if a previous trigger has not been consumed yet, it is + /// overwritten with the latest slot so XMSS always works on the most recent + /// trigger. + pub fn trigger(&self, slot: u64) { + let _ = self.agg_tx.send(Some(slot)); } /// Returns the next completed aggregation result, or `None` if none is ready. diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index da2e899..ac89f74 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -4,12 +4,14 @@ use containers::{ Block, BlockBody, BlockHeader, BlockSignatures, Checkpoint, Config, SignedBlock, Slot, State, Status, Validator, }; +use dedicated_executor::DedicatedExecutor; use ethereum_types::H256; use features::Feature; use fork_choice::{ block_cache::BlockCache, handlers::{ - on_aggregated_attestation, on_attestation, on_block, on_gossip_attestation, on_tick, + apply_verified_block, on_aggregated_attestation, on_attestation, on_gossip_attestation, + on_tick, verify_and_transition, }, store::{ INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, execute_block_production, @@ -401,6 +403,12 @@ async fn main() -> Result<()> { ) .init(); + info!( + "Starting grandine v{} ({})", + env!("CARGO_PKG_VERSION"), + env!("GRANDINE_GIT_COMMIT_HASH"), + ); + let args = Args::parse(); for feature in args.features { @@ -435,14 +443,39 @@ async fn main() -> Result<()> { let (outbound_p2p_sender, outbound_p2p_receiver) = mpsc::unbounded_channel::(); - let (chain_message_sender, mut chain_message_receiver) = - mpsc::unbounded_channel::(); + let (chain_message_sender, mut chain_message_receiver) = mpsc::channel::(1024); // Separate channel for validator task → chain task request-response messages. // Keeps ValidatorChainMessage (which carries oneshot senders) separate from // ChainMessage (which is Clone and used by the network layer). let (validator_chain_sender, mut validator_chain_receiver) = mpsc::unbounded_channel::(); + let num_cpus = num_cpus::get(); + let cpu_normal_executor = Arc::new(DedicatedExecutor::new( + "lean-cpu-normal", + (num_cpus / 2).max(2), + None, + )); + let cpu_low_executor = Arc::new(DedicatedExecutor::new( + "lean-cpu-low", + (num_cpus / 4).max(1), + Some(19), + )); + + // Verified blocks travel back from the executor to the chain task on this channel. + // Tuple: (block_root, signed_block, should_gossip, verify outcome). + // `should_gossip` is threaded so Phase 3 can rebroadcast only blocks that arrived + // with should_gossip=true AND applied successfully (Bug 1 invariant). + // tx side increments `grandine_verify_result_channel_depth`; rx decrements after recv. + // + // Shutdown semantics: if the chain task exits, `verify_result_rx` is dropped and + // any in-flight `tx.send()` returns Err — the spawned forwarder rolls back the + // gauge inc and exits. The verify itself runs to completion on the executor + // (output discarded). `DedicatedExecutor::Drop` joins worker threads, so no + // verify thread leaks past process shutdown. + let (verify_result_tx, mut verify_result_rx) = + mpsc::channel::<(H256, SignedBlock, bool, Result)>(1024); + let (genesis_time, validators, genesis_log_inv_rate, genesis_attestation_committee_count) = if let Some(genesis_path) = &args.genesis { let genesis_config = containers::GenesisConfig::load_from_file(genesis_path) @@ -595,6 +628,7 @@ async fn main() -> Result<()> { config.clone(), num_validators, keys_path, + cpu_normal_executor.clone(), args.is_aggregator, ) { Ok(service) => { @@ -615,6 +649,7 @@ async fn main() -> Result<()> { Some(ValidatorService::new_with_aggregator( config, num_validators, + cpu_normal_executor.clone(), args.is_aggregator, )) } @@ -627,6 +662,7 @@ async fn main() -> Result<()> { Some(ValidatorService::new_with_aggregator( config, num_validators, + cpu_normal_executor.clone(), args.is_aggregator, )) } @@ -640,6 +676,7 @@ async fn main() -> Result<()> { Some(ValidatorService::new_with_aggregator( config, num_validators, + cpu_normal_executor.clone(), args.is_aggregator, )) } @@ -831,6 +868,7 @@ async fn main() -> Result<()> { "Chain message channel closed during anchor block wait" )); }; + METRICS.get().map(|m| m.grandine_chain_message_channel_depth.dec()); if let ChainMessage::ProcessBlock { signed_block, .. } = msg { let root = signed_block.block.hash_tree_root(); if root == expected_root { @@ -923,6 +961,15 @@ async fn main() -> Result<()> { root: s.head, slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), }; + + METRICS.get().map(|m| { + if let Ok(j) = i64::try_from(s.latest_justified.slot.0) { + m.lean_latest_justified_slot.set(j); + } + if let Ok(f) = i64::try_from(s.latest_finalized.slot.0) { + m.lean_latest_finalized_slot.set(f); + } + }); } let chain_outbound_sender = outbound_p2p_sender.clone(); @@ -971,9 +1018,13 @@ async fn main() -> Result<()> { // watch channel = lossless latest-value semantics: send() always overwrites so // the aggregation task always sees the most recent trigger even if XMSS was // still running when a newer slot arrived. No trigger is ever silently dropped. - let has_aggregator = vs_for_chain.is_some(); - let mut aggregator = - vs_for_chain.map(|vs| aggregation::AggregationService::new(vs, chain_log_inv_rate)); + let mut aggregator = vs_for_chain + .map(|vs| aggregation::AggregationService::new(vs, store.clone(), chain_log_inv_rate)); + // Whether this node has any validator service at all. Used for sync-state + // coloring; NOT a per-slot aggregator gate. The per-slot aggregator check + // lives at the snapshot trigger site below and goes through + // `AggregationService::is_aggregator_for_slot`. + let has_aggregator = aggregator.is_some(); // Channel for the chain task to signal block arrival to the validator task. let (block_slot_tx, block_slot_rx) = watch::channel::(0); @@ -1012,6 +1063,13 @@ async fn main() -> Result<()> { let has_proposal = target_interval % INTERVALS_PER_SLOT == 0; on_tick(&mut *store.write(), now_millis, has_proposal); + // Sample the cpu_normal executor's queue depth once per tick. + // Counts both XMSS block verify and attestation signing tasks. + METRICS.get().map(|m| { + m.grandine_cpu_normal_executor_tasks_in_flight + .set(cpu_normal_executor.tasks() as i64) + }); + let (current_slot, current_interval) = { let s = store.read(); (s.time / INTERVALS_PER_SLOT, s.time % INTERVALS_PER_SLOT) @@ -1059,13 +1117,25 @@ async fn main() -> Result<()> { // after a long block-processing burst) → trigger still fires for // the current slot, mirroring zeam's explicit catch-up loop. // last_agg_slot prevents double-firing within the same slot. - if has_aggregator && current_interval >= 2 && current_slot > last_agg_slot { - last_agg_slot = current_slot; - let snapshot = store.read().clone(); - if let Some(ref agg) = aggregator { - agg.trigger(current_slot, snapshot); + // Only trigger if this node is the aggregator for this slot. + // Previously gated on `has_aggregator` (any-validator-service), + // which made every validator node deep-clone the Store every + // slot — pure waste on non-aggregators. The per-slot check via + // AggregationService::is_aggregator_for_slot routes through the + // ValidatorService AtomicBool flag, so admin-API runtime toggles + // are still honored. + if let Some(ref agg) = aggregator { + if agg.is_aggregator_for_slot(Slot(current_slot)) + && current_interval >= 2 + && current_slot > last_agg_slot + { + last_agg_slot = current_slot; + METRICS.get().map(|m| { + m.lean_aggregation_snapshots_triggered_total.inc() + }); + agg.trigger(current_slot); + info!(slot = current_slot, "Aggregation phase - triggered"); } - info!(slot = current_slot, "Aggregation phase - triggered"); } match current_interval { @@ -1093,8 +1163,180 @@ async fn main() -> Result<()> { let nf = *network_finalized_slot.lock(); evaluate_sync_state(&mut sync_state, peers, head_slot, nf); } + // Phase 3 (apply) is placed ABOVE `chain_message_receiver.recv()` in the + // biased select so completed verifies always drain before we accept new + // chain messages. Without this priority, a backlog of incoming gossip + // would starve apply: chain_message would always be ready, biased polling + // would pick it every iteration, and verified blocks would never land in + // store.blocks (observed live: depth 200+ growing, store_blocks_size frozen). + result = verify_result_rx.recv() => { + let Some((block_root, signed_block, should_gossip, outcome)) = result else { break }; + METRICS + .get() + .map(|m| m.grandine_verify_result_channel_depth.dec()); + // RAII timer for Phase 3 apply wall-clock; records on Drop so + // all `continue` early-exits are captured automatically. + let _apply_timer = METRICS + .get() + .map(|m| m.lean_chain_task_apply_seconds.start_timer()); + + // Skip if another path already applied this block (e.g. cascade race). + if store.read().blocks.contains_key(&block_root) { + continue; + } + + // Skip if parent state was pruned by finalization while verify ran. + // `apply_verified_block` retains finalized + buffer states only; if + // finalization advanced past this block's parent during the verify + // window, the post-state we just received is no longer applicable. + // Drop it; if the block becomes head-relevant it will be re-fetched + // via BlocksByRoot. + let parent_root = signed_block.block.parent_root; + if !store.read().states.contains_key(&parent_root) { + warn!( + block_root = %format!("0x{:x}", block_root), + "Parent state pruned during verify, dropping result" + ); + continue; + } + + let block_slot = signed_block.block.slot; + + match outcome { + Ok(new_state) => { + let apply_result = apply_verified_block( + &mut *store.write(), + signed_block.clone(), + new_state, + block_root, + ); + + match apply_result { + Ok(()) => { + info!("Block processed successfully"); + let _ = block_slot_tx.send(block_slot.0); + + { + let s = store.read(); + let mut status = status_provider.write(); + status.finalized = s.latest_finalized.clone(); + status.head = Checkpoint { + root: s.head, + slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), + }; + } + + // Bug 1 invariant: rebroadcast only on Apply (apply_verified_block + // is reached only when the dedup guard above missed, i.e. the + // block is new to us). The original ChainMessage carried + // should_gossip; cascade respawns will set it to false. + if should_gossip { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::GossipBlock(signed_block.clone()) + ) { + warn!("Failed to gossip block: {}", e); + } else { + info!(slot = block_slot.0, "Broadcasted block"); + } + } + + let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; + let nf = *network_finalized_slot.lock(); + check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); + + // Cascade: orphans waiting on this block can now verify. + // Spawn each cached child against the just-applied state and + // route the result back through `verify_result_tx`. The reply + // re-enters this same arm, so grandchildren are picked up + // automatically without going through `chain_message_sender` + // (avoids burst pressure on the bounded chain channel). + let cascade_children: Vec<(H256, SignedBlock)> = block_cache + .get_children(&block_root) + .into_iter() + .map(|p| (p.root, p.block.clone())) + .collect(); + + if !cascade_children.is_empty() { + if let Some(cascade_parent_state) = + store.read().states.get(&block_root).cloned() + { + for (child_root, child_block) in cascade_children { + let exec = cpu_normal_executor.clone(); + let result_tx = verify_result_tx.clone(); + let parent_state_for_child = + cascade_parent_state.clone(); + let child_for_verify = child_block.clone(); + METRICS.get().map(|m| { + m.grandine_verify_result_channel_depth.inc() + }); + tokio::spawn(async move { + let child_for_send = child_for_verify.clone(); + let job = exec.spawn(async move { + verify_and_transition( + parent_state_for_child, + child_for_verify, + ) + }); + let outcome = job.await.unwrap_or_else(|e| { + Err(anyhow::anyhow!("executor: {e}")) + }); + if result_tx + .send(( + child_root, + child_for_send, + false, // cascade: never re-gossip + outcome, + )) + .await + .is_err() + { + METRICS.get().map(|m| { + m.grandine_verify_result_channel_depth + .dec() + }); + } + }); + block_cache.remove(&child_root); + } + } + } + + METRICS + .get() + .map(|m| m.grandine_block_cache_size.set(block_cache.len() as i64)); + + // Drain block roots queued by retried attestations inside + // apply_verified_block. + let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); + if !missing.is_empty() { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(missing) + ) { + warn!("Failed to request blocks missing from retried attestations: {}", e); + } + } + } + Err(e) => warn!( + block_root = %format!("0x{:x}", block_root), + "Problem applying verified block: {}", e + ), + } + } + Err(e) => warn!( + block_root = %format!("0x{:x}", block_root), + "Block verify failed: {}", e + ), + } + } message = chain_message_receiver.recv() => { let Some(message) = message else { break }; + METRICS.get().map(|m| m.grandine_chain_message_channel_depth.dec()); + // RAII timer: HistogramTimer records on Drop, so all `continue` + // exits inside the match below are captured automatically. + let _chain_message_timer = METRICS + .get() + .map(|m| m.lean_chain_task_chain_message_seconds.start_timer()); match message { ChainMessage::ProcessBlock { signed_block, @@ -1115,6 +1357,10 @@ async fn main() -> Result<()> { let block_root = signed_block.block.hash_tree_root(); let parent_root = signed_block.block.parent_root; + if store.read().blocks.contains_key(&block_root) { + continue; + } + info!( slot = block_slot.0, block_root = %format!("0x{:x}", block_root), @@ -1129,11 +1375,6 @@ async fn main() -> Result<()> { .as_millis() as u64; on_tick(&mut *store.write(), now_millis, false); - let parent_exists = { - let s = store.read(); - parent_root.is_zero() || s.states.contains_key(&parent_root) - }; - // Store block immediately so we can serve it to peers via // BlocksByRoot even if it can't be processed yet (e.g. parent // missing). This prevents STREAM_CLOSED errors when a peer @@ -1155,7 +1396,16 @@ async fn main() -> Result<()> { } } - if !parent_exists { + // Snapshot parent state for the executor. If absent, treat as + // orphan and re-queue. Genesis (parent_root.is_zero()) is loaded + // at startup, so a None here for non-zero parents means the + // parent block hasn't been processed yet. + let parent_state = { + let s = store.read(); + s.states.get(&parent_root).cloned() + }; + + let Some(parent_state) = parent_state else { block_cache.add( signed_block.clone(), block_root, @@ -1192,53 +1442,41 @@ async fn main() -> Result<()> { check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); continue; - } - - let result = {on_block(&mut *store.write(), &mut block_cache, signed_block.clone())}; - match result { - Ok(()) => { - info!("Block processed successfully"); - let _ = block_slot_tx.send(block_slot.0); - - { - let s = store.read(); - let mut status = status_provider.write(); - status.finalized = s.latest_finalized.clone(); - status.head = Checkpoint { - root: s.head, - slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), - }; - } - - if should_gossip { - if let Err(e) = outbound_p2p_sender.send( - OutboundP2pRequest::GossipBlock(signed_block) - ) { - warn!("Failed to gossip block: {}", e); - } else { - info!(slot = block_slot.0, "Broadcasted block"); - } - } - - let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; - let nf = *network_finalized_slot.lock(); - check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); - } - Err(e) => warn!("Problem processing block: {}", e), - } - - METRICS.get().map(|m| m.grandine_block_cache_size.set(block_cache.len() as i64)); + }; - // Drain block roots queued by retried attestations inside on_block. - let missing: Vec = store.write().pending_fetch_roots.drain().collect(); - METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); - if !missing.is_empty() { - if let Err(e) = outbound_p2p_sender.send( - OutboundP2pRequest::RequestBlocksByRoot(missing) - ) { - warn!("Failed to request blocks missing from retried attestations: {}", e); + // Phase 1: ship verify+state_transition to the cpu_normal executor. + // The chain task returns to the select! loop immediately and processes + // the verified outcome later via verify_result_rx (Phase 3). + // No store mutation happens here; gossip rebroadcast and post-apply + // bookkeeping are deferred to Phase 3 once we know the block applied. + let exec = cpu_normal_executor.clone(); + let result_tx = verify_result_tx.clone(); + let signed_block_for_verify = signed_block.clone(); + METRICS + .get() + .map(|m| m.grandine_verify_result_channel_depth.inc()); + tokio::spawn(async move { + let signed_block_for_send = signed_block_for_verify.clone(); + let job = exec.spawn(async move { + verify_and_transition(parent_state, signed_block_for_verify) + }); + let outcome = job + .await + .unwrap_or_else(|e| { + Err(anyhow::anyhow!("executor: {e}")) + }); + if result_tx + .send((block_root, signed_block_for_send, should_gossip, outcome)) + .await + .is_err() + { + // Receiver gone (chain task exited): roll back the inc above + // since this entry will never be drained. + METRICS + .get() + .map(|m| m.grandine_verify_result_channel_depth.dec()); } - } + }); } ChainMessage::ProcessAttestation { signed_attestation, @@ -1363,6 +1601,7 @@ async fn main() -> Result<()> { } v_message = validator_chain_receiver.recv() => { let Some(v_message) = v_message else { break }; + METRICS.get().map(|m| m.grandine_validator_chain_message_channel_depth.dec()); match v_message { ValidatorChainMessage::ProduceBlock { slot, proposer_index, sender } => { let block_build_start = Instant::now(); @@ -1468,14 +1707,19 @@ async fn main() -> Result<()> { ); let (tx, rx) = oneshot::channel(); - if validator_chain_sender - .send(ValidatorChainMessage::ProduceBlock { + let send_result = validator_chain_sender.send( + ValidatorChainMessage::ProduceBlock { slot: Slot(current_slot), proposer_index: proposer_idx, sender: tx, - }) - .is_err() - { + }, + ); + if send_result.is_ok() { + METRICS.get().map(|m| { + m.grandine_validator_chain_message_channel_depth.inc() + }); + } + if send_result.is_err() { warn!("Validator task: chain channel closed, stopping"); break; } @@ -1505,14 +1749,19 @@ async fn main() -> Result<()> { block_root = %format!("0x{:x}", block_root), "Validator task: block signed, sending to chain" ); - if chain_msg_sender_for_validator + let send_result = chain_msg_sender_for_validator .send(ChainMessage::ProcessBlock { signed_block, is_trusted: true, should_gossip: true, }) - .is_err() - { + .await; + if send_result.is_ok() { + METRICS.get().map(|m| { + m.grandine_chain_message_channel_depth.inc() + }); + } + if send_result.is_err() { warn!( "Validator task: chain message channel closed, stopping" ); @@ -1551,13 +1800,18 @@ async fn main() -> Result<()> { } let (tx, rx) = oneshot::channel(); - if validator_chain_sender - .send(ValidatorChainMessage::BuildAttestationData { + let send_result = validator_chain_sender.send( + ValidatorChainMessage::BuildAttestationData { slot: Slot(current_slot), sender: tx, - }) - .is_err() - { + }, + ); + if send_result.is_ok() { + METRICS + .get() + .map(|m| m.grandine_validator_chain_message_channel_depth.inc()); + } + if send_result.is_err() { warn!("Validator task: chain channel closed, stopping"); break; } @@ -1569,10 +1823,12 @@ async fn main() -> Result<()> { } else { u64::MAX }; - let attestations = vs.create_attestations_from_data( - Slot(current_slot), - attestation_data, - ); + let attestations = vs + .create_attestations_from_data( + Slot(current_slot), + attestation_data, + ) + .await; for signed_att in attestations { if signed_att.validator_id == proposer_index { continue; @@ -1588,14 +1844,19 @@ async fn main() -> Result<()> { subnet_id = subnet_id, "Validator task: broadcasting attestation" ); - if chain_msg_sender_for_validator + let send_result = chain_msg_sender_for_validator .send(ChainMessage::ProcessAttestation { signed_attestation: signed_att, is_trusted: true, should_gossip: true, }) - .is_err() - { + .await; + if send_result.is_ok() { + METRICS + .get() + .map(|m| m.grandine_chain_message_channel_depth.inc()); + } + if send_result.is_err() { warn!( "Validator task: chain message channel closed, stopping" ); diff --git a/lean_client/test_vectors/fork_choice/devnet/fc/test_gossip_attestation_validation/test_attestation_one_slot_in_future_allowed.json b/lean_client/test_vectors/fork_choice/devnet/fc/test_gossip_attestation_validation/test_attestation_one_slot_in_future_allowed.json deleted file mode 100644 index 352863d..0000000 --- a/lean_client/test_vectors/fork_choice/devnet/fc/test_gossip_attestation_validation/test_attestation_one_slot_in_future_allowed.json +++ /dev/null @@ -1,146 +0,0 @@ -{ - "tests/consensus/devnet/fc/test_gossip_attestation_validation.py::test_attestation_one_slot_in_future_allowed[fork_Devnet][fork_devnet-fork_choice_test]": { - "network": "Devnet", - "leanEnv": "prod", - "anchorState": { - "config": { - "genesisTime": 0 - }, - "slot": 0, - "latestBlockHeader": { - "slot": 0, - "proposerIndex": 0, - "parentRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", - "stateRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", - "bodyRoot": "0xdba9671bac9513c9482f1416a53aabd2c6ce90d5a5f865ce5a55c775325c9136" - }, - "latestJustified": { - "root": "0x0000000000000000000000000000000000000000000000000000000000000000", - "slot": 0 - }, - "latestFinalized": { - "root": "0x0000000000000000000000000000000000000000000000000000000000000000", - "slot": 0 - }, - "historicalBlockHashes": { - "data": [] - }, - "justifiedSlots": { - "data": [] - }, - "validators": { - "data": [ - { - "attestationPubkey": "0x1865a50c6478a3173882ff01f2b7904a2b96b64a8ad63d04ea53b34a539bfd672ffffd40f2e93c2900311f068cf9641b81fd364d", - "proposalPubkey": "0xef30711e272be66f8850f13c393f5c7dc302f6112d3d04672b715b0caef8b823cccac138b5ab9644845cc6262dd8076f49bacf3d", - "index": 0 - }, - { - "attestationPubkey": "0xce45954436b2b4038330ba17071d9c482f56e43cfd3c21414a0f326635cc6132523b995e346e4e2df535e6742a5bb506e0c91365", - "proposalPubkey": "0x2fdc4e5caacbdf3117a6687b09e7f46a132bd54f0e9b8730d3e57c655a60557a488ae4078e248244fadeab6d506a79355b563142", - "index": 1 - }, - { - "attestationPubkey": "0xa6cece462ea48c5e5a31d1576d6d7c7367fdf851a94dd738442c93570f88556ca04991462fedb17191054f65eeb9eb3777a8d27c", - "proposalPubkey": "0xcfe9915524913f40da7ad5649bf69f450d72b952d2296627072e50524f7e8b775c61c17c36283b210521db7517ef097ca996de10", - "index": 2 - }, - { - "attestationPubkey": "0xa9536c0b365edf64b9f2a50b7e0c84157697d444bcfeaa72f966070ca539090b00ac6d5e154aa3279c5f624bfa96fd025b66ba09", - "proposalPubkey": "0x951ee23a3065d322d64174463ac89b4fa863ed648a44c02e331e7511da875b4c4fb27c3c2689272634fed634d67b256868f3be59", - "index": 3 - } - ] - }, - "justificationsRoots": { - "data": [] - }, - "justificationsValidators": { - "data": [] - } - }, - "anchorBlock": { - "slot": 0, - "proposerIndex": 0, - "parentRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", - "stateRoot": "0xde06920f4007f9d3925ba77bfdc1e0fa970537181be1d99a96e7c3880c7d4c3f", - "body": { - "attestations": { - "data": [] - } - } - }, - "steps": [ - { - "valid": true, - "checks": { - "headSlot": 1 - }, - "stepType": "block", - "block": { - "slot": 1, - "proposerIndex": 1, - "parentRoot": "0xd123d3d19ba32a08df9b3bf9e55e4447d1a3a3b4f905583d013b8f05c77d585e", - "stateRoot": "0xf9adceb604bbff70c983cb92794e97000925712faa73c1fd33818d3ad6488b7b", - "body": { - "attestations": { - "data": [] - } - }, - "blockRootLabel": "block_1" - } - }, - { - "valid": true, - "checks": { - "headSlot": 2 - }, - "stepType": "block", - "block": { - "slot": 2, - "proposerIndex": 2, - "parentRoot": "0x6214b969cc3f585a85432ed9dcd3884d4842fb561a3b303a35a771475d58aa88", - "stateRoot": "0x326030ed14199de4db3fcc7d9daae64fbbd00deb176d7dee3ea13ddab89f8ffd", - "body": { - "attestations": { - "data": [] - } - }, - "blockRootLabel": "block_2" - } - }, - { - "valid": true, - "stepType": "attestation", - "attestation": { - "validatorId": 1, - "data": { - "slot": 3, - "head": { - "root": "0x6b7c8df5d3628c2425caf72e487418f09c4b34b2693dcccddcba29e28219d5ab", - "slot": 2 - }, - "target": { - "root": "0xd123d3d19ba32a08df9b3bf9e55e4447d1a3a3b4f905583d013b8f05c77d585e", - "slot": 0 - }, - "source": { - "root": "0xd123d3d19ba32a08df9b3bf9e55e4447d1a3a3b4f905583d013b8f05c77d585e", - "slot": 0 - } - }, - "signature": "0x240000003926892abe3d527ef1c06c10af567f30b328f2378f852009f95aaf3a2804000004000000efcba50e7fba6c1d6b2f224da9ad9813c6c57313b224e001aefe71052669cb084e51025a129de5703bb88508badcd13aa7fcac594fb9ae2cb280ec1cbf78373f5133065a009f4e576338de6754669c1232898a56b94bd220b0221e2d8669e6731b65c9494366f870e3d48b5dfaa30c4065bbd625b989f21f2229cc7297ad2713459c547c6b43676804e8091b6033cb00caf3b222934fc7740cf61c5b5e748049a8f13e524c591b619776e60da8b5023f5b00772523b8482ed6a98e327e970d0fe915a3577d7f5f350bd13f0ef73a182cbc10877e9064db2ae7630c0c90cbd961a9b7044393392902c0e4de6587ec1155f441790faa97c405f6f25021afaef021143adc728329c93ee432872e37192845fa34ad647d44f86131a9a20dad3f0056540f301570925b441b8ce01d25fc5c4457bc0c51d60f9a1328ce9e316b03e30980579762939f221841fa522852c2921cf81f87161b55f74e9507e9629e98c457a983a83d44f44f20dcd57f4c207887251e478733a9fd6e2b2988b060ace72e49a0166d13edbdc01ef61df16ec8bf072c2578b808d2df224f129ff204e932be586ede116fde6d4275f60f643e3a2a034c8500b16b1bdee52961469d6a5717f23e46519f44d78abf2cbb150e647d84196f0d09fe4acb230f13665d622e4919ea5d8362171f4ceba83a189932764621895db6319e6315a2ed6707b7ae65a9f0016863cdd618cc7b8d3c8060b9322fbb4b554c21da37e988822191303c6d940383016b82c96739972a2490e2831c2cc37f4782cd064393eecc2309e9e9595d37882fb030563e92e3c54ab101907da8f2483c27b26c3459c5524a7efe981a5149103023bc0e215a60c342806d491a9d20587b05a788220a1ffb63ec332a236f8d6c0bb53c8b726799cf5ee9402b3f00b6a365e7bbbd15092b4f6cbc63a421f9cf391ca4702d441b85e4014f0e5720e4629208188dcd6ed6474a33ad05e43444376828c19fbe5fed3314275f46cb4e1fc3e479cc57121c315a8e07f1c3e124cc58ad29863f4a0657cdb42add4b974301f07129037c4430a2409309e89c886e88b96121f2d3c11415cf5e0ea0aec224f3d229192d1b754a1580e17cfff0b21810f49748cd896255029b2361f022115bfae0f714d033582e6b6ce7393371943de88c836e17a22c44ea38ce4ea822bf65d6c9124101cfb8154f75de2ab52494200bbb0a65b8d8cb6a942d135aa0f3bd6d947fee005720a8280d59e272798f404889889240bee25c051d20526b1e0e2e45a3dd0d11053c6c489eafa61f8e5d2f7a0c8c7b20ed889f63b26d687af3600e7854a39d457bbd002ee368164943418a2b5a13fb013b79a9038ef0b00d5faf155ff5894762d07247205649e8617c554d2ea618fa778effcb3e6b7fe4792396ec0f7e8a18470a5c727278e79d09c27a366021423c6b46a5657316918e62c345d910d038e922c3265526b03f8c63f11eda1f4dade402f096bf74c8eb3d322dedfb3d82050f1b0fcdf82b79664b521dba1a7384c54169b186b819f91bbc50b97bb11a753e116b327e0b79e81ca308aabe4329f68bce6932ea627c819da95edaa9110bf3f88c3fda8f5870bff81e1048e1207c10bf0d4a856fa85eca2e5d542f17a5356d4205124d74433b5cc51414e9711b72e8cc3b570e525f2700dd2c51a352e7317f7f981b731eec0bedc77c4a9e46414916675616ac94f60ee7d9da699863712d5e29dc477a62ed73f483ac2571a9c171c91f2043379a9939a9b61c0ee7e63741c41adb24445e0649b9b5815d9cdb3d0505b59a2cf855831a1e76f05d0c9b8939509c7405612a68613c97094d4da0060ffd9d0f298f0c8c5c6c6b6b7b9205bd3d97ef790404458c2b9f6e703912017a56a6e7bd231594254f1bde6113b6c94e22a82db33713aa9a1070019873f8fdc405e2f8064385358440b0ae1731dfa5a3099394da3624518c65722dae1650a70232f5bb157ebcb2856b69d941594433c13a88708e57161a0879ef5ca12974cc3577f5fdea451722026f1ce75247172dcd1477db0c4a23db6c4fed556774f07c61088650ff7955a9366ca8679e6e10c5b71c3f4d145c137dfb666b5f2d57422b45637821322f96293b6eb2f8cf268548fe6d9d6f481209306029771cb07da8c27b66bcca4f25c5f0dd0dbcd8245c9557c579a0b0975c0773a92760990e61e278830286a57a735e29c3768de50e67c5e61432810a6010733c3960d4bf55065ae4b604699a0559c7a2de45dcdf8e291f603f45b10a0c2eae7ad03350908c56adb78c20dbd9be5ee6158a350162f6063ae15b273d1d9d3f5d65ca62393c0e1e1ea90d1f2c908f29402e7f748a80e706addb6e6267149b403232012df586620e859aa26e3b4b2e0a6dc59b559e54483fafff4e63dfbe3f02d5879c3ba37d12216fcf0e0d106dca769f5b8c291b12017dee2173034f43935c4a38791e001d5407825feb394f30b00aeb03f710edda2f3aec6d3f6aa8293e7be69a22095a45832296ff8f144cb364553bfe8e4200f3e9736789053356d30840d996947a90ecc0533daa18435586ff189c4c4831884b752e23c88a2514727074bf18bc679a6d913164bcd40c9d3eca54feb4d557b91ddc76fbd9d9670310e31c850c280dec5b0218a32e5e731f63fb444e0df56722fba6131454690ab6e1b64b7b6d02003d321354d7964f5d8826f813346c5d0056adcb20971dfc0705b7763d39490d1019a9be52a8338866add588668e49527ac3bd420e618c2d0293986451e5b1d82e22b5f845b443990aa0aa1e16ee97f61ca515d66a8b2672122d582a12f8373b2caed9a2539d193b16c6a78d1f090523345c3885542e80532fa1144f2596a9654a0c890d306feee43827e5e522d111c6631cacf62b65b9e44264c7971fcbc4a5414bf1ed600758ca0749ee7b4601d97721575abe1a5970232e12a49c5c194c4a4d5c65c436d9130171e4b23e04f7712125dd7c5e3df93c5471de4f1273fd57387694d0c060a92238746ba8c859693cb6564c5a8d0b9e3ba935d8f2e42cea760515c804c5024a1d2b599e23465e94d5f8071902b41f91c55e0b4b4a6b5b21df5a02b0a52974faacfe4c41cedf34524b9c3ff08f04343b41c85e0768cd76e1a3e72c35b93d6071c2b94a5d33746facfcfa04d601c110d89b195a09570236be150445b76a400d5200d64e7b0ed25d5ca5be5ccf2ba6055685d267a8d3745f1ee4c64169a1c12e10089b57678c6e46e3e47615099b955681fee25741d55b0598a335058344cf7b10bac02d08441566e8c8f05ddd6a71650319c745f269eb240d32c303b5baf768315c175f2eac1a68da0c9e627075e908eccdfa4b0307ba6af1f5485a5721ef1e4870794155f1e10e4ad7641fd829cd0ef71fa473d3c1e166095a1b7122b55b68156f8a776e343a44609fb97b4aafb5569f262b54751fe232663a923c2f40cb5f759760596d800200bd567a6c8ca5e256c83d6f651179457436f49b7d178dad68c0203478e625557d" - }, - "isAggregator": false - } - ], - "maxSlot": 3, - "_info": { - "hash": "0xd87c2574acbd6a52d023c761b7f6c30fbc2396d4503015a8ffee528db8a8f903", - "comment": "`leanSpec` generated test", - "testId": "tests/consensus/devnet/fc/test_gossip_attestation_validation.py::test_attestation_one_slot_in_future_allowed[fork_Devnet]", - "description": "Attestation exactly one slot in the future is allowed.\n\n Scenario\n --------\n Build a chain with blocks at slots 1 and 2.\n Submit attestation for slot 3 (one slot in future, allowed margin).\n\n Expected:\n - Attestation is validated successfully", - "fixtureFormat": "fork_choice_test" - } - } -} \ No newline at end of file diff --git a/lean_client/validator/Cargo.toml b/lean_client/validator/Cargo.toml index 4d584c4..01471cb 100644 --- a/lean_client/validator/Cargo.toml +++ b/lean_client/validator/Cargo.toml @@ -5,9 +5,11 @@ edition = { workspace = true } [dependencies] anyhow = { workspace = true } containers = { workspace = true } +dedicated_executor = { workspace = true } env-config = { workspace = true } ethereum-types = { workspace = true } fork_choice = { workspace = true } +futures = { workspace = true } metrics = { workspace = true } serde = { workspace = true } serde_yaml = { workspace = true } diff --git a/lean_client/validator/src/lib.rs b/lean_client/validator/src/lib.rs index 96bb110..771938e 100644 --- a/lean_client/validator/src/lib.rs +++ b/lean_client/validator/src/lib.rs @@ -1,6 +1,7 @@ // Lean validator client with XMSS signing support use std::collections::{HashMap, HashSet}; use std::path::Path; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use serde::Deserialize; @@ -10,7 +11,9 @@ use containers::{ AggregatedSignatureProof, AggregationBits, AttestationData, AttestationSignatures, Block, BlockSignatures, SignedAggregatedAttestation, SignedAttestation, SignedBlock, Slot, }; +use dedicated_executor::DedicatedExecutor; use fork_choice::store::Store; +use futures::stream::{FuturesUnordered, StreamExt}; use metrics::{METRICS, stop_and_discard, stop_and_record}; use rayon::prelude::*; use ssz::H256; @@ -98,7 +101,12 @@ impl ValidatorConfig { pub struct ValidatorService { pub config: ValidatorConfig, pub num_validators: u64, - key_manager: Option, + /// Wrapped in `Arc` so each per-validator XMSS signing job can be moved + /// into a `DedicatedExecutor` task without cloning the secret keys. + key_manager: Option>, + /// Shared CPU pool used for offloading XMSS attestation signing off the + /// validator task / chain task. + cpu_normal_executor: Arc, /// Whether this node performs aggregation duties (devnet-3). /// Uses `AtomicBool` for interior mutability so the admin API can toggle /// the flag at runtime without requiring `&mut self` or a write lock. @@ -151,13 +159,18 @@ fn extend_children_greedily<'a>( } impl ValidatorService { - pub fn new(config: ValidatorConfig, num_validators: u64) -> Self { - Self::new_with_aggregator(config, num_validators, false) + pub fn new( + config: ValidatorConfig, + num_validators: u64, + cpu_normal_executor: Arc, + ) -> Self { + Self::new_with_aggregator(config, num_validators, cpu_normal_executor, false) } pub fn new_with_aggregator( config: ValidatorConfig, num_validators: u64, + cpu_normal_executor: Arc, is_aggregator: bool, ) -> Self { info!( @@ -178,6 +191,7 @@ impl ValidatorService { config, num_validators, key_manager: None, + cpu_normal_executor, is_aggregator: AtomicBool::new(is_aggregator), } } @@ -186,14 +200,22 @@ impl ValidatorService { config: ValidatorConfig, num_validators: u64, keys_dir: impl AsRef, + cpu_normal_executor: Arc, ) -> Result> { - Self::new_with_keys_and_aggregator(config, num_validators, keys_dir, false) + Self::new_with_keys_and_aggregator( + config, + num_validators, + keys_dir, + cpu_normal_executor, + false, + ) } pub fn new_with_keys_and_aggregator( config: ValidatorConfig, num_validators: u64, keys_dir: impl AsRef, + cpu_normal_executor: Arc, is_aggregator: bool, ) -> Result> { let mut key_manager = KeyManager::new(&keys_dir)?; @@ -229,7 +251,8 @@ impl ValidatorService { Ok(Self { config, num_validators, - key_manager: Some(key_manager), + key_manager: Some(Arc::new(key_manager)), + cpu_normal_executor, is_aggregator: AtomicBool::new(is_aggregator), }) } @@ -567,7 +590,7 @@ impl ValidatorService { /// The validator task calls `BuildAttestationData` on the chain task first, /// receives the `AttestationData` via oneshot, then calls this method. /// This keeps XMSS signing entirely off the chain task's thread. - pub fn create_attestations_from_data( + pub async fn create_attestations_from_data( &self, slot: Slot, attestation_data: AttestationData, @@ -587,58 +610,79 @@ impl ValidatorService { .start_timer() }); - self.config - .validator_indices - .iter() - .filter_map(|&idx| { - let signature = if let Some(ref key_manager) = self.key_manager { - let message = attestation_data.hash_tree_root(); - let epoch = slot.0 as u32; + // No keys: return zero-signature attestations directly (test / passive mode). + let Some(key_manager) = self.key_manager.as_ref() else { + return self + .config + .validator_indices + .iter() + .map(|&idx| { + info!( + slot = slot.0, + validator = idx, + "Created attestation with zero signature" + ); + SignedAttestation { + validator_id: idx, + message: attestation_data.clone(), + signature: Signature::default(), + } + }) + .collect(); + }; + let message = attestation_data.hash_tree_root(); + let epoch = slot.0 as u32; + let target_slot = attestation_data.target.slot.0; + let source_slot = attestation_data.source.slot.0; + + // Fan out signing across the executor's worker threads so we get + // concurrent XMSS signing instead of one-at-a-time on this task. + let mut sign_jobs = FuturesUnordered::new(); + for &idx in &self.config.validator_indices { + let exec = self.cpu_normal_executor.clone(); + let km = Arc::clone(key_manager); + sign_jobs.push(async move { + let job = exec.spawn(async move { let _timer = METRICS.get().map(|metrics| { metrics .lean_pq_sig_attestation_signing_time_seconds .start_timer() }); + km.sign_attestation(idx, epoch, message) + }); + let result = job.await.unwrap_or_else(|e| Err(anyhow!("executor: {e}"))); + (idx, result) + }); + } - match key_manager.sign_attestation(idx, epoch, message) { - Ok(sig) => { - METRICS.get().map(|metrics| { - metrics.lean_pq_sig_attestation_signatures_total.inc(); - }); - info!( - slot = slot.0, - validator = idx, - target_slot = attestation_data.target.slot.0, - source_slot = attestation_data.source.slot.0, - "Created signed attestation" - ); - sig - } - Err(e) => { - warn!( - validator = idx, - error = %e, - "Failed to sign attestation, skipping" - ); - return None; - } - } - } else { + let mut attestations = Vec::with_capacity(self.config.validator_indices.len()); + while let Some((idx, sig_result)) = sign_jobs.next().await { + match sig_result { + Ok(signature) => { + METRICS.get().map(|metrics| { + metrics.lean_pq_sig_attestation_signatures_total.inc(); + }); info!( slot = slot.0, validator = idx, - "Created attestation with zero signature" + target_slot, + source_slot, + "Created signed attestation" ); - Signature::default() - }; - - Some(SignedAttestation { - validator_id: idx, - message: attestation_data.clone(), - signature, - }) - }) - .collect() + attestations.push(SignedAttestation { + validator_id: idx, + message: attestation_data.clone(), + signature, + }); + } + Err(e) => warn!( + validator = idx, + error = %e, + "Failed to sign attestation, skipping" + ), + } + } + attestations } }