Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lean_client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion lean_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ ethereum-types = "0.14"
futures = "0.3"
features = { git = "https://github.com/grandinetech/grandine", rev = "64afdee3c6be79fceffb66933dcb69a943f3f1ae" }
hex = "0.4.3"
http-body-util = "0.1"
http_api_utils = { git = "https://github.com/grandinetech/grandine", rev = "64afdee3c6be79fceffb66933dcb69a943f3f1ae" }
k256 = "0.13"
rec_aggregation = { git = "https://github.com/leanEthereum/leanMultisig.git", rev = "2eb4b9d983171139af36749f127dd9890c9109e6" }
Expand Down Expand Up @@ -273,6 +274,7 @@ prometheus = "0.14"
rand = "0.10"
rand_chacha = "0.10"
rayon = "1"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] }
rstest = "0.18"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand All @@ -286,6 +288,7 @@ test-generator = "0.3.1"
thiserror = "2"
tiny-keccak = "2.0.2"
tokio = { version = "1.0", features = ["full"] }
tower = { version = "0.5", features = ["util"] }
tower-http = { version = '0.6', features = ['cors', 'trace'] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
Expand Down Expand Up @@ -321,4 +324,4 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
validator = { workspace = true }
xmss = { workspace = true }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest = { workspace = true }
135 changes: 80 additions & 55 deletions lean_client/containers/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,68 +888,93 @@ impl State {
}

if !phase2_children.is_empty() {
let child_pk_vecs: Vec<Vec<PublicKey>> = phase2_children
.iter()
.map(|child| {
child
.get_participant_indices()
.into_iter()
.filter_map(|vid| {
self.validators
.get(vid)
.ok()
.map(|v| v.attestation_pubkey.clone())
})
.collect()
})
.collect();
if phase2_children.len() == 1 {
// leanSpec state.py: `if len(proofs) == 1: sig = proofs[0]`
// Single proof: pass through directly without re-aggregating.
let single_proof = (*phase2_children[0]).clone();
let phase2_participants = single_proof.participants.clone();

info!(
slot = data.slot.0,
validators =
phase2_children[0].get_participant_indices().len(),
"Phase 2: single proof passed through directly"
);
results.push((
AggregatedAttestation {
aggregation_bits: phase2_participants,
data: data.clone(),
},
single_proof,
));
} else {
let child_pk_vecs: Vec<Vec<PublicKey>> = phase2_children
.iter()
.map(|child| {
child
.get_participant_indices()
.into_iter()
.filter_map(|vid| {
self.validators
.get(vid)
.ok()
.map(|v| v.attestation_pubkey.clone())
})
.collect()
})
.collect();

let children_arg: Vec<(&[PublicKey], &AggregatedSignatureProof)> =
child_pk_vecs
let children_arg: Vec<(
&[PublicKey],
&AggregatedSignatureProof,
)> = child_pk_vecs
.iter()
.zip(phase2_children.iter())
.map(|(pks, proof)| (pks.as_slice(), *proof))
.collect();

let mut phase2_validator_ids: Vec<u64> = phase2_children
.iter()
.flat_map(|child| child.get_participant_indices())
.collect();
phase2_validator_ids.sort();
phase2_validator_ids.dedup();

let phase2_participants =
AggregationBits::from_validator_indices(&phase2_validator_ids);

match AggregatedSignatureProof::aggregate_with_children(
phase2_participants.clone(),
&children_arg,
Vec::<PublicKey>::new(),
Vec::<Signature>::new(),
data_root,
data.slot.0 as u32,
log_inv_rate,
) {
Ok(proof) => {
info!(
slot = data.slot.0,
children = phase2_children.len(),
validators = phase2_validator_ids.len(),
"Phase 2: recursive block proof via aggregate_with_children"
);
results.push((
AggregatedAttestation {
aggregation_bits: phase2_participants,
data: data.clone(),
},
proof,
));
}
Err(e) => {
warn!(
error = %e,
"Phase 2 recursive aggregation failed, skipping"
let mut phase2_validator_ids: Vec<u64> = phase2_children
.iter()
.flat_map(|child| child.get_participant_indices())
.collect();
phase2_validator_ids.sort();
phase2_validator_ids.dedup();

let phase2_participants =
AggregationBits::from_validator_indices(
&phase2_validator_ids,
);

match AggregatedSignatureProof::aggregate_with_children(
phase2_participants.clone(),
&children_arg,
Vec::<PublicKey>::new(),
Vec::<Signature>::new(),
data_root,
data.slot.0 as u32,
log_inv_rate,
) {
Ok(proof) => {
info!(
slot = data.slot.0,
children = phase2_children.len(),
validators = phase2_validator_ids.len(),
"Phase 2: recursive block proof via aggregate_with_children"
);
results.push((
AggregatedAttestation {
aggregation_bits: phase2_participants,
data: data.clone(),
},
proof,
));
}
Err(e) => {
warn!(
error = %e,
"Phase 2 recursive aggregation failed, skipping"
);
}
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions lean_client/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,24 @@ anyhow = { workspace = true }
axum = { workspace = true }
clap = { workspace = true }
fork_choice = { workspace = true }
validator = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
ssz = { workspace = true }
tokio = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
fork_choice = { workspace = true }
http-body-util = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
test-generator = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
100 changes: 100 additions & 0 deletions lean_client/http_api/src/aggregator_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/// Runtime controller for the node's aggregator role.
///
/// Exposes get/set operations over the shared `is_aggregator` flag so the
/// admin API can rotate aggregator duties across nodes without restarting.
///
/// Toggles are serialized under a `tokio::sync::Mutex` so concurrent admin
/// requests cannot leave the store and validator service disagreeing on the
/// current role.
use std::sync::Arc;

use tokio::sync::Mutex;
use tracing::info;
use validator::ValidatorService;

use metrics::METRICS;

use crate::handlers::SharedStore;

/// Shared handle wrapped by [`SharedController`] and passed into axum handlers via `State`.
pub type AggregatorControllerHandle = Arc<AggregatorController>;

/// Shared controller passed into axum handlers via `State`.
pub type SharedController = Option<AggregatorControllerHandle>;

/// Runtime control over the node's aggregator role.
///
/// Operators toggle the flag to rotate aggregation duties across nodes when
/// an active aggregator becomes unhealthy, without restarting the node.
///
/// The spec-level semantics are unchanged: the store reads `is_aggregator`
/// on each gossip event and each tick, so flipping the flag takes effect
/// from the next event or tick onward.
pub struct AggregatorController {
/// Store whose flag gates gossip-side aggregator behaviour.
store: SharedStore,

/// Validator service whose flag drives aggregation duty execution.
/// `None` when the node has no validator identity.
validator_service: Option<Arc<ValidatorService>>,

/// Serializes concurrent toggle requests from admin API handlers.
lock: Mutex<()>,
}

impl AggregatorController {
/// Create a new controller.
///
/// # Arguments
/// * `store` — shared forkchoice store
/// * `validator_service` — optional validator service
pub fn new(store: SharedStore, validator_service: Option<Arc<ValidatorService>>) -> Self {
Self {
store,
validator_service,
lock: Mutex::new(()),
}
}

/// Return whether the node is currently acting as aggregator.
///
/// Reads the live flag from the store (source of truth).
pub fn is_enabled(&self) -> bool {
self.store.read().is_aggregator
}

/// Update the aggregator role and return the previous value.
///
/// The store and validator service are updated together under the mutex
/// so both views remain consistent from any observer's perspective.
///
/// # Arguments
/// * `enabled` — desired aggregator state
///
/// # Returns
/// Aggregator state prior to the update.
pub async fn set_enabled(&self, enabled: bool) -> bool {
let _guard = self.lock.lock().await;

let previous = self.store.read().is_aggregator;
self.store.write().is_aggregator = enabled;

if let Some(vs) = &self.validator_service {
vs.set_is_aggregator(enabled);
}

METRICS
.get()
.map(|m| m.lean_is_aggregator.set(if enabled { 1 } else { 0 }));

if previous != enabled {
info!(
is_aggregator = enabled,
"Aggregator role {} via admin API",
if enabled { "activated" } else { "deactivated" },
);
}

previous
}
}
Loading
Loading