diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 520edbe..20092cd 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -356,8 +356,8 @@ pub fn set_node_start_time() { } /// Increment the valid attestations counter. -pub fn inc_attestations_valid() { - LEAN_ATTESTATIONS_VALID_TOTAL.inc(); +pub fn inc_attestations_valid(count: u64) { + LEAN_ATTESTATIONS_VALID_TOTAL.inc_by(count); } /// Increment the invalid attestations counter. diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 3d70e08..77582a1 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,12 +4,12 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload}; +use ethlambda_storage::{ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, attestation::{ AggregatedAttestation, AggregationBits, Attestation, AttestationData, - SignedAggregatedAttestation, SignedAttestation, validator_indices, + HashedAttestationData, SignedAggregatedAttestation, SignedAttestation, validator_indices, }, block::{ AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, @@ -97,15 +97,11 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); let blocks = store.get_live_chain(); - // Merge both attestation pools (keys only — skip payload deserialization). + // Merge both attestation pools (known + new). // At interval 3 the migration (interval 4) hasn't run yet, so attestations // that entered "known" directly (proposer's own attestation in block body, // node's self-attestation) would be invisible without this merge. - let all_keys: HashSet = store - .iter_known_aggregated_payload_keys() - .chain(store.iter_new_aggregated_payload_keys()) - .collect(); - let attestations = store.extract_latest_attestations(all_keys.into_iter()); + let attestations = store.extract_latest_all_attestations(); let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -120,8 +116,8 @@ fn update_safe_target(store: &mut Store) { /// Collects individual gossip signatures, aggregates them by attestation data, /// and stores the resulting proofs in the new aggregated payloads buffer. fn aggregate_committee_signatures(store: &mut Store) -> Vec { - let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); - if gossip_sigs.is_empty() { + let gossip_groups = store.iter_gossip_signatures(); + if gossip_groups.is_empty() { return Vec::new(); } let _timing = metrics::time_committee_signatures_aggregation(); @@ -131,32 +127,18 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec> = HashMap::new(); - let mut keys_to_delete: Vec = Vec::new(); - let mut payload_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new(); - - for ((validator_id, data_root), stored_sig) in &gossip_sigs { - if let Ok(sig) = stored_sig.to_validator_signature() { - groups - .entry(*data_root) - .or_default() - .push((*validator_id, sig)); - } - } - - for (data_root, validators_and_sigs) in groups { - let Some(data) = store.get_attestation_data_by_root(&data_root) else { - continue; - }; + let mut keys_to_delete: Vec<(u64, H256)> = Vec::new(); + let mut payload_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); - let slot = data.slot; + for (hashed, validator_sigs) in &gossip_groups { + let data_root = hashed.root(); + let slot = hashed.data().slot; let mut sigs = vec![]; let mut pubkeys = vec![]; let mut ids = vec![]; - for (vid, sig) in &validators_and_sigs { + for (vid, sig) in validator_sigs { let Some(validator) = validators.get(*vid as usize) else { continue; }; @@ -185,16 +167,12 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec>()?; - let data_root = aggregated.data.tree_hash_root(); + let hashed = HashedAttestationData::new(aggregated.data.clone()); + let data_root = hashed.root(); let slot: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32"); { @@ -468,22 +445,8 @@ pub fn on_gossip_aggregated_attestation( } .map_err(StoreError::AggregateVerificationFailed)?; - // Store attestation data by root (content-addressed, idempotent) - store.insert_attestation_data_by_root(data_root, aggregated.data.clone()); - - // Store one aggregated payload per participating validator (batch insert) - let entries: Vec<_> = aggregated - .proof - .participant_indices() - .map(|validator_id| { - let payload = StoredAggregatedPayload { - slot: aggregated.data.slot, - proof: aggregated.proof.clone(), - }; - ((validator_id, data_root), payload) - }) - .collect(); - store.insert_new_aggregated_payloads_batch(entries); + // Store one proof per attestation data (not per validator) + store.insert_new_aggregated_payload(hashed, aggregated.proof.clone()); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); let slot = aggregated.data.slot; @@ -497,7 +460,7 @@ pub fn on_gossip_aggregated_attestation( "Aggregated attestation processed" ); - metrics::inc_attestations_valid(); + metrics::inc_attestations_valid(1); Ok(()) } @@ -594,37 +557,23 @@ fn on_block_core( let aggregated_attestations = &block.body.attestations; let attestation_signatures = &signed_block.signature.attestation_signatures; - // Process block body attestations. - // Store attestation data by root and proofs in known aggregated payloads. - let mut att_data_entries: Vec<(H256, AttestationData)> = Vec::new(); - let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new(); + // Store one proof per attestation data in known aggregated payloads. + let mut known_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { - let data_root = att.data.tree_hash_root(); - att_data_entries.push((data_root, att.data.clone())); - - let validator_ids: Vec<_> = validator_indices(&att.aggregation_bits).collect(); - let payload = StoredAggregatedPayload { - slot: att.data.slot, - proof: proof.clone(), - }; - - for validator_id in &validator_ids { - known_entries.push(((*validator_id, data_root), payload.clone())); - metrics::inc_attestations_valid(); - } + known_entries.push((HashedAttestationData::new(att.data.clone()), proof.clone())); + // Count each participating validator as a valid attestation + let count = validator_indices(&att.aggregation_bits).count() as u64; + metrics::inc_attestations_valid(count); } // Process proposer attestation as pending (enters "new" stage via gossip path) // The proposer's attestation should NOT affect this block's fork choice position. let proposer_vid = proposer_attestation.validator_id; - let proposer_data_root = proposer_attestation.data.tree_hash_root(); - att_data_entries.push((proposer_data_root, proposer_attestation.data.clone())); + let proposer_hashed = HashedAttestationData::new(proposer_attestation.data.clone()); - // Batch-insert all attestation data (body + proposer) in a single commit - store.insert_attestation_data_by_root_batch(att_data_entries); store.insert_known_aggregated_payloads_batch(known_entries); // Update forkchoice head based on new block and attestations @@ -635,23 +584,15 @@ fn on_block_core( if !verify { // Without sig verification, insert directly with a dummy proof let participants = aggregation_bits_from_validator_indices(&[proposer_vid]); - let payload = StoredAggregatedPayload { - slot: proposer_attestation.data.slot, - proof: AggregatedSignatureProof::empty(participants), - }; - store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload); + let proof = AggregatedSignatureProof::empty(participants); + store.insert_new_aggregated_payload(proposer_hashed, proof); } else { // Store the proposer's signature unconditionally for future block building. // Subnet filtering is handled at the P2P subscription layer. let proposer_sig = ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature( - proposer_data_root, - proposer_attestation.data.slot, - proposer_vid, - proposer_sig, - ); + store.insert_gossip_signature(proposer_hashed, proposer_vid, proposer_sig); } let block_total = block_start.elapsed(); @@ -807,20 +748,10 @@ pub fn produce_block_with_signatures( }); } - let known_block_roots = store.get_block_roots(); - - // Group payloads by data_root with deduplicated proofs - let proofs_by_data_root = store.known_payloads_by_data_root(); + // Get known aggregated payloads: data_root -> (AttestationData, Vec) + let aggregated_payloads = store.known_aggregated_payloads(); - // Resolve AttestationData for each data_root - let aggregated_payloads: HashMap)> = - proofs_by_data_root - .into_iter() - .filter_map(|(data_root, proofs)| { - let data = store.get_attestation_data_by_root(&data_root)?; - Some((data_root, (data, proofs))) - }) - .collect(); + let known_block_roots = store.get_block_roots(); let (block, signatures) = build_block( &head_state, diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 9d1c028..5958cf5 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -284,12 +284,8 @@ fn validate_attestation_check( let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => { - st.extract_latest_attestations(st.iter_new_aggregated_payloads().map(|(key, _)| key)) - } - "known" => { - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)) - } + "new" => st.extract_latest_new_attestations(), + "known" => st.extract_latest_known_attestations(), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -369,8 +365,7 @@ fn validate_lexicographic_head_among( } let blocks = st.get_live_chain(); - let known_attestations: HashMap = - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)); + let known_attestations: HashMap = st.extract_latest_known_attestations(); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index e33db5c..2df26e7 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -1,7 +1,10 @@ use crate::{ block::AggregatedSignatureProof, checkpoint::Checkpoint, - primitives::ssz::{Decode, Encode, TreeHash}, + primitives::{ + H256, + ssz::{Decode, Encode, TreeHash}, + }, signature::SignatureSize, state::ValidatorRegistryLimit, }; @@ -17,7 +20,7 @@ pub struct Attestation { } /// Attestation content describing the validator's observed chain view. -#[derive(Debug, Clone, Encode, Decode, TreeHash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, TreeHash)] pub struct AttestationData { /// The slot for which the attestation is made. pub slot: u64, @@ -77,3 +80,41 @@ pub struct SignedAggregatedAttestation { pub data: AttestationData, pub proof: AggregatedSignatureProof, } + +/// Attestation data paired with its precomputed tree hash root. +/// +/// Private fields ensure that `root == data.tree_hash_root()` is always true. +/// The only way to construct this is via [`HashedAttestationData::new`] or +/// [`From`], both of which compute the root from the data. +#[derive(Debug, Clone)] +pub struct HashedAttestationData { + root: H256, + data: AttestationData, +} + +impl HashedAttestationData { + pub fn new(data: AttestationData) -> Self { + Self { + root: data.tree_hash_root(), + data, + } + } + + pub fn root(&self) -> H256 { + self.root + } + + pub fn data(&self) -> &AttestationData { + &self.data + } + + pub fn into_parts(self) -> (H256, AttestationData) { + (self.root, self.data) + } +} + +impl From for HashedAttestationData { + fn from(data: AttestationData) -> Self { + Self::new(data) + } +} diff --git a/crates/common/types/src/checkpoint.rs b/crates/common/types/src/checkpoint.rs index 328a718..00b9f06 100644 --- a/crates/common/types/src/checkpoint.rs +++ b/crates/common/types/src/checkpoint.rs @@ -7,7 +7,18 @@ use crate::primitives::{ /// Represents a checkpoint in the chain's history. #[derive( - Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, TreeHash, + Debug, + Clone, + Copy, + Default, + PartialEq, + Eq, + Hash, + Serialize, + Deserialize, + Encode, + Decode, + TreeHash, )] pub struct Checkpoint { /// The root hash of the checkpoint's block. diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 35fb687..6fd972c 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -12,10 +12,6 @@ pub enum Table { BlockSignatures, /// State storage: H256 -> State States, - /// Gossip signatures: SignatureKey -> ValidatorSignature - GossipSignatures, - /// Attestation data indexed by tree hash root: H256 -> AttestationData - AttestationDataByRoot, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -27,13 +23,11 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 8] = [ +pub const ALL_TABLES: [Table; 6] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, - Table::GossipSignatures, - Table::AttestationDataByRoot, Table::Metadata, Table::LiveChain, ]; @@ -46,8 +40,6 @@ impl Table { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", - Table::GossipSignatures => "gossip_signatures", - Table::AttestationDataByRoot => "attestation_data_by_root", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 160ea4c..4853873 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -16,8 +16,6 @@ fn cf_name(table: Table) -> &'static str { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", - Table::GossipSignatures => "gossip_signatures", - Table::AttestationDataByRoot => "attestation_data_by_root", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 01ac010..9c30f9c 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,8 +1,6 @@ mod api; pub mod backend; mod store; -mod types; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{ForkCheckpoints, SignatureKey, Store}; -pub use types::{StoredAggregatedPayload, StoredSignature}; +pub use store::{ForkCheckpoints, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index e7bed2a..d21ccde 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock, Mutex}; /// The tree hash root of an empty block body. @@ -9,10 +8,9 @@ use std::sync::{Arc, LazyLock, Mutex}; static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().tree_hash_root()); use crate::api::{StorageBackend, StorageWriteBatch, Table}; -use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ - attestation::AttestationData, + attestation::{AttestationData, HashedAttestationData}, block::{ AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, SignedBlockWithAttestation, @@ -27,12 +25,6 @@ use ethlambda_types::{ }; use tracing::info; -/// Key for looking up individual validator signatures. -/// Used to index signature caches by (validator, message) pairs. -/// -/// Values are (validator_index, attestation_data_root). -pub type SignatureKey = (u64, H256); - /// Checkpoints to update in the forkchoice store. /// /// Used with `Store::update_checkpoints` to update head and optionally @@ -91,106 +83,149 @@ const _: () = assert!( "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" ); -/// Hard cap for the known aggregated payload buffer. -/// Matches Lantern's approach. With 9 validators, this holds -/// ~455 unique attestation messages (~30 min at 1/slot). -const AGGREGATED_PAYLOAD_CAP: usize = 4096; +/// Hard cap for the known aggregated payload buffer (number of distinct attestation messages). +/// With 1 attestation/slot, this holds ~500 messages (~33 min at 4s/slot). +const AGGREGATED_PAYLOAD_CAP: usize = 512; /// Hard cap for the new (pending) aggregated payload buffer. /// Smaller than known since new payloads are drained every interval (~4s). -/// With 9 validators at 1 attestation/slot, one interval holds ~9 entries. -const NEW_PAYLOAD_CAP: usize = 512; +const NEW_PAYLOAD_CAP: usize = 64; + +/// An entry in the payload buffer: attestation data + set of proofs. +#[derive(Clone)] +struct PayloadEntry { + data: AttestationData, + proofs: Vec, +} /// Fixed-size circular buffer for aggregated payloads. /// -/// Entries are evicted FIFO when the buffer reaches capacity. -/// This prevents unbounded memory growth when finalization stalls. +/// Groups proofs by attestation data (via data_root). Each distinct +/// attestation message stores the full `AttestationData` plus all +/// `AggregatedSignatureProof`s covering that message. +/// +/// Entries are evicted FIFO (by insertion order of the data_root) +/// when the buffer reaches capacity. #[derive(Clone)] struct PayloadBuffer { - entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, + data: HashMap, + order: VecDeque, capacity: usize, + total_proofs: usize, } impl PayloadBuffer { fn new(capacity: usize) -> Self { Self { - entries: VecDeque::with_capacity(capacity), + data: HashMap::with_capacity(capacity), + order: VecDeque::with_capacity(capacity), capacity, + total_proofs: 0, } } - /// Insert one entry, FIFO-evicting the oldest if at capacity. - fn push(&mut self, key: SignatureKey, payload: StoredAggregatedPayload) { - if self.entries.len() >= self.capacity { - self.entries.pop_front(); + /// Insert proofs for an attestation, FIFO-evicting oldest data_roots when total proofs reach capacity. + fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) { + let (data_root, att_data) = hashed.into_parts(); + if let Some(entry) = self.data.get_mut(&data_root) { + // Skip duplicate proofs (same participants) + if entry + .proofs + .iter() + .any(|p| p.participants == proof.participants) + { + return; + } + entry.proofs.push(proof); + self.total_proofs += 1; + } else { + // Evict oldest data_roots until under capacity + while self.total_proofs >= self.capacity { + if let Some(evicted) = self.order.pop_front() { + if let Some(removed) = self.data.remove(&evicted) { + self.total_proofs -= removed.proofs.len(); + } + } else { + break; + } + } + self.data.insert( + data_root, + PayloadEntry { + data: att_data, + proofs: vec![proof], + }, + ); + self.order.push_back(data_root); + self.total_proofs += 1; } - self.entries.push_back((key, payload)); } - /// Insert multiple entries, FIFO-evicting as needed. - fn push_batch(&mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>) { - for (key, payload) in entries { - self.push(key, payload); + /// Insert a batch of (hashed_attestation_data, proof) entries. + fn push_batch(&mut self, entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>) { + for (hashed, proof) in entries { + self.push(hashed, proof); } } /// Take all entries, leaving the buffer empty. - fn drain(&mut self) -> Vec<(SignatureKey, StoredAggregatedPayload)> { - self.entries.drain(..).collect() - } - - /// Group entries by key, preserving insertion order within each group. - fn grouped(&self) -> HashMap> { - let mut map: HashMap> = HashMap::new(); - for (key, payload) in &self.entries { - map.entry(*key).or_default().push(payload.clone()); - } - map - } - - /// Group entries by data_root, deduplicating proofs by participant bitfield. - fn grouped_by_data_root(&self) -> HashMap> { - let mut map: HashMap> = HashMap::new(); - let mut seen: HashMap>> = HashMap::new(); - for ((_vid, data_root), payload) in &self.entries { - let key_bytes = payload.proof.participants.as_ssz_bytes(); - if seen.entry(*data_root).or_default().insert(key_bytes) { - map.entry(*data_root) - .or_default() - .push(payload.proof.clone()); - } - } - map + fn drain(&mut self) -> Vec<(HashedAttestationData, AggregatedSignatureProof)> { + self.order.clear(); + self.total_proofs = 0; + self.data + .drain() + .flat_map(|(_, entry)| { + entry + .proofs + .into_iter() + .map(move |proof| (HashedAttestationData::new(entry.data.clone()), proof)) + }) + .collect() } - /// Return deduplicated keys. - fn unique_keys(&self) -> HashSet { - self.entries.iter().map(|(key, _)| *key).collect() + /// Return the number of distinct attestation messages in the buffer. + fn len(&self) -> usize { + self.data.len() } - /// Return the number of entries in the buffer. - fn len(&self) -> usize { - self.entries.len() + /// Extract per-validator latest attestations from proofs' participation bits. + fn extract_latest_attestations(&self) -> HashMap { + let mut result: HashMap = HashMap::new(); + for entry in self.data.values() { + for proof in &entry.proofs { + for vid in proof.participant_indices() { + let should_update = result + .get(&vid) + .is_none_or(|existing| existing.slot < entry.data.slot); + if should_update { + result.insert(vid, entry.data.clone()); + } + } + } + } + result } } -// ============ Key Encoding Helpers ============ - -/// Encode a SignatureKey (validator_id, root) to bytes. -/// Layout: validator_id (8 bytes SSZ) || root (32 bytes SSZ) -fn encode_signature_key(key: &SignatureKey) -> Vec { - let mut result = key.0.as_ssz_bytes(); - result.extend(key.1.as_ssz_bytes()); - result +/// Individual validator signature received via gossip. +#[derive(Clone)] +struct GossipSignatureEntry { + validator_id: u64, + signature: ValidatorSignature, } -/// Decode a SignatureKey from bytes. -fn decode_signature_key(bytes: &[u8]) -> SignatureKey { - let validator_id = u64::from_ssz_bytes(&bytes[..8]).expect("valid validator_id"); - let root = H256::from_ssz_bytes(&bytes[8..]).expect("valid root"); - (validator_id, root) +/// Gossip signatures grouped by attestation data. +struct GossipDataEntry { + data: AttestationData, + signatures: Vec, } +/// Gossip signatures grouped by attestation data (via data_root). +type GossipSignatureMap = HashMap; + +/// Gossip signatures snapshot: (hashed_attestation_data, Vec<(validator_id, signature)>). +pub type GossipSignatureSnapshot = Vec<(HashedAttestationData, Vec<(u64, ValidatorSignature)>)>; + /// Encode a LiveChain key (slot, root) to bytes. /// Layout: slot (8 bytes big-endian) || root (32 bytes) /// Big-endian ensures lexicographic ordering matches numeric ordering. @@ -227,7 +262,9 @@ pub struct Store { backend: Arc, new_payloads: Arc>, known_payloads: Arc>, - gossip_signatures_count: Arc, + /// In-memory gossip signatures: data_root → (AttestationData, Vec). + /// Transient data consumed at interval 2 aggregation. + gossip_signatures: Arc>, } impl Store { @@ -363,28 +400,14 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - let initial_gossip_count = Self::count_gossip_signatures(&*backend); Self { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures_count: Arc::new(AtomicUsize::new(initial_gossip_count)), + gossip_signatures: Arc::new(Mutex::new(HashMap::new())), } } - /// Count existing gossip signatures in the database. - /// - /// Used once at startup to seed the in-memory counter. - fn count_gossip_signatures(backend: &dyn StorageBackend) -> usize { - backend - .begin_read() - .expect("read view") - .prefix_iterator(Table::GossipSignatures, &[]) - .expect("iterator") - .filter_map(|r| r.ok()) - .count() - } - // ============ Metadata Helpers ============ fn get_metadata(&self, key: &[u8]) -> T { @@ -494,12 +517,11 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); let pruned_sigs = self.prune_gossip_signatures(finalized.slot); - let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); - if pruned_chain > 0 || pruned_sigs > 0 || pruned_att_data > 0 { + if pruned_chain > 0 || pruned_sigs > 0 { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, pruned_att_data, "Pruned finalized data" + pruned_chain, pruned_sigs, "Pruned finalized data" ); } } @@ -592,26 +614,12 @@ impl Store { /// Prune gossip signatures for slots <= finalized_slot. /// - /// Returns the number of signatures pruned. - pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { - let pruned = self.prune_by_slot(Table::GossipSignatures, finalized_slot, |bytes| { - StoredSignature::from_ssz_bytes(bytes).ok().map(|s| s.slot) - }); - self.gossip_signatures_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - Some(current.saturating_sub(pruned)) - }) - .unwrap(); - pruned - } - - /// Prune attestation data by root for slots <= finalized_slot. - /// /// Returns the number of entries pruned. - pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { - self.prune_by_slot(Table::AttestationDataByRoot, finalized_slot, |bytes| { - AttestationData::from_ssz_bytes(bytes).ok().map(|d| d.slot) - }) + pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { + let mut gossip = self.gossip_signatures.lock().unwrap(); + let before = gossip.len(); + gossip.retain(|_, entry| entry.data.slot > finalized_slot); + before - gossip.len() } /// Prune old states beyond the retention window. @@ -820,124 +828,76 @@ impl Store { batch.commit().expect("commit"); } - // ============ Attestation Data By Root ============ - // - // Content-addressed attestation data storage. Used to reconstruct - // per-validator attestation maps from aggregated payloads. - - /// Stores attestation data indexed by its tree hash root. - pub fn insert_attestation_data_by_root(&mut self, root: H256, data: AttestationData) { - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(root.as_ssz_bytes(), data.as_ssz_bytes())]; - batch - .put_batch(Table::AttestationDataByRoot, entries) - .expect("put attestation data"); - batch.commit().expect("commit"); - } + // ============ Attestation Extraction ============ - /// Batch-insert multiple attestation data entries in a single commit. - pub fn insert_attestation_data_by_root_batch(&mut self, entries: Vec<(H256, AttestationData)>) { - if entries.is_empty() { - return; - } - let mut batch = self.backend.begin_write().expect("write batch"); - let ssz_entries = entries - .into_iter() - .map(|(root, data)| (root.as_ssz_bytes(), data.as_ssz_bytes())) - .collect(); - batch - .put_batch(Table::AttestationDataByRoot, ssz_entries) - .expect("put attestation data batch"); - batch.commit().expect("commit"); + /// Extract per-validator latest attestations from known (fork-choice-active) payloads. + pub fn extract_latest_known_attestations(&self) -> HashMap { + self.known_payloads + .lock() + .unwrap() + .extract_latest_attestations() } - /// Returns attestation data for the given root hash. - pub fn get_attestation_data_by_root(&self, root: &H256) -> Option { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::AttestationDataByRoot, &root.as_ssz_bytes()) - .expect("get") - .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) + /// Extract per-validator latest attestations from new (pending) payloads. + pub fn extract_latest_new_attestations(&self) -> HashMap { + self.new_payloads + .lock() + .unwrap() + .extract_latest_attestations() } - /// Reconstruct per-validator attestation data from aggregated payloads. - /// - /// For each (validator_id, data_root) key in the payloads, looks up the - /// attestation data by root. Returns the latest attestation per validator - /// (by slot). - pub fn extract_latest_attestations( - &self, - keys: impl Iterator, - ) -> HashMap { - let mut result: HashMap = HashMap::new(); - let mut data_cache: HashMap> = HashMap::new(); - - for (validator_id, data_root) in keys { - let data = data_cache - .entry(data_root) - .or_insert_with(|| self.get_attestation_data_by_root(&data_root)); - - let Some(data) = data else { - continue; - }; - + /// Extract per-validator latest attestations from both known and new payloads. + pub fn extract_latest_all_attestations(&self) -> HashMap { + let mut result = self + .known_payloads + .lock() + .unwrap() + .extract_latest_attestations(); + for (vid, data) in self + .new_payloads + .lock() + .unwrap() + .extract_latest_attestations() + { let should_update = result - .get(&validator_id) + .get(&vid) .is_none_or(|existing| existing.slot < data.slot); - if should_update { - result.insert(validator_id, data.clone()); + result.insert(vid, data); } } - result } - /// Convenience: extract latest attestation per validator from known - /// (fork-choice-active) aggregated payloads only. - pub fn extract_latest_known_attestations(&self) -> HashMap { - let keys = self.known_payloads.lock().unwrap().unique_keys(); - self.extract_latest_attestations(keys.into_iter()) - } - // ============ Known Aggregated Payloads ============ // // "Known" aggregated payloads are active in fork choice weight calculations. // Promoted from "new" payloads at specific intervals (0 with proposal, 4). - /// Group known payloads by data_root with deduplicated proofs. - pub fn known_payloads_by_data_root(&self) -> HashMap> { - self.known_payloads.lock().unwrap().grouped_by_data_root() - } - - /// Iterates over all known aggregated payloads, grouped by key. - pub fn iter_known_aggregated_payloads( + /// Returns a snapshot of known payloads as (AttestationData, Vec) pairs. + pub fn known_aggregated_payloads( &self, - ) -> impl Iterator)> { - self.known_payloads.lock().unwrap().grouped().into_iter() - } - - /// Iterates over deduplicated keys from the known aggregated payloads. - pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator { - self.known_payloads - .lock() - .unwrap() - .unique_keys() - .into_iter() + ) -> HashMap)> { + let buf = self.known_payloads.lock().unwrap(); + buf.data + .iter() + .map(|(root, entry)| (*root, (entry.data.clone(), entry.proofs.clone()))) + .collect() } - /// Insert an aggregated payload into the known (fork-choice-active) buffer. + /// Insert a single proof into the known (fork-choice-active) buffer. pub fn insert_known_aggregated_payload( &mut self, - key: SignatureKey, - payload: StoredAggregatedPayload, + hashed: HashedAttestationData, + proof: AggregatedSignatureProof, ) { - self.known_payloads.lock().unwrap().push(key, payload); + self.known_payloads.lock().unwrap().push(hashed, proof); } - /// Batch-insert multiple aggregated payloads into the known buffer. + /// Batch-insert proofs into the known buffer. pub fn insert_known_aggregated_payloads_batch( &mut self, - entries: Vec<(SignatureKey, StoredAggregatedPayload)>, + entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>, ) { self.known_payloads.lock().unwrap().push_batch(entries); } @@ -947,70 +907,25 @@ impl Store { // "New" aggregated payloads are pending — not yet counted in fork choice. // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) aggregated payloads, grouped by key. - pub fn iter_new_aggregated_payloads( - &self, - ) -> impl Iterator)> { - self.new_payloads.lock().unwrap().grouped().into_iter() - } - - /// Iterates over deduplicated keys from the new aggregated payloads. - pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator { - self.new_payloads.lock().unwrap().unique_keys().into_iter() - } - - /// Insert an aggregated payload into the new (pending) buffer. + /// Insert a single proof into the new (pending) buffer. pub fn insert_new_aggregated_payload( &mut self, - key: SignatureKey, - payload: StoredAggregatedPayload, + hashed: HashedAttestationData, + proof: AggregatedSignatureProof, ) { - self.new_payloads.lock().unwrap().push(key, payload); + self.new_payloads.lock().unwrap().push(hashed, proof); } - /// Batch-insert multiple aggregated payloads into the new buffer. + /// Batch-insert proofs into the new buffer. pub fn insert_new_aggregated_payloads_batch( &mut self, - entries: Vec<(SignatureKey, StoredAggregatedPayload)>, + entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>, ) { self.new_payloads.lock().unwrap().push_batch(entries); } // ============ Pruning Helpers ============ - /// Prune entries from a table where the slot (extracted via `get_slot`) is <= `finalized_slot`. - /// Returns the number of entries pruned. - fn prune_by_slot( - &mut self, - table: Table, - finalized_slot: u64, - get_slot: impl Fn(&[u8]) -> Option, - ) -> usize { - let view = self.backend.begin_read().expect("read view"); - let mut to_delete = vec![]; - - for (key_bytes, value_bytes) in view - .prefix_iterator(table, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Some(slot) = get_slot(&value_bytes) - && slot <= finalized_slot - { - to_delete.push(key_bytes.to_vec()); - } - } - drop(view); - - let count = to_delete.len(); - if !to_delete.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - batch.delete_batch(table, to_delete).expect("delete"); - batch.commit().expect("commit"); - } - count - } - /// Promotes all new aggregated payloads to known, making them active in fork choice. /// /// Drains the new buffer and pushes all entries into the known buffer. @@ -1029,9 +944,10 @@ impl Store { self.known_payloads.lock().unwrap().len() } - /// Returns the number of gossip signatures stored. + /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { - self.gossip_signatures_count.load(Ordering::Relaxed) + let gossip = self.gossip_signatures.lock().unwrap(); + gossip.values().map(|entry| entry.signatures.len()).sum() } /// Estimated live data size in bytes for a table, as reported by the backend. @@ -1039,79 +955,68 @@ impl Store { self.backend.estimate_table_bytes(table) } - /// Delete specific gossip signatures by key. - pub fn delete_gossip_signatures(&mut self, keys: &[SignatureKey]) { + // ============ Gossip Signatures ============ + // + // Gossip signatures are individual validator signatures received via P2P. + // They're transient (consumed at interval 2 aggregation) so stored in-memory. + // Keyed by AttestationData (via data_root) matching the leanSpec structure: + // gossip_signatures: dict[AttestationData, set[GossipSignatureEntry]] + + /// Delete gossip entries for the given (validator_id, data_root) pairs. + pub fn delete_gossip_signatures(&mut self, keys: &[(u64, H256)]) { if keys.is_empty() { return; } - let count = keys.len(); - let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::GossipSignatures, encoded_keys) - .expect("delete gossip signatures"); - batch.commit().expect("commit"); - self.gossip_signatures_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - Some(current.saturating_sub(count)) - }) - .unwrap(); + let mut gossip = self.gossip_signatures.lock().unwrap(); + for &(vid, data_root) in keys { + if let Some(entry) = gossip.get_mut(&data_root) { + entry.signatures.retain(|e| e.validator_id != vid); + if entry.signatures.is_empty() { + gossip.remove(&data_root); + } + } + } } - // ============ Gossip Signatures ============ - // - // Gossip signatures are individual validator signatures received via P2P. - // They're aggregated into proofs for block signature verification. - - /// Iterates over all gossip signatures. - pub fn iter_gossip_signatures( - &self, - ) -> impl Iterator + '_ { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::GossipSignatures, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .filter_map(|(k, v)| { - let key = decode_signature_key(&k); - StoredSignature::from_ssz_bytes(&v) - .ok() - .map(|stored| (key, stored)) + /// Returns a snapshot of gossip signatures grouped by attestation data. + pub fn iter_gossip_signatures(&self) -> GossipSignatureSnapshot { + let gossip = self.gossip_signatures.lock().unwrap(); + gossip + .values() + .map(|entry| { + let sigs: Vec<_> = entry + .signatures + .iter() + .map(|e| (e.validator_id, e.signature.clone())) + .collect(); + (HashedAttestationData::new(entry.data.clone()), sigs) }) - .collect(); - entries.into_iter() + .collect() } /// Stores a gossip signature for later aggregation. pub fn insert_gossip_signature( &mut self, - data_root: H256, - slot: u64, + hashed: HashedAttestationData, validator_id: u64, signature: ValidatorSignature, ) { - let key = (validator_id, data_root); - let encoded_key = encode_signature_key(&key); - - // Check if key already exists to avoid inflating the counter on upsert - let is_new = self - .backend - .begin_read() - .expect("read view") - .get(Table::GossipSignatures, &encoded_key) - .expect("get") - .is_none(); - - let stored = StoredSignature::new(slot, signature); - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encoded_key, stored.as_ssz_bytes())]; - batch - .put_batch(Table::GossipSignatures, entries) - .expect("put signature"); - batch.commit().expect("commit"); - - if is_new { - self.gossip_signatures_count.fetch_add(1, Ordering::Relaxed); + let (data_root, att_data) = hashed.into_parts(); + let mut gossip = self.gossip_signatures.lock().unwrap(); + let entry = gossip.entry(data_root).or_insert_with(|| GossipDataEntry { + data: att_data, + signatures: Vec::new(), + }); + // Avoid duplicates for same validator + if !entry + .signatures + .iter() + .any(|e| e.validator_id == validator_id) + { + entry.signatures.push(GossipSignatureEntry { + validator_id, + signature, + }); } } @@ -1257,7 +1162,7 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures_count: Arc::new(AtomicUsize::new(0)), + gossip_signatures: Arc::new(Mutex::new(HashMap::new())), } } @@ -1268,7 +1173,7 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures_count: Arc::new(AtomicUsize::new(0)), + gossip_signatures: Arc::new(Mutex::new(HashMap::new())), } } } @@ -1563,96 +1468,139 @@ mod tests { // ============ PayloadBuffer Tests ============ - fn make_payload(slot: u64) -> StoredAggregatedPayload { + fn make_proof() -> AggregatedSignatureProof { + use ethlambda_types::attestation::AggregationBits; + AggregatedSignatureProof::empty(AggregationBits::with_capacity(0).unwrap()) + } + + /// Create a proof with a specific validator bit set (distinct participants). + fn make_proof_for_validator(vid: usize) -> AggregatedSignatureProof { use ethlambda_types::attestation::AggregationBits; - use ethlambda_types::block::AggregatedSignatureProof; + let mut bits = AggregationBits::with_capacity(vid + 1).unwrap(); + bits.set(vid, true).unwrap(); + AggregatedSignatureProof::empty(bits) + } - StoredAggregatedPayload { + fn make_att_data(slot: u64) -> AttestationData { + AttestationData { slot, - proof: AggregatedSignatureProof::empty(AggregationBits::with_capacity(0).unwrap()), + head: Checkpoint::default(), + target: Checkpoint::default(), + source: Checkpoint::default(), } } #[test] fn payload_buffer_fifo_eviction() { let mut buf = PayloadBuffer::new(3); - let key = (0u64, H256::ZERO); - buf.push(key, make_payload(1)); - buf.push(key, make_payload(2)); - buf.push(key, make_payload(3)); - assert_eq!(buf.entries.len(), 3); + // Insert 3 distinct attestation data entries (different slots → different roots) + for slot in 1..=3u64 { + let data = make_att_data(slot); + buf.push(HashedAttestationData::new(data), make_proof()); + } + assert_eq!(buf.len(), 3); + + // Pushing a 4th should evict the oldest (slot 1) + let data = make_att_data(4); + buf.push(HashedAttestationData::new(data), make_proof()); + assert_eq!(buf.len(), 3); - // Pushing a 4th entry should evict the oldest (slot 1) - buf.push(key, make_payload(4)); - assert_eq!(buf.entries.len(), 3); - let slots: Vec = buf.entries.iter().map(|(_, p)| p.slot).collect(); - assert_eq!(slots, vec![2, 3, 4]); + // The oldest (slot 1) should be gone + let att_data_1 = make_att_data(1); + assert!(!buf.data.contains_key(&att_data_1.tree_hash_root())); } #[test] - fn payload_buffer_grouped_returns_correct_groups() { + fn payload_buffer_multiple_proofs_per_data() { let mut buf = PayloadBuffer::new(10); - let key_a = (0u64, H256::ZERO); - let key_b = (1u64, H256::ZERO); + let data = make_att_data(1); + let data_root = data.tree_hash_root(); - buf.push(key_a, make_payload(1)); - buf.push(key_b, make_payload(2)); - buf.push(key_a, make_payload(3)); + // Insert 3 proofs with distinct participants for the same attestation data + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(1), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validator(2), + ); - let grouped = buf.grouped(); - assert_eq!(grouped.len(), 2); - assert_eq!(grouped[&key_a].len(), 2); - assert_eq!(grouped[&key_a][0].slot, 1); - assert_eq!(grouped[&key_a][1].slot, 3); - assert_eq!(grouped[&key_b].len(), 1); - assert_eq!(grouped[&key_b][0].slot, 2); + // Should be 1 distinct data entry with 3 proofs + assert_eq!(buf.len(), 1); + assert_eq!(buf.data[&data_root].proofs.len(), 3); } #[test] fn payload_buffer_drain_empties_buffer() { let mut buf = PayloadBuffer::new(10); - let key = (0u64, H256::ZERO); + let data = make_att_data(1); - buf.push(key, make_payload(1)); - buf.push(key, make_payload(2)); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validator(1), + ); let drained = buf.drain(); - assert_eq!(drained.len(), 2); - assert!(buf.entries.is_empty()); + assert_eq!(drained.len(), 2); // 2 proofs flattened + assert!(buf.data.is_empty()); + assert!(buf.order.is_empty()); } #[test] fn promote_moves_new_to_known() { let mut store = Store::test_store(); + let data = make_att_data(1); + let data_root = data.tree_hash_root(); - let key = (0u64, H256::ZERO); - store.insert_new_aggregated_payload(key, make_payload(1)); - store.insert_new_aggregated_payload(key, make_payload(2)); + store.insert_new_aggregated_payload( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + store.insert_new_aggregated_payload( + HashedAttestationData::new(data), + make_proof_for_validator(1), + ); - assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 2); - assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 0); + assert_eq!(store.new_payloads.lock().unwrap().len(), 1); + assert_eq!(store.known_payloads.lock().unwrap().len(), 0); store.promote_new_aggregated_payloads(); - assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 0); - assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 2); + assert_eq!(store.new_payloads.lock().unwrap().len(), 0); + assert_eq!(store.known_payloads.lock().unwrap().len(), 1); + // The known buffer should have 2 proofs for this data + assert_eq!( + store.known_payloads.lock().unwrap().data[&data_root] + .proofs + .len(), + 2 + ); } #[test] fn cloned_store_shares_payload_buffers() { let mut store = Store::test_store(); let cloned = store.clone(); + let data = make_att_data(1); - let key = (0u64, H256::ZERO); - store.insert_new_aggregated_payload(key, make_payload(1)); + store.insert_new_aggregated_payload(HashedAttestationData::new(data), make_proof()); // Modification on original should be visible in clone - assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 1); + assert_eq!(cloned.new_payloads.lock().unwrap().len(), 1); store.promote_new_aggregated_payloads(); - assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 0); - assert_eq!(cloned.known_payloads.lock().unwrap().entries.len(), 1); + assert_eq!(cloned.new_payloads.lock().unwrap().len(), 0); + assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1); } } diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs deleted file mode 100644 index b23207f..0000000 --- a/crates/storage/src/types.rs +++ /dev/null @@ -1,36 +0,0 @@ -use ethlambda_types::{ - block::AggregatedSignatureProof, primitives::ssz, signature::ValidatorSignature, -}; - -/// Gossip signature stored with slot for pruning. -/// -/// Signatures are stored alongside the slot they pertain to, enabling -/// simple slot-based pruning when blocks become finalized. -#[derive(Debug, Clone, ssz::Encode, ssz::Decode)] -pub struct StoredSignature { - pub slot: u64, - pub signature_bytes: Vec, -} - -impl StoredSignature { - pub fn new(slot: u64, signature: ValidatorSignature) -> Self { - Self { - slot, - signature_bytes: signature.to_bytes(), - } - } - - pub fn to_validator_signature(&self) -> Result { - ValidatorSignature::from_bytes(&self.signature_bytes) - } -} - -/// Aggregated payload stored with slot for pruning. -/// -/// Aggregated signature proofs are stored with their slot to enable -/// pruning when blocks become finalized. -#[derive(Debug, Clone, ssz::Encode, ssz::Decode)] -pub struct StoredAggregatedPayload { - pub slot: u64, - pub proof: AggregatedSignatureProof, -}