From bc9b2d9fef8ffbddf2e8688948225e5c81b58f23 Mon Sep 17 00:00:00 2001 From: shane-moore Date: Thu, 19 Feb 2026 08:27:11 -0800 Subject: [PATCH 01/11] refactor: stream vc vote signing --- Cargo.lock | 1 + .../lighthouse_validator_store/src/lib.rs | 115 ++++++++- .../src/attestation_service.rs | 221 +++++++++--------- .../src/sync_committee_service.rs | 161 ++++++------- validator_client/validator_store/Cargo.toml | 1 + validator_client/validator_store/src/lib.rs | 66 +++++- 6 files changed, 359 insertions(+), 206 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a8e76a8a8d..0465c2cddb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9708,6 +9708,7 @@ version = "0.1.0" dependencies = [ "bls", "eth2", + "futures", "slashing_protection", "types", ] diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index 7806482ffb3..eef6696045c 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -3,6 +3,7 @@ use bls::{PublicKeyBytes, Signature}; use doppelganger_service::DoppelgangerService; use eth2::types::PublishBlockRequest; use futures::future::join_all; +use futures::stream::{self, BoxStream, StreamExt}; use initialized_validators::InitializedValidators; use logging::crit; use parking_lot::{Mutex, RwLock}; @@ -885,7 +886,7 @@ impl ValidatorStore for LighthouseValidatorS async fn sign_attestations( self: &Arc, mut attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, - ) -> Result)>, Error> { + ) -> Result)>>, Error> { // Sign all attestations concurrently. let signing_futures = attestations @@ -932,7 +933,7 @@ impl ValidatorStore for LighthouseValidatorS } if signed_attestations.is_empty() { - return Ok(vec![]); + return Ok(stream::empty().boxed()); } // Check slashing protection and insert into database. Use a dedicated blocking thread @@ -947,7 +948,7 @@ impl ValidatorStore for LighthouseValidatorS .ok_or(Error::ExecutorError)? .await .map_err(|_| Error::ExecutorError)??; - Ok(safe_attestations) + Ok(stream::once(async move { safe_attestations }).boxed()) } async fn sign_validator_registration_data( @@ -1166,6 +1167,114 @@ impl ValidatorStore for LighthouseValidatorS Ok(SignedContributionAndProof { message, signature }) } + async fn sign_aggregate_and_proofs( + self: &Arc, + aggregates: Vec<(PublicKeyBytes, u64, Attestation, SelectionProof)>, + ) -> Result>>, Error> { + let signing_futures = aggregates.into_iter().map( + |(pubkey, aggregator_index, aggregate, selection_proof)| async move { + match self + .produce_signed_aggregate_and_proof( + pubkey, + aggregator_index, + aggregate, + selection_proof, + ) + .await + { + Ok(signed) => Some(signed), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for aggregate" + ); + None + } + Err(e) => { + crit!(error = ?e, "Failed to sign aggregate"); + None + } + } + }, + ); + + let results: Vec<_> = join_all(signing_futures).await.into_iter().flatten().collect(); + Ok(stream::once(async move { results }).boxed()) + } + + async fn sign_sync_committee_signatures( + self: &Arc, + messages: Vec<(Slot, Hash256, u64, PublicKeyBytes)>, + ) -> Result>, Error> { + let signing_futures = messages.into_iter().map( + |(slot, beacon_block_root, validator_index, pubkey)| async move { + match self + .produce_sync_committee_signature( + slot, + beacon_block_root, + validator_index, + &pubkey, + ) + .await + { + Ok(sig) => Some(sig), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for sync committee signature" + ); + None + } + Err(e) => { + crit!(error = ?e, "Failed to sign sync committee message"); + None + } + } + }, + ); + + let results: Vec<_> = join_all(signing_futures).await.into_iter().flatten().collect(); + Ok(stream::once(async move { results }).boxed()) + } + + async fn sign_sync_committee_contributions( + self: &Arc, + contributions: Vec<(u64, PublicKeyBytes, SyncCommitteeContribution, SyncSelectionProof)>, + ) -> Result>>, Error> { + let signing_futures = contributions.into_iter().map( + |(aggregator_index, aggregator_pubkey, contribution, selection_proof)| async move { + match self + .produce_signed_contribution_and_proof( + aggregator_index, + aggregator_pubkey, + contribution, + selection_proof, + ) + .await + { + Ok(signed) => Some(signed), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for sync contribution" + ); + None + } + Err(e) => { + crit!(error = ?e, "Failed to sign sync committee contribution"); + None + } + } + }, + ); + + let results: Vec<_> = join_all(signing_futures).await.into_iter().flatten().collect(); + Ok(stream::once(async move { results }).boxed()) + } + /// Prune the slashing protection database so that it remains performant. /// /// This function will only do actual pruning periodically, so it should usually be diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index a9d52833127..a0ae2fb1f6b 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,6 +1,6 @@ use crate::duties_service::{DutiesService, DutyAndProof}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, beacon_head_monitor::HeadEvent}; -use futures::future::join_all; +use futures::StreamExt; use logging::crit; use slot_clock::SlotClock; use std::collections::HashMap; @@ -12,8 +12,10 @@ use tokio::sync::mpsc; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use tree_hash::TreeHash; -use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Hash256, Slot}; -use validator_store::{Error as ValidatorStoreError, ValidatorStore}; +use types::{ + Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, ForkName, Hash256, Slot, +}; +use validator_store::ValidatorStore; /// Builds an `AttestationService`. #[derive(Default)] @@ -573,21 +575,34 @@ impl AttestationService(attestation_data.slot); + // Publish each batch as it arrives from the stream. + while let Some(batch) = attestation_stream.next().await { + if !batch.is_empty() { + self.publish_attestation_batch(&batch, fork_name, &attestation_data, slot) + .await; + } + } + + Ok(()) + } + + async fn publish_attestation_batch( + &self, + safe_attestations: &[(u64, Attestation)], + fork_name: ForkName, + attestation_data: &AttestationData, + slot: Slot, + ) { let single_attestations = safe_attestations .iter() .filter_map(|(i, a)| { @@ -651,8 +666,6 @@ impl AttestationService AttestationService = validator_duties + .iter() + .filter_map(|duty_and_proof| { + let duty = &duty_and_proof.duty; + let selection_proof = duty_and_proof.selection_proof.as_ref()?; - if !duty.match_attestation_data::(attestation_data, &self.chain_spec) { - crit!("Inconsistent validator duties during signing"); - return None; - } + if !duty.match_attestation_data::(attestation_data, &self.chain_spec) { + crit!("Inconsistent validator duties during signing"); + return None; + } - match self - .validator_store - .produce_signed_aggregate_and_proof( + Some(( duty.pubkey, duty.validator_index, aggregated_attestation.clone(), selection_proof.clone(), - ) - .await - { - Ok(aggregate) => Some(aggregate), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!(?pubkey, "Missing pubkey for aggregate"); - None - } - Err(e) => { - crit!( - error = ?e, - pubkey = ?duty.pubkey, - "Failed to sign aggregate" - ); - None - } - } - }); + )) + }) + .collect(); - // Execute all the futures in parallel, collecting any successful results. - let aggregator_count = validator_duties - .iter() - .filter(|d| d.selection_proof.is_some()) - .count(); - let signed_aggregate_and_proofs = join_all(signing_futures) - .instrument(info_span!("sign_aggregates", count = aggregator_count)) + if aggregates_to_sign.is_empty() { + return Ok(()); + } + + // Sign aggregates. Returns a stream of batches. + let mut aggregate_stream = self + .validator_store + .sign_aggregate_and_proofs(aggregates_to_sign) .await - .into_iter() - .flatten() - .collect::>(); + .map_err(|e| format!("Failed to sign aggregates: {e:?}"))?; - if !signed_aggregate_and_proofs.is_empty() { - let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); - match self - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES_HTTP_POST], + // Publish each batch as it arrives from the stream. + while let Some(batch) = aggregate_stream.next().await { + if !batch.is_empty() { + self.publish_aggregate_batch(&batch, fork_name).await; + } + } + + Ok(()) + } + + async fn publish_aggregate_batch( + &self, + signed_aggregate_and_proofs: &[types::SignedAggregateAndProof], + fork_name: ForkName, + ) { + match self + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES_HTTP_POST], + ); + if fork_name.electra_enabled() { + beacon_node + .post_validator_aggregate_and_proof_v2( + signed_aggregate_and_proofs, + fork_name, + ) + .await + } else { + beacon_node + .post_validator_aggregate_and_proof_v1( + signed_aggregate_and_proofs, + ) + .await + } + }) + .instrument(info_span!( + "publish_aggregates", + count = signed_aggregate_and_proofs.len() + )) + .await + { + Ok(()) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = signed_aggregate_and_proof.message().aggregate(); + info!( + aggregator = signed_aggregate_and_proof.message().aggregator_index(), + signatures = attestation.num_set_aggregation_bits(), + head_block = format!("{:?}", attestation.data().beacon_block_root), + committee_index = attestation.committee_index(), + slot = attestation.data().slot.as_u64(), + "type" = "aggregated", + "Successfully published attestation" ); - if fork_name.electra_enabled() { - beacon_node - .post_validator_aggregate_and_proof_v2( - signed_aggregate_and_proofs_slice, - fork_name, - ) - .await - } else { - beacon_node - .post_validator_aggregate_and_proof_v1( - signed_aggregate_and_proofs_slice, - ) - .await - } - }) - .instrument(info_span!( - "publish_aggregates", - count = signed_aggregate_and_proofs.len() - )) - .await - { - Ok(()) => { - for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = signed_aggregate_and_proof.message().aggregate(); - info!( - aggregator = signed_aggregate_and_proof.message().aggregator_index(), - signatures = attestation.num_set_aggregation_bits(), - head_block = format!("{:?}", attestation.data().beacon_block_root), - committee_index = attestation.committee_index(), - slot = attestation.data().slot.as_u64(), - "type" = "aggregated", - "Successfully published attestation" - ); - } } - Err(e) => { - for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = &signed_aggregate_and_proof.message().aggregate(); - crit!( - error = %e, - aggregator = signed_aggregate_and_proof.message().aggregator_index(), - committee_index = attestation.committee_index(), - slot = attestation.data().slot.as_u64(), - "type" = "aggregated", - "Failed to publish attestation" - ); - } + } + Err(e) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = &signed_aggregate_and_proof.message().aggregate(); + crit!( + error = %e, + aggregator = signed_aggregate_and_proof.message().aggregator_index(), + committee_index = attestation.committee_index(), + slot = attestation.data().slot.as_u64(), + "type" = "aggregated", + "Failed to publish attestation" + ); } } } - - Ok(()) } /// Spawn a blocking task to run the slashing protection pruning process. diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index 59e8524a1ae..92f5266eb2a 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -2,8 +2,8 @@ use crate::duties_service::DutiesService; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use bls::PublicKeyBytes; use eth2::types::BlockId; +use futures::StreamExt; use futures::future::FutureExt; -use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; use std::collections::HashMap; @@ -14,10 +14,10 @@ use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; use types::{ - ChainSpec, EthSpec, Hash256, Slot, SyncCommitteeSubscription, SyncContributionData, SyncDuty, - SyncSelectionProof, SyncSubnetId, + ChainSpec, EthSpec, Hash256, Slot, SyncCommitteeMessage, SyncCommitteeSubscription, + SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId, }; -use validator_store::{Error as ValidatorStoreError, ValidatorStore}; +use validator_store::ValidatorStore; pub const SUBSCRIPTION_LOOKAHEAD_EPOCHS: u64 = 4; @@ -247,53 +247,39 @@ impl SyncCommitteeService, ) -> Result<(), ()> { - // Create futures to produce sync committee signatures. - let signature_futures = validator_duties.iter().map(|duty| async move { - match self - .validator_store - .produce_sync_committee_signature( - slot, - beacon_block_root, - duty.validator_index, - &duty.pubkey, - ) - .await - { - Ok(signature) => Some(signature), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!( - ?pubkey, - validator_index = duty.validator_index, - %slot, - "Missing pubkey for sync committee signature" - ); - None - } - Err(e) => { - crit!( - validator_index = duty.validator_index, - %slot, - error = ?e, - "Failed to sign sync committee signature" - ); - None - } - } - }); + let messages_to_sign: Vec<_> = validator_duties + .iter() + .map(|duty| (slot, beacon_block_root, duty.validator_index, duty.pubkey)) + .collect(); - // Execute all the futures in parallel, collecting any successful results. - let committee_signatures = &join_all(signature_futures) - .instrument(info_span!( - "sign_sync_signatures", - count = validator_duties.len() - )) + if messages_to_sign.is_empty() { + return Ok(()); + } + + let mut signature_stream = self + .validator_store + .sign_sync_committee_signatures(messages_to_sign) .await - .into_iter() - .flatten() - .collect::>(); + .map_err(|e| { + crit!(%slot, error = ?e, "Failed to sign sync committee signatures"); + })?; + + while let Some(batch) = signature_stream.next().await { + if !batch.is_empty() { + self.publish_sync_signature_batch(&batch, slot, beacon_block_root) + .await?; + } + } + Ok(()) + } + + async fn publish_sync_signature_batch( + &self, + committee_signatures: &[SyncCommitteeMessage], + slot: Slot, + beacon_block_root: Hash256, + ) -> Result<(), ()> { self.beacon_nodes .request(ApiTopic::SyncCommittee, |beacon_node| async move { beacon_node @@ -389,50 +375,45 @@ impl SyncCommitteeService Some(signed_contribution), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!(?pubkey, %slot, "Missing pubkey for sync contribution"); - None - } - Err(e) => { - crit!( - %slot, - error = ?e, - "Unable to sign sync committee contribution" - ); - None - } - } - }, - ); + let contributions_to_sign: Vec<_> = subnet_aggregators + .into_iter() + .map(|(aggregator_index, aggregator_pk, selection_proof)| { + (aggregator_index, aggregator_pk, contribution.clone(), selection_proof) + }) + .collect(); - // Execute all the futures in parallel, collecting any successful results. - let signed_contributions = &join_all(signature_futures) - .instrument(info_span!( - "sign_sync_contributions", - count = aggregator_count - )) + if contributions_to_sign.is_empty() { + return Ok(()); + } + + let mut contribution_stream = self + .validator_store + .sign_sync_committee_contributions(contributions_to_sign) .await - .into_iter() - .flatten() - .collect::>(); + .map_err(|e| { + crit!(%slot, error = ?e, "Failed to sign sync committee contributions"); + })?; + while let Some(batch) = contribution_stream.next().await { + if !batch.is_empty() { + self.publish_sync_contribution_batch( + &batch, slot, beacon_block_root, subnet_id, + contribution.aggregation_bits.num_set_bits(), + ).await?; + } + } + + Ok(()) + } + + async fn publish_sync_contribution_batch( + &self, + signed_contributions: &[types::SignedContributionAndProof], + slot: Slot, + beacon_block_root: Hash256, + subnet_id: SyncSubnetId, + num_signers: usize, + ) -> Result<(), ()> { // Publish to the beacon node. self.beacon_nodes .first_success(|beacon_node| async move { @@ -456,7 +437,7 @@ impl SyncCommitteeService"] [dependencies] bls = { workspace = true } eth2 = { workspace = true } +futures = { workspace = true } slashing_protection = { workspace = true } types = { workspace = true } diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 87ab669e8d2..4885b40f227 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -1,5 +1,6 @@ use bls::{PublicKeyBytes, Signature}; use eth2::types::{FullBlockContents, PublishBlockRequest}; +use futures::stream::BoxStream; use slashing_protection::NotSafe; use std::fmt::Debug; use std::future::Future; @@ -106,22 +107,30 @@ pub trait ValidatorStore: Send + Sync { /// Sign a batch of `attestations` and apply slashing protection to them. /// - /// Only successfully signed attestations that pass slashing protection are returned, along with - /// the validator index of the signer. Eventually this will be replaced by `SingleAttestation` - /// use. + /// Returns a stream of batches of successfully signed attestations. Each batch contains + /// attestations that passed slashing protection, along with the validator index of the signer. + /// + /// For standard (non-distributed) operation, the stream yields a single batch containing + /// all attestations. For DVT/distributed operation, the stream may yield multiple batches + /// (e.g., one per committee) allowing incremental publishing as each committee completes. /// /// Input: /// /// * Vec of (validator_index, pubkey, validator_committee_index, attestation). /// - /// Output: + /// Output (per batch): /// /// * Vec of (validator_index, signed_attestation). #[allow(clippy::type_complexity)] fn sign_attestations( self: &Arc, attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, - ) -> impl Future)>, Error>> + Send; + ) -> impl Future< + Output = Result< + BoxStream<'static, Vec<(u64, Attestation)>>, + Error, + >, + > + Send; fn sign_validator_registration_data( &self, @@ -172,6 +181,53 @@ pub trait ValidatorStore: Send + Sync { selection_proof: SyncSelectionProof, ) -> impl Future, Error>> + Send; + /// Sign a batch of aggregate and proofs and return results as a stream of batches. + /// + /// For standard operation, yields a single batch. For DVT, may yield multiple batches + /// (e.g., one per committee) for incremental publishing. + #[allow(clippy::type_complexity)] + fn sign_aggregate_and_proofs( + self: &Arc, + aggregates: Vec<(PublicKeyBytes, u64, Attestation, SelectionProof)>, + ) -> impl Future< + Output = Result< + BoxStream<'static, Vec>>, + Error, + >, + > + Send; + + /// Sign a batch of sync committee messages and return results as a stream of batches. + /// + /// For standard operation, yields a single batch. For DVT, may yield multiple batches + /// for incremental publishing. + #[allow(clippy::type_complexity)] + fn sign_sync_committee_signatures( + self: &Arc, + messages: Vec<(Slot, Hash256, u64, PublicKeyBytes)>, + ) -> impl Future< + Output = Result>, Error>, + > + Send; + + /// Sign a batch of sync committee contributions and return results as a stream of batches. + /// + /// For standard operation, yields a single batch. For DVT, may yield multiple batches + /// for incremental publishing. + #[allow(clippy::type_complexity)] + fn sign_sync_committee_contributions( + self: &Arc, + contributions: Vec<( + u64, + PublicKeyBytes, + SyncCommitteeContribution, + SyncSelectionProof, + )>, + ) -> impl Future< + Output = Result< + BoxStream<'static, Vec>>, + Error, + >, + > + Send; + /// Prune the slashing protection database so that it remains performant. /// /// This function will only do actual pruning periodically, so it should usually be From af533e76483f2c6a57d86fa0d60dc4f813dfeb20 Mon Sep 17 00:00:00 2001 From: shane-moore Date: Thu, 19 Feb 2026 10:01:15 -0800 Subject: [PATCH 02/11] refactor: use impl Stream for vote signing --- .../lighthouse_validator_store/src/lib.rs | 351 ++++++++++-------- .../src/attestation_service.rs | 46 ++- .../src/sync_committee_service.rs | 60 +-- validator_client/validator_store/src/lib.rs | 27 +- 4 files changed, 270 insertions(+), 214 deletions(-) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index eef6696045c..ef1782e224b 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -3,7 +3,8 @@ use bls::{PublicKeyBytes, Signature}; use doppelganger_service::DoppelgangerService; use eth2::types::PublishBlockRequest; use futures::future::join_all; -use futures::stream::{self, BoxStream, StreamExt}; +use futures::Stream; +use futures::stream; use initialized_validators::InitializedValidators; use logging::crit; use parking_lot::{Mutex, RwLock}; @@ -18,7 +19,7 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use task_executor::TaskExecutor; -use tracing::{error, info, instrument, warn}; +use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, @@ -883,72 +884,78 @@ impl ValidatorStore for LighthouseValidatorS } } - async fn sign_attestations( + fn sign_attestations( self: &Arc, mut attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, - ) -> Result)>>, Error> { - // Sign all attestations concurrently. - let signing_futures = - attestations - .iter_mut() - .map(|(_, pubkey, validator_committee_index, attestation)| { - let pubkey = *pubkey; - let validator_committee_index = *validator_committee_index; - async move { - self.sign_attestation_no_slashing_protection( - pubkey, - validator_committee_index, - attestation, - ) - .await + ) -> impl Stream)>, Error>> + Send { + let store = self.clone(); + stream::once(async move { + // Sign all attestations concurrently. + let signing_futures = + attestations + .iter_mut() + .map(|(_, pubkey, validator_committee_index, attestation)| { + let pubkey = *pubkey; + let validator_committee_index = *validator_committee_index; + let store = store.clone(); + async move { + store.sign_attestation_no_slashing_protection( + pubkey, + validator_committee_index, + attestation, + ) + .await + } + }); + + // Execute all signing in parallel. + let results: Vec<_> = join_all(signing_futures).await; + + // Collect successfully signed attestations and log errors. + let mut signed_attestations = Vec::with_capacity(attestations.len()); + for (result, (validator_index, pubkey, _, attestation)) in + results.into_iter().zip(attestations.into_iter()) + { + match result { + Ok(()) => { + signed_attestations.push((validator_index, attestation, pubkey)); + } + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for attestation" + ); + } + Err(e) => { + crit!( + error = ?e, + "Failed to sign attestation" + ); } - }); - - // Execute all signing in parallel. - let results: Vec<_> = join_all(signing_futures).await; - - // Collect successfully signed attestations and log errors. - let mut signed_attestations = Vec::with_capacity(attestations.len()); - for (result, (validator_index, pubkey, _, attestation)) in - results.into_iter().zip(attestations.into_iter()) - { - match result { - Ok(()) => { - signed_attestations.push((validator_index, attestation, pubkey)); - } - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for attestation" - ); - } - Err(e) => { - crit!( - error = ?e, - "Failed to sign attestation" - ); } } - } - if signed_attestations.is_empty() { - return Ok(stream::empty().boxed()); - } + if signed_attestations.is_empty() { + return Ok(vec![]); + } - // Check slashing protection and insert into database. Use a dedicated blocking thread - // to avoid clogging the async executor with blocking database I/O. - let validator_store = self.clone(); - let safe_attestations = self - .task_executor - .spawn_blocking_handle( - move || validator_store.slashing_protect_attestations(signed_attestations), - "slashing_protect_attestations", - ) - .ok_or(Error::ExecutorError)? - .await - .map_err(|_| Error::ExecutorError)??; - Ok(stream::once(async move { safe_attestations }).boxed()) + // Check slashing protection and insert into database. Use a dedicated blocking + // thread to avoid clogging the async executor with blocking database I/O. + let validator_store = store.clone(); + let handle = store + .task_executor + .spawn_blocking_handle( + move || validator_store.slashing_protect_attestations(signed_attestations), + "slashing_protect_attestations", + ) + .ok_or(Error::ExecutorError)?; + + match handle.await { + Ok(result) => result, + Err(_) => Err(Error::ExecutorError), + } + }) } async fn sign_validator_registration_data( @@ -1167,112 +1174,154 @@ impl ValidatorStore for LighthouseValidatorS Ok(SignedContributionAndProof { message, signature }) } - async fn sign_aggregate_and_proofs( + fn sign_aggregate_and_proofs( self: &Arc, aggregates: Vec<(PublicKeyBytes, u64, Attestation, SelectionProof)>, - ) -> Result>>, Error> { - let signing_futures = aggregates.into_iter().map( - |(pubkey, aggregator_index, aggregate, selection_proof)| async move { - match self - .produce_signed_aggregate_and_proof( - pubkey, - aggregator_index, - aggregate, - selection_proof, - ) - .await - { - Ok(signed) => Some(signed), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for aggregate" - ); - None - } - Err(e) => { - crit!(error = ?e, "Failed to sign aggregate"); - None + ) -> impl Stream>, Error>> + Send { + let store = self.clone(); + let count = aggregates.len(); + stream::once(async move { + let signing_futures = aggregates.into_iter().map( + |(pubkey, aggregator_index, aggregate, selection_proof)| { + let store = store.clone(); + async move { + match store + .produce_signed_aggregate_and_proof( + pubkey, + aggregator_index, + aggregate, + selection_proof, + ) + .await + { + Ok(signed) => Some(signed), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!(?pubkey, "Missing pubkey for aggregate"); + None + } + Err(e) => { + crit!(error = ?e, ?pubkey, "Failed to sign aggregate"); + None + } + } } - } - }, - ); + }, + ); - let results: Vec<_> = join_all(signing_futures).await.into_iter().flatten().collect(); - Ok(stream::once(async move { results }).boxed()) + Ok(join_all(signing_futures) + .instrument(info_span!("sign_aggregates", count)) + .await + .into_iter() + .flatten() + .collect::>()) + }) } - async fn sign_sync_committee_signatures( + fn sign_sync_committee_signatures( self: &Arc, messages: Vec<(Slot, Hash256, u64, PublicKeyBytes)>, - ) -> Result>, Error> { - let signing_futures = messages.into_iter().map( - |(slot, beacon_block_root, validator_index, pubkey)| async move { - match self - .produce_sync_committee_signature( - slot, - beacon_block_root, - validator_index, - &pubkey, - ) - .await - { - Ok(sig) => Some(sig), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for sync committee signature" - ); - None - } - Err(e) => { - crit!(error = ?e, "Failed to sign sync committee message"); - None + ) -> impl Stream, Error>> + Send { + let store = self.clone(); + let count = messages.len(); + stream::once(async move { + let signing_futures = messages.into_iter().map( + |(slot, beacon_block_root, validator_index, pubkey)| { + let store = store.clone(); + async move { + match store + .produce_sync_committee_signature( + slot, + beacon_block_root, + validator_index, + &pubkey, + ) + .await + { + Ok(sig) => Some(sig), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!( + ?pubkey, + validator_index, + %slot, + "Missing pubkey for sync committee signature" + ); + None + } + Err(e) => { + crit!( + validator_index, + %slot, + error = ?e, + "Failed to sign sync committee signature" + ); + None + } + } } - } - }, - ); + }, + ); - let results: Vec<_> = join_all(signing_futures).await.into_iter().flatten().collect(); - Ok(stream::once(async move { results }).boxed()) + Ok(join_all(signing_futures) + .instrument(info_span!("sign_sync_signatures", count)) + .await + .into_iter() + .flatten() + .collect::>()) + }) } - async fn sign_sync_committee_contributions( + fn sign_sync_committee_contributions( self: &Arc, contributions: Vec<(u64, PublicKeyBytes, SyncCommitteeContribution, SyncSelectionProof)>, - ) -> Result>>, Error> { - let signing_futures = contributions.into_iter().map( - |(aggregator_index, aggregator_pubkey, contribution, selection_proof)| async move { - match self - .produce_signed_contribution_and_proof( - aggregator_index, - aggregator_pubkey, - contribution, - selection_proof, - ) - .await - { - Ok(signed) => Some(signed), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for sync contribution" - ); - None - } - Err(e) => { - crit!(error = ?e, "Failed to sign sync committee contribution"); - None + ) -> impl Stream>, Error>> + Send { + let store = self.clone(); + let count = contributions.len(); + stream::once(async move { + let signing_futures = contributions.into_iter().map( + |(aggregator_index, aggregator_pubkey, contribution, selection_proof)| { + let store = store.clone(); + let slot = contribution.slot; + async move { + match store + .produce_signed_contribution_and_proof( + aggregator_index, + aggregator_pubkey, + contribution, + selection_proof, + ) + .await + { + Ok(signed) => Some(signed), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!(?pubkey, %slot, "Missing pubkey for sync contribution"); + None + } + Err(e) => { + crit!( + %slot, + error = ?e, + "Unable to sign sync committee contribution" + ); + None + } + } } - } - }, - ); + }, + ); - let results: Vec<_> = join_all(signing_futures).await.into_iter().flatten().collect(); - Ok(stream::once(async move { results }).boxed()) + Ok(join_all(signing_futures) + .instrument(info_span!("sign_sync_contributions", count)) + .await + .into_iter() + .flatten() + .collect::>()) + }) } /// Prune the slashing protection database so that it remains performant. diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index a0ae2fb1f6b..3df3bcffc2b 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -575,24 +575,35 @@ impl AttestationService(attestation_data.slot); // Publish each batch as it arrives from the stream. - while let Some(batch) = attestation_stream.next().await { - if !batch.is_empty() { - self.publish_attestation_batch(&batch, fork_name, &attestation_data, slot) - .await; + let mut published_any = false; + while let Some(result) = attestation_stream.next().await { + match result { + Ok(batch) if !batch.is_empty() => { + published_any = true; + self.publish_attestation_batch(&batch, fork_name, &attestation_data, slot) + .await; + } + Err(e) => { + return Err(format!("Failed to sign attestations: {e:?}")); + } + _ => {} } } + if !published_any { + warn!("No attestations were published"); + } + Ok(()) } @@ -764,16 +775,21 @@ impl AttestationService { + self.publish_aggregate_batch(&batch, fork_name).await; + } + Err(e) => { + return Err(format!("Failed to sign aggregates: {e:?}")); + } + _ => {} } } diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index 92f5266eb2a..b71f8fffa0e 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -256,18 +256,22 @@ impl SyncCommitteeService { + self.publish_sync_signature_batch(&batch, slot, beacon_block_root) + .await?; + } + Err(e) => { + crit!(%slot, error = ?e, "Failed to sign sync committee signatures"); + return Err(()); + } + _ => {} } } @@ -300,9 +304,9 @@ impl SyncCommitteeService SyncCommitteeService { + self.publish_sync_contribution_batch( + &batch, slot, beacon_block_root, subnet_id, + contribution.aggregation_bits.num_set_bits(), + ).await?; + } + Err(e) => { + crit!(%slot, error = ?e, "Failed to sign sync committee contributions"); + return Err(()); + } + _ => {} } } diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 4885b40f227..955418fdd49 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -1,6 +1,6 @@ use bls::{PublicKeyBytes, Signature}; use eth2::types::{FullBlockContents, PublishBlockRequest}; -use futures::stream::BoxStream; +use futures::Stream; use slashing_protection::NotSafe; use std::fmt::Debug; use std::future::Future; @@ -125,12 +125,7 @@ pub trait ValidatorStore: Send + Sync { fn sign_attestations( self: &Arc, attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, - ) -> impl Future< - Output = Result< - BoxStream<'static, Vec<(u64, Attestation)>>, - Error, - >, - > + Send; + ) -> impl Stream)>, Error>> + Send; fn sign_validator_registration_data( &self, @@ -189,12 +184,7 @@ pub trait ValidatorStore: Send + Sync { fn sign_aggregate_and_proofs( self: &Arc, aggregates: Vec<(PublicKeyBytes, u64, Attestation, SelectionProof)>, - ) -> impl Future< - Output = Result< - BoxStream<'static, Vec>>, - Error, - >, - > + Send; + ) -> impl Stream>, Error>> + Send; /// Sign a batch of sync committee messages and return results as a stream of batches. /// @@ -204,9 +194,7 @@ pub trait ValidatorStore: Send + Sync { fn sign_sync_committee_signatures( self: &Arc, messages: Vec<(Slot, Hash256, u64, PublicKeyBytes)>, - ) -> impl Future< - Output = Result>, Error>, - > + Send; + ) -> impl Stream, Error>> + Send; /// Sign a batch of sync committee contributions and return results as a stream of batches. /// @@ -221,12 +209,7 @@ pub trait ValidatorStore: Send + Sync { SyncCommitteeContribution, SyncSelectionProof, )>, - ) -> impl Future< - Output = Result< - BoxStream<'static, Vec>>, - Error, - >, - > + Send; + ) -> impl Stream>, Error>> + Send; /// Prune the slashing protection database so that it remains performant. /// From 580e2372bb2cc5e1bba3f4911d21e61b3a0438ba Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 07:38:39 -0800 Subject: [PATCH 03/11] refactor: move produce_* signing fns to LighthouseValidatorStore --- .../lighthouse_validator_store/src/lib.rs | 226 +++++++++--------- validator_client/validator_store/src/lib.rs | 28 --- 2 files changed, 113 insertions(+), 141 deletions(-) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index ef1782e224b..058ba934bf1 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -693,6 +693,119 @@ impl LighthouseValidatorStore { Ok(safe_attestations) } + + /// Signs an `AggregateAndProof` for a given validator. + /// + /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be + /// modified by actors other than the signing validator. + async fn produce_signed_aggregate_and_proof( + &self, + validator_pubkey: PublicKeyBytes, + aggregator_index: u64, + aggregate: Attestation, + selection_proof: SelectionProof, + ) -> Result, Error> { + let signing_epoch = aggregate.data().target.epoch; + let signing_context = self.signing_context(Domain::AggregateAndProof, signing_epoch); + + let message = + AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof); + + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signature = signing_method + .get_signature::>( + SignableMessage::SignedAggregateAndProof(message.to_ref()), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_AGGREGATES_TOTAL, + &[validator_metrics::SUCCESS], + ); + + Ok(SignedAggregateAndProof::from_aggregate_and_proof( + message, signature, + )) + } + + async fn produce_sync_committee_signature( + &self, + slot: Slot, + beacon_block_root: Hash256, + validator_index: u64, + validator_pubkey: &PublicKeyBytes, + ) -> Result { + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch); + + // Bypass `with_validator_signing_method`: sync committee messages are not slashable. + let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::SyncCommitteeSignature { + beacon_block_root, + slot, + }, + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL, + &[validator_metrics::SUCCESS], + ); + + Ok(SyncCommitteeMessage { + slot, + beacon_block_root, + validator_index, + signature, + }) + } + + async fn produce_signed_contribution_and_proof( + &self, + aggregator_index: u64, + aggregator_pubkey: PublicKeyBytes, + contribution: SyncCommitteeContribution, + selection_proof: SyncSelectionProof, + ) -> Result, Error> { + let signing_epoch = contribution.slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch); + + // Bypass `with_validator_signing_method`: sync committee messages are not slashable. + let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?; + + let message = ContributionAndProof { + aggregator_index, + contribution, + selection_proof: selection_proof.into(), + }; + + let signature = signing_method + .get_signature::>( + SignableMessage::SignedContributionAndProof(&message), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL, + &[validator_metrics::SUCCESS], + ); + + Ok(SignedContributionAndProof { message, signature }) + } } impl ValidatorStore for LighthouseValidatorStore { @@ -987,43 +1100,6 @@ impl ValidatorStore for LighthouseValidatorS }) } - /// Signs an `AggregateAndProof` for a given validator. - /// - /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be - /// modified by actors other than the signing validator. - async fn produce_signed_aggregate_and_proof( - &self, - validator_pubkey: PublicKeyBytes, - aggregator_index: u64, - aggregate: Attestation, - selection_proof: SelectionProof, - ) -> Result, Error> { - let signing_epoch = aggregate.data().target.epoch; - let signing_context = self.signing_context(Domain::AggregateAndProof, signing_epoch); - - let message = - AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof); - - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; - let signature = signing_method - .get_signature::>( - SignableMessage::SignedAggregateAndProof(message.to_ref()), - signing_context, - &self.spec, - &self.task_executor, - ) - .await?; - - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_AGGREGATES_TOTAL, - &[validator_metrics::SUCCESS], - ); - - Ok(SignedAggregateAndProof::from_aggregate_and_proof( - message, signature, - )) - } - /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// `validator_pubkey`. async fn produce_selection_proof( @@ -1098,82 +1174,6 @@ impl ValidatorStore for LighthouseValidatorS Ok(signature.into()) } - async fn produce_sync_committee_signature( - &self, - slot: Slot, - beacon_block_root: Hash256, - validator_index: u64, - validator_pubkey: &PublicKeyBytes, - ) -> Result { - let signing_epoch = slot.epoch(E::slots_per_epoch()); - let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch); - - // Bypass `with_validator_signing_method`: sync committee messages are not slashable. - let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; - - let signature = signing_method - .get_signature::>( - SignableMessage::SyncCommitteeSignature { - beacon_block_root, - slot, - }, - signing_context, - &self.spec, - &self.task_executor, - ) - .await - .map_err(Error::SpecificError)?; - - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL, - &[validator_metrics::SUCCESS], - ); - - Ok(SyncCommitteeMessage { - slot, - beacon_block_root, - validator_index, - signature, - }) - } - - async fn produce_signed_contribution_and_proof( - &self, - aggregator_index: u64, - aggregator_pubkey: PublicKeyBytes, - contribution: SyncCommitteeContribution, - selection_proof: SyncSelectionProof, - ) -> Result, Error> { - let signing_epoch = contribution.slot.epoch(E::slots_per_epoch()); - let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch); - - // Bypass `with_validator_signing_method`: sync committee messages are not slashable. - let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?; - - let message = ContributionAndProof { - aggregator_index, - contribution, - selection_proof: selection_proof.into(), - }; - - let signature = signing_method - .get_signature::>( - SignableMessage::SignedContributionAndProof(&message), - signing_context, - &self.spec, - &self.task_executor, - ) - .await - .map_err(Error::SpecificError)?; - - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL, - &[validator_metrics::SUCCESS], - ); - - Ok(SignedContributionAndProof { message, signature }) - } - fn sign_aggregate_and_proofs( self: &Arc, aggregates: Vec<(PublicKeyBytes, u64, Attestation, SelectionProof)>, diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 955418fdd49..170672fd661 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -132,18 +132,6 @@ pub trait ValidatorStore: Send + Sync { validator_registration_data: ValidatorRegistrationData, ) -> impl Future>> + Send; - /// Signs an `AggregateAndProof` for a given validator. - /// - /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be - /// modified by actors other than the signing validator. - fn produce_signed_aggregate_and_proof( - &self, - validator_pubkey: PublicKeyBytes, - aggregator_index: u64, - aggregate: Attestation, - selection_proof: SelectionProof, - ) -> impl Future, Error>> + Send; - /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// `validator_pubkey`. fn produce_selection_proof( @@ -160,22 +148,6 @@ pub trait ValidatorStore: Send + Sync { subnet_id: SyncSubnetId, ) -> impl Future>> + Send; - fn produce_sync_committee_signature( - &self, - slot: Slot, - beacon_block_root: Hash256, - validator_index: u64, - validator_pubkey: &PublicKeyBytes, - ) -> impl Future>> + Send; - - fn produce_signed_contribution_and_proof( - &self, - aggregator_index: u64, - aggregator_pubkey: PublicKeyBytes, - contribution: SyncCommitteeContribution, - selection_proof: SyncSelectionProof, - ) -> impl Future, Error>> + Send; - /// Sign a batch of aggregate and proofs and return results as a stream of batches. /// /// For standard operation, yields a single batch. For DVT, may yield multiple batches From 5b4dd7b8d8b95a741913566ac4c8f8f162aa0fd4 Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 07:50:04 -0800 Subject: [PATCH 04/11] refactor: sign_* error handling --- .../lighthouse_validator_store/src/lib.rs | 151 +++++++++--------- 1 file changed, 77 insertions(+), 74 deletions(-) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index 058ba934bf1..e6f5e6c825e 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -1185,37 +1185,38 @@ impl ValidatorStore for LighthouseValidatorS |(pubkey, aggregator_index, aggregate, selection_proof)| { let store = store.clone(); async move { - match store + let result = store .produce_signed_aggregate_and_proof( pubkey, aggregator_index, aggregate, selection_proof, ) - .await - { - Ok(signed) => Some(signed), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!(?pubkey, "Missing pubkey for aggregate"); - None - } - Err(e) => { - crit!(error = ?e, ?pubkey, "Failed to sign aggregate"); - None - } - } + .await; + (pubkey, result) } }, ); - Ok(join_all(signing_futures) + let results = join_all(signing_futures) .instrument(info_span!("sign_aggregates", count)) - .await - .into_iter() - .flatten() - .collect::>()) + .await; + + let mut signed = Vec::with_capacity(results.len()); + for (pubkey, result) in results { + match result { + Ok(agg) => signed.push(agg), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!(?pubkey, "Missing pubkey for aggregate"); + } + Err(e) => { + crit!(error = ?e, pubkey = ?pubkey, "Failed to sign aggregate"); + } + } + } + Ok(signed) }) } @@ -1230,47 +1231,48 @@ impl ValidatorStore for LighthouseValidatorS |(slot, beacon_block_root, validator_index, pubkey)| { let store = store.clone(); async move { - match store + let result = store .produce_sync_committee_signature( slot, beacon_block_root, validator_index, &pubkey, ) - .await - { - Ok(sig) => Some(sig), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!( - ?pubkey, - validator_index, - %slot, - "Missing pubkey for sync committee signature" - ); - None - } - Err(e) => { - crit!( - validator_index, - %slot, - error = ?e, - "Failed to sign sync committee signature" - ); - None - } - } + .await; + (pubkey, validator_index, slot, result) } }, ); - Ok(join_all(signing_futures) + let results = join_all(signing_futures) .instrument(info_span!("sign_sync_signatures", count)) - .await - .into_iter() - .flatten() - .collect::>()) + .await; + + let mut signed = Vec::with_capacity(results.len()); + for (_pubkey, validator_index, slot, result) in results { + match result { + Ok(sig) => signed.push(sig), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!( + ?pubkey, + validator_index, + %slot, + "Missing pubkey for sync committee signature" + ); + } + Err(e) => { + crit!( + validator_index, + %slot, + error = ?e, + "Failed to sign sync committee signature" + ); + } + } + } + Ok(signed) }) } @@ -1286,41 +1288,42 @@ impl ValidatorStore for LighthouseValidatorS let store = store.clone(); let slot = contribution.slot; async move { - match store + let result = store .produce_signed_contribution_and_proof( aggregator_index, aggregator_pubkey, contribution, selection_proof, ) - .await - { - Ok(signed) => Some(signed), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!(?pubkey, %slot, "Missing pubkey for sync contribution"); - None - } - Err(e) => { - crit!( - %slot, - error = ?e, - "Unable to sign sync committee contribution" - ); - None - } - } + .await; + (slot, result) } }, ); - Ok(join_all(signing_futures) + let results = join_all(signing_futures) .instrument(info_span!("sign_sync_contributions", count)) - .await - .into_iter() - .flatten() - .collect::>()) + .await; + + let mut signed = Vec::with_capacity(results.len()); + for (slot, result) in results { + match result { + Ok(contribution) => signed.push(contribution), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!(?pubkey, %slot, "Missing pubkey for sync contribution"); + } + Err(e) => { + crit!( + %slot, + error = ?e, + "Unable to sign sync committee contribution" + ); + } + } + } + Ok(signed) }) } From 376670c442fa80c618cd95bb045c2c6e4334b87e Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 08:23:19 -0800 Subject: [PATCH 05/11] chore: publish sync votes even if empty --- .../validator_services/src/sync_committee_service.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index b71f8fffa0e..b6842850810 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -263,7 +263,7 @@ impl SyncCommitteeService { + Ok(batch) => { self.publish_sync_signature_batch(&batch, slot, beacon_block_root) .await?; } @@ -271,7 +271,6 @@ impl SyncCommitteeService {} } } @@ -304,9 +303,9 @@ impl SyncCommitteeService SyncCommitteeService { + Ok(batch) => { self.publish_sync_contribution_batch( &batch, slot, beacon_block_root, subnet_id, contribution.aggregation_bits.num_set_bits(), @@ -407,7 +406,6 @@ impl SyncCommitteeService {} } } From b57203c0dbfae83775336d3062b3d07dd5acd9fc Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 09:29:35 -0800 Subject: [PATCH 06/11] fix: vc signing tests --- testing/web3signer_tests/src/lib.rs | 39 +++++------ .../http_api/src/tests/keystores.rs | 42 ++++++----- .../lighthouse_validator_store/src/lib.rs | 70 ++++++++++--------- .../src/attestation_service.rs | 8 +-- .../src/sync_committee_service.rs | 15 +++- 5 files changed, 93 insertions(+), 81 deletions(-) diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 4b9432b67b3..b9e3dae37d9 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -25,6 +25,7 @@ mod tests { use eth2_keystore::KeystoreBuilder; use eth2_network_config::Eth2NetworkConfig; use fixed_bytes::FixedBytesExtended; + use futures::StreamExt; use initialized_validators::{ InitializedValidators, load_pem_certificate, load_pkcs12_identity, }; @@ -654,13 +655,9 @@ mod tests { .await .assert_signatures_match("attestation", |pubkey, validator_store| async move { let attestation = get_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await - .unwrap() - .pop() - .unwrap() - .1 + let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + tokio::pin!(stream); + stream.next().await.unwrap().unwrap().pop().unwrap().1 }) .await .assert_signatures_match("signed_aggregate", |pubkey, validator_store| async move { @@ -879,22 +876,18 @@ mod tests { .await .assert_signatures_match("first_attestation", |pubkey, validator_store| async move { let attestation = first_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await - .unwrap() - .pop() - .unwrap() - .1 + let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + tokio::pin!(stream); + stream.next().await.unwrap().unwrap().pop().unwrap().1 }) .await .assert_slashable_attestation_should_sign( "double_vote_attestation", move |pubkey, validator_store| async move { let attestation = double_vote_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await + let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + tokio::pin!(stream); + stream.next().await.unwrap() }, slashable_message_should_sign, ) @@ -903,9 +896,9 @@ mod tests { "surrounding_attestation", move |pubkey, validator_store| async move { let attestation = surrounding_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await + let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + tokio::pin!(stream); + stream.next().await.unwrap() }, slashable_message_should_sign, ) @@ -914,9 +907,9 @@ mod tests { "surrounded_attestation", move |pubkey, validator_store| async move { let attestation = surrounded_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await + let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + tokio::pin!(stream); + stream.next().await.unwrap() }, slashable_message_should_sign, ) diff --git a/validator_client/http_api/src/tests/keystores.rs b/validator_client/http_api/src/tests/keystores.rs index 601b2f16662..a06081fdf41 100644 --- a/validator_client/http_api/src/tests/keystores.rs +++ b/validator_client/http_api/src/tests/keystores.rs @@ -9,6 +9,7 @@ use eth2::lighthouse_vc::{ types::Web3SignerValidatorRequest, }; use fixed_bytes::FixedBytesExtended; +use futures::StreamExt; use itertools::Itertools; use lighthouse_validator_store::DEFAULT_GAS_LIMIT; use rand::rngs::StdRng; @@ -1101,11 +1102,14 @@ async fn generic_migration_test( // Sign attestations on VC1. for (validator_index, attestation) in first_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let safe_attestations = tester1 - .validator_store - .sign_attestations(vec![(0, public_key, 0, attestation.clone())]) - .await - .unwrap(); + let stream = tester1.validator_store.sign_attestations(vec![( + 0, + public_key, + 0, + attestation.clone(), + )]); + tokio::pin!(stream); + let safe_attestations = stream.next().await.unwrap().unwrap(); assert_eq!(safe_attestations.len(), 1); // Compare data only, ignoring signatures which are added during signing. assert_eq!(safe_attestations[0].1.data(), attestation.data()); @@ -1184,10 +1188,14 @@ async fn generic_migration_test( // Sign attestations on the second VC. for (validator_index, attestation, should_succeed) in second_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let result = tester2 - .validator_store - .sign_attestations(vec![(0, public_key, 0, attestation.clone())]) - .await; + let stream = tester2.validator_store.sign_attestations(vec![( + 0, + public_key, + 0, + attestation.clone(), + )]); + tokio::pin!(stream); + let result = stream.next().await.unwrap(); match result { Ok(safe_attestations) => { if should_succeed { @@ -1331,14 +1339,14 @@ async fn delete_concurrent_with_signing() { for j in 0..num_attestations { let att = make_attestation(j, j + 1); for (validator_index, public_key) in thread_pubkeys.iter().enumerate() { - let _ = validator_store - .sign_attestations(vec![( - validator_index as u64, - *public_key, - 0, - att.clone(), - )]) - .await; + let stream = validator_store.sign_attestations(vec![( + validator_index as u64, + *public_key, + 0, + att.clone(), + )]); + tokio::pin!(stream); + let _ = stream.next().await; } } }); diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index e6f5e6c825e..e2eb30b82ab 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -2,8 +2,8 @@ use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition} use bls::{PublicKeyBytes, Signature}; use doppelganger_service::DoppelgangerService; use eth2::types::PublishBlockRequest; -use futures::future::join_all; use futures::Stream; +use futures::future::join_all; use futures::stream; use initialized_validators::InitializedValidators; use logging::crit; @@ -698,7 +698,7 @@ impl LighthouseValidatorStore { /// /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be /// modified by actors other than the signing validator. - async fn produce_signed_aggregate_and_proof( + pub async fn produce_signed_aggregate_and_proof( &self, validator_pubkey: PublicKeyBytes, aggregator_index: u64, @@ -731,7 +731,7 @@ impl LighthouseValidatorStore { )) } - async fn produce_sync_committee_signature( + pub async fn produce_sync_committee_signature( &self, slot: Slot, beacon_block_root: Hash256, @@ -770,7 +770,7 @@ impl LighthouseValidatorStore { }) } - async fn produce_signed_contribution_and_proof( + pub async fn produce_signed_contribution_and_proof( &self, aggregator_index: u64, aggregator_pubkey: PublicKeyBytes, @@ -1004,22 +1004,22 @@ impl ValidatorStore for LighthouseValidatorS let store = self.clone(); stream::once(async move { // Sign all attestations concurrently. - let signing_futures = - attestations - .iter_mut() - .map(|(_, pubkey, validator_committee_index, attestation)| { - let pubkey = *pubkey; - let validator_committee_index = *validator_committee_index; - let store = store.clone(); - async move { - store.sign_attestation_no_slashing_protection( + let signing_futures = attestations.iter_mut().map( + |(_, pubkey, validator_committee_index, attestation)| { + let pubkey = *pubkey; + let validator_committee_index = *validator_committee_index; + let store = store.clone(); + async move { + store + .sign_attestation_no_slashing_protection( pubkey, validator_committee_index, attestation, ) .await - } - }); + } + }, + ); // Execute all signing in parallel. let results: Vec<_> = join_all(signing_futures).await; @@ -1227,22 +1227,23 @@ impl ValidatorStore for LighthouseValidatorS let store = self.clone(); let count = messages.len(); stream::once(async move { - let signing_futures = messages.into_iter().map( - |(slot, beacon_block_root, validator_index, pubkey)| { - let store = store.clone(); - async move { - let result = store - .produce_sync_committee_signature( - slot, - beacon_block_root, - validator_index, - &pubkey, - ) - .await; - (pubkey, validator_index, slot, result) - } - }, - ); + let signing_futures = + messages + .into_iter() + .map(|(slot, beacon_block_root, validator_index, pubkey)| { + let store = store.clone(); + async move { + let result = store + .produce_sync_committee_signature( + slot, + beacon_block_root, + validator_index, + &pubkey, + ) + .await; + (pubkey, validator_index, slot, result) + } + }); let results = join_all(signing_futures) .instrument(info_span!("sign_sync_signatures", count)) @@ -1278,7 +1279,12 @@ impl ValidatorStore for LighthouseValidatorS fn sign_sync_committee_contributions( self: &Arc, - contributions: Vec<(u64, PublicKeyBytes, SyncCommitteeContribution, SyncSelectionProof)>, + contributions: Vec<( + u64, + PublicKeyBytes, + SyncCommitteeContribution, + SyncSelectionProof, + )>, ) -> impl Stream>, Error>> + Send { let store = self.clone(); let count = contributions.len(); diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 3df3bcffc2b..5a5e6db7df8 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -575,9 +575,7 @@ impl AttestationService AttestationService SyncCommitteeService = subnet_aggregators .into_iter() .map(|(aggregator_index, aggregator_pk, selection_proof)| { - (aggregator_index, aggregator_pk, contribution.clone(), selection_proof) + ( + aggregator_index, + aggregator_pk, + contribution.clone(), + selection_proof, + ) }) .collect(); @@ -398,9 +403,13 @@ impl SyncCommitteeService { self.publish_sync_contribution_batch( - &batch, slot, beacon_block_root, subnet_id, + &batch, + slot, + beacon_block_root, + subnet_id, contribution.aggregation_bits.num_set_bits(), - ).await?; + ) + .await?; } Err(e) => { crit!(%slot, error = ?e, "Failed to sign sync committee contributions"); From de46abd61832bda5332c6b3921325c37ccd543e8 Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 10:07:51 -0800 Subject: [PATCH 07/11] refactor: signing tuple structs to structs --- testing/web3signer_tests/src/lib.rs | 37 ++++++-- .../http_api/src/tests/keystores.rs | 41 +++++---- .../lighthouse_validator_store/src/lib.rs | 85 +++++++++++-------- .../src/attestation_service.rs | 24 +++--- .../src/sync_committee_service.rs | 21 +++-- validator_client/validator_store/src/lib.rs | 56 +++++++----- 6 files changed, 166 insertions(+), 98 deletions(-) diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index b9e3dae37d9..1f36f8d4ceb 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -51,7 +51,7 @@ mod tests { use types::{attestation::AttestationBase, *}; use url::Url; use validator_store::{ - Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore, + AttestationToSign, Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore, }; /// If the we are unable to reach the Web3Signer HTTP API within this time out then we will @@ -655,7 +655,12 @@ mod tests { .await .assert_signatures_match("attestation", |pubkey, validator_store| async move { let attestation = get_attestation(); - let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); tokio::pin!(stream); stream.next().await.unwrap().unwrap().pop().unwrap().1 }) @@ -876,7 +881,12 @@ mod tests { .await .assert_signatures_match("first_attestation", |pubkey, validator_store| async move { let attestation = first_attestation(); - let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); tokio::pin!(stream); stream.next().await.unwrap().unwrap().pop().unwrap().1 }) @@ -885,7 +895,12 @@ mod tests { "double_vote_attestation", move |pubkey, validator_store| async move { let attestation = double_vote_attestation(); - let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); tokio::pin!(stream); stream.next().await.unwrap() }, @@ -896,7 +911,12 @@ mod tests { "surrounding_attestation", move |pubkey, validator_store| async move { let attestation = surrounding_attestation(); - let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); tokio::pin!(stream); stream.next().await.unwrap() }, @@ -907,7 +927,12 @@ mod tests { "surrounded_attestation", move |pubkey, validator_store| async move { let attestation = surrounded_attestation(); - let stream = validator_store.sign_attestations(vec![(0, pubkey, 0, attestation)]); + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); tokio::pin!(stream); stream.next().await.unwrap() }, diff --git a/validator_client/http_api/src/tests/keystores.rs b/validator_client/http_api/src/tests/keystores.rs index a06081fdf41..eb35075526e 100644 --- a/validator_client/http_api/src/tests/keystores.rs +++ b/validator_client/http_api/src/tests/keystores.rs @@ -20,6 +20,7 @@ use std::{collections::HashMap, path::Path}; use tokio::runtime::Handle; use typenum::Unsigned; use types::{Address, attestation::AttestationBase}; +use validator_store::AttestationToSign; use validator_store::ValidatorStore; use zeroize::Zeroizing; @@ -1102,12 +1103,14 @@ async fn generic_migration_test( // Sign attestations on VC1. for (validator_index, attestation) in first_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let stream = tester1.validator_store.sign_attestations(vec![( - 0, - public_key, - 0, - attestation.clone(), - )]); + let stream = tester1 + .validator_store + .sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey: public_key, + validator_committee_index: 0, + attestation: attestation.clone(), + }]); tokio::pin!(stream); let safe_attestations = stream.next().await.unwrap().unwrap(); assert_eq!(safe_attestations.len(), 1); @@ -1188,12 +1191,14 @@ async fn generic_migration_test( // Sign attestations on the second VC. for (validator_index, attestation, should_succeed) in second_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let stream = tester2.validator_store.sign_attestations(vec![( - 0, - public_key, - 0, - attestation.clone(), - )]); + let stream = tester2 + .validator_store + .sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey: public_key, + validator_committee_index: 0, + attestation: attestation.clone(), + }]); tokio::pin!(stream); let result = stream.next().await.unwrap(); match result { @@ -1339,12 +1344,12 @@ async fn delete_concurrent_with_signing() { for j in 0..num_attestations { let att = make_attestation(j, j + 1); for (validator_index, public_key) in thread_pubkeys.iter().enumerate() { - let stream = validator_store.sign_attestations(vec![( - validator_index as u64, - *public_key, - 0, - att.clone(), - )]); + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: validator_index as u64, + pubkey: *public_key, + validator_committee_index: 0, + attestation: att.clone(), + }]); tokio::pin!(stream); let _ = stream.next().await; } diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index e2eb30b82ab..d8ddf85bec4 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -30,7 +30,8 @@ use types::{ ValidatorRegistrationData, VoluntaryExit, graffiti::GraffitiString, }; use validator_store::{ - DoppelgangerStatus, Error as ValidatorStoreError, ProposalData, SignedBlock, UnsignedBlock, + AggregateToSign, AttestationToSign, ContributionToSign, DoppelgangerStatus, + Error as ValidatorStoreError, ProposalData, SignedBlock, SyncMessageToSign, UnsignedBlock, ValidatorStore, }; @@ -999,13 +1000,18 @@ impl ValidatorStore for LighthouseValidatorS fn sign_attestations( self: &Arc, - mut attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, + mut attestations: Vec>, ) -> impl Stream)>, Error>> + Send { let store = self.clone(); stream::once(async move { // Sign all attestations concurrently. let signing_futures = attestations.iter_mut().map( - |(_, pubkey, validator_committee_index, attestation)| { + |AttestationToSign { + pubkey, + validator_committee_index, + attestation, + .. + }| { let pubkey = *pubkey; let validator_committee_index = *validator_committee_index; let store = store.clone(); @@ -1026,12 +1032,14 @@ impl ValidatorStore for LighthouseValidatorS // Collect successfully signed attestations and log errors. let mut signed_attestations = Vec::with_capacity(attestations.len()); - for (result, (validator_index, pubkey, _, attestation)) in - results.into_iter().zip(attestations.into_iter()) - { + for (result, att) in results.into_iter().zip(attestations.into_iter()) { match result { Ok(()) => { - signed_attestations.push((validator_index, attestation, pubkey)); + signed_attestations.push(( + att.validator_index, + att.attestation, + att.pubkey, + )); } Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { warn!( @@ -1176,13 +1184,18 @@ impl ValidatorStore for LighthouseValidatorS fn sign_aggregate_and_proofs( self: &Arc, - aggregates: Vec<(PublicKeyBytes, u64, Attestation, SelectionProof)>, + aggregates: Vec>, ) -> impl Stream>, Error>> + Send { let store = self.clone(); let count = aggregates.len(); stream::once(async move { let signing_futures = aggregates.into_iter().map( - |(pubkey, aggregator_index, aggregate, selection_proof)| { + |AggregateToSign { + pubkey, + aggregator_index, + aggregate, + selection_proof, + }| { let store = store.clone(); async move { let result = store @@ -1222,28 +1235,32 @@ impl ValidatorStore for LighthouseValidatorS fn sign_sync_committee_signatures( self: &Arc, - messages: Vec<(Slot, Hash256, u64, PublicKeyBytes)>, + messages: Vec, ) -> impl Stream, Error>> + Send { let store = self.clone(); let count = messages.len(); stream::once(async move { - let signing_futures = - messages - .into_iter() - .map(|(slot, beacon_block_root, validator_index, pubkey)| { - let store = store.clone(); - async move { - let result = store - .produce_sync_committee_signature( - slot, - beacon_block_root, - validator_index, - &pubkey, - ) - .await; - (pubkey, validator_index, slot, result) - } - }); + let signing_futures = messages.into_iter().map( + |SyncMessageToSign { + slot, + beacon_block_root, + validator_index, + pubkey, + }| { + let store = store.clone(); + async move { + let result = store + .produce_sync_committee_signature( + slot, + beacon_block_root, + validator_index, + &pubkey, + ) + .await; + (pubkey, validator_index, slot, result) + } + }, + ); let results = join_all(signing_futures) .instrument(info_span!("sign_sync_signatures", count)) @@ -1279,18 +1296,18 @@ impl ValidatorStore for LighthouseValidatorS fn sign_sync_committee_contributions( self: &Arc, - contributions: Vec<( - u64, - PublicKeyBytes, - SyncCommitteeContribution, - SyncSelectionProof, - )>, + contributions: Vec>, ) -> impl Stream>, Error>> + Send { let store = self.clone(); let count = contributions.len(); stream::once(async move { let signing_futures = contributions.into_iter().map( - |(aggregator_index, aggregator_pubkey, contribution, selection_proof)| { + |ContributionToSign { + aggregator_index, + aggregator_pubkey, + contribution, + selection_proof, + }| { let store = store.clone(); let slot = contribution.slot; async move { diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 5a5e6db7df8..92b9e5619e2 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -15,7 +15,7 @@ use tree_hash::TreeHash; use types::{ Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, ForkName, Hash256, Slot, }; -use validator_store::ValidatorStore; +use validator_store::{AggregateToSign, AttestationToSign, ValidatorStore}; /// Builds an `AttestationService`. #[derive(Default)] @@ -562,12 +562,12 @@ impl AttestationService AttestationService SyncCommitteeService Result<(), ()> { let messages_to_sign: Vec<_> = validator_duties .iter() - .map(|duty| (slot, beacon_block_root, duty.validator_index, duty.pubkey)) + .map(|duty| SyncMessageToSign { + slot, + beacon_block_root, + validator_index: duty.validator_index, + pubkey: duty.pubkey, + }) .collect(); if messages_to_sign.is_empty() { @@ -380,14 +385,14 @@ impl SyncCommitteeService = subnet_aggregators .into_iter() - .map(|(aggregator_index, aggregator_pk, selection_proof)| { - ( + .map( + |(aggregator_index, aggregator_pk, selection_proof)| ContributionToSign { aggregator_index, - aggregator_pk, - contribution.clone(), + aggregator_pubkey: aggregator_pk, + contribution: contribution.clone(), selection_proof, - ) - }) + }, + ) .collect(); if contributions_to_sign.is_empty() { diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 170672fd661..d429b3ac86d 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -33,6 +33,38 @@ impl From for Error { } } +/// Input for batch attestation signing +pub struct AttestationToSign { + pub validator_index: u64, + pub pubkey: PublicKeyBytes, + pub validator_committee_index: usize, + pub attestation: Attestation, +} + +/// Input for batch aggregate signing +pub struct AggregateToSign { + pub pubkey: PublicKeyBytes, + pub aggregator_index: u64, + pub aggregate: Attestation, + pub selection_proof: SelectionProof, +} + +/// Input for batch sync committee message signing +pub struct SyncMessageToSign { + pub slot: Slot, + pub beacon_block_root: Hash256, + pub validator_index: u64, + pub pubkey: PublicKeyBytes, +} + +/// Input for batch sync committee contribution signing +pub struct ContributionToSign { + pub aggregator_index: u64, + pub aggregator_pubkey: PublicKeyBytes, + pub contribution: SyncCommitteeContribution, + pub selection_proof: SyncSelectionProof, +} + /// A helper struct, used for passing data from the validator store to services. pub struct ProposalData { pub validator_index: Option, @@ -113,18 +145,10 @@ pub trait ValidatorStore: Send + Sync { /// For standard (non-distributed) operation, the stream yields a single batch containing /// all attestations. For DVT/distributed operation, the stream may yield multiple batches /// (e.g., one per committee) allowing incremental publishing as each committee completes. - /// - /// Input: - /// - /// * Vec of (validator_index, pubkey, validator_committee_index, attestation). - /// - /// Output (per batch): - /// - /// * Vec of (validator_index, signed_attestation). #[allow(clippy::type_complexity)] fn sign_attestations( self: &Arc, - attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, + attestations: Vec>, ) -> impl Stream)>, Error>> + Send; fn sign_validator_registration_data( @@ -152,35 +176,27 @@ pub trait ValidatorStore: Send + Sync { /// /// For standard operation, yields a single batch. For DVT, may yield multiple batches /// (e.g., one per committee) for incremental publishing. - #[allow(clippy::type_complexity)] fn sign_aggregate_and_proofs( self: &Arc, - aggregates: Vec<(PublicKeyBytes, u64, Attestation, SelectionProof)>, + aggregates: Vec>, ) -> impl Stream>, Error>> + Send; /// Sign a batch of sync committee messages and return results as a stream of batches. /// /// For standard operation, yields a single batch. For DVT, may yield multiple batches /// for incremental publishing. - #[allow(clippy::type_complexity)] fn sign_sync_committee_signatures( self: &Arc, - messages: Vec<(Slot, Hash256, u64, PublicKeyBytes)>, + messages: Vec, ) -> impl Stream, Error>> + Send; /// Sign a batch of sync committee contributions and return results as a stream of batches. /// /// For standard operation, yields a single batch. For DVT, may yield multiple batches /// for incremental publishing. - #[allow(clippy::type_complexity)] fn sign_sync_committee_contributions( self: &Arc, - contributions: Vec<( - u64, - PublicKeyBytes, - SyncCommitteeContribution, - SyncSelectionProof, - )>, + contributions: Vec>, ) -> impl Stream>, Error>> + Send; /// Prune the slashing protection database so that it remains performant. From 3a7c3e4bdf2d61b43a12f45173baed892f8666d6 Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 12:09:05 -0800 Subject: [PATCH 08/11] chore: cleanup pr --- .../lighthouse_validator_store/src/lib.rs | 16 +- .../src/attestation_service.rs | 273 +++++++++--------- .../src/sync_committee_service.rs | 152 ++++------ 3 files changed, 195 insertions(+), 246 deletions(-) diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index d8ddf85bec4..e8c1cfbc43c 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -2,9 +2,7 @@ use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition} use bls::{PublicKeyBytes, Signature}; use doppelganger_service::DoppelgangerService; use eth2::types::PublishBlockRequest; -use futures::Stream; -use futures::future::join_all; -use futures::stream; +use futures::{Stream, future::join_all, stream}; use initialized_validators::InitializedValidators; use logging::crit; use parking_lot::{Mutex, RwLock}; @@ -1064,18 +1062,16 @@ impl ValidatorStore for LighthouseValidatorS // Check slashing protection and insert into database. Use a dedicated blocking // thread to avoid clogging the async executor with blocking database I/O. let validator_store = store.clone(); - let handle = store + let safe_attestations = store .task_executor .spawn_blocking_handle( move || validator_store.slashing_protect_attestations(signed_attestations), "slashing_protect_attestations", ) - .ok_or(Error::ExecutorError)?; - - match handle.await { - Ok(result) => result, - Err(_) => Err(Error::ExecutorError), - } + .ok_or(Error::ExecutorError)? + .await + .map_err(|_| Error::ExecutorError)??; + Ok(safe_attestations) }) } diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 92b9e5619e2..db2ad95eff1 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -13,7 +13,7 @@ use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use tree_hash::TreeHash; use types::{ - Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, ForkName, Hash256, Slot, + Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Hash256, Slot, }; use validator_store::{AggregateToSign, AttestationToSign, ValidatorStore}; @@ -588,8 +588,73 @@ impl AttestationService { published_any = true; - self.publish_attestation_batch(&batch, fork_name, &attestation_data, slot) - .await; + + let single_attestations = batch + .iter() + .filter_map(|(attester_index, attestation)| { + match attestation.to_single_attestation_with_attester_index(*attester_index) { + Ok(single_attestation) => Some(single_attestation), + Err(e) => { + // This shouldn't happen unless BN and VC are out of sync with + // respect to the Electra fork. + error!( + error = ?e, + committee_index = attestation_data.index, + slot = slot.as_u64(), + "type" = "unaggregated", + "Unable to convert to SingleAttestation" + ); + None + } + } + }) + .collect::>(); + let single_attestations = &single_attestations; + let validator_indices = single_attestations + .iter() + .map(|att| att.attester_index) + .collect::>(); + let published_count = single_attestations.len(); + + // Post the attestations to the BN. + match self + .beacon_nodes + .request(ApiTopic::Attestations, |beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_POST], + ); + + beacon_node + .post_beacon_pool_attestations_v2::( + single_attestations.clone(), + fork_name, + ) + .await + }) + .instrument(info_span!( + "publish_attestations", + count = published_count + )) + .await + { + Ok(()) => info!( + count = published_count, + validator_indices = ?validator_indices, + head_block = ?attestation_data.beacon_block_root, + committee_index = attestation_data.index, + slot = attestation_data.slot.as_u64(), + "type" = "unaggregated", + "Successfully published attestations" + ), + Err(e) => error!( + error = %e, + committee_index = attestation_data.index, + slot = slot.as_u64(), + "type" = "unaggregated", + "Unable to publish attestations" + ), + } } Err(e) => { return Err(format!("Failed to sign attestations: {e:?}")); @@ -605,78 +670,6 @@ impl AttestationService)], - fork_name: ForkName, - attestation_data: &AttestationData, - slot: Slot, - ) { - let single_attestations = safe_attestations - .iter() - .filter_map(|(i, a)| { - match a.to_single_attestation_with_attester_index(*i) { - Ok(a) => Some(a), - Err(e) => { - // This shouldn't happen unless BN and VC are out of sync with - // respect to the Electra fork. - error!( - error = ?e, - committee_index = attestation_data.index, - slot = slot.as_u64(), - "type" = "unaggregated", - "Unable to convert to SingleAttestation" - ); - None - } - } - }) - .collect::>(); - let single_attestations = &single_attestations; - let validator_indices = single_attestations - .iter() - .map(|att| att.attester_index) - .collect::>(); - let published_count = single_attestations.len(); - - // Post the attestations to the BN. - match self - .beacon_nodes - .request(ApiTopic::Attestations, |beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS_HTTP_POST], - ); - - beacon_node - .post_beacon_pool_attestations_v2::( - single_attestations.clone(), - fork_name, - ) - .await - }) - .instrument(info_span!("publish_attestations", count = published_count)) - .await - { - Ok(()) => info!( - count = published_count, - validator_indices = ?validator_indices, - head_block = ?attestation_data.beacon_block_root, - committee_index = attestation_data.index, - slot = attestation_data.slot.as_u64(), - "type" = "unaggregated", - "Successfully published attestations" - ), - Err(e) => error!( - error = %e, - committee_index = attestation_data.index, - slot = slot.as_u64(), - "type" = "unaggregated", - "Unable to publish attestations" - ), - } - } - /// Performs the second step of the attesting process: downloading an aggregated `Attestation`, /// converting it into a `SignedAggregateAndProof` and returning it to the BN. /// @@ -768,10 +761,6 @@ impl AttestationService AttestationService { - self.publish_aggregate_batch(&batch, fork_name).await; + let signed_aggregate_and_proofs = batch.as_slice(); + match self + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES_HTTP_POST], + ); + if fork_name.electra_enabled() { + beacon_node + .post_validator_aggregate_and_proof_v2( + signed_aggregate_and_proofs, + fork_name, + ) + .await + } else { + beacon_node + .post_validator_aggregate_and_proof_v1( + signed_aggregate_and_proofs, + ) + .await + } + }) + .instrument(info_span!( + "publish_aggregates", + count = signed_aggregate_and_proofs.len() + )) + .await + { + Ok(()) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = + signed_aggregate_and_proof.message().aggregate(); + info!( + aggregator = signed_aggregate_and_proof + .message() + .aggregator_index(), + signatures = attestation.num_set_aggregation_bits(), + head_block = + format!("{:?}", attestation.data().beacon_block_root), + committee_index = attestation.committee_index(), + slot = attestation.data().slot.as_u64(), + "type" = "aggregated", + "Successfully published attestation" + ); + } + } + Err(e) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = + &signed_aggregate_and_proof.message().aggregate(); + crit!( + error = %e, + aggregator = signed_aggregate_and_proof + .message() + .aggregator_index(), + committee_index = attestation.committee_index(), + slot = attestation.data().slot.as_u64(), + "type" = "aggregated", + "Failed to publish attestation" + ); + } + } + } } Err(e) => { return Err(format!("Failed to sign aggregates: {e:?}")); @@ -794,67 +846,6 @@ impl AttestationService], - fork_name: ForkName, - ) { - match self - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES_HTTP_POST], - ); - if fork_name.electra_enabled() { - beacon_node - .post_validator_aggregate_and_proof_v2( - signed_aggregate_and_proofs, - fork_name, - ) - .await - } else { - beacon_node - .post_validator_aggregate_and_proof_v1(signed_aggregate_and_proofs) - .await - } - }) - .instrument(info_span!( - "publish_aggregates", - count = signed_aggregate_and_proofs.len() - )) - .await - { - Ok(()) => { - for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = signed_aggregate_and_proof.message().aggregate(); - info!( - aggregator = signed_aggregate_and_proof.message().aggregator_index(), - signatures = attestation.num_set_aggregation_bits(), - head_block = format!("{:?}", attestation.data().beacon_block_root), - committee_index = attestation.committee_index(), - slot = attestation.data().slot.as_u64(), - "type" = "aggregated", - "Successfully published attestation" - ); - } - } - Err(e) => { - for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = &signed_aggregate_and_proof.message().aggregate(); - crit!( - error = %e, - aggregator = signed_aggregate_and_proof.message().aggregator_index(), - committee_index = attestation.committee_index(), - slot = attestation.data().slot.as_u64(), - "type" = "aggregated", - "Failed to publish attestation" - ); - } - } - } - } - /// Spawn a blocking task to run the slashing protection pruning process. /// /// Start the task at `pruning_instant` to avoid interference with other tasks. diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index c45baa98667..3ca9de11757 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -14,7 +14,7 @@ use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; use types::{ - ChainSpec, EthSpec, Hash256, Slot, SyncCommitteeMessage, SyncCommitteeSubscription, + ChainSpec, EthSpec, Hash256, Slot, SyncCommitteeSubscription, SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId, }; use validator_store::{ContributionToSign, SyncMessageToSign, ValidatorStore}; @@ -257,10 +257,6 @@ impl SyncCommitteeService SyncCommitteeService { - self.publish_sync_signature_batch(&batch, slot, beacon_block_root) - .await?; + Ok(committee_signatures) => { + let committee_signatures = &committee_signatures; + self.beacon_nodes + .request(ApiTopic::SyncCommittee, |beacon_node| async move { + beacon_node + .post_beacon_pool_sync_committee_signatures(committee_signatures) + .await + }) + .instrument(info_span!( + "publish_sync_signatures", + count = committee_signatures.len() + )) + .await + .map_err(|e| { + error!( + %slot, + error = %e, + "Unable to publish sync committee messages" + ); + })?; + + info!( + count = committee_signatures.len(), + head_block = ?beacon_block_root, + %slot, + "Successfully published sync committee messages" + ); } Err(e) => { crit!(%slot, error = ?e, "Failed to sign sync committee signatures"); @@ -282,41 +302,6 @@ impl SyncCommitteeService Result<(), ()> { - self.beacon_nodes - .request(ApiTopic::SyncCommittee, |beacon_node| async move { - beacon_node - .post_beacon_pool_sync_committee_signatures(committee_signatures) - .await - }) - .instrument(info_span!( - "publish_sync_signatures", - count = committee_signatures.len() - )) - .await - .map_err(|e| { - error!( - %slot, - error = %e, - "Unable to publish sync committee messages" - ); - })?; - - info!( - count = committee_signatures.len(), - head_block = ?beacon_block_root, - %slot, - "Successfully published sync committee messages" - ); - - Ok(()) - } - async fn publish_sync_committee_aggregates( &self, slot: Slot, @@ -395,10 +380,6 @@ impl SyncCommitteeService SyncCommitteeService { - self.publish_sync_contribution_batch( - &batch, - slot, - beacon_block_root, - subnet_id, - contribution.aggregation_bits.num_set_bits(), - ) - .await?; + Ok(signed_contributions) => { + let signed_contributions = &signed_contributions; + // Publish to the beacon node. + self.beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .post_validator_contribution_and_proofs(signed_contributions) + .await + }) + .instrument(info_span!( + "publish_sync_contributions", + count = signed_contributions.len() + )) + .await + .map_err(|e| { + error!( + %slot, + error = %e, + "Unable to publish signed contributions and proofs" + ); + })?; + + info!( + subnet = %subnet_id, + beacon_block_root = %beacon_block_root, + num_signers = contribution.aggregation_bits.num_set_bits(), + %slot, + "Successfully published sync contributions" + ); } Err(e) => { crit!(%slot, error = ?e, "Failed to sign sync committee contributions"); @@ -426,45 +427,6 @@ impl SyncCommitteeService], - slot: Slot, - beacon_block_root: Hash256, - subnet_id: SyncSubnetId, - num_signers: usize, - ) -> Result<(), ()> { - // Publish to the beacon node. - self.beacon_nodes - .first_success(|beacon_node| async move { - beacon_node - .post_validator_contribution_and_proofs(signed_contributions) - .await - }) - .instrument(info_span!( - "publish_sync_contributions", - count = signed_contributions.len() - )) - .await - .map_err(|e| { - error!( - %slot, - error = %e, - "Unable to publish signed contributions and proofs" - ); - })?; - - info!( - subnet = %subnet_id, - beacon_block_root = %beacon_block_root, - num_signers = num_signers, - %slot, - "Successfully published sync contributions" - ); - - Ok(()) - } - fn spawn_subscription_tasks(&self) { let service = self.clone(); From 9cc2a3ab0b71614e8c1adfda6a7ff13f40f0d175 Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 12:48:39 -0800 Subject: [PATCH 09/11] chore: remove comments --- .../src/attestation_service.rs | 24 +++++++------------ .../src/sync_committee_service.rs | 4 ++-- validator_client/validator_store/src/lib.rs | 17 ++++--------- 3 files changed, 16 insertions(+), 29 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index db2ad95eff1..59fc8e3582e 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -12,9 +12,7 @@ use tokio::sync::mpsc; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use tree_hash::TreeHash; -use types::{ - Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Hash256, Slot, -}; +use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Hash256, Slot}; use validator_store::{AggregateToSign, AttestationToSign, ValidatorStore}; /// Builds an `AttestationService`. @@ -592,7 +590,9 @@ impl AttestationService Some(single_attestation), Err(e) => { // This shouldn't happen unless BN and VC are out of sync with @@ -632,10 +632,7 @@ impl AttestationService info!( @@ -802,12 +799,10 @@ impl AttestationService { for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = - signed_aggregate_and_proof.message().aggregate(); + let attestation = signed_aggregate_and_proof.message().aggregate(); info!( - aggregator = signed_aggregate_and_proof - .message() - .aggregator_index(), + aggregator = + signed_aggregate_and_proof.message().aggregator_index(), signatures = attestation.num_set_aggregation_bits(), head_block = format!("{:?}", attestation.data().beacon_block_root), @@ -820,8 +815,7 @@ impl AttestationService { for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = - &signed_aggregate_and_proof.message().aggregate(); + let attestation = &signed_aggregate_and_proof.message().aggregate(); crit!( error = %e, aggregator = signed_aggregate_and_proof diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index 3ca9de11757..60df5682cef 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -14,8 +14,8 @@ use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; use types::{ - ChainSpec, EthSpec, Hash256, Slot, SyncCommitteeSubscription, - SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId, + ChainSpec, EthSpec, Hash256, Slot, SyncCommitteeSubscription, SyncContributionData, SyncDuty, + SyncSelectionProof, SyncSubnetId, }; use validator_store::{ContributionToSign, SyncMessageToSign, ValidatorStore}; diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index d429b3ac86d..f70e5b97f03 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -141,10 +141,12 @@ pub trait ValidatorStore: Send + Sync { /// /// Returns a stream of batches of successfully signed attestations. Each batch contains /// attestations that passed slashing protection, along with the validator index of the signer. + /// Eventually this will be replaced by `SingleAttestation` use. + /// + /// Output: + /// + /// * Vec of (validator_index, signed_attestation). /// - /// For standard (non-distributed) operation, the stream yields a single batch containing - /// all attestations. For DVT/distributed operation, the stream may yield multiple batches - /// (e.g., one per committee) allowing incremental publishing as each committee completes. #[allow(clippy::type_complexity)] fn sign_attestations( self: &Arc, @@ -173,27 +175,18 @@ pub trait ValidatorStore: Send + Sync { ) -> impl Future>> + Send; /// Sign a batch of aggregate and proofs and return results as a stream of batches. - /// - /// For standard operation, yields a single batch. For DVT, may yield multiple batches - /// (e.g., one per committee) for incremental publishing. fn sign_aggregate_and_proofs( self: &Arc, aggregates: Vec>, ) -> impl Stream>, Error>> + Send; /// Sign a batch of sync committee messages and return results as a stream of batches. - /// - /// For standard operation, yields a single batch. For DVT, may yield multiple batches - /// for incremental publishing. fn sign_sync_committee_signatures( self: &Arc, messages: Vec, ) -> impl Stream, Error>> + Send; /// Sign a batch of sync committee contributions and return results as a stream of batches. - /// - /// For standard operation, yields a single batch. For DVT, may yield multiple batches - /// for incremental publishing. fn sign_sync_committee_contributions( self: &Arc, contributions: Vec>, From c681151c6ec959a2f5e06e41cd79d90e74aa397b Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 13:25:27 -0800 Subject: [PATCH 10/11] chore: handle stream errors --- .../src/attestation_service.rs | 16 ++++++++++++++-- .../src/sync_committee_service.rs | 14 ++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 59fc8e3582e..941021b7953 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -582,6 +582,7 @@ impl AttestationService { @@ -654,12 +655,17 @@ impl AttestationService { - return Err(format!("Failed to sign attestations: {e:?}")); + crit!(error = ?e, "Failed to sign attestations"); + stream_error = Some(format!("Failed to sign attestations: {e:?}")); } _ => {} } } + if let Some(e) = stream_error { + return Err(e); + } + if !published_any { warn!("No attestations were published"); } @@ -765,6 +771,7 @@ impl AttestationService { @@ -831,12 +838,17 @@ impl AttestationService { - return Err(format!("Failed to sign aggregates: {e:?}")); + crit!(error = ?e, "Failed to sign aggregates"); + stream_error = Some(format!("Failed to sign aggregates: {e:?}")); } _ => {} } } + if let Some(e) = stream_error { + return Err(e); + } + Ok(()) } diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index 60df5682cef..bd01226d9eb 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -262,6 +262,7 @@ impl SyncCommitteeService { @@ -294,11 +295,15 @@ impl SyncCommitteeService { crit!(%slot, error = ?e, "Failed to sign sync committee signatures"); - return Err(()); + stream_error = true; } } } + if stream_error { + return Err(()); + } + Ok(()) } @@ -385,6 +390,7 @@ impl SyncCommitteeService { @@ -419,11 +425,15 @@ impl SyncCommitteeService { crit!(%slot, error = ?e, "Failed to sign sync committee contributions"); - return Err(()); + stream_error = true; } } } + if stream_error { + return Err(()); + } + Ok(()) } From a0a2a950a7921baffde6e041bc4e5822c0aab766 Mon Sep 17 00:00:00 2001 From: shane-moore Date: Fri, 20 Feb 2026 13:42:16 -0800 Subject: [PATCH 11/11] chore: handle stream errors with logging --- .../validator_services/src/attestation_service.rs | 12 ------------ .../validator_services/src/sync_committee_service.rs | 12 ------------ 2 files changed, 24 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 941021b7953..ae3f7844141 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -582,7 +582,6 @@ impl AttestationService { @@ -656,16 +655,11 @@ impl AttestationService { crit!(error = ?e, "Failed to sign attestations"); - stream_error = Some(format!("Failed to sign attestations: {e:?}")); } _ => {} } } - if let Some(e) = stream_error { - return Err(e); - } - if !published_any { warn!("No attestations were published"); } @@ -771,7 +765,6 @@ impl AttestationService { @@ -839,16 +832,11 @@ impl AttestationService { crit!(error = ?e, "Failed to sign aggregates"); - stream_error = Some(format!("Failed to sign aggregates: {e:?}")); } _ => {} } } - if let Some(e) = stream_error { - return Err(e); - } - Ok(()) } diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index bd01226d9eb..e05514d0dde 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -262,7 +262,6 @@ impl SyncCommitteeService { @@ -295,15 +294,10 @@ impl SyncCommitteeService { crit!(%slot, error = ?e, "Failed to sign sync committee signatures"); - stream_error = true; } } } - if stream_error { - return Err(()); - } - Ok(()) } @@ -390,7 +384,6 @@ impl SyncCommitteeService { @@ -425,15 +418,10 @@ impl SyncCommitteeService { crit!(%slot, error = ?e, "Failed to sign sync committee contributions"); - stream_error = true; } } } - if stream_error { - return Err(()); - } - Ok(()) }