From 4a1cab250d0cf1b39d34626fcf593db4e2aaf169 Mon Sep 17 00:00:00 2001 From: petarjuki7 Date: Tue, 17 Feb 2026 13:39:47 +0100 Subject: [PATCH 1/3] refactor: metadata service --- anchor/client/src/lib.rs | 8 +- ...ice.rs => aggregator_consensus_builder.rs} | 474 ++---------------- .../src/duty_input_publisher.rs | 465 +++++++++++++++++ anchor/validator_store/src/lib.rs | 21 +- 4 files changed, 540 insertions(+), 428 deletions(-) rename anchor/validator_store/src/{metadata_service.rs => aggregator_consensus_builder.rs} (76%) create mode 100644 anchor/validator_store/src/duty_input_publisher.rs diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 29b5fe55e..dcc93cf30 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -15,7 +15,7 @@ use std::{ }; use anchor_validator_store::{ - AnchorValidatorStore, metadata_service::MetadataService, + AnchorValidatorStore, duty_input_publisher::DutyInputPublisher, registration_service::RegistrationService, }; use beacon_node_fallback::{ @@ -701,7 +701,7 @@ impl Client { executor.clone(), ); - let metadata_service = MetadataService::new( + let duty_input_publisher = DutyInputPublisher::new( duties_service.clone(), validator_store.clone(), slot_clock.clone(), @@ -731,9 +731,9 @@ impl Client { .start_update_service(&spec) .map_err(|e| format!("Unable to start sync committee service: {e}"))?; - metadata_service + duty_input_publisher .start_update_service() - .map_err(|e| format!("Unable to start metadata service: {e}"))?; + .map_err(|e| format!("Unable to start duty input publisher: {e}"))?; preparation_service .start_proposer_prepare_service(&spec) diff --git a/anchor/validator_store/src/metadata_service.rs b/anchor/validator_store/src/aggregator_consensus_builder.rs similarity index 76% rename from anchor/validator_store/src/metadata_service.rs rename to anchor/validator_store/src/aggregator_consensus_builder.rs index 64fd7ceaf..fc5f84deb 100644 --- a/anchor/validator_store/src/metadata_service.rs +++ b/anchor/validator_store/src/aggregator_consensus_builder.rs @@ -1,3 +1,21 @@ +//! Aggregator Consensus Data Builder +//! +//! This module handles fetching aggregated attestations and sync contributions from beacon nodes, +//! and building `AggregatorCommitteeConsensusData` for QBFT consensus (Boole+ fork). +//! +//! # Responsibilities +//! +//! - Fetch aggregated attestations from beacon node (deduplicated by committee index) +//! - Fetch sync committee contributions from beacon node (deduplicated by subnet ID) +//! - Build `AggregatorCommitteeConsensusData` per SSV committee +//! - Sort aggregators/contributors deterministically for consensus compatibility with SSV-Go +//! +//! # Separation of Concerns +//! +//! This module is intentionally separated from `DutyInputPublisher` (formerly `MetadataService`) +//! which handles the timing and publishing of duty inputs. This module focuses purely on +//! the data fetching and consensus data construction logic. + use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -7,7 +25,6 @@ use std::{ use beacon_node_fallback::BeaconNodeFallback; use bls::PublicKeyBytes; use eth2::types::SyncContributionData; -use fork::{Fork, ForkSchedule}; use futures::stream::{FuturesUnordered, StreamExt}; use slot_clock::SlotClock; use ssv_types::{ @@ -15,464 +32,81 @@ use ssv_types::{ consensus::{AggregatorCommitteeConsensusData, AssignedAggregator, BeaconVote, DataVersion}, }; use ssz::Encode; -use task_executor::TaskExecutor; -use tokio::time::{Instant, sleep, sleep_until}; -use tracing::{Instrument, error, info, info_span, trace, warn}; +use tokio::time::{Instant, sleep_until}; +use tracing::{Instrument, info_span, warn}; use tree_hash::TreeHash; use types::{ Attestation, AttestationData, ChainSpec, EthSpec, ForkName, Hash256, Slot, SyncCommitteeContribution, SyncSelectionProof, SyncSubnetId, }; -use validator_services::duties_service::{DutiesService, DutyAndProof}; +use validator_services::duties_service::DutyAndProof; -use crate::{ - AggregationAssignments, AnchorValidatorStore, ContributionWaiter, VotingAssignments, - VotingContext, metrics, -}; +use crate::{AnchorValidatorStore, VotingContext, metrics}; /// Data for sync committee aggregators. -struct SyncAggregatorData { - validator_index: u64, - pubkey: PublicKeyBytes, - selection_proof: SyncSelectionProof, +pub struct SyncAggregatorData { + pub validator_index: u64, + pub pubkey: PublicKeyBytes, + pub selection_proof: SyncSelectionProof, } /// Map from SSV committee to its sync aggregators grouped by subnet. -type SyncByCommitteeMap = HashMap>; - -/// Maximum time to wait for beacon node API calls to fetch aggregated attestations -/// and sync contributions. After this timeout, we return whatever partial results -/// have been collected. This is shorter than the standard 3-second Lighthouse timeout -/// because SSV has additional latency for QBFT consensus and P2P propagation. -const BEACON_API_FETCH_TIMEOUT: Duration = Duration::from_secs(2); - -#[derive(Clone)] -pub struct MetadataService { - duties_service: Arc, T>>, - validator_store: Arc>, - slot_clock: T, - beacon_nodes: Arc>, - executor: TaskExecutor, - spec: Arc, - fork_schedule: Arc, +pub type SyncByCommitteeMap = HashMap>; + +/// Builder for `AggregatorCommitteeConsensusData`. +/// +/// Handles fetching aggregated data from beacon nodes and constructing consensus data +/// structures that all SSV operators must agree on. +pub struct AggregatorConsensusBuilder<'a, E: EthSpec, T: SlotClock + 'static> { + validator_store: &'a Arc>, + beacon_nodes: &'a Arc>, + spec: &'a Arc, } -impl MetadataService { +impl<'a, E: EthSpec, T: SlotClock + 'static> AggregatorConsensusBuilder<'a, E, T> { + /// Create a new builder with references to required services. pub fn new( - duties_service: Arc, T>>, - validator_store: Arc>, - slot_clock: T, - beacon_nodes: Arc>, - executor: TaskExecutor, - spec: Arc, - fork_schedule: Arc, + validator_store: &'a Arc>, + beacon_nodes: &'a Arc>, + spec: &'a Arc, ) -> Self { Self { - duties_service, validator_store, - slot_clock, beacon_nodes, - executor, spec, - fork_schedule, } } - pub fn start_update_service(self) -> Result<(), String> { - let slot_duration = Duration::from_secs(self.spec.seconds_per_slot); - let duration_to_next_slot = self - .slot_clock - .duration_to_next_slot() - .ok_or("Unable to determine duration to next slot")?; - - info!( - next_update_millis = duration_to_next_slot.as_millis(), - "Metadata service started" - ); - - let executor = self.executor.clone(); - - // ═══════════════════════════════════════════════════════════════════════ - // PHASE 1: VotingAssignments (slot start) - // Caches voting assignments for use by both selection proofs AND voting context. - // Reads directly from DutiesService cache which is populated on startup and - // refreshed each slot. - // ═══════════════════════════════════════════════════════════════════════ - let self_clone_phase1 = self.clone(); - executor.spawn( - async move { - loop { - if let Some(duration_to_next_slot) = - self_clone_phase1.slot_clock.duration_to_next_slot() - { - // Sleep until slot start - sleep(duration_to_next_slot).await; - - if let Err(err) = self_clone_phase1.update_voting_assignments() { - error!(err, "Failed to update validator voting assignments"); - } - } else { - error!("Failed to read slot clock"); - sleep(slot_duration).await; - } - } - }, - "voting_assignments_service", - ); - - // ═══════════════════════════════════════════════════════════════════════ - // PHASE 2: VotingContext (1/3 slot) - // Gets cached voting assignments, fetches beacon_vote, builds VotingContext. - // ═══════════════════════════════════════════════════════════════════════ - let self_clone_phase2 = self.clone(); - executor.spawn( - async move { - loop { - if let Some(duration_to_next_slot) = - self_clone_phase2.slot_clock.duration_to_next_slot() - { - // Sleep until 1/3 into slot - sleep(duration_to_next_slot + slot_duration / 3).await; - - if let Err(err) = self_clone_phase2.update_voting_context().await { - error!(err, "Failed to update voting context") - } else { - trace!("Updated voting context"); - } - } else { - error!("Failed to read slot clock"); - sleep(slot_duration).await; - } - } - }, - "voting_context_service", - ); - - // ═══════════════════════════════════════════════════════════════════════ - // PHASE 3: AggregationAssignments (2/3 slot) - // Re-fetches `duties_service.attesters()` after selection proofs are computed. - // At this point, `DutyAndProof.selection_proof.is_some()` accurately indicates - // `is_aggregator` for attestation duties. - // ═══════════════════════════════════════════════════════════════════════ - let self_clone_phase3 = self.clone(); - executor.spawn( - async move { - loop { - if let Some(duration_to_next_slot) = - self_clone_phase3.slot_clock.duration_to_next_slot() - { - // Sleep until 2/3 into slot - sleep(duration_to_next_slot + slot_duration * 2 / 3).await; - - if let Err(err) = self_clone_phase3.update_aggregation_assignments().await { - error!(err, "Failed to update aggregator voting assignments"); - } - } else { - error!("Failed to read slot clock"); - sleep(slot_duration).await; - } - } - }, - "aggregation_assignments_service", - ); - - Ok(()) - } - - /// Phase 1: Build and publish `VotingAssignments` at slot start. - fn update_voting_assignments(&self) -> Result<(), String> { - let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; - - // Get attestation validators - let (attesting_validators, attesting_committees): (Vec<_>, HashMap<_, _>) = self - .duties_service - .attesters(slot) - .into_iter() - .map(|duty| { - ( - ValidatorIndex(duty.duty.validator_index as usize), - (duty.duty.pubkey, duty.duty.committee_index), - ) - }) - .unzip(); - - // Get sync validators by subnet - let sync_validators_by_subnet = self - .duties_service - .sync_duties - .get_duties_for_slot::(slot, &self.spec) - .as_ref() - .map(|sync_duties| { - let mut map = HashMap::>::new(); - sync_duties - .duties - .iter() - .filter_map(|duty| { - SyncSubnetId::compute_subnets_for_sync_committee::( - &duty.validator_sync_committee_indices, - ) - .map_err(|e| { - tracing::warn!( - "Failed to compute sync subnets for validator {}: {e:?}", - duty.validator_index - ); - }) - .ok() - .map(|subnet_ids| { - (ValidatorIndex(duty.validator_index as usize), subnet_ids) - }) - }) - .for_each(|(validator_index, subnet_ids)| { - map.entry(validator_index).or_default().extend(subnet_ids); - }); - map - }) - .unwrap_or_default(); - - let attester_count = attesting_validators.len(); - let sync_count = sync_validators_by_subnet.len(); - - let voting_assignments = VotingAssignments { - slot, - attesting_validators, - attesting_committees, - sync_validators_by_subnet, - }; - - self.validator_store - .update_voting_assignments(voting_assignments); - - // Record validator count metrics - metrics::set_gauge( - &metrics::METADATA_SERVICE_ATTESTING_VALIDATORS, - attester_count as i64, - ); - metrics::set_gauge( - &metrics::METADATA_SERVICE_SYNC_VALIDATORS, - sync_count as i64, - ); - if attester_count == 0 && sync_count == 0 { - metrics::inc_counter(&metrics::METADATA_SERVICE_EMPTY_ASSIGNMENTS_TOTAL); - } - - trace!(%slot, attester_count, sync_count, "Published VotingAssignments at slot start"); - Ok(()) - } - - /// Phase 2: Build and publish `VotingContext` at 1/3 slot. - async fn update_voting_context(&self) -> Result<(), String> { - let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; - - let voting_assignments = self - .validator_store - .get_voting_assignments(slot) - .await - .map_err(|e| format!("Failed to get cached voting assignments: {:?}", e))?; - - // Fetch beacon_vote from beacon node - let attestation_data = self - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS_HTTP_GET], - ); - beacon_node - .get_validator_attestation_data(slot, 0) - .await - .map_err(|e| format!("Failed to produce attestation data: {e:?}")) - .map(|result| result.data) - }) - .await - .map_err(|e| e.to_string())?; - - let beacon_vote = BeaconVote { - block_root: attestation_data.beacon_block_root, - source: attestation_data.source, - target: attestation_data.target, - }; - - let voting_context = VotingContext { - voting_assignments, - beacon_vote, - }; - - self.validator_store.update_voting_context(voting_context); - - trace!(%slot, "Published VotingContext at 1/3 slot"); - Ok(()) - } - - /// Phase 3: Build and publish `AggregationAssignments` at 2/3 slot. - /// - /// Uses single-pass data transformation to minimize iterations: - /// - ONE pass over attesters (those with `selection_proof`) to build all attester-related data - /// - ONE pass over `sync_aggregators` to build all sync-related data - /// - Then beacon fetches and consensus data building - async fn update_aggregation_assignments(&self) -> Result<(), String> { - let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; - - // Get selection proofs from `duties_service` - let attesters = self.duties_service.attesters(slot); - let sync_duties = self - .duties_service - .sync_duties - .get_duties_for_slot::(slot, &self.spec); - - // ═══════════════════════════════════════════════════════════════════════ - // SINGLE PASS over attesters with `selection_proof` - // Only processes validators with valid, non-liquidated SSV committees. - // Collects: `aggregator_committees`, `attesters_by_ssv_committee`, - // `attestation_committee_indexes` - // ═══════════════════════════════════════════════════════════════════════ - let mut aggregator_committees: HashMap = - HashMap::with_capacity(attesters.len()); - let mut attesters_by_ssv_committee: HashMap> = - HashMap::new(); - let mut attestation_committee_indexes: HashSet = - HashSet::with_capacity(attesters.len()); - - for attester in attesters.iter().filter(|d| d.selection_proof.is_some()) { - // Only process validators with valid, non-liquidated SSV committees - if let Some(ssv_committee_id) = self - .validator_store - .get_validator_and_cluster(attester.duty.pubkey) - .ok() - .map(|(_, cluster)| cluster.committee_id()) - { - // For AggregationAssignments output - aggregator_committees.insert(attester.duty.pubkey, attester.duty.committee_index); - - // For consensus data building - group by SSV committee - attesters_by_ssv_committee - .entry(ssv_committee_id) - .or_default() - .push(attester); - attestation_committee_indexes.insert(attester.duty.committee_index); - } - } - - // ═══════════════════════════════════════════════════════════════════════ - // SINGLE PASS over `sync_aggregators` - // Only processes validators with valid, non-liquidated SSV committees. - // Collects: `validator_subnet_counts` (for multi_sync), `sync_by_ssv_committee`, - // `all_subnet_ids` - // ═══════════════════════════════════════════════════════════════════════ - let sync_aggregators = sync_duties.as_ref().map(|duties| &duties.aggregators); - - let mut validator_subnet_counts: HashMap = HashMap::new(); - let mut sync_by_ssv_committee: SyncByCommitteeMap = HashMap::new(); - let mut all_subnet_ids: HashSet = - HashSet::with_capacity(sync_aggregators.map(|a| a.len()).unwrap_or(0)); - - if let Some(aggregators) = sync_aggregators { - for (subnet_id, subnet_aggregators) in aggregators { - for (validator_index, pubkey, selection_proof) in subnet_aggregators { - let sync_aggregator = SyncAggregatorData { - validator_index: *validator_index, - pubkey: *pubkey, - selection_proof: selection_proof.clone(), - }; - - // Only process validators with valid, non-liquidated SSV committees - if let Some(ssv_committee_id) = self - .validator_store - .get_validator_and_cluster(sync_aggregator.pubkey) - .ok() - .map(|(_, cluster)| cluster.committee_id()) - { - // For AggregationAssignments output - *validator_subnet_counts - .entry(sync_aggregator.pubkey) - .or_insert(0) += 1; - - // For consensus data building - group by SSV committee - sync_by_ssv_committee - .entry(ssv_committee_id) - .or_default() - .push((*subnet_id, sync_aggregator)); - all_subnet_ids.insert(*subnet_id); - } - } - } - } - - // Derive multi_sync_aggregators from validator_subnet_counts - // (validators aggregating on multiple subnets need coordination) - let multi_sync_aggregators: HashMap> = - validator_subnet_counts - .into_iter() - .filter(|(_, count)| *count > 1) - .map(|(pubkey, count)| (pubkey, ContributionWaiter::new(count))) - .collect(); - - // ═══════════════════════════════════════════════════════════════════════ - // Build consensus data for Boole+ forks - // ═══════════════════════════════════════════════════════════════════════ - let epoch = slot.epoch(E::slots_per_epoch()); - - let consensus_data_by_ssv_committee = - if self.fork_schedule.active_fork(epoch) >= Fork::Boole { - self.build_consensus_data_for_all_committees( - slot, - attesters_by_ssv_committee, - sync_by_ssv_committee, - attestation_committee_indexes, - all_subnet_ids, - ) - .await? - } else { - HashMap::new() - }; - - let aggregator_info = AggregationAssignments { - slot, - aggregator_committees, - multi_sync_aggregators, - consensus_data_by_ssv_committee, - }; - - self.validator_store - .update_aggregation_assignments(aggregator_info); - - trace!(%slot, "Published AggregationAssignments at 2/3 slot"); - Ok(()) - } - /// Build `AggregatorCommitteeConsensusData` for each committee that has aggregators. /// - /// Takes pre-grouped data from `update_aggregation_assignments` to avoid redundant iteration. - async fn build_consensus_data_for_all_committees( + /// Takes pre-grouped data from `DutyInputPublisher` to avoid redundant iteration. + #[allow(clippy::too_many_arguments)] + pub async fn build_consensus_data_for_all_committees( &self, slot: Slot, attesters_by_ssv_committee: HashMap>, sync_by_ssv_committee: SyncByCommitteeMap, attestation_committee_indexes: HashSet, all_subnet_ids: HashSet, + voting_context: &VotingContext, + timeout: Duration, ) -> Result>>, String> { - // Get `VotingContext` for `beacon_vote` (cached at 1/3 slot) - let voting_context = self - .validator_store - .get_voting_context(slot) - .await - .map_err(|e| format!("Failed to get voting context: {:?}", e))?; - // Parallel fetch from beacon node with timeout for partial results. // Uses `FuturesUnordered` internally to collect results as they complete. - // After BEACON_API_FETCH_TIMEOUT (2s), returns whatever has been collected. + // After timeout, returns whatever has been collected. // This ensures we don't block on slow beacon nodes while still getting partial data. let (aggregated_attestations, sync_contributions) = tokio::join!( self.fetch_aggregated_attestations( slot, &voting_context.beacon_vote, &attestation_committee_indexes, - BEACON_API_FETCH_TIMEOUT, + timeout, ), self.fetch_sync_contributions( slot, voting_context.beacon_vote.block_root, &all_subnet_ids, - BEACON_API_FETCH_TIMEOUT, + timeout, ), ); @@ -876,7 +510,7 @@ impl MetadataService { ); } Err(e) => { - error!( + tracing::error!( %slot, ?beacon_block_root, ?subnet_id, @@ -927,9 +561,9 @@ impl MetadataService { // Pure Helper Functions for Consensus Data Building // ═══════════════════════════════════════════════════════════════════════════════════════ // -// These functions are extracted from `build_consensus_data_for_committee` to enable -// unit testing of the sorting and filtering logic without requiring beacon node mocks. -// The sorting order MUST match SSV-Go exactly for consensus compatibility. +// These functions are extracted to enable unit testing of the sorting and filtering logic +// without requiring beacon node mocks. The sorting order MUST match SSV-Go exactly for +// consensus compatibility. /// Sort aggregators by `validator_index` ascending. /// @@ -1078,7 +712,7 @@ mod tests { } } - // ═══════════════════════════════════════════════════════════════════════════════════ + // ══════════════════════════════════���════════════════════════════════════════════════ // P0 Tests: Wire Compatibility // ═══════════════════════════════════════════════════════════════════════════════════ diff --git a/anchor/validator_store/src/duty_input_publisher.rs b/anchor/validator_store/src/duty_input_publisher.rs new file mode 100644 index 000000000..8401cef4b --- /dev/null +++ b/anchor/validator_store/src/duty_input_publisher.rs @@ -0,0 +1,465 @@ +//! Duty Input Publisher +//! +//! This service publishes duty inputs at scheduled times during each slot: +//! +//! - **Phase 1 (slot start)**: `VotingAssignments` - caches which validators are attesting/syncing +//! - **Phase 2 (1/3 slot)**: `VotingContext` - fetches beacon_vote and combines with assignments +//! - **Phase 3 (2/3 slot)**: `AggregationAssignments` - builds consensus data for aggregation +//! duties +//! +//! # Separation of Concerns +//! +//! This service focuses on **timing and publishing** of duty inputs. The actual consensus data +//! building logic is delegated to [`AggregatorConsensusBuilder`]. +//! +//! Previously named `MetadataService`, this was renamed to better reflect its actual role +//! as a publisher of duty inputs rather than a generic "metadata" handler. + +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use beacon_node_fallback::BeaconNodeFallback; +use bls::PublicKeyBytes; +use fork::{Fork, ForkSchedule}; +use slot_clock::SlotClock; +use ssv_types::{CommitteeId, ValidatorIndex, consensus::BeaconVote}; +use task_executor::TaskExecutor; +use tokio::time::sleep; +use tracing::{error, info, trace}; +use types::{ChainSpec, EthSpec}; +use validator_services::duties_service::DutiesService; + +use crate::{ + AggregationAssignments, AnchorValidatorStore, ContributionWaiter, VotingAssignments, + VotingContext, + aggregator_consensus_builder::{ + AggregatorConsensusBuilder, SyncAggregatorData, SyncByCommitteeMap, + }, + metrics, +}; + +/// Maximum time to wait for beacon node API calls to fetch aggregated attestations +/// and sync contributions. After this timeout, we return whatever partial results +/// have been collected. This is shorter than the standard 3-second Lighthouse timeout +/// because SSV has additional latency for QBFT consensus and P2P propagation. +const BEACON_API_FETCH_TIMEOUT: Duration = Duration::from_secs(2); + +/// Publishes duty inputs at scheduled times during each slot. +/// +/// This service is responsible for: +/// - Caching voting assignments at slot start +/// - Fetching beacon_vote at 1/3 slot +/// - Building and publishing aggregation assignments at 2/3 slot +#[derive(Clone)] +pub struct DutyInputPublisher { + duties_service: Arc, T>>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + spec: Arc, + fork_schedule: Arc, +} + +impl DutyInputPublisher { + pub fn new( + duties_service: Arc, T>>, + validator_store: Arc>, + slot_clock: T, + beacon_nodes: Arc>, + executor: TaskExecutor, + spec: Arc, + fork_schedule: Arc, + ) -> Self { + Self { + duties_service, + validator_store, + slot_clock, + beacon_nodes, + executor, + spec, + fork_schedule, + } + } + + pub fn start_update_service(self) -> Result<(), String> { + let slot_duration = Duration::from_secs(self.spec.seconds_per_slot); + let duration_to_next_slot = self + .slot_clock + .duration_to_next_slot() + .ok_or("Unable to determine duration to next slot")?; + + info!( + next_update_millis = duration_to_next_slot.as_millis(), + "Duty input publisher started" + ); + + let executor = self.executor.clone(); + + // ═══════════════════════════════════════════════════════════════════════ + // PHASE 1: VotingAssignments (slot start) + // Caches voting assignments for use by both selection proofs AND voting context. + // Reads directly from DutiesService cache which is populated on startup and + // refreshed each slot. + // ═══════════════════════════════════════════════════════════════════════ + let self_clone_phase1 = self.clone(); + executor.spawn( + async move { + loop { + if let Some(duration_to_next_slot) = + self_clone_phase1.slot_clock.duration_to_next_slot() + { + // Sleep until slot start + sleep(duration_to_next_slot).await; + + if let Err(err) = self_clone_phase1.publish_voting_assignments() { + error!(err, "Failed to publish voting assignments"); + } + } else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + } + } + }, + "voting_assignments_publisher", + ); + + // ═══════════════════════════════════════════════════════════════════════ + // PHASE 2: VotingContext (1/3 slot) + // Gets cached voting assignments, fetches beacon_vote, builds VotingContext. + // ═══════════════════════════════════════════════════════════════════════ + let self_clone_phase2 = self.clone(); + executor.spawn( + async move { + loop { + if let Some(duration_to_next_slot) = + self_clone_phase2.slot_clock.duration_to_next_slot() + { + // Sleep until 1/3 into slot + sleep(duration_to_next_slot + slot_duration / 3).await; + + if let Err(err) = self_clone_phase2.publish_voting_context().await { + error!(err, "Failed to publish voting context") + } else { + trace!("Published voting context"); + } + } else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + } + } + }, + "voting_context_publisher", + ); + + // ═══════════════════════════════════════════════════════════════════════ + // PHASE 3: AggregationAssignments (2/3 slot) + // Re-fetches `duties_service.attesters()` after selection proofs are computed. + // At this point, `DutyAndProof.selection_proof.is_some()` accurately indicates + // `is_aggregator` for attestation duties. + // ═══════════════════════════════════════════════════════════════════════ + let self_clone_phase3 = self.clone(); + executor.spawn( + async move { + loop { + if let Some(duration_to_next_slot) = + self_clone_phase3.slot_clock.duration_to_next_slot() + { + // Sleep until 2/3 into slot + sleep(duration_to_next_slot + slot_duration * 2 / 3).await; + + if let Err(err) = self_clone_phase3.publish_aggregation_assignments().await + { + error!(err, "Failed to publish aggregation assignments"); + } + } else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + } + } + }, + "aggregation_assignments_publisher", + ); + + Ok(()) + } + + /// Phase 1: Build and publish `VotingAssignments` at slot start. + fn publish_voting_assignments(&self) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + // Get attestation validators + let (attesting_validators, attesting_committees): (Vec<_>, HashMap<_, _>) = self + .duties_service + .attesters(slot) + .into_iter() + .map(|duty| { + ( + ValidatorIndex(duty.duty.validator_index as usize), + (duty.duty.pubkey, duty.duty.committee_index), + ) + }) + .unzip(); + + // Get sync validators by subnet + let sync_validators_by_subnet = self + .duties_service + .sync_duties + .get_duties_for_slot::(slot, &self.spec) + .as_ref() + .map(|sync_duties| { + let mut map = HashMap::>::new(); + sync_duties + .duties + .iter() + .filter_map(|duty| { + types::SyncSubnetId::compute_subnets_for_sync_committee::( + &duty.validator_sync_committee_indices, + ) + .map_err(|e| { + tracing::warn!( + "Failed to compute sync subnets for validator {}: {e:?}", + duty.validator_index + ); + }) + .ok() + .map(|subnet_ids| { + (ValidatorIndex(duty.validator_index as usize), subnet_ids) + }) + }) + .for_each(|(validator_index, subnet_ids)| { + map.entry(validator_index).or_default().extend(subnet_ids); + }); + map + }) + .unwrap_or_default(); + + let attester_count = attesting_validators.len(); + let sync_count = sync_validators_by_subnet.len(); + + let voting_assignments = VotingAssignments { + slot, + attesting_validators, + attesting_committees, + sync_validators_by_subnet, + }; + + self.validator_store + .update_voting_assignments(voting_assignments); + + // Record validator count metrics + metrics::set_gauge( + &metrics::METADATA_SERVICE_ATTESTING_VALIDATORS, + attester_count as i64, + ); + metrics::set_gauge( + &metrics::METADATA_SERVICE_SYNC_VALIDATORS, + sync_count as i64, + ); + if attester_count == 0 && sync_count == 0 { + metrics::inc_counter(&metrics::METADATA_SERVICE_EMPTY_ASSIGNMENTS_TOTAL); + } + + trace!(%slot, attester_count, sync_count, "Published VotingAssignments at slot start"); + Ok(()) + } + + /// Phase 2: Build and publish `VotingContext` at 1/3 slot. + async fn publish_voting_context(&self) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + let voting_assignments = self + .validator_store + .get_voting_assignments(slot) + .await + .map_err(|e| format!("Failed to get cached voting assignments: {:?}", e))?; + + // Fetch beacon_vote from beacon node + let attestation_data = self + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {e:?}")) + .map(|result| result.data) + }) + .await + .map_err(|e| e.to_string())?; + + let beacon_vote = BeaconVote { + block_root: attestation_data.beacon_block_root, + source: attestation_data.source, + target: attestation_data.target, + }; + + let voting_context = VotingContext { + voting_assignments, + beacon_vote, + }; + + self.validator_store.update_voting_context(voting_context); + + trace!(%slot, "Published VotingContext at 1/3 slot"); + Ok(()) + } + + /// Phase 3: Build and publish `AggregationAssignments` at 2/3 slot. + /// + /// Uses single-pass data transformation to minimize iterations: + /// - ONE pass over attesters (those with `selection_proof`) to build all attester-related data + /// - ONE pass over `sync_aggregators` to build all sync-related data + /// - Then delegates to `AggregatorConsensusBuilder` for beacon fetches and consensus data + async fn publish_aggregation_assignments(&self) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + // Get selection proofs from `duties_service` + let attesters = self.duties_service.attesters(slot); + let sync_duties = self + .duties_service + .sync_duties + .get_duties_for_slot::(slot, &self.spec); + + // ═══════════════════════════════════════════════════════════════════════ + // SINGLE PASS over attesters with `selection_proof` + // Only processes validators with valid, non-liquidated SSV committees. + // Collects: `aggregator_committees`, `attesters_by_ssv_committee`, + // `attestation_committee_indexes` + // ═══════════════════════════════════════════════════════════════════════ + let mut aggregator_committees: HashMap = + HashMap::with_capacity(attesters.len()); + let mut attesters_by_ssv_committee: HashMap> = HashMap::new(); + let mut attestation_committee_indexes: HashSet = + HashSet::with_capacity(attesters.len()); + + for attester in attesters.iter().filter(|d| d.selection_proof.is_some()) { + // Only process validators with valid, non-liquidated SSV committees + if let Some(ssv_committee_id) = self + .validator_store + .get_validator_and_cluster(attester.duty.pubkey) + .ok() + .map(|(_, cluster)| cluster.committee_id()) + { + // For AggregationAssignments output + aggregator_committees.insert(attester.duty.pubkey, attester.duty.committee_index); + + // For consensus data building - group by SSV committee + attesters_by_ssv_committee + .entry(ssv_committee_id) + .or_default() + .push(attester); + attestation_committee_indexes.insert(attester.duty.committee_index); + } + } + + // ═══════════════════════════════════════════════════════════════════════ + // SINGLE PASS over `sync_aggregators` + // Only processes validators with valid, non-liquidated SSV committees. + // Collects: `validator_subnet_counts` (for multi_sync), `sync_by_ssv_committee`, + // `all_subnet_ids` + // ═══════════════════════════════════════════════════════════════════════ + let sync_aggregators = sync_duties.as_ref().map(|duties| &duties.aggregators); + + let mut validator_subnet_counts: HashMap = HashMap::new(); + let mut sync_by_ssv_committee: SyncByCommitteeMap = HashMap::new(); + let mut all_subnet_ids: HashSet = + HashSet::with_capacity(sync_aggregators.map(|a| a.len()).unwrap_or(0)); + + if let Some(aggregators) = sync_aggregators { + for (subnet_id, subnet_aggregators) in aggregators { + for (validator_index, pubkey, selection_proof) in subnet_aggregators { + let sync_aggregator = SyncAggregatorData { + validator_index: *validator_index, + pubkey: *pubkey, + selection_proof: selection_proof.clone(), + }; + + // Only process validators with valid, non-liquidated SSV committees + if let Some(ssv_committee_id) = self + .validator_store + .get_validator_and_cluster(sync_aggregator.pubkey) + .ok() + .map(|(_, cluster)| cluster.committee_id()) + { + // For AggregationAssignments output + *validator_subnet_counts + .entry(sync_aggregator.pubkey) + .or_insert(0) += 1; + + // For consensus data building - group by SSV committee + sync_by_ssv_committee + .entry(ssv_committee_id) + .or_default() + .push((*subnet_id, sync_aggregator)); + all_subnet_ids.insert(*subnet_id); + } + } + } + } + + // Derive multi_sync_aggregators from validator_subnet_counts + // (validators aggregating on multiple subnets need coordination) + let multi_sync_aggregators: HashMap> = + validator_subnet_counts + .into_iter() + .filter(|(_, count)| *count > 1) + .map(|(pubkey, count)| (pubkey, ContributionWaiter::new(count))) + .collect(); + + // ═══════════════════════════════════════════════════════════════════════ + // Build consensus data for Boole+ forks + // Delegate to AggregatorConsensusBuilder for the heavy lifting + // ═══════════════════════════════════════════════════════════════════════ + let epoch = slot.epoch(E::slots_per_epoch()); + + let consensus_data_by_ssv_committee = + if self.fork_schedule.active_fork(epoch) >= Fork::Boole { + // Get voting context for beacon_vote (cached at 1/3 slot) + let voting_context = self + .validator_store + .get_voting_context(slot) + .await + .map_err(|e| format!("Failed to get voting context: {:?}", e))?; + + let builder = AggregatorConsensusBuilder::new( + &self.validator_store, + &self.beacon_nodes, + &self.spec, + ); + + builder + .build_consensus_data_for_all_committees( + slot, + attesters_by_ssv_committee, + sync_by_ssv_committee, + attestation_committee_indexes, + all_subnet_ids, + &voting_context, + BEACON_API_FETCH_TIMEOUT, + ) + .await? + } else { + HashMap::new() + }; + + let aggregator_info = AggregationAssignments { + slot, + aggregator_committees, + multi_sync_aggregators, + consensus_data_by_ssv_committee, + }; + + self.validator_store + .update_aggregation_assignments(aggregator_info); + + trace!(%slot, "Published AggregationAssignments at 2/3 slot"); + Ok(()) + } +} diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs index adfbc5593..e1d447af0 100644 --- a/anchor/validator_store/src/lib.rs +++ b/anchor/validator_store/src/lib.rs @@ -1,7 +1,17 @@ -pub mod metadata_service; +pub mod aggregator_consensus_builder; +pub mod duty_input_publisher; mod metrics; pub mod registration_service; +/// Backwards compatibility: re-export DutyInputPublisher as MetadataService +#[deprecated( + since = "0.1.0", + note = "Use duty_input_publisher::DutyInputPublisher instead" +)] +pub mod metadata_service { + pub use crate::duty_input_publisher::DutyInputPublisher as MetadataService; +} + use std::{ collections::{HashMap, HashSet}, fmt::Debug, @@ -1540,11 +1550,14 @@ fn decrypt_key_share( .map_err(|err| error!(?err, validator = %pubkey_bytes, "Invalid secret key decrypted")) } -struct VotingContext { +/// Context for voting duties at 1/3 slot. +/// +/// Contains cached voting assignments and the beacon_vote fetched from the beacon node. +pub struct VotingContext { /// Cached voting assignments (computed at slot start, reused here) - voting_assignments: Arc, + pub voting_assignments: Arc, /// The `BeaconVote` (only available at 1/3 slot from beacon node) - beacon_vote: BeaconVote, + pub beacon_vote: BeaconVote, } /// Cached validator voting assignments for a slot. From f3833bf9e834d958cc97e7c424e45f3cd0e5f4e0 Mon Sep 17 00:00:00 2001 From: petarjuki7 Date: Thu, 19 Feb 2026 11:48:21 +0100 Subject: [PATCH 2/3] fix clippy --- anchor/validator_store/src/aggregator_consensus_builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anchor/validator_store/src/aggregator_consensus_builder.rs b/anchor/validator_store/src/aggregator_consensus_builder.rs index fc5f84deb..e95ad44ab 100644 --- a/anchor/validator_store/src/aggregator_consensus_builder.rs +++ b/anchor/validator_store/src/aggregator_consensus_builder.rs @@ -80,7 +80,7 @@ impl<'a, E: EthSpec, T: SlotClock + 'static> AggregatorConsensusBuilder<'a, E, T /// Build `AggregatorCommitteeConsensusData` for each committee that has aggregators. /// /// Takes pre-grouped data from `DutyInputPublisher` to avoid redundant iteration. - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub async fn build_consensus_data_for_all_committees( &self, slot: Slot, From 9049cacaa7e9835c6103e0e4d15051c5dfe0870c Mon Sep 17 00:00:00 2001 From: petarjuki7 Date: Mon, 23 Feb 2026 16:52:50 +0100 Subject: [PATCH 3/3] simplified duty_input_publisher --- .../src/aggregator_consensus_builder.rs | 1354 ++++++----------- .../src/duty_input_publisher.rs | 202 +-- anchor/validator_store/src/lib.rs | 24 +- 3 files changed, 513 insertions(+), 1067 deletions(-) diff --git a/anchor/validator_store/src/aggregator_consensus_builder.rs b/anchor/validator_store/src/aggregator_consensus_builder.rs index e95ad44ab..dc3eab372 100644 --- a/anchor/validator_store/src/aggregator_consensus_builder.rs +++ b/anchor/validator_store/src/aggregator_consensus_builder.rs @@ -1,20 +1,4 @@ -//! Aggregator Consensus Data Builder -//! -//! This module handles fetching aggregated attestations and sync contributions from beacon nodes, -//! and building `AggregatorCommitteeConsensusData` for QBFT consensus (Boole+ fork). -//! -//! # Responsibilities -//! -//! - Fetch aggregated attestations from beacon node (deduplicated by committee index) -//! - Fetch sync committee contributions from beacon node (deduplicated by subnet ID) -//! - Build `AggregatorCommitteeConsensusData` per SSV committee -//! - Sort aggregators/contributors deterministically for consensus compatibility with SSV-Go -//! -//! # Separation of Concerns -//! -//! This module is intentionally separated from `DutyInputPublisher` (formerly `MetadataService`) -//! which handles the timing and publishing of duty inputs. This module focuses purely on -//! the data fetching and consensus data construction logic. +//! Builds aggregation assignments for QBFT consensus (Boole+ fork). use std::{ collections::{HashMap, HashSet}, @@ -41,431 +25,428 @@ use types::{ }; use validator_services::duties_service::DutyAndProof; -use crate::{AnchorValidatorStore, VotingContext, metrics}; +use crate::{AnchorValidatorStore, ContributionWaiter, VotingAssignments, metrics}; -/// Data for sync committee aggregators. pub struct SyncAggregatorData { pub validator_index: u64, pub pubkey: PublicKeyBytes, pub selection_proof: SyncSelectionProof, } -/// Map from SSV committee to its sync aggregators grouped by subnet. pub type SyncByCommitteeMap = HashMap>; -/// Builder for `AggregatorCommitteeConsensusData`. -/// -/// Handles fetching aggregated data from beacon nodes and constructing consensus data -/// structures that all SSV operators must agree on. -pub struct AggregatorConsensusBuilder<'a, E: EthSpec, T: SlotClock + 'static> { - validator_store: &'a Arc>, - beacon_nodes: &'a Arc>, - spec: &'a Arc, -} - -impl<'a, E: EthSpec, T: SlotClock + 'static> AggregatorConsensusBuilder<'a, E, T> { - /// Create a new builder with references to required services. - pub fn new( - validator_store: &'a Arc>, - beacon_nodes: &'a Arc>, - spec: &'a Arc, - ) -> Self { - Self { - validator_store, - beacon_nodes, - spec, - } - } +pub(crate) type SyncAggregatorsBySubnet = + HashMap>; - /// Build `AggregatorCommitteeConsensusData` for each committee that has aggregators. - /// - /// Takes pre-grouped data from `DutyInputPublisher` to avoid redundant iteration. - #[expect(clippy::too_many_arguments)] - pub async fn build_consensus_data_for_all_committees( - &self, - slot: Slot, - attesters_by_ssv_committee: HashMap>, - sync_by_ssv_committee: SyncByCommitteeMap, - attestation_committee_indexes: HashSet, - all_subnet_ids: HashSet, - voting_context: &VotingContext, - timeout: Duration, - ) -> Result>>, String> { - // Parallel fetch from beacon node with timeout for partial results. - // Uses `FuturesUnordered` internally to collect results as they complete. - // After timeout, returns whatever has been collected. - // This ensures we don't block on slow beacon nodes while still getting partial data. - let (aggregated_attestations, sync_contributions) = tokio::join!( - self.fetch_aggregated_attestations( - slot, - &voting_context.beacon_vote, - &attestation_committee_indexes, - timeout, - ), - self.fetch_sync_contributions( - slot, - voting_context.beacon_vote.block_root, - &all_subnet_ids, - timeout, - ), - ); - - // Build consensus data per committee - // All committees with work - let ssv_committees: HashSet = attesters_by_ssv_committee - .keys() - .chain(sync_by_ssv_committee.keys()) - .copied() - .collect(); +/// Context for voting duties at 1/3 slot. +pub(crate) struct VotingContext { + pub voting_assignments: Arc, + pub beacon_vote: BeaconVote, +} - let mut result = HashMap::with_capacity(ssv_committees.len()); - for ssv_committee_id in ssv_committees { - let ssv_committee_attesters = attesters_by_ssv_committee.get(&ssv_committee_id); - let ssv_committee_sync = sync_by_ssv_committee.get(&ssv_committee_id); +/// Result of the single-pass grouping over duties. +pub(crate) struct GroupedDuties<'a, E: EthSpec> { + pub aggregator_committees: HashMap, + pub attesters_by_ssv_committee: HashMap>, + pub attestation_committee_indexes: HashSet, + pub multi_sync_aggregators: HashMap>, + pub sync_by_ssv_committee: SyncByCommitteeMap, + pub all_subnet_ids: HashSet, +} - let consensus_data = self.build_consensus_data_for_committee( - slot, - &ssv_committee_id, - ssv_committee_attesters, - ssv_committee_sync, - &aggregated_attestations, - &sync_contributions, - )?; - - if let Some(data) = consensus_data { - result.insert(ssv_committee_id, Arc::new(data)); - } +/// Groups attesters and sync aggregators by SSV committee in a single pass each. +pub(crate) fn group_duties_by_committee<'a, E: EthSpec, T: SlotClock + 'static>( + attesters: &'a [DutyAndProof], + sync_aggregators: Option<&SyncAggregatorsBySubnet>, + validator_store: &AnchorValidatorStore, +) -> GroupedDuties<'a, E> { + let mut aggregator_committees: HashMap = + HashMap::with_capacity(attesters.len()); + let mut attesters_by_ssv_committee: HashMap> = HashMap::new(); + let mut attestation_committee_indexes: HashSet = HashSet::with_capacity(attesters.len()); + + for attester in attesters.iter().filter(|d| d.selection_proof.is_some()) { + if let Some(ssv_committee_id) = validator_store + .get_validator_and_cluster(attester.duty.pubkey) + .ok() + .map(|(_, cluster)| cluster.committee_id()) + { + aggregator_committees.insert(attester.duty.pubkey, attester.duty.committee_index); + attesters_by_ssv_committee + .entry(ssv_committee_id) + .or_default() + .push(attester); + attestation_committee_indexes.insert(attester.duty.committee_index); } - - Ok(result) } - /// Build `AggregatorCommitteeConsensusData` for a single committee. - /// - /// CRITICAL REQUIREMENTS (must match SSV Go/Spec exactly for consensus): - /// 1. Aggregators: Call `is_aggregator()` on the selection proof before including - /// 2. Contributors: Call `is_sync_committee_aggregator()` on the selection proof before - /// including - /// 3. Aggregators: Sort by `validator_index` (all share same signing root) - /// 4. Contributors: Sort by (`signing_root`, `validator_index`) to match SSV Go's root-sorted - /// processing - /// 5. Committee indexes: First-seen order from sorted aggregators (not sorted separately) - /// 6. Subnet IDs: First-seen order from sorted contributors (not sorted separately) - fn build_consensus_data_for_committee( - &self, - slot: Slot, - ssv_committee_id: &CommitteeId, - ssv_committee_attesters: Option<&Vec<&DutyAndProof>>, - ssv_committee_sync: Option<&Vec<(SyncSubnetId, SyncAggregatorData)>>, - aggregated_attestations: &HashMap>, - sync_contributions: &HashMap>, - ) -> Result>, String> { - // === AGGREGATORS === - // Process pre-filtered attesters for this committee - // These validators are already confirmed to be in this committee - let mut aggregators: Vec = match ssv_committee_attesters { - Some(attesters) => Vec::with_capacity(attesters.len()), - None => Vec::new(), - }; - if let Some(attesters) = ssv_committee_attesters { - for duty_and_proof in attesters.iter() { - let validator_index = ValidatorIndex(duty_and_proof.duty.validator_index as usize); - - // Verify the aggregated selection proof passes beacon node is_aggregator check. - // Without this check, validators whose proofs don't meet the modulo threshold - // would be included, causing consensus hash mismatch with other operators. - // NOTE: Since we don't have direct beacon node access to is_aggregator() in - // Anchor's setup, we rely on the fact that `selection_proof.is_some()` means - // Lighthouse has already verified this validator is an aggregator. - // Defensive: this should never happen since `update_aggregation_assignments` - // filters with `.filter(|d| d.selection_proof.is_some())` before grouping. - let Some(selection_proof) = duty_and_proof.selection_proof.clone() else { - warn!( - %slot, - ?ssv_committee_id, - ?validator_index, - "[AggregatorCommittee] BUG: aggregator missing selection_proof despite upstream filter" - ); - continue; + let mut validator_subnet_counts: HashMap = HashMap::new(); + let mut sync_by_ssv_committee: SyncByCommitteeMap = HashMap::new(); + let mut all_subnet_ids: HashSet = + HashSet::with_capacity(sync_aggregators.map(|a| a.len()).unwrap_or(0)); + + if let Some(aggregators) = sync_aggregators { + for (subnet_id, subnet_aggregators) in aggregators { + for (validator_index, pubkey, selection_proof) in subnet_aggregators { + let sync_aggregator = SyncAggregatorData { + validator_index: *validator_index, + pubkey: *pubkey, + selection_proof: selection_proof.clone(), }; - aggregators.push(AssignedAggregator { - validator_index, - selection_proof: selection_proof.into(), - committee_index: duty_and_proof.duty.committee_index, - }); - } - } - // Sort and filter aggregators using helper functions - sort_aggregators_by_validator_index(&mut aggregators); - filter_aggregators_with_attestations(&mut aggregators, aggregated_attestations); - - // === CONTRIBUTORS === - // Process pre-filtered sync aggregators for this committee - // These validators are already confirmed to be in this committee - let mut contributors_with_roots: Vec<(Hash256, AssignedAggregator)> = - match ssv_committee_sync { - Some(sync_entries) => Vec::with_capacity(sync_entries.len()), - None => Vec::new(), - }; - if let Some(sync_entries) = ssv_committee_sync { - for (subnet_id, sync_aggregator) in sync_entries.iter() { - // Compute signing root for this subnet - let sync_selection_root = self - .validator_store - .compute_sync_selection_root(slot, (*subnet_id).into()); - - let validator_index = ValidatorIndex(sync_aggregator.validator_index as usize); - - // Lighthouse duties service already filters sync duties to only include valid - // aggregators (those whose proofs meet the modulo threshold), so we can proceed. - contributors_with_roots.push(( - sync_selection_root, - AssignedAggregator { - validator_index, - selection_proof: sync_aggregator.selection_proof.clone().into(), - committee_index: (*subnet_id).into(), - }, - )); + if let Some(ssv_committee_id) = validator_store + .get_validator_and_cluster(sync_aggregator.pubkey) + .ok() + .map(|(_, cluster)| cluster.committee_id()) + { + *validator_subnet_counts + .entry(sync_aggregator.pubkey) + .or_insert(0) += 1; + sync_by_ssv_committee + .entry(ssv_committee_id) + .or_default() + .push((*subnet_id, sync_aggregator)); + all_subnet_ids.insert(*subnet_id); + } } } + } - // Sort and filter contributors using helper functions - sort_contributors_by_signing_root_then_validator_index(&mut contributors_with_roots); - filter_contributors_with_contributions(&mut contributors_with_roots, sync_contributions); - - let contributors: Vec = contributors_with_roots + let multi_sync_aggregators: HashMap> = + validator_subnet_counts .into_iter() - .map(|(_, contributor)| contributor) + .filter(|(_, count)| *count > 1) + .map(|(pubkey, count)| (pubkey, ContributionWaiter::new(count))) .collect(); - // Early exit if no aggregators/contributors - if aggregators.is_empty() && contributors.is_empty() { - return Ok(None); - } + GroupedDuties { + aggregator_committees, + attesters_by_ssv_committee, + attestation_committee_indexes, + multi_sync_aggregators, + sync_by_ssv_committee, + all_subnet_ids, + } +} - // === COMMITTEE INDEXES & ATTESTATIONS === - // Extract unique committee indexes preserving first-seen order from sorted aggregators. - // `IndexSet` deduplicates while maintaining insertion order, matching SSV Go's approach - // of adding new indexes as they're encountered during iteration. - let attestation_committee_indexes: IndexSet = - aggregators.iter().map(|a| a.committee_index).collect(); +/// Builds `AggregatorCommitteeConsensusData` for all committees with aggregation duties. +#[expect(clippy::too_many_arguments)] +pub(crate) async fn build_consensus_data_for_all_committees( + slot: Slot, + attesters_by_ssv_committee: HashMap>, + sync_by_ssv_committee: SyncByCommitteeMap, + attestation_committee_indexes: HashSet, + all_subnet_ids: HashSet, + voting_context: &VotingContext, + timeout: Duration, + validator_store: &Arc>, + beacon_nodes: &Arc>, + spec: &Arc, +) -> Result>>, String> { + let (aggregated_attestations, sync_contributions) = tokio::join!( + fetch_aggregated_attestations( + slot, + &voting_context.beacon_vote, + &attestation_committee_indexes, + timeout, + beacon_nodes, + spec, + ), + fetch_sync_contributions( + slot, + voting_context.beacon_vote.block_root, + &all_subnet_ids, + timeout, + beacon_nodes, + ), + ); + + let ssv_committees: HashSet = attesters_by_ssv_committee + .keys() + .chain(sync_by_ssv_committee.keys()) + .copied() + .collect(); + + let mut result = HashMap::with_capacity(ssv_committees.len()); + for ssv_committee_id in ssv_committees { + let ssv_committee_attesters = attesters_by_ssv_committee.get(&ssv_committee_id); + let ssv_committee_sync = sync_by_ssv_committee.get(&ssv_committee_id); + + let consensus_data = build_consensus_data_for_committee( + slot, + &ssv_committee_id, + ssv_committee_attesters, + ssv_committee_sync, + &aggregated_attestations, + &sync_contributions, + validator_store, + spec, + )?; - // Get attestations in attestation_committee_indexes order (1:1 correspondence) - let attestations_bytes: Vec> = attestation_committee_indexes - .iter() - .filter_map(|committee_index| aggregated_attestations.get(committee_index)) - .map(|attestation| { - let bytes = attestation.as_ssz_bytes(); - VariableList::new(bytes).map_err(|e| { - warn!("Failed to create attestation bytes list: {:?}", e); - e - }) - }) - .collect::, _>>() - .map_err(|e| format!("Failed to create attestation bytes: {:?}", e))?; + if let Some(data) = consensus_data { + result.insert(ssv_committee_id, Arc::new(data)); + } + } - // === SUBNET IDS & CONTRIBUTIONS === - // Extract unique subnet IDs preserving first-seen order from sorted contributors. - // `IndexSet` deduplicates while maintaining insertion order, matching SSV Go's approach - // of adding new IDs as they're encountered during iteration. - let subnet_ids: IndexSet = contributors - .iter() - .map(|c| SyncSubnetId::new(c.committee_index)) - .collect(); + Ok(result) +} - let contributions: Vec> = subnet_ids - .iter() - .filter_map(|id| sync_contributions.get(id).cloned()) - .collect(); +#[expect(clippy::too_many_arguments)] +fn build_consensus_data_for_committee( + slot: Slot, + ssv_committee_id: &CommitteeId, + ssv_committee_attesters: Option<&Vec<&DutyAndProof>>, + ssv_committee_sync: Option<&Vec<(SyncSubnetId, SyncAggregatorData)>>, + aggregated_attestations: &HashMap>, + sync_contributions: &HashMap>, + validator_store: &Arc>, + spec: &Arc, +) -> Result>, String> { + let mut aggregators: Vec = match ssv_committee_attesters { + Some(attesters) => Vec::with_capacity(attesters.len()), + None => Vec::new(), + }; + if let Some(attesters) = ssv_committee_attesters { + for duty_and_proof in attesters.iter() { + let validator_index = ValidatorIndex(duty_and_proof.duty.validator_index as usize); + let Some(selection_proof) = duty_and_proof.selection_proof.clone() else { + warn!( + %slot, + ?ssv_committee_id, + ?validator_index, + "BUG: aggregator missing selection_proof despite upstream filter" + ); + continue; + }; + aggregators.push(AssignedAggregator { + validator_index, + selection_proof: selection_proof.into(), + committee_index: duty_and_proof.duty.committee_index, + }); + } + } - // Get the fork version - let epoch = slot.epoch(E::slots_per_epoch()); - let fork_name = self.spec.fork_name_at_epoch(epoch); - let version = DataVersion::from(fork_name); + sort_aggregators_by_validator_index(&mut aggregators); + filter_aggregators_with_attestations(&mut aggregators, aggregated_attestations); - Ok(Some(AggregatorCommitteeConsensusData { - version, - aggregators: aggregators - .try_into() - .map_err(|e| format!("aggregators: {e:?}"))?, - aggregator_committee_indexes: attestation_committee_indexes - .into_iter() - .collect::>() - .try_into() - .map_err(|e| format!("aggregator_committee_indexes: {e:?}"))?, - aggregated_attestations: attestations_bytes - .try_into() - .map_err(|e| format!("aggregated_attestations: {e:?}"))?, - contributors: contributors - .try_into() - .map_err(|e| format!("contributors: {e:?}"))?, - sync_committee_contributions: contributions - .try_into() - .map_err(|e| format!("sync_committee_contributions: {e:?}"))?, - })) + let mut contributors_with_roots: Vec<(Hash256, AssignedAggregator)> = match ssv_committee_sync { + Some(sync_entries) => Vec::with_capacity(sync_entries.len()), + None => Vec::new(), + }; + if let Some(sync_entries) = ssv_committee_sync { + for (subnet_id, sync_aggregator) in sync_entries.iter() { + let sync_selection_root = + validator_store.compute_sync_selection_root(slot, (*subnet_id).into()); + let validator_index = ValidatorIndex(sync_aggregator.validator_index as usize); + contributors_with_roots.push(( + sync_selection_root, + AssignedAggregator { + validator_index, + selection_proof: sync_aggregator.selection_proof.clone().into(), + committee_index: (*subnet_id).into(), + }, + )); + } } - /// Fetch aggregated attestations from beacon node for the given committee indexes. - /// Returns a map of `committee_index` -> `Attestation`. - /// - /// Uses `FuturesUnordered` to collect results as they complete. When the timeout is reached, - /// returns whatever results have been collected so far (partial results). This ensures - /// we don't block on slow beacon nodes while still using successful fetches. - async fn fetch_aggregated_attestations( - &self, - slot: Slot, - beacon_vote: &BeaconVote, - attestation_committee_indexes: &HashSet, - timeout: Duration, - ) -> HashMap> { - let _timer = metrics::start_timer_vec( - &metrics::AGGREGATOR_COMMITTEE_FETCH_TIMES, - &["aggregated_attestations"], - ); + sort_contributors_by_signing_root_then_validator_index(&mut contributors_with_roots); + filter_contributors_with_contributions(&mut contributors_with_roots, sync_contributions); - // Determine fork version to handle pre-Electra vs Electra+ attestation data format. - // In Electra+ (EIP-7549), AttestationData.index is always 0 because the committee - // index moved to Attestation.committee_bits. - // In pre-Electra, AttestationData.index must equal the committee_index. - // Use slot as the canonical source for epoch since it's the authoritative parameter. - let fork_name = self - .spec - .fork_name_at_epoch(slot.epoch(E::slots_per_epoch())); - - // Create `FuturesUnordered` for concurrent execution with partial result collection - let beacon_nodes = &self.beacon_nodes; - let mut futures: FuturesUnordered<_> = attestation_committee_indexes - .iter() - .map(|&committee_index| { - // Reconstruct the attestation data to compute its tree hash root. - // Pre-Electra: index = committee_index - // Electra+: index = 0 (committee info moved to Attestation.committee_bits) - let attestation_data = AttestationData { - slot, - index: if fork_name < ForkName::Electra { committee_index } else { 0 }, - beacon_block_root: beacon_vote.block_root, - source: beacon_vote.source, - target: beacon_vote.target, - }; - let attestation_data_root = attestation_data.tree_hash_root(); - - async move { - let result = beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES_HTTP_GET], - ); - beacon_node - .get_validator_aggregate_attestation_v2( - slot, - attestation_data_root, - committee_index, - ) - .await - .map_err(|e| { - format!("[AggregatorCommittee] Failed to produce aggregate attestation: {:?}", e) - })? - .ok_or_else(|| { - format!( - "[AggregatorCommittee] No aggregate available for slot {}, committee {}", - slot, committee_index - ) - }) - .map(|result| result.into_data()) - }) - .await; - (committee_index, result) - } + let contributors: Vec = contributors_with_roots + .into_iter() + .map(|(_, contributor)| contributor) + .collect(); + + if aggregators.is_empty() && contributors.is_empty() { + return Ok(None); + } + + let attestation_committee_indexes: IndexSet = + aggregators.iter().map(|a| a.committee_index).collect(); + + let attestations_bytes: Vec> = attestation_committee_indexes + .iter() + .filter_map(|committee_index| aggregated_attestations.get(committee_index)) + .map(|attestation| { + let bytes = attestation.as_ssz_bytes(); + VariableList::new(bytes).map_err(|e| { + warn!("Failed to create attestation bytes list: {:?}", e); + e }) - .collect(); + }) + .collect::, _>>() + .map_err(|e| format!("Failed to create attestation bytes: {:?}", e))?; + + let subnet_ids: IndexSet = contributors + .iter() + .map(|c| SyncSubnetId::new(c.committee_index)) + .collect(); + + let contributions: Vec> = subnet_ids + .iter() + .filter_map(|id| sync_contributions.get(id).cloned()) + .collect(); + + let epoch = slot.epoch(E::slots_per_epoch()); + let fork_name = spec.fork_name_at_epoch(epoch); + let version = DataVersion::from(fork_name); + + Ok(Some(AggregatorCommitteeConsensusData { + version, + aggregators: aggregators + .try_into() + .map_err(|e| format!("aggregators: {e:?}"))?, + aggregator_committee_indexes: attestation_committee_indexes + .into_iter() + .collect::>() + .try_into() + .map_err(|e| format!("aggregator_committee_indexes: {e:?}"))?, + aggregated_attestations: attestations_bytes + .try_into() + .map_err(|e| format!("aggregated_attestations: {e:?}"))?, + contributors: contributors + .try_into() + .map_err(|e| format!("contributors: {e:?}"))?, + sync_committee_contributions: contributions + .try_into() + .map_err(|e| format!("sync_committee_contributions: {e:?}"))?, + })) +} - let total_committees = attestation_committee_indexes.len(); - let mut aggregated_attestations = HashMap::with_capacity(total_committees); - let deadline = Instant::now() + timeout; +async fn fetch_aggregated_attestations( + slot: Slot, + beacon_vote: &BeaconVote, + attestation_committee_indexes: &HashSet, + timeout: Duration, + beacon_nodes: &Arc>, + spec: &Arc, +) -> HashMap> { + let _timer = metrics::start_timer_vec( + &metrics::AGGREGATOR_COMMITTEE_FETCH_TIMES, + &["aggregated_attestations"], + ); + + let fork_name = spec.fork_name_at_epoch(slot.epoch(E::slots_per_epoch())); + + let mut futures: FuturesUnordered<_> = attestation_committee_indexes + .iter() + .map(|&committee_index| { + let attestation_data = AttestationData { + slot, + index: if fork_name < ForkName::Electra { + committee_index + } else { + 0 + }, + beacon_block_root: beacon_vote.block_root, + source: beacon_vote.source, + target: beacon_vote.target, + }; + let attestation_data_root = attestation_data.tree_hash_root(); + let beacon_nodes = beacon_nodes.clone(); - // Collect results as they complete, until timeout or all done - loop { - // Exit when all futures completed - if futures.is_empty() { - break; + async move { + let result = beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES_HTTP_GET], + ); + beacon_node + .get_validator_aggregate_attestation_v2( + slot, + attestation_data_root, + committee_index, + ) + .await + .map_err(|e| { + format!("Failed to produce aggregate attestation: {:?}", e) + })? + .ok_or_else(|| { + format!( + "No aggregate available for slot {}, committee {}", + slot, committee_index + ) + }) + .map(|result| result.into_data()) + }) + .await; + (committee_index, result) } + }) + .collect(); - tokio::select! { - Some((committee_index, result)) = futures.next() => { - match result { - Ok(attestation) => { - aggregated_attestations.insert(committee_index, attestation); - } - Err(e) => { - warn!( - %slot, - %committee_index, - error = %e, - "[AggregatorCommittee] Failed to fetch aggregated attestation for committee" - ); - } + let total_committees = attestation_committee_indexes.len(); + let mut aggregated_attestations = HashMap::with_capacity(total_committees); + let deadline = Instant::now() + timeout; + + loop { + if futures.is_empty() { + break; + } + + tokio::select! { + Some((committee_index, result)) = futures.next() => { + match result { + Ok(attestation) => { + aggregated_attestations.insert(committee_index, attestation); } - } - _ = sleep_until(deadline) => { - if aggregated_attestations.len() < total_committees { - warn!( - %slot, - collected = aggregated_attestations.len(), - total = total_committees, - "[AggregatorCommittee] Timeout fetching aggregated attestations, returning partial results" - ); - metrics::inc_counter_vec( - &metrics::AGGREGATOR_COMMITTEE_PARTIAL_RESULTS, - &["aggregated_attestations"], - ); + Err(e) => { + warn!(%slot, %committee_index, error = %e, "Failed to fetch aggregated attestation"); } - break; } } + _ = sleep_until(deadline) => { + if aggregated_attestations.len() < total_committees { + warn!( + %slot, + collected = aggregated_attestations.len(), + total = total_committees, + "Timeout fetching aggregated attestations, returning partial results" + ); + metrics::inc_counter_vec( + &metrics::AGGREGATOR_COMMITTEE_PARTIAL_RESULTS, + &["aggregated_attestations"], + ); + } + break; + } } - - // Track success/failure counts - let successful = aggregated_attestations.len(); - let total = total_committees; - let failed = total.saturating_sub(successful); - metrics::inc_counter_vec_by( - &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, - &["aggregated_attestations", "success"], - successful as u64, - ); - metrics::inc_counter_vec_by( - &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, - &["aggregated_attestations", "failed"], - failed as u64, - ); - - aggregated_attestations } - /// Fetch sync committee contributions from beacon node for the given subnet IDs. - /// Returns a map of subnet_id -> SyncCommitteeContribution. - /// - /// Uses `FuturesUnordered` to collect results as they complete. When the timeout is reached, - /// returns whatever results have been collected so far (partial results). This ensures - /// we don't block on slow beacon nodes while still using successful fetches. - async fn fetch_sync_contributions( - &self, - slot: Slot, - beacon_block_root: Hash256, - subnet_ids: &HashSet, - timeout: Duration, - ) -> HashMap> { - let _timer = metrics::start_timer_vec( - &metrics::AGGREGATOR_COMMITTEE_FETCH_TIMES, - &["sync_contributions"], - ); - // Create `FuturesUnordered` for concurrent execution with partial result collection - let beacon_nodes = &self.beacon_nodes; - let mut futures: FuturesUnordered<_> = subnet_ids - .iter() - .map(|&subnet_id| async move { + let successful = aggregated_attestations.len(); + let failed = total_committees.saturating_sub(successful); + metrics::inc_counter_vec_by( + &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, + &["aggregated_attestations", "success"], + successful as u64, + ); + metrics::inc_counter_vec_by( + &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, + &["aggregated_attestations", "failed"], + failed as u64, + ); + + aggregated_attestations +} + +async fn fetch_sync_contributions( + slot: Slot, + beacon_block_root: Hash256, + subnet_ids: &HashSet, + timeout: Duration, + beacon_nodes: &Arc>, +) -> HashMap> { + let _timer = metrics::start_timer_vec( + &metrics::AGGREGATOR_COMMITTEE_FETCH_TIMES, + &["sync_contributions"], + ); + + let mut futures: FuturesUnordered<_> = subnet_ids + .iter() + .map(|&subnet_id| { + let beacon_nodes = beacon_nodes.clone(); + async move { let result = beacon_nodes .first_success(|beacon_node| async move { let sync_contribution_data = SyncContributionData { @@ -473,7 +454,6 @@ impl<'a, E: EthSpec, T: SlotClock + 'static> AggregatorConsensusBuilder<'a, E, T beacon_block_root, subcommittee_index: subnet_id.into(), }; - beacon_node .get_validator_sync_committee_contribution(&sync_contribution_data) .await @@ -481,103 +461,71 @@ impl<'a, E: EthSpec, T: SlotClock + 'static> AggregatorConsensusBuilder<'a, E, T .instrument(info_span!("fetch_sync_contribution")) .await; (subnet_id, result) - }) - .collect(); + } + }) + .collect(); - let total_subnets = subnet_ids.len(); - let mut sync_contributions = HashMap::with_capacity(total_subnets); - let deadline = Instant::now() + timeout; + let total_subnets = subnet_ids.len(); + let mut sync_contributions = HashMap::with_capacity(total_subnets); + let deadline = Instant::now() + timeout; - // Collect results as they complete, until timeout or all done - loop { - // Exit when all futures completed - if futures.is_empty() { - break; - } + loop { + if futures.is_empty() { + break; + } - tokio::select! { - Some((subnet_id, result)) = futures.next() => { - match result { - Ok(Some(response)) => { - sync_contributions.insert(subnet_id, response.data); - } - Ok(None) => { - warn!( - %slot, - ?beacon_block_root, - ?subnet_id, - "[AggregatorCommittee] No sync contribution found for subnet" - ); - } - Err(e) => { - tracing::error!( - %slot, - ?beacon_block_root, - ?subnet_id, - error = %e, - "[AggregatorCommittee] Failed to fetch sync contribution for subnet" - ); - } + tokio::select! { + Some((subnet_id, result)) = futures.next() => { + match result { + Ok(Some(response)) => { + sync_contributions.insert(subnet_id, response.data); } - } - _ = sleep_until(deadline) => { - if sync_contributions.len() < total_subnets { - warn!( - %slot, - collected = sync_contributions.len(), - total = total_subnets, - "[AggregatorCommittee] Timeout fetching sync contributions, returning partial results" - ); - metrics::inc_counter_vec( - &metrics::AGGREGATOR_COMMITTEE_PARTIAL_RESULTS, - &["sync_contributions"], - ); + Ok(None) => { + warn!(%slot, ?beacon_block_root, ?subnet_id, "No sync contribution found"); + } + Err(e) => { + tracing::error!(%slot, ?beacon_block_root, ?subnet_id, error = %e, "Failed to fetch sync contribution"); } - break; } } + _ = sleep_until(deadline) => { + if sync_contributions.len() < total_subnets { + warn!( + %slot, + collected = sync_contributions.len(), + total = total_subnets, + "Timeout fetching sync contributions, returning partial results" + ); + metrics::inc_counter_vec( + &metrics::AGGREGATOR_COMMITTEE_PARTIAL_RESULTS, + &["sync_contributions"], + ); + } + break; + } } - - // Track success/failure counts - let successful = sync_contributions.len(); - let total = total_subnets; - let failed = total.saturating_sub(successful); - metrics::inc_counter_vec_by( - &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, - &["sync_contributions", "success"], - successful as u64, - ); - metrics::inc_counter_vec_by( - &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, - &["sync_contributions", "failed"], - failed as u64, - ); - - sync_contributions } + + let successful = sync_contributions.len(); + let failed = total_subnets.saturating_sub(successful); + metrics::inc_counter_vec_by( + &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, + &["sync_contributions", "success"], + successful as u64, + ); + metrics::inc_counter_vec_by( + &metrics::AGGREGATOR_COMMITTEE_FETCH_SUCCESS, + &["sync_contributions", "failed"], + failed as u64, + ); + + sync_contributions } -// ═══════════════════════════════════════════════════════════════════════════════════════ -// Pure Helper Functions for Consensus Data Building -// ═══════════════════════════════════════════════════════════════════════════════════════ -// -// These functions are extracted to enable unit testing of the sorting and filtering logic -// without requiring beacon node mocks. The sorting order MUST match SSV-Go exactly for -// consensus compatibility. - -/// Sort aggregators by `validator_index` ascending. -/// -/// CRITICAL for consensus: All aggregators share the same signing root (attestation data), -/// so we sort purely by `validator_index`. This matches SSV-Go's behavior. pub fn sort_aggregators_by_validator_index(aggregators: &mut [AssignedAggregator]) { aggregators.sort_unstable_by_key(|a| a.validator_index.0); } -/// Sort contributors by (`signing_root`, `validator_index`). -/// -/// CRITICAL for consensus: Contributors may have different signing roots (different subnets), -/// so we sort first by signing root, then by `validator_index` within each root group. -/// This matches SSV-Go's root-sorted processing. pub fn sort_contributors_by_signing_root_then_validator_index( contributors_with_roots: &mut [(Hash256, AssignedAggregator)], ) { @@ -588,9 +536,6 @@ pub fn sort_contributors_by_signing_root_then_validator_index( }); } -/// Filter aggregators to only those whose attestation was successfully fetched. -/// -/// This maintains 1:1 correspondence between `aggregator_committee_indexes` and attestations. pub fn filter_aggregators_with_attestations( aggregators: &mut Vec, aggregated_attestations: &HashMap>, @@ -598,9 +543,6 @@ pub fn filter_aggregators_with_attestations( aggregators.retain(|agg| aggregated_attestations.contains_key(&agg.committee_index)); } -/// Filter contributors to only those whose sync contribution was successfully fetched. -/// -/// This maintains 1:1 correspondence between `subnet_ids` and contributions. pub fn filter_contributors_with_contributions( contributors_with_roots: &mut Vec<(Hash256, AssignedAggregator)>, sync_contributions: &HashMap>, @@ -629,11 +571,6 @@ mod tests { use super::*; - // ═══════════════════════════════════════════════════════════════════════════════════ - // Test Helpers - // ═══════════════════════════════════════════════════════════════════════════════════ - - /// Create a test AssignedAggregator with specified validator_index and committee_index fn create_aggregator(validator_index: usize, committee_index: u64) -> AssignedAggregator { AssignedAggregator { validator_index: ValidatorIndex(validator_index), @@ -642,7 +579,6 @@ mod tests { } } - /// Create a contributor with signing root for testing fn create_contributor_with_root( signing_root: Hash256, validator_index: usize, @@ -658,7 +594,6 @@ mod tests { ) } - /// Create a test attestation for a given committee index fn create_test_attestation(index: u64) -> Attestation { Attestation::Base(AttestationBase { aggregation_bits: BitList::with_capacity(128).expect("valid capacity"), @@ -679,7 +614,6 @@ mod tests { }) } - /// Create test attestation bytes for consensus data fn create_attestation_bytes(index: u64) -> VariableList { let attestation = AttestationBase:: { aggregation_bits: BitList::with_capacity(128).expect("valid capacity"), @@ -701,7 +635,6 @@ mod tests { VariableList::new(attestation.as_ssz_bytes()).expect("valid attestation bytes") } - /// Create a test sync committee contribution fn create_test_contribution(subnet_id: u64) -> SyncCommitteeContribution { SyncCommitteeContribution { slot: Slot::new(1000), @@ -712,46 +645,24 @@ mod tests { } } - // ══════════════════════════════════���════════════════════════════════════════════════ - // P0 Tests: Wire Compatibility - // ═══════════════════════════════════════════════════════════════════════════════════ - - /// P0-1: Verify aggregators are sorted by validator_index ascending. - /// - /// This is CRITICAL for consensus - all operators must produce the same hash - /// for the same input data. If aggregators are not sorted identically, - /// consensus will fail due to hash mismatch. #[test] fn test_aggregators_sorted_by_validator_index() { - // Create aggregators in unsorted order let mut aggregators = vec![ create_aggregator(500, 10), create_aggregator(100, 5), create_aggregator(300, 7), - create_aggregator(200, 5), // Same committee as validator 100 + create_aggregator(200, 5), create_aggregator(50, 3), ]; - // Sort using the production function sort_aggregators_by_validator_index(&mut aggregators); - // Verify sorted by validator_index ascending let indices: Vec = aggregators.iter().map(|a| a.validator_index.0).collect(); - assert_eq!( - indices, - vec![50, 100, 200, 300, 500], - "Aggregators must be sorted by validator_index ascending for consensus compatibility" - ); + assert_eq!(indices, vec![50, 100, 200, 300, 500]); - // Verify sorting is stable for same validator_index edge case - let mut same_index_aggregators = vec![ - create_aggregator(100, 10), - create_aggregator(100, 5), // Same validator_index, different committee - ]; + let mut same_index_aggregators = + vec![create_aggregator(100, 10), create_aggregator(100, 5)]; sort_aggregators_by_validator_index(&mut same_index_aggregators); - - // Both should have validator_index 100, order may vary but that's ok - // since SSV-Go would produce same result assert!( same_index_aggregators .iter() @@ -759,40 +670,22 @@ mod tests { ); } - /// P0-2: Verify contributors are sorted by (signing_root, validator_index). - /// - /// This is CRITICAL for consensus - sync committee contributors have different - /// signing roots per subnet, so we must sort by root first, then validator_index. - /// This matches SSV-Go's root-sorted processing. #[test] fn test_contributors_sorted_by_signing_root_then_validator_index() { - // Create signing roots with predictable ordering - // Hash256 comparison is lexicographic on the underlying bytes - let root_a = Hash256::from_low_u64_be(1); // Smaller hash - let root_b = Hash256::from_low_u64_be(2); // Larger hash - let root_c = Hash256::from_low_u64_be(3); // Even larger hash + let root_a = Hash256::from_low_u64_be(1); + let root_b = Hash256::from_low_u64_be(2); + let root_c = Hash256::from_low_u64_be(3); - // Create contributors in unsorted order with various roots let mut contributors = vec![ - create_contributor_with_root(root_c, 100, 2), // root_c, validator 100 - create_contributor_with_root(root_a, 300, 0), // root_a, validator 300 - create_contributor_with_root(root_b, 50, 1), // root_b, validator 50 - create_contributor_with_root(root_a, 100, 0), /* root_a, validator 100 (same root as - * above) */ - create_contributor_with_root(root_b, 200, 1), /* root_b, validator 200 (same root as - * above) */ + create_contributor_with_root(root_c, 100, 2), + create_contributor_with_root(root_a, 300, 0), + create_contributor_with_root(root_b, 50, 1), + create_contributor_with_root(root_a, 100, 0), + create_contributor_with_root(root_b, 200, 1), ]; - // Sort using the production function sort_contributors_by_signing_root_then_validator_index(&mut contributors); - // Expected order: - // 1. root_a, validator 100 (smallest root, smaller validator) - // 2. root_a, validator 300 (smallest root, larger validator) - // 3. root_b, validator 50 (middle root, smaller validator) - // 4. root_b, validator 200 (middle root, larger validator) - // 5. root_c, validator 100 (largest root) - let result: Vec<(Hash256, usize)> = contributors .iter() .map(|(root, agg)| (*root, agg.validator_index.0)) @@ -806,23 +699,13 @@ mod tests { (root_b, 50), (root_b, 200), (root_c, 100), - ], - "Contributors must be sorted by (signing_root, validator_index) for consensus compatibility" + ] ); } - /// P0-3: Verify same inputs produce identical hash (deterministic output). - /// - /// This test verifies that given the same input data, the consensus data - /// will always produce the same hash. This is essential for distributed - /// consensus - all operators must agree on the hash for the same data. #[test] fn test_deterministic_output_same_inputs() { - // Create identical consensus data twice using the same methodology - // that build_consensus_data_for_committee would use - let build_consensus_data = || { - // Create aggregators and sort them let mut aggregators = vec![ create_aggregator(300, 10), create_aggregator(100, 5), @@ -830,7 +713,6 @@ mod tests { ]; sort_aggregators_by_validator_index(&mut aggregators); - // Create contributors with roots and sort them let root_a = Hash256::from_low_u64_be(100); let root_b = Hash256::from_low_u64_be(200); let mut contributors_with_roots = vec![ @@ -845,18 +727,9 @@ mod tests { .map(|(_, contrib)| contrib) .collect(); - // Extract committee indexes in first-seen order from sorted aggregators let committee_indexes: IndexSet = aggregators.iter().map(|a| a.committee_index).collect(); - // Extract subnet IDs in first-seen order from sorted contributors - // (kept for illustration but unused since we create fixed contributions) - let _subnet_ids: IndexSet = contributors - .iter() - .map(|c| SyncSubnetId::new(c.committee_index)) - .collect(); - - // Build the consensus data structure AggregatorCommitteeConsensusData:: { version: DataVersion::from(ForkName::Deneb), aggregators: aggregators.try_into().expect("valid aggregators"), @@ -879,53 +752,29 @@ mod tests { } }; - // Build twice and compare hashes let data1 = build_consensus_data(); let data2 = build_consensus_data(); - // Use the QbftData::hash() method which is used for consensus use ssv_types::consensus::QbftData; - let hash1 = data1.hash(); - let hash2 = data2.hash(); - - assert_eq!( - hash1, hash2, - "Same inputs must produce identical hash for consensus to work" - ); - - // Also verify SSZ encoding is identical (which is what hash() uses) - assert_eq!( - data1.as_ssz_bytes(), - data2.as_ssz_bytes(), - "Same inputs must produce identical SSZ encoding" - ); + assert_eq!(data1.hash(), data2.hash()); + assert_eq!(data1.as_ssz_bytes(), data2.as_ssz_bytes()); } - /// P0-4: Verify built data passes AggregatorCommitteeDataValidator. - /// - /// This test ensures that the consensus data built using our sorting and - /// construction logic passes the validation that will be performed during - /// QBFT consensus. If our data fails validation, consensus will fail. #[test] fn test_output_passes_validator_with_both() { - // Build consensus data with both aggregators and contributors - // following the exact same logic as build_consensus_data_for_committee - - // Create and sort aggregators let mut aggregators = vec![ create_aggregator(300, 10), create_aggregator(100, 5), - create_aggregator(200, 5), // Same committee_index as validator 100 + create_aggregator(200, 5), ]; sort_aggregators_by_validator_index(&mut aggregators); - // Create and sort contributors let root_a = Hash256::from_low_u64_be(100); let root_b = Hash256::from_low_u64_be(200); let mut contributors_with_roots = vec![ create_contributor_with_root(root_b, 150, 1), create_contributor_with_root(root_a, 250, 0), - create_contributor_with_root(root_a, 50, 0), // Same subnet as validator 250 + create_contributor_with_root(root_a, 50, 0), ]; sort_contributors_by_signing_root_then_validator_index(&mut contributors_with_roots); @@ -934,30 +783,25 @@ mod tests { .map(|(_, contrib)| contrib) .collect(); - // Extract committee indexes in first-seen order (preserving order from sorted aggregators) let committee_indexes: IndexSet = aggregators.iter().map(|a| a.committee_index).collect(); - // Extract subnet IDs in first-seen order (preserving order from sorted contributors) let subnet_ids: IndexSet = contributors .iter() .map(|c| SyncSubnetId::new(c.committee_index)) .collect(); - // Build attestation bytes matching the committee indexes let attestation_bytes: Vec> = committee_indexes .iter() .map(|&idx| create_attestation_bytes(idx)) .collect(); - // Build contributions matching the subnet IDs let contributions: Vec> = subnet_ids .iter() .map(|id| create_test_contribution((*id).into())) .collect(); - // Build the complete consensus data let consensus_data = AggregatorCommitteeConsensusData:: { version: DataVersion::from(ForkName::Deneb), aggregators: aggregators.try_into().expect("valid aggregators"), @@ -971,31 +815,15 @@ mod tests { sync_committee_contributions: contributions.try_into().expect("valid contributions"), }; - // Create validator and run validation let validator = AggregatorCommitteeDataValidator::::new(); - - // Validate using the do_validation method for detailed error reporting let result = validator.do_validation(&consensus_data); - assert!( - result.is_ok(), - "Consensus data built with proper sorting/construction must pass validation. Error: {:?}", - result.err() - ); + assert!(result.is_ok(), "Error: {:?}", result.err()); - // Also test via the QbftDataValidator trait (used during actual consensus) let passes_trait_validation = QbftDataValidator::validate(&validator, &consensus_data, &consensus_data); - assert!( - passes_trait_validation, - "Consensus data must pass QbftDataValidator trait validation" - ); + assert!(passes_trait_validation); } - // ═══════════════════════════════════════════════════════════════════════════════════ - // Additional Edge Case Tests - // ═══════════════════════════════════════════════════════════════════════════════════ - - /// Test filter_aggregators_with_attestations removes aggregators without attestations #[test] fn test_filter_aggregators_removes_unfetched() { let mut aggregators = vec![ @@ -1004,24 +832,18 @@ mod tests { create_aggregator(300, 15), ]; - // Only have attestation for committee 5 and 15, not 10 let mut attestations = HashMap::new(); attestations.insert(5, create_test_attestation(5)); attestations.insert(15, create_test_attestation(15)); filter_aggregators_with_attestations(&mut aggregators, &attestations); - assert_eq!( - aggregators.len(), - 2, - "Should filter out aggregator with committee_index 10" - ); + assert_eq!(aggregators.len(), 2); let remaining_indexes: Vec = aggregators.iter().map(|a| a.validator_index.0).collect(); assert_eq!(remaining_indexes, vec![100, 300]); } - /// Test filter_contributors_with_contributions removes contributors without contributions #[test] fn test_filter_contributors_removes_unfetched() { let root = Hash256::zero(); @@ -1031,18 +853,13 @@ mod tests { create_contributor_with_root(root, 300, 2), ]; - // Only have contributions for subnet 0 and 2, not 1 let mut contributions = HashMap::new(); contributions.insert(SyncSubnetId::new(0), create_test_contribution(0)); contributions.insert(SyncSubnetId::new(2), create_test_contribution(2)); filter_contributors_with_contributions(&mut contributors, &contributions); - assert_eq!( - contributors.len(), - 2, - "Should filter out contributor with subnet_id 1" - ); + assert_eq!(contributors.len(), 2); let remaining_indexes: Vec = contributors .iter() .map(|(_, a)| a.validator_index.0) @@ -1050,78 +867,53 @@ mod tests { assert_eq!(remaining_indexes, vec![100, 300]); } - /// Test that IndexSet preserves first-seen order from sorted aggregators #[test] fn test_committee_indexes_preserve_first_seen_order() { - // Create aggregators with repeated committee indexes let mut aggregators = vec![ - create_aggregator(300, 10), // First occurrence of 10 - create_aggregator(100, 5), // First occurrence of 5 - create_aggregator(200, 5), // Second occurrence of 5 (should be deduped) - create_aggregator(400, 10), // Second occurrence of 10 (should be deduped) - create_aggregator(50, 3), // First occurrence of 3 + create_aggregator(300, 10), + create_aggregator(100, 5), + create_aggregator(200, 5), + create_aggregator(400, 10), + create_aggregator(50, 3), ]; - // Sort aggregators first sort_aggregators_by_validator_index(&mut aggregators); - // Extract committee indexes preserving first-seen order let committee_indexes: IndexSet = aggregators.iter().map(|a| a.committee_index).collect(); - // After sorting by validator_index: [50, 100, 200, 300, 400] - // Committee indexes in order: [3, 5, 5, 10, 10] - // First-seen unique: [3, 5, 10] let indexes: Vec = committee_indexes.into_iter().collect(); - assert_eq!( - indexes, - vec![3, 5, 10], - "Committee indexes must be in first-seen order from sorted aggregators" - ); + assert_eq!(indexes, vec![3, 5, 10]); } - /// Test that subnet IDs preserve first-seen order from sorted contributors #[test] fn test_subnet_ids_preserve_first_seen_order() { - // Create signing roots let root_a = Hash256::from_low_u64_be(1); let root_b = Hash256::from_low_u64_be(2); - // Create contributors with repeated subnet IDs let mut contributors_with_roots = vec![ - create_contributor_with_root(root_b, 100, 1), // root_b, subnet 1 - create_contributor_with_root(root_a, 200, 0), // root_a, subnet 0 - create_contributor_with_root(root_a, 50, 0), // root_a, subnet 0 (duplicate) - create_contributor_with_root(root_b, 150, 1), // root_b, subnet 1 (duplicate) + create_contributor_with_root(root_b, 100, 1), + create_contributor_with_root(root_a, 200, 0), + create_contributor_with_root(root_a, 50, 0), + create_contributor_with_root(root_b, 150, 1), ]; - // Sort contributors first sort_contributors_by_signing_root_then_validator_index(&mut contributors_with_roots); - // Extract just the contributors let contributors: Vec = contributors_with_roots .into_iter() .map(|(_, contrib)| contrib) .collect(); - // Extract subnet IDs preserving first-seen order let subnet_ids: IndexSet = contributors .iter() .map(|c| SyncSubnetId::new(c.committee_index)) .collect(); - // After sorting: root_a first (validators 50, 200 both with subnet 0), - // then root_b (validators 100, 150 both with subnet 1) - // First-seen unique: [subnet 0, subnet 1] let ids: Vec = subnet_ids.into_iter().map(|id| id.into()).collect(); - assert_eq!( - ids, - vec![0, 1], - "Subnet IDs must be in first-seen order from sorted contributors" - ); + assert_eq!(ids, vec![0, 1]); } - /// Test empty inputs produce valid (None) result #[test] fn test_empty_aggregators_and_contributors() { let mut aggregators: Vec = vec![]; @@ -1133,21 +925,8 @@ mod tests { assert!(contributors.is_empty()); } - // ═══════════════════════════════════════════════════════════════════════════════════ - // P1 Tests: High Priority - Failure Scenarios - // ═══════════════════════════════════════════════════════════════════════════════════ - - /// P1-1: When all attestation fetches fail AND all contribution fetches fail, - /// the result should indicate no valid data (empty aggregators AND empty contributors - /// after filtering). - /// - /// This simulates the scenario where beacon node is unavailable or returns errors - /// for all requested attestations and contributions. The consensus data building - /// logic should gracefully handle this by producing no consensus data (None result - /// in build_consensus_data_for_committee). #[test] fn test_all_fetches_fail_returns_none() { - // Create aggregators with various committee indexes let mut aggregators = vec![ create_aggregator(100, 5), create_aggregator(200, 10), @@ -1155,7 +934,6 @@ mod tests { ]; sort_aggregators_by_validator_index(&mut aggregators); - // Create contributors with various subnet IDs let root = Hash256::zero(); let mut contributors = vec![ create_contributor_with_root(root, 50, 0), @@ -1164,97 +942,54 @@ mod tests { ]; sort_contributors_by_signing_root_then_validator_index(&mut contributors); - // Empty attestations map - simulates all fetches failing let aggregated_attestations: HashMap> = HashMap::new(); - - // Empty contributions map - simulates all fetches failing let sync_contributions: HashMap> = HashMap::new(); - // Apply the filter functions (as done in build_consensus_data_for_committee) filter_aggregators_with_attestations(&mut aggregators, &aggregated_attestations); filter_contributors_with_contributions(&mut contributors, &sync_contributions); - // After filtering, both should be empty because no attestations/contributions were fetched - assert!( - aggregators.is_empty(), - "All aggregators should be filtered out when no attestations are fetched" - ); - assert!( - contributors.is_empty(), - "All contributors should be filtered out when no contributions are fetched" - ); - - // This matches the early exit condition in build_consensus_data_for_committee: - // if aggregators.is_empty() && contributors.is_empty() { return Ok(None); } - let should_return_none = aggregators.is_empty() && contributors.is_empty(); - assert!( - should_return_none, - "When all fetches fail, build_consensus_data_for_committee should return None" - ); + assert!(aggregators.is_empty()); + assert!(contributors.is_empty()); } - // ═══════════════════════════════════════════════════════════════════════════════════ - // P2 Tests: Medium Priority - Valid Edge Cases - // ═══════════════════════════════════════════════════════════════════════════════════ - - /// P2-1: Valid scenario - no attestation aggregators, only sync contributors. - /// - /// Should produce valid consensus data with empty aggregators list but populated - /// contributors. This is a valid scenario when a committee only has sync committee - /// aggregation duties and no attestation aggregation duties for a given slot. #[test] fn test_empty_aggregators_only_contributors() { - // No aggregators let aggregators: Vec = vec![]; - // Create contributors with contributions let root_a = Hash256::from_low_u64_be(1); let root_b = Hash256::from_low_u64_be(2); let mut contributors_with_roots = vec![ create_contributor_with_root(root_b, 150, 1), create_contributor_with_root(root_a, 250, 0), - create_contributor_with_root(root_a, 50, 0), // Same subnet as validator 250 + create_contributor_with_root(root_a, 50, 0), ]; sort_contributors_by_signing_root_then_validator_index(&mut contributors_with_roots); - // Create contributions for all subnets let mut sync_contributions = HashMap::new(); sync_contributions.insert(SyncSubnetId::new(0), create_test_contribution(0)); sync_contributions.insert(SyncSubnetId::new(1), create_test_contribution(1)); - // Filter contributors (all should remain since we have contributions) filter_contributors_with_contributions(&mut contributors_with_roots, &sync_contributions); - // Extract contributors after filtering let contributors: Vec = contributors_with_roots .into_iter() .map(|(_, contrib)| contrib) .collect(); - assert!( - !contributors.is_empty(), - "Contributors should not be empty when contributions are fetched" - ); - assert_eq!( - contributors.len(), - 3, - "All 3 contributors should remain after filtering" - ); + assert!(!contributors.is_empty()); + assert_eq!(contributors.len(), 3); - // Extract subnet IDs in first-seen order let subnet_ids: IndexSet = contributors .iter() .map(|c| SyncSubnetId::new(c.committee_index)) .collect(); - // Build contributions matching subnet IDs let contributions: Vec> = subnet_ids .iter() .filter_map(|id| sync_contributions.get(id).cloned()) .collect(); - // Build the consensus data with empty aggregators but populated contributors let consensus_data = AggregatorCommitteeConsensusData:: { version: DataVersion::from(ForkName::Deneb), aggregators: aggregators.try_into().expect("valid empty aggregators"), @@ -1266,42 +1001,16 @@ mod tests { sync_committee_contributions: contributions.try_into().expect("valid contributions"), }; - // Verify structure is valid - assert!( - consensus_data.aggregators.is_empty(), - "Aggregators should be empty" - ); - assert!( - !consensus_data.contributors.is_empty(), - "Contributors should be populated" - ); - assert!( - consensus_data.aggregated_attestations.is_empty(), - "Attestations should be empty" - ); - assert!( - !consensus_data.sync_committee_contributions.is_empty(), - "Contributions should be populated" - ); + assert!(consensus_data.aggregators.is_empty()); + assert!(!consensus_data.contributors.is_empty()); - // Validate with the consensus data validator let validator = AggregatorCommitteeDataValidator::::new(); let result = validator.do_validation(&consensus_data); - assert!( - result.is_ok(), - "Consensus data with only contributors should pass validation. Error: {:?}", - result.err() - ); + assert!(result.is_ok(), "Error: {:?}", result.err()); } - /// P2-2: Valid scenario - no sync contributors, only attestation aggregators. - /// - /// Should produce valid consensus data with empty contributors list but populated - /// aggregators. This is a valid scenario when a committee only has attestation - /// aggregation duties and no sync committee aggregation duties for a given slot. #[test] fn test_empty_contributors_only_aggregators() { - // Create aggregators with attestations let mut aggregators = vec![ create_aggregator(300, 10), create_aggregator(100, 5), @@ -1309,40 +1018,27 @@ mod tests { ]; sort_aggregators_by_validator_index(&mut aggregators); - // Create attestations for all committees let mut aggregated_attestations = HashMap::new(); aggregated_attestations.insert(5, create_test_attestation(5)); aggregated_attestations.insert(7, create_test_attestation(7)); aggregated_attestations.insert(10, create_test_attestation(10)); - // Filter aggregators (all should remain since we have attestations) filter_aggregators_with_attestations(&mut aggregators, &aggregated_attestations); - assert!( - !aggregators.is_empty(), - "Aggregators should not be empty when attestations are fetched" - ); - assert_eq!( - aggregators.len(), - 3, - "All 3 aggregators should remain after filtering" - ); + assert!(!aggregators.is_empty()); + assert_eq!(aggregators.len(), 3); - // Extract committee indexes in first-seen order from sorted aggregators let committee_indexes: IndexSet = aggregators.iter().map(|a| a.committee_index).collect(); - // Build attestation bytes matching committee indexes let attestation_bytes: Vec> = committee_indexes .iter() .map(|&idx| create_attestation_bytes(idx)) .collect(); - // No contributors let contributors: Vec = vec![]; - // Build the consensus data with populated aggregators but empty contributors let consensus_data = AggregatorCommitteeConsensusData:: { version: DataVersion::from(ForkName::Deneb), aggregators: aggregators.try_into().expect("valid aggregators"), @@ -1356,47 +1052,16 @@ mod tests { sync_committee_contributions: VariableList::empty(), }; - // Verify structure is valid - assert!( - !consensus_data.aggregators.is_empty(), - "Aggregators should be populated" - ); - assert!( - consensus_data.contributors.is_empty(), - "Contributors should be empty" - ); - assert!( - !consensus_data.aggregated_attestations.is_empty(), - "Attestations should be populated" - ); - assert!( - consensus_data.sync_committee_contributions.is_empty(), - "Contributions should be empty" - ); + assert!(!consensus_data.aggregators.is_empty()); + assert!(consensus_data.contributors.is_empty()); - // Validate with the consensus data validator let validator = AggregatorCommitteeDataValidator::::new(); let result = validator.do_validation(&consensus_data); - assert!( - result.is_ok(), - "Consensus data with only aggregators should pass validation. Error: {:?}", - result.err() - ); + assert!(result.is_ok(), "Error: {:?}", result.err()); } - /// P2-3: 5 aggregators all assigned to committee_index 42. - /// - /// Should: - /// - Sort by validator_index - /// - Deduplicate committee_index to single entry in IndexSet - /// - All 5 aggregators remain in the list - /// - Only 1 committee_index in the output - /// - /// This tests the deduplication behavior of IndexSet for committee indexes - /// when multiple aggregators share the same committee. #[test] fn test_multiple_aggregators_same_committee() { - // Create 5 aggregators all with committee_index 42, in unsorted order let mut aggregators = vec![ create_aggregator(500, 42), create_aggregator(100, 42), @@ -1405,61 +1070,32 @@ mod tests { create_aggregator(400, 42), ]; - // Sort by validator_index sort_aggregators_by_validator_index(&mut aggregators); - // Verify sorted order let validator_indices: Vec = aggregators.iter().map(|a| a.validator_index.0).collect(); - assert_eq!( - validator_indices, - vec![100, 200, 300, 400, 500], - "Aggregators must be sorted by validator_index ascending" - ); + assert_eq!(validator_indices, vec![100, 200, 300, 400, 500]); - // Create attestation for committee 42 let mut aggregated_attestations = HashMap::new(); aggregated_attestations.insert(42, create_test_attestation(42)); - // Filter aggregators (all should remain since we have the attestation for committee 42) filter_aggregators_with_attestations(&mut aggregators, &aggregated_attestations); + assert_eq!(aggregators.len(), 5); - // All 5 aggregators should remain - assert_eq!( - aggregators.len(), - 5, - "All 5 aggregators should remain after filtering since attestation for committee 42 exists" - ); - - // Extract committee indexes in first-seen order (should deduplicate to 1 entry) let committee_indexes: IndexSet = aggregators.iter().map(|a| a.committee_index).collect(); - // Verify only 1 unique committee_index - assert_eq!( - committee_indexes.len(), - 1, - "IndexSet should deduplicate to single committee_index" - ); - assert!( - committee_indexes.contains(&42), - "The single committee_index should be 42" - ); + assert_eq!(committee_indexes.len(), 1); + assert!(committee_indexes.contains(&42)); - // Build attestation bytes (only 1 attestation for the single committee_index) let attestation_bytes: Vec> = committee_indexes .iter() .map(|&idx| create_attestation_bytes(idx)) .collect(); - assert_eq!( - attestation_bytes.len(), - 1, - "Should have exactly 1 attestation bytes entry for the deduplicated committee" - ); + assert_eq!(attestation_bytes.len(), 1); - // Build the consensus data let consensus_data = AggregatorCommitteeConsensusData:: { version: DataVersion::from(ForkName::Deneb), aggregators: aggregators.try_into().expect("valid aggregators"), @@ -1475,38 +1111,16 @@ mod tests { sync_committee_contributions: VariableList::empty(), }; - // Verify final structure - assert_eq!( - consensus_data.aggregators.len(), - 5, - "Should have all 5 aggregators in final consensus data" - ); - assert_eq!( - consensus_data.aggregator_committee_indexes.len(), - 1, - "Should have only 1 committee_index in final consensus data" - ); - assert_eq!( - consensus_data.aggregated_attestations.len(), - 1, - "Should have only 1 attestation in final consensus data" - ); + assert_eq!(consensus_data.aggregators.len(), 5); + assert_eq!(consensus_data.aggregator_committee_indexes.len(), 1); + assert_eq!(consensus_data.aggregated_attestations.len(), 1); - // Validate with the consensus data validator let validator = AggregatorCommitteeDataValidator::::new(); let result = validator.do_validation(&consensus_data); - assert!( - result.is_ok(), - "Consensus data with multiple aggregators same committee should pass validation. Error: {:?}", - result.err() - ); + assert!(result.is_ok(), "Error: {:?}", result.err()); - // Also verify via QbftDataValidator trait let passes_trait_validation = QbftDataValidator::validate(&validator, &consensus_data, &consensus_data); - assert!( - passes_trait_validation, - "Consensus data must pass QbftDataValidator trait validation" - ); + assert!(passes_trait_validation); } } diff --git a/anchor/validator_store/src/duty_input_publisher.rs b/anchor/validator_store/src/duty_input_publisher.rs index 8401cef4b..6523397f5 100644 --- a/anchor/validator_store/src/duty_input_publisher.rs +++ b/anchor/validator_store/src/duty_input_publisher.rs @@ -1,19 +1,4 @@ -//! Duty Input Publisher -//! -//! This service publishes duty inputs at scheduled times during each slot: -//! -//! - **Phase 1 (slot start)**: `VotingAssignments` - caches which validators are attesting/syncing -//! - **Phase 2 (1/3 slot)**: `VotingContext` - fetches beacon_vote and combines with assignments -//! - **Phase 3 (2/3 slot)**: `AggregationAssignments` - builds consensus data for aggregation -//! duties -//! -//! # Separation of Concerns -//! -//! This service focuses on **timing and publishing** of duty inputs. The actual consensus data -//! building logic is delegated to [`AggregatorConsensusBuilder`]. -//! -//! Previously named `MetadataService`, this was renamed to better reflect its actual role -//! as a publisher of duty inputs rather than a generic "metadata" handler. +//! Publishes duty inputs at scheduled times during each slot. use std::{ collections::{HashMap, HashSet}, @@ -22,37 +7,25 @@ use std::{ }; use beacon_node_fallback::BeaconNodeFallback; -use bls::PublicKeyBytes; use fork::{Fork, ForkSchedule}; use slot_clock::SlotClock; -use ssv_types::{CommitteeId, ValidatorIndex, consensus::BeaconVote}; +use ssv_types::{ValidatorIndex, consensus::BeaconVote}; use task_executor::TaskExecutor; use tokio::time::sleep; -use tracing::{error, info, trace}; +use tracing::{error, trace}; use types::{ChainSpec, EthSpec}; use validator_services::duties_service::DutiesService; use crate::{ - AggregationAssignments, AnchorValidatorStore, ContributionWaiter, VotingAssignments, - VotingContext, + AggregationAssignments, AnchorValidatorStore, VotingAssignments, aggregator_consensus_builder::{ - AggregatorConsensusBuilder, SyncAggregatorData, SyncByCommitteeMap, + VotingContext, build_consensus_data_for_all_committees, group_duties_by_committee, }, metrics, }; -/// Maximum time to wait for beacon node API calls to fetch aggregated attestations -/// and sync contributions. After this timeout, we return whatever partial results -/// have been collected. This is shorter than the standard 3-second Lighthouse timeout -/// because SSV has additional latency for QBFT consensus and P2P propagation. const BEACON_API_FETCH_TIMEOUT: Duration = Duration::from_secs(2); -/// Publishes duty inputs at scheduled times during each slot. -/// -/// This service is responsible for: -/// - Caching voting assignments at slot start -/// - Fetching beacon_vote at 1/3 slot -/// - Building and publishing aggregation assignments at 2/3 slot #[derive(Clone)] pub struct DutyInputPublisher { duties_service: Arc, T>>, @@ -92,19 +65,14 @@ impl DutyInputPublisher { .duration_to_next_slot() .ok_or("Unable to determine duration to next slot")?; - info!( + tracing::info!( next_update_millis = duration_to_next_slot.as_millis(), "Duty input publisher started" ); let executor = self.executor.clone(); - // ═══════════════════════════════════════════════════════════════════════ - // PHASE 1: VotingAssignments (slot start) - // Caches voting assignments for use by both selection proofs AND voting context. - // Reads directly from DutiesService cache which is populated on startup and - // refreshed each slot. - // ═══════════════════════════════════════════════════════════════════════ + // Phase 1: VotingAssignments (slot start) let self_clone_phase1 = self.clone(); executor.spawn( async move { @@ -112,9 +80,7 @@ impl DutyInputPublisher { if let Some(duration_to_next_slot) = self_clone_phase1.slot_clock.duration_to_next_slot() { - // Sleep until slot start sleep(duration_to_next_slot).await; - if let Err(err) = self_clone_phase1.publish_voting_assignments() { error!(err, "Failed to publish voting assignments"); } @@ -127,10 +93,7 @@ impl DutyInputPublisher { "voting_assignments_publisher", ); - // ═══════════════════════════════════════════════════════════════════════ - // PHASE 2: VotingContext (1/3 slot) - // Gets cached voting assignments, fetches beacon_vote, builds VotingContext. - // ═══════════════════════════════════════════════════════════════════════ + // Phase 2: VotingContext (1/3 slot) let self_clone_phase2 = self.clone(); executor.spawn( async move { @@ -138,9 +101,7 @@ impl DutyInputPublisher { if let Some(duration_to_next_slot) = self_clone_phase2.slot_clock.duration_to_next_slot() { - // Sleep until 1/3 into slot sleep(duration_to_next_slot + slot_duration / 3).await; - if let Err(err) = self_clone_phase2.publish_voting_context().await { error!(err, "Failed to publish voting context") } else { @@ -155,12 +116,7 @@ impl DutyInputPublisher { "voting_context_publisher", ); - // ═══════════════════════════════════════════════════════════════════════ - // PHASE 3: AggregationAssignments (2/3 slot) - // Re-fetches `duties_service.attesters()` after selection proofs are computed. - // At this point, `DutyAndProof.selection_proof.is_some()` accurately indicates - // `is_aggregator` for attestation duties. - // ═══════════════════════════════════════════════════════════════════════ + // Phase 3: AggregationAssignments (2/3 slot) let self_clone_phase3 = self.clone(); executor.spawn( async move { @@ -168,9 +124,7 @@ impl DutyInputPublisher { if let Some(duration_to_next_slot) = self_clone_phase3.slot_clock.duration_to_next_slot() { - // Sleep until 2/3 into slot sleep(duration_to_next_slot + slot_duration * 2 / 3).await; - if let Err(err) = self_clone_phase3.publish_aggregation_assignments().await { error!(err, "Failed to publish aggregation assignments"); @@ -187,11 +141,9 @@ impl DutyInputPublisher { Ok(()) } - /// Phase 1: Build and publish `VotingAssignments` at slot start. fn publish_voting_assignments(&self) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; - // Get attestation validators let (attesting_validators, attesting_committees): (Vec<_>, HashMap<_, _>) = self .duties_service .attesters(slot) @@ -204,7 +156,6 @@ impl DutyInputPublisher { }) .unzip(); - // Get sync validators by subnet let sync_validators_by_subnet = self .duties_service .sync_duties @@ -250,7 +201,6 @@ impl DutyInputPublisher { self.validator_store .update_voting_assignments(voting_assignments); - // Record validator count metrics metrics::set_gauge( &metrics::METADATA_SERVICE_ATTESTING_VALIDATORS, attester_count as i64, @@ -263,11 +213,10 @@ impl DutyInputPublisher { metrics::inc_counter(&metrics::METADATA_SERVICE_EMPTY_ASSIGNMENTS_TOTAL); } - trace!(%slot, attester_count, sync_count, "Published VotingAssignments at slot start"); + trace!(%slot, attester_count, sync_count, "Published VotingAssignments"); Ok(()) } - /// Phase 2: Build and publish `VotingContext` at 1/3 slot. async fn publish_voting_context(&self) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; @@ -277,7 +226,6 @@ impl DutyInputPublisher { .await .map_err(|e| format!("Failed to get cached voting assignments: {:?}", e))?; - // Fetch beacon_vote from beacon node let attestation_data = self .beacon_nodes .first_success(|beacon_node| async move { @@ -307,159 +255,61 @@ impl DutyInputPublisher { self.validator_store.update_voting_context(voting_context); - trace!(%slot, "Published VotingContext at 1/3 slot"); + trace!(%slot, "Published VotingContext"); Ok(()) } - /// Phase 3: Build and publish `AggregationAssignments` at 2/3 slot. - /// - /// Uses single-pass data transformation to minimize iterations: - /// - ONE pass over attesters (those with `selection_proof`) to build all attester-related data - /// - ONE pass over `sync_aggregators` to build all sync-related data - /// - Then delegates to `AggregatorConsensusBuilder` for beacon fetches and consensus data async fn publish_aggregation_assignments(&self) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; - // Get selection proofs from `duties_service` let attesters = self.duties_service.attesters(slot); let sync_duties = self .duties_service .sync_duties .get_duties_for_slot::(slot, &self.spec); - // ═══════════════════════════════════════════════════════════════════════ - // SINGLE PASS over attesters with `selection_proof` - // Only processes validators with valid, non-liquidated SSV committees. - // Collects: `aggregator_committees`, `attesters_by_ssv_committee`, - // `attestation_committee_indexes` - // ═══════════════════════════════════════════════════════════════════════ - let mut aggregator_committees: HashMap = - HashMap::with_capacity(attesters.len()); - let mut attesters_by_ssv_committee: HashMap> = HashMap::new(); - let mut attestation_committee_indexes: HashSet = - HashSet::with_capacity(attesters.len()); - - for attester in attesters.iter().filter(|d| d.selection_proof.is_some()) { - // Only process validators with valid, non-liquidated SSV committees - if let Some(ssv_committee_id) = self - .validator_store - .get_validator_and_cluster(attester.duty.pubkey) - .ok() - .map(|(_, cluster)| cluster.committee_id()) - { - // For AggregationAssignments output - aggregator_committees.insert(attester.duty.pubkey, attester.duty.committee_index); - - // For consensus data building - group by SSV committee - attesters_by_ssv_committee - .entry(ssv_committee_id) - .or_default() - .push(attester); - attestation_committee_indexes.insert(attester.duty.committee_index); - } - } - - // ═══════════════════════════════════════════════════════════════════════ - // SINGLE PASS over `sync_aggregators` - // Only processes validators with valid, non-liquidated SSV committees. - // Collects: `validator_subnet_counts` (for multi_sync), `sync_by_ssv_committee`, - // `all_subnet_ids` - // ═══════════════════════════════════════════════════════════════════════ let sync_aggregators = sync_duties.as_ref().map(|duties| &duties.aggregators); - let mut validator_subnet_counts: HashMap = HashMap::new(); - let mut sync_by_ssv_committee: SyncByCommitteeMap = HashMap::new(); - let mut all_subnet_ids: HashSet = - HashSet::with_capacity(sync_aggregators.map(|a| a.len()).unwrap_or(0)); - - if let Some(aggregators) = sync_aggregators { - for (subnet_id, subnet_aggregators) in aggregators { - for (validator_index, pubkey, selection_proof) in subnet_aggregators { - let sync_aggregator = SyncAggregatorData { - validator_index: *validator_index, - pubkey: *pubkey, - selection_proof: selection_proof.clone(), - }; - - // Only process validators with valid, non-liquidated SSV committees - if let Some(ssv_committee_id) = self - .validator_store - .get_validator_and_cluster(sync_aggregator.pubkey) - .ok() - .map(|(_, cluster)| cluster.committee_id()) - { - // For AggregationAssignments output - *validator_subnet_counts - .entry(sync_aggregator.pubkey) - .or_insert(0) += 1; - - // For consensus data building - group by SSV committee - sync_by_ssv_committee - .entry(ssv_committee_id) - .or_default() - .push((*subnet_id, sync_aggregator)); - all_subnet_ids.insert(*subnet_id); - } - } - } - } + let grouped = + group_duties_by_committee(&attesters, sync_aggregators, &self.validator_store); - // Derive multi_sync_aggregators from validator_subnet_counts - // (validators aggregating on multiple subnets need coordination) - let multi_sync_aggregators: HashMap> = - validator_subnet_counts - .into_iter() - .filter(|(_, count)| *count > 1) - .map(|(pubkey, count)| (pubkey, ContributionWaiter::new(count))) - .collect(); - - // ═══════════════════════════════════════════════════════════════════════ - // Build consensus data for Boole+ forks - // Delegate to AggregatorConsensusBuilder for the heavy lifting - // ═══════════════════════════════════════════════════════════════════════ let epoch = slot.epoch(E::slots_per_epoch()); - let consensus_data_by_ssv_committee = if self.fork_schedule.active_fork(epoch) >= Fork::Boole { - // Get voting context for beacon_vote (cached at 1/3 slot) let voting_context = self .validator_store .get_voting_context(slot) .await .map_err(|e| format!("Failed to get voting context: {:?}", e))?; - let builder = AggregatorConsensusBuilder::new( + build_consensus_data_for_all_committees( + slot, + grouped.attesters_by_ssv_committee, + grouped.sync_by_ssv_committee, + grouped.attestation_committee_indexes, + grouped.all_subnet_ids, + &voting_context, + BEACON_API_FETCH_TIMEOUT, &self.validator_store, &self.beacon_nodes, &self.spec, - ); - - builder - .build_consensus_data_for_all_committees( - slot, - attesters_by_ssv_committee, - sync_by_ssv_committee, - attestation_committee_indexes, - all_subnet_ids, - &voting_context, - BEACON_API_FETCH_TIMEOUT, - ) - .await? + ) + .await? } else { HashMap::new() }; let aggregator_info = AggregationAssignments { slot, - aggregator_committees, - multi_sync_aggregators, + aggregator_committees: grouped.aggregator_committees, + multi_sync_aggregators: grouped.multi_sync_aggregators, consensus_data_by_ssv_committee, }; self.validator_store .update_aggregation_assignments(aggregator_info); - trace!(%slot, "Published AggregationAssignments at 2/3 slot"); + trace!(%slot, "Published AggregationAssignments"); Ok(()) } } diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs index 8219a6fd5..2d3e1b7a1 100644 --- a/anchor/validator_store/src/lib.rs +++ b/anchor/validator_store/src/lib.rs @@ -3,15 +3,6 @@ pub mod duty_input_publisher; mod metrics; pub mod registration_service; -/// Backwards compatibility: re-export DutyInputPublisher as MetadataService -#[deprecated( - since = "0.1.0", - note = "Use duty_input_publisher::DutyInputPublisher instead" -)] -pub mod metadata_service { - pub use crate::duty_input_publisher::DutyInputPublisher as MetadataService; -} - use std::{ collections::{HashMap, HashSet}, fmt::Debug, @@ -22,6 +13,7 @@ use std::{ time::Duration, }; +pub(crate) use aggregator_consensus_builder::VotingContext; use bls::{PublicKeyBytes, SecretKey, Signature}; use database::{NetworkDatabase, NonUniqueIndex, UniqueIndex}; use eth2::types::{BlockContents, FullBlockContents, PublishBlockRequest}; @@ -518,7 +510,7 @@ impl AnchorValidatorStore { } } - /// Update validator voting assignments (called by `MetadataService` at slot start). + /// Update validator voting assignments (called by `DutyInputPublisher` at slot start). /// /// This publishes the `VotingAssignments` to all subscribers via the watch channel. pub fn update_voting_assignments(&self, voting_assignments: VotingAssignments) { @@ -559,7 +551,7 @@ impl AnchorValidatorStore { } } - /// Update aggregator voting assignments (called by `MetadataService` Phase 3 at 2/3 slot). + /// Update aggregation assignments (called by `DutyInputPublisher` at 2/3 slot). /// /// This publishes the `AggregationAssignments` to all subscribers via the watch channel. /// At 2/3 slot, selection proofs have been computed by Lighthouse, so @@ -1550,16 +1542,6 @@ fn decrypt_key_share( .map_err(|err| error!(?err, validator = %pubkey_bytes, "Invalid secret key decrypted")) } -/// Context for voting duties at 1/3 slot. -/// -/// Contains cached voting assignments and the beacon_vote fetched from the beacon node. -pub struct VotingContext { - /// Cached voting assignments (computed at slot start, reused here) - pub voting_assignments: Arc, - /// The `BeaconVote` (only available at 1/3 slot from beacon node) - pub beacon_vote: BeaconVote, -} - /// Cached validator voting assignments for a slot. /// /// This struct caches voting assignments computed at slot start and reuses it at 1/3 slot,