Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ pub fn set_node_start_time() {
}

/// Increment the valid attestations counter.
pub fn inc_attestations_valid() {
LEAN_ATTESTATIONS_VALID_TOTAL.inc();
pub fn inc_attestations_valid(count: u64) {
LEAN_ATTESTATIONS_VALID_TOTAL.inc_by(count);
}

/// Increment the invalid attestations counter.
Expand Down
143 changes: 37 additions & 106 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use ethlambda_crypto::aggregate_signatures;
use ethlambda_state_transition::{
is_proposer, process_block, process_slots, slot_is_justifiable_after,
};
use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload};
use ethlambda_storage::{ForkCheckpoints, Store};
use ethlambda_types::{
ShortRoot,
attestation::{
AggregatedAttestation, AggregationBits, Attestation, AttestationData,
SignedAggregatedAttestation, SignedAttestation, validator_indices,
HashedAttestationData, SignedAggregatedAttestation, SignedAttestation, validator_indices,
},
block::{
AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody,
Expand Down Expand Up @@ -97,15 +97,11 @@ fn update_safe_target(store: &mut Store) {
let min_target_score = (num_validators * 2).div_ceil(3);

let blocks = store.get_live_chain();
// Merge both attestation pools (keys only — skip payload deserialization).
// Merge both attestation pools (known + new).
// At interval 3 the migration (interval 4) hasn't run yet, so attestations
// that entered "known" directly (proposer's own attestation in block body,
// node's self-attestation) would be invisible without this merge.
let all_keys: HashSet<SignatureKey> = store
.iter_known_aggregated_payload_keys()
.chain(store.iter_new_aggregated_payload_keys())
.collect();
let attestations = store.extract_latest_attestations(all_keys.into_iter());
let attestations = store.extract_latest_all_attestations();
let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head(
store.latest_justified().root,
&blocks,
Expand All @@ -120,8 +116,8 @@ fn update_safe_target(store: &mut Store) {
/// Collects individual gossip signatures, aggregates them by attestation data,
/// and stores the resulting proofs in the new aggregated payloads buffer.
fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAttestation> {
let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect();
if gossip_sigs.is_empty() {
let gossip_groups = store.iter_gossip_signatures();
if gossip_groups.is_empty() {
return Vec::new();
}
let _timing = metrics::time_committee_signatures_aggregation();
Expand All @@ -131,32 +127,18 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAtte
let head_state = store.head_state();
let validators = &head_state.validators;

// Group gossip signatures by data_root for batch aggregation
let mut groups: HashMap<H256, Vec<(u64, ValidatorSignature)>> = HashMap::new();
let mut keys_to_delete: Vec<SignatureKey> = Vec::new();
let mut payload_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new();

for ((validator_id, data_root), stored_sig) in &gossip_sigs {
if let Ok(sig) = stored_sig.to_validator_signature() {
groups
.entry(*data_root)
.or_default()
.push((*validator_id, sig));
}
}

for (data_root, validators_and_sigs) in groups {
let Some(data) = store.get_attestation_data_by_root(&data_root) else {
continue;
};
let mut keys_to_delete: Vec<(u64, H256)> = Vec::new();
let mut payload_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new();

let slot = data.slot;
for (hashed, validator_sigs) in &gossip_groups {
let data_root = hashed.root();
let slot = hashed.data().slot;

let mut sigs = vec![];
let mut pubkeys = vec![];
let mut ids = vec![];

for (vid, sig) in &validators_and_sigs {
for (vid, sig) in validator_sigs {
let Some(validator) = validators.get(*vid as usize) else {
continue;
};
Expand Down Expand Up @@ -185,16 +167,12 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec<SignedAggregatedAtte
let proof = AggregatedSignatureProof::new(participants, proof_data);

new_aggregates.push(SignedAggregatedAttestation {
data: data.clone(),
data: hashed.data().clone(),
proof: proof.clone(),
});

let payload = StoredAggregatedPayload { slot, proof };

// Collect entries for batch insert
for vid in &ids {
payload_entries.push(((*vid, data_root), payload.clone()));
}
// One entry per attestation data (not per validator)
payload_entries.push((hashed.clone(), proof));

// Only delete successfully aggregated signatures
keys_to_delete.extend(ids.iter().map(|vid| (*vid, data_root)));
Expand Down Expand Up @@ -368,7 +346,8 @@ pub fn on_gossip_attestation(
validate_attestation_data(store, &attestation.data)
.inspect_err(|_| metrics::inc_attestations_invalid())?;

let data_root = attestation.data.tree_hash_root();
let hashed = HashedAttestationData::new(attestation.data.clone());
let data_root = hashed.root();

let target = attestation.data.target;
let target_state = store
Expand All @@ -395,15 +374,12 @@ pub fn on_gossip_attestation(
}
metrics::inc_pq_sig_attestation_signatures_valid();

// Store attestation data by root (content-addressed, idempotent)
store.insert_attestation_data_by_root(data_root, attestation.data.clone());

// Store gossip signature unconditionally for later aggregation at interval 2.
// Subnet filtering is handled at the P2P subscription layer.
store.insert_gossip_signature(data_root, attestation.data.slot, validator_id, signature);
store.insert_gossip_signature(hashed, validator_id, signature);
metrics::update_gossip_signatures(store.gossip_signatures_count());

metrics::inc_attestations_valid();
metrics::inc_attestations_valid(1);

let slot = attestation.data.slot;
let target_slot = attestation.data.target.slot;
Expand Down Expand Up @@ -454,7 +430,8 @@ pub fn on_gossip_aggregated_attestation(
})
.collect::<Result<_, _>>()?;

let data_root = aggregated.data.tree_hash_root();
let hashed = HashedAttestationData::new(aggregated.data.clone());
let data_root = hashed.root();
let slot: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32");

{
Expand All @@ -468,22 +445,8 @@ pub fn on_gossip_aggregated_attestation(
}
.map_err(StoreError::AggregateVerificationFailed)?;

// Store attestation data by root (content-addressed, idempotent)
store.insert_attestation_data_by_root(data_root, aggregated.data.clone());

// Store one aggregated payload per participating validator (batch insert)
let entries: Vec<_> = aggregated
.proof
.participant_indices()
.map(|validator_id| {
let payload = StoredAggregatedPayload {
slot: aggregated.data.slot,
proof: aggregated.proof.clone(),
};
((validator_id, data_root), payload)
})
.collect();
store.insert_new_aggregated_payloads_batch(entries);
// Store one proof per attestation data (not per validator)
store.insert_new_aggregated_payload(hashed, aggregated.proof.clone());
metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count());

let slot = aggregated.data.slot;
Expand All @@ -497,7 +460,7 @@ pub fn on_gossip_aggregated_attestation(
"Aggregated attestation processed"
);

metrics::inc_attestations_valid();
metrics::inc_attestations_valid(1);

Ok(())
}
Expand Down Expand Up @@ -594,37 +557,23 @@ fn on_block_core(
let aggregated_attestations = &block.body.attestations;
let attestation_signatures = &signed_block.signature.attestation_signatures;

// Process block body attestations.
// Store attestation data by root and proofs in known aggregated payloads.
let mut att_data_entries: Vec<(H256, AttestationData)> = Vec::new();
let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new();
// Store one proof per attestation data in known aggregated payloads.
let mut known_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new();
for (att, proof) in aggregated_attestations
.iter()
.zip(attestation_signatures.iter())
{
let data_root = att.data.tree_hash_root();
att_data_entries.push((data_root, att.data.clone()));

let validator_ids: Vec<_> = validator_indices(&att.aggregation_bits).collect();
let payload = StoredAggregatedPayload {
slot: att.data.slot,
proof: proof.clone(),
};

for validator_id in &validator_ids {
known_entries.push(((*validator_id, data_root), payload.clone()));
metrics::inc_attestations_valid();
}
known_entries.push((HashedAttestationData::new(att.data.clone()), proof.clone()));
// Count each participating validator as a valid attestation
let count = validator_indices(&att.aggregation_bits).count() as u64;
metrics::inc_attestations_valid(count);
}

// Process proposer attestation as pending (enters "new" stage via gossip path)
// The proposer's attestation should NOT affect this block's fork choice position.
let proposer_vid = proposer_attestation.validator_id;
let proposer_data_root = proposer_attestation.data.tree_hash_root();
att_data_entries.push((proposer_data_root, proposer_attestation.data.clone()));
let proposer_hashed = HashedAttestationData::new(proposer_attestation.data.clone());

// Batch-insert all attestation data (body + proposer) in a single commit
store.insert_attestation_data_by_root_batch(att_data_entries);
store.insert_known_aggregated_payloads_batch(known_entries);

// Update forkchoice head based on new block and attestations
Expand All @@ -635,23 +584,15 @@ fn on_block_core(
if !verify {
// Without sig verification, insert directly with a dummy proof
let participants = aggregation_bits_from_validator_indices(&[proposer_vid]);
let payload = StoredAggregatedPayload {
slot: proposer_attestation.data.slot,
proof: AggregatedSignatureProof::empty(participants),
};
store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload);
let proof = AggregatedSignatureProof::empty(participants);
store.insert_new_aggregated_payload(proposer_hashed, proof);
} else {
// Store the proposer's signature unconditionally for future block building.
// Subnet filtering is handled at the P2P subscription layer.
let proposer_sig =
ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature)
.map_err(|_| StoreError::SignatureDecodingFailed)?;
store.insert_gossip_signature(
proposer_data_root,
proposer_attestation.data.slot,
proposer_vid,
proposer_sig,
);
store.insert_gossip_signature(proposer_hashed, proposer_vid, proposer_sig);
}

let block_total = block_start.elapsed();
Expand Down Expand Up @@ -807,20 +748,10 @@ pub fn produce_block_with_signatures(
});
}

let known_block_roots = store.get_block_roots();

// Group payloads by data_root with deduplicated proofs
let proofs_by_data_root = store.known_payloads_by_data_root();
// Get known aggregated payloads: data_root -> (AttestationData, Vec<proof>)
let aggregated_payloads = store.known_aggregated_payloads();

// Resolve AttestationData for each data_root
let aggregated_payloads: HashMap<H256, (AttestationData, Vec<AggregatedSignatureProof>)> =
proofs_by_data_root
.into_iter()
.filter_map(|(data_root, proofs)| {
let data = store.get_attestation_data_by_root(&data_root)?;
Some((data_root, (data, proofs)))
})
.collect();
let known_block_roots = store.get_block_roots();

let (block, signatures) = build_block(
&head_state,
Expand Down
11 changes: 3 additions & 8 deletions crates/blockchain/tests/forkchoice_spectests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,8 @@ fn validate_attestation_check(
let location = check.location.as_str();

let attestations: HashMap<u64, AttestationData> = match location {
"new" => {
st.extract_latest_attestations(st.iter_new_aggregated_payloads().map(|(key, _)| key))
}
"known" => {
st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key))
}
"new" => st.extract_latest_new_attestations(),
"known" => st.extract_latest_known_attestations(),
other => {
return Err(
format!("Step {}: unknown attestation location: {}", step_idx, other).into(),
Expand Down Expand Up @@ -369,8 +365,7 @@ fn validate_lexicographic_head_among(
}

let blocks = st.get_live_chain();
let known_attestations: HashMap<u64, AttestationData> =
st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key));
let known_attestations: HashMap<u64, AttestationData> = st.extract_latest_known_attestations();

// Resolve all fork labels to roots and compute their weights
// Map: label -> (root, slot, weight)
Expand Down
45 changes: 43 additions & 2 deletions crates/common/types/src/attestation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{
block::AggregatedSignatureProof,
checkpoint::Checkpoint,
primitives::ssz::{Decode, Encode, TreeHash},
primitives::{
H256,
ssz::{Decode, Encode, TreeHash},
},
signature::SignatureSize,
state::ValidatorRegistryLimit,
};
Expand All @@ -17,7 +20,7 @@ pub struct Attestation {
}

/// Attestation content describing the validator's observed chain view.
#[derive(Debug, Clone, Encode, Decode, TreeHash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, TreeHash)]
pub struct AttestationData {
/// The slot for which the attestation is made.
pub slot: u64,
Expand Down Expand Up @@ -77,3 +80,41 @@ pub struct SignedAggregatedAttestation {
pub data: AttestationData,
pub proof: AggregatedSignatureProof,
}

/// Attestation data paired with its precomputed tree hash root.
///
/// Private fields ensure that `root == data.tree_hash_root()` is always true.
/// The only way to construct this is via [`HashedAttestationData::new`] or
/// [`From<AttestationData>`], both of which compute the root from the data.
#[derive(Debug, Clone)]
pub struct HashedAttestationData {
root: H256,
data: AttestationData,
}

impl HashedAttestationData {
pub fn new(data: AttestationData) -> Self {
Self {
root: data.tree_hash_root(),
data,
}
}

pub fn root(&self) -> H256 {
self.root
}

pub fn data(&self) -> &AttestationData {
&self.data
}

pub fn into_parts(self) -> (H256, AttestationData) {
(self.root, self.data)
}
}

impl From<AttestationData> for HashedAttestationData {
fn from(data: AttestationData) -> Self {
Self::new(data)
}
}
13 changes: 12 additions & 1 deletion crates/common/types/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@ use crate::primitives::{

/// Represents a checkpoint in the chain's history.
#[derive(
Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, TreeHash,
Debug,
Clone,
Copy,
Default,
PartialEq,
Eq,
Hash,
Serialize,
Deserialize,
Encode,
Decode,
TreeHash,
)]
pub struct Checkpoint {
/// The root hash of the checkpoint's block.
Expand Down
Loading
Loading