diff --git a/Cargo.lock b/Cargo.lock index 5a8e76a8a8d..0465c2cddb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9708,6 +9708,7 @@ version = "0.1.0" dependencies = [ "bls", "eth2", + "futures", "slashing_protection", "types", ] diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 4b9432b67b3..1f36f8d4ceb 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -25,6 +25,7 @@ mod tests { use eth2_keystore::KeystoreBuilder; use eth2_network_config::Eth2NetworkConfig; use fixed_bytes::FixedBytesExtended; + use futures::StreamExt; use initialized_validators::{ InitializedValidators, load_pem_certificate, load_pkcs12_identity, }; @@ -50,7 +51,7 @@ mod tests { use types::{attestation::AttestationBase, *}; use url::Url; use validator_store::{ - Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore, + AttestationToSign, Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore, }; /// If the we are unable to reach the Web3Signer HTTP API within this time out then we will @@ -654,13 +655,14 @@ mod tests { .await .assert_signatures_match("attestation", |pubkey, validator_store| async move { let attestation = get_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await - .unwrap() - .pop() - .unwrap() - .1 + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); + tokio::pin!(stream); + stream.next().await.unwrap().unwrap().pop().unwrap().1 }) .await .assert_signatures_match("signed_aggregate", |pubkey, validator_store| async move { @@ -879,22 +881,28 @@ mod tests { .await .assert_signatures_match("first_attestation", |pubkey, validator_store| async move { let attestation = first_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await - .unwrap() - .pop() - .unwrap() - .1 + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); + tokio::pin!(stream); + stream.next().await.unwrap().unwrap().pop().unwrap().1 }) .await .assert_slashable_attestation_should_sign( "double_vote_attestation", move |pubkey, validator_store| async move { let attestation = double_vote_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); + tokio::pin!(stream); + stream.next().await.unwrap() }, slashable_message_should_sign, ) @@ -903,9 +911,14 @@ mod tests { "surrounding_attestation", move |pubkey, validator_store| async move { let attestation = surrounding_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); + tokio::pin!(stream); + stream.next().await.unwrap() }, slashable_message_should_sign, ) @@ -914,9 +927,14 @@ mod tests { "surrounded_attestation", move |pubkey, validator_store| async move { let attestation = surrounded_attestation(); - validator_store - .sign_attestations(vec![(0, pubkey, 0, attestation)]) - .await + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey, + validator_committee_index: 0, + attestation, + }]); + tokio::pin!(stream); + stream.next().await.unwrap() }, slashable_message_should_sign, ) diff --git a/validator_client/http_api/src/tests/keystores.rs b/validator_client/http_api/src/tests/keystores.rs index 601b2f16662..eb35075526e 100644 --- a/validator_client/http_api/src/tests/keystores.rs +++ b/validator_client/http_api/src/tests/keystores.rs @@ -9,6 +9,7 @@ use eth2::lighthouse_vc::{ types::Web3SignerValidatorRequest, }; use fixed_bytes::FixedBytesExtended; +use futures::StreamExt; use itertools::Itertools; use lighthouse_validator_store::DEFAULT_GAS_LIMIT; use rand::rngs::StdRng; @@ -19,6 +20,7 @@ use std::{collections::HashMap, path::Path}; use tokio::runtime::Handle; use typenum::Unsigned; use types::{Address, attestation::AttestationBase}; +use validator_store::AttestationToSign; use validator_store::ValidatorStore; use zeroize::Zeroizing; @@ -1101,11 +1103,16 @@ async fn generic_migration_test( // Sign attestations on VC1. for (validator_index, attestation) in first_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let safe_attestations = tester1 + let stream = tester1 .validator_store - .sign_attestations(vec![(0, public_key, 0, attestation.clone())]) - .await - .unwrap(); + .sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey: public_key, + validator_committee_index: 0, + attestation: attestation.clone(), + }]); + tokio::pin!(stream); + let safe_attestations = stream.next().await.unwrap().unwrap(); assert_eq!(safe_attestations.len(), 1); // Compare data only, ignoring signatures which are added during signing. assert_eq!(safe_attestations[0].1.data(), attestation.data()); @@ -1184,10 +1191,16 @@ async fn generic_migration_test( // Sign attestations on the second VC. for (validator_index, attestation, should_succeed) in second_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let result = tester2 + let stream = tester2 .validator_store - .sign_attestations(vec![(0, public_key, 0, attestation.clone())]) - .await; + .sign_attestations(vec![AttestationToSign { + validator_index: 0, + pubkey: public_key, + validator_committee_index: 0, + attestation: attestation.clone(), + }]); + tokio::pin!(stream); + let result = stream.next().await.unwrap(); match result { Ok(safe_attestations) => { if should_succeed { @@ -1331,14 +1344,14 @@ async fn delete_concurrent_with_signing() { for j in 0..num_attestations { let att = make_attestation(j, j + 1); for (validator_index, public_key) in thread_pubkeys.iter().enumerate() { - let _ = validator_store - .sign_attestations(vec![( - validator_index as u64, - *public_key, - 0, - att.clone(), - )]) - .await; + let stream = validator_store.sign_attestations(vec![AttestationToSign { + validator_index: validator_index as u64, + pubkey: *public_key, + validator_committee_index: 0, + attestation: att.clone(), + }]); + tokio::pin!(stream); + let _ = stream.next().await; } } }); diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index 7806482ffb3..e8c1cfbc43c 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -2,7 +2,7 @@ use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition} use bls::{PublicKeyBytes, Signature}; use doppelganger_service::DoppelgangerService; use eth2::types::PublishBlockRequest; -use futures::future::join_all; +use futures::{Stream, future::join_all, stream}; use initialized_validators::InitializedValidators; use logging::crit; use parking_lot::{Mutex, RwLock}; @@ -17,7 +17,7 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use task_executor::TaskExecutor; -use tracing::{error, info, instrument, warn}; +use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, ExecutionPayloadEnvelope, Fork, @@ -28,7 +28,8 @@ use types::{ ValidatorRegistrationData, VoluntaryExit, graffiti::GraffitiString, }; use validator_store::{ - DoppelgangerStatus, Error as ValidatorStoreError, ProposalData, SignedBlock, UnsignedBlock, + AggregateToSign, AttestationToSign, ContributionToSign, DoppelgangerStatus, + Error as ValidatorStoreError, ProposalData, SignedBlock, SyncMessageToSign, UnsignedBlock, ValidatorStore, }; @@ -691,6 +692,119 @@ impl LighthouseValidatorStore { Ok(safe_attestations) } + + /// Signs an `AggregateAndProof` for a given validator. + /// + /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be + /// modified by actors other than the signing validator. + pub async fn produce_signed_aggregate_and_proof( + &self, + validator_pubkey: PublicKeyBytes, + aggregator_index: u64, + aggregate: Attestation, + selection_proof: SelectionProof, + ) -> Result, Error> { + let signing_epoch = aggregate.data().target.epoch; + let signing_context = self.signing_context(Domain::AggregateAndProof, signing_epoch); + + let message = + AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof); + + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signature = signing_method + .get_signature::>( + SignableMessage::SignedAggregateAndProof(message.to_ref()), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_AGGREGATES_TOTAL, + &[validator_metrics::SUCCESS], + ); + + Ok(SignedAggregateAndProof::from_aggregate_and_proof( + message, signature, + )) + } + + pub async fn produce_sync_committee_signature( + &self, + slot: Slot, + beacon_block_root: Hash256, + validator_index: u64, + validator_pubkey: &PublicKeyBytes, + ) -> Result { + let signing_epoch = slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch); + + // Bypass `with_validator_signing_method`: sync committee messages are not slashable. + let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; + + let signature = signing_method + .get_signature::>( + SignableMessage::SyncCommitteeSignature { + beacon_block_root, + slot, + }, + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL, + &[validator_metrics::SUCCESS], + ); + + Ok(SyncCommitteeMessage { + slot, + beacon_block_root, + validator_index, + signature, + }) + } + + pub async fn produce_signed_contribution_and_proof( + &self, + aggregator_index: u64, + aggregator_pubkey: PublicKeyBytes, + contribution: SyncCommitteeContribution, + selection_proof: SyncSelectionProof, + ) -> Result, Error> { + let signing_epoch = contribution.slot.epoch(E::slots_per_epoch()); + let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch); + + // Bypass `with_validator_signing_method`: sync committee messages are not slashable. + let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?; + + let message = ContributionAndProof { + aggregator_index, + contribution, + selection_proof: selection_proof.into(), + }; + + let signature = signing_method + .get_signature::>( + SignableMessage::SignedContributionAndProof(&message), + signing_context, + &self.spec, + &self.task_executor, + ) + .await + .map_err(Error::SpecificError)?; + + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL, + &[validator_metrics::SUCCESS], + ); + + Ok(SignedContributionAndProof { message, signature }) + } } impl ValidatorStore for LighthouseValidatorStore { @@ -882,72 +996,83 @@ impl ValidatorStore for LighthouseValidatorS } } - async fn sign_attestations( + fn sign_attestations( self: &Arc, - mut attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, - ) -> Result)>, Error> { - // Sign all attestations concurrently. - let signing_futures = - attestations - .iter_mut() - .map(|(_, pubkey, validator_committee_index, attestation)| { + mut attestations: Vec>, + ) -> impl Stream)>, Error>> + Send { + let store = self.clone(); + stream::once(async move { + // Sign all attestations concurrently. + let signing_futures = attestations.iter_mut().map( + |AttestationToSign { + pubkey, + validator_committee_index, + attestation, + .. + }| { let pubkey = *pubkey; let validator_committee_index = *validator_committee_index; + let store = store.clone(); async move { - self.sign_attestation_no_slashing_protection( - pubkey, - validator_committee_index, - attestation, - ) - .await + store + .sign_attestation_no_slashing_protection( + pubkey, + validator_committee_index, + attestation, + ) + .await } - }); - - // Execute all signing in parallel. - let results: Vec<_> = join_all(signing_futures).await; + }, + ); - // Collect successfully signed attestations and log errors. - let mut signed_attestations = Vec::with_capacity(attestations.len()); - for (result, (validator_index, pubkey, _, attestation)) in - results.into_iter().zip(attestations.into_iter()) - { - match result { - Ok(()) => { - signed_attestations.push((validator_index, attestation, pubkey)); - } - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for attestation" - ); - } - Err(e) => { - crit!( - error = ?e, - "Failed to sign attestation" - ); + // Execute all signing in parallel. + let results: Vec<_> = join_all(signing_futures).await; + + // Collect successfully signed attestations and log errors. + let mut signed_attestations = Vec::with_capacity(attestations.len()); + for (result, att) in results.into_iter().zip(attestations.into_iter()) { + match result { + Ok(()) => { + signed_attestations.push(( + att.validator_index, + att.attestation, + att.pubkey, + )); + } + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for attestation" + ); + } + Err(e) => { + crit!( + error = ?e, + "Failed to sign attestation" + ); + } } } - } - if signed_attestations.is_empty() { - return Ok(vec![]); - } + if signed_attestations.is_empty() { + return Ok(vec![]); + } - // Check slashing protection and insert into database. Use a dedicated blocking thread - // to avoid clogging the async executor with blocking database I/O. - let validator_store = self.clone(); - let safe_attestations = self - .task_executor - .spawn_blocking_handle( - move || validator_store.slashing_protect_attestations(signed_attestations), - "slashing_protect_attestations", - ) - .ok_or(Error::ExecutorError)? - .await - .map_err(|_| Error::ExecutorError)??; - Ok(safe_attestations) + // Check slashing protection and insert into database. Use a dedicated blocking + // thread to avoid clogging the async executor with blocking database I/O. + let validator_store = store.clone(); + let safe_attestations = store + .task_executor + .spawn_blocking_handle( + move || validator_store.slashing_protect_attestations(signed_attestations), + "slashing_protect_attestations", + ) + .ok_or(Error::ExecutorError)? + .await + .map_err(|_| Error::ExecutorError)??; + Ok(safe_attestations) + }) } async fn sign_validator_registration_data( @@ -979,43 +1104,6 @@ impl ValidatorStore for LighthouseValidatorS }) } - /// Signs an `AggregateAndProof` for a given validator. - /// - /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be - /// modified by actors other than the signing validator. - async fn produce_signed_aggregate_and_proof( - &self, - validator_pubkey: PublicKeyBytes, - aggregator_index: u64, - aggregate: Attestation, - selection_proof: SelectionProof, - ) -> Result, Error> { - let signing_epoch = aggregate.data().target.epoch; - let signing_context = self.signing_context(Domain::AggregateAndProof, signing_epoch); - - let message = - AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof); - - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; - let signature = signing_method - .get_signature::>( - SignableMessage::SignedAggregateAndProof(message.to_ref()), - signing_context, - &self.spec, - &self.task_executor, - ) - .await?; - - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_AGGREGATES_TOTAL, - &[validator_metrics::SUCCESS], - ); - - Ok(SignedAggregateAndProof::from_aggregate_and_proof( - message, signature, - )) - } - /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// `validator_pubkey`. async fn produce_selection_proof( @@ -1090,80 +1178,172 @@ impl ValidatorStore for LighthouseValidatorS Ok(signature.into()) } - async fn produce_sync_committee_signature( - &self, - slot: Slot, - beacon_block_root: Hash256, - validator_index: u64, - validator_pubkey: &PublicKeyBytes, - ) -> Result { - let signing_epoch = slot.epoch(E::slots_per_epoch()); - let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch); - - // Bypass `with_validator_signing_method`: sync committee messages are not slashable. - let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; - - let signature = signing_method - .get_signature::>( - SignableMessage::SyncCommitteeSignature { - beacon_block_root, - slot, + fn sign_aggregate_and_proofs( + self: &Arc, + aggregates: Vec>, + ) -> impl Stream>, Error>> + Send { + let store = self.clone(); + let count = aggregates.len(); + stream::once(async move { + let signing_futures = aggregates.into_iter().map( + |AggregateToSign { + pubkey, + aggregator_index, + aggregate, + selection_proof, + }| { + let store = store.clone(); + async move { + let result = store + .produce_signed_aggregate_and_proof( + pubkey, + aggregator_index, + aggregate, + selection_proof, + ) + .await; + (pubkey, result) + } }, - signing_context, - &self.spec, - &self.task_executor, - ) - .await - .map_err(Error::SpecificError)?; - - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_SYNC_COMMITTEE_MESSAGES_TOTAL, - &[validator_metrics::SUCCESS], - ); + ); - Ok(SyncCommitteeMessage { - slot, - beacon_block_root, - validator_index, - signature, + let results = join_all(signing_futures) + .instrument(info_span!("sign_aggregates", count)) + .await; + + let mut signed = Vec::with_capacity(results.len()); + for (pubkey, result) in results { + match result { + Ok(agg) => signed.push(agg), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!(?pubkey, "Missing pubkey for aggregate"); + } + Err(e) => { + crit!(error = ?e, pubkey = ?pubkey, "Failed to sign aggregate"); + } + } + } + Ok(signed) }) } - async fn produce_signed_contribution_and_proof( - &self, - aggregator_index: u64, - aggregator_pubkey: PublicKeyBytes, - contribution: SyncCommitteeContribution, - selection_proof: SyncSelectionProof, - ) -> Result, Error> { - let signing_epoch = contribution.slot.epoch(E::slots_per_epoch()); - let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch); - - // Bypass `with_validator_signing_method`: sync committee messages are not slashable. - let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?; - - let message = ContributionAndProof { - aggregator_index, - contribution, - selection_proof: selection_proof.into(), - }; + fn sign_sync_committee_signatures( + self: &Arc, + messages: Vec, + ) -> impl Stream, Error>> + Send { + let store = self.clone(); + let count = messages.len(); + stream::once(async move { + let signing_futures = messages.into_iter().map( + |SyncMessageToSign { + slot, + beacon_block_root, + validator_index, + pubkey, + }| { + let store = store.clone(); + async move { + let result = store + .produce_sync_committee_signature( + slot, + beacon_block_root, + validator_index, + &pubkey, + ) + .await; + (pubkey, validator_index, slot, result) + } + }, + ); - let signature = signing_method - .get_signature::>( - SignableMessage::SignedContributionAndProof(&message), - signing_context, - &self.spec, - &self.task_executor, - ) - .await - .map_err(Error::SpecificError)?; + let results = join_all(signing_futures) + .instrument(info_span!("sign_sync_signatures", count)) + .await; + + let mut signed = Vec::with_capacity(results.len()); + for (_pubkey, validator_index, slot, result) in results { + match result { + Ok(sig) => signed.push(sig), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!( + ?pubkey, + validator_index, + %slot, + "Missing pubkey for sync committee signature" + ); + } + Err(e) => { + crit!( + validator_index, + %slot, + error = ?e, + "Failed to sign sync committee signature" + ); + } + } + } + Ok(signed) + }) + } - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_SYNC_COMMITTEE_CONTRIBUTIONS_TOTAL, - &[validator_metrics::SUCCESS], - ); + fn sign_sync_committee_contributions( + self: &Arc, + contributions: Vec>, + ) -> impl Stream>, Error>> + Send { + let store = self.clone(); + let count = contributions.len(); + stream::once(async move { + let signing_futures = contributions.into_iter().map( + |ContributionToSign { + aggregator_index, + aggregator_pubkey, + contribution, + selection_proof, + }| { + let store = store.clone(); + let slot = contribution.slot; + async move { + let result = store + .produce_signed_contribution_and_proof( + aggregator_index, + aggregator_pubkey, + contribution, + selection_proof, + ) + .await; + (slot, result) + } + }, + ); - Ok(SignedContributionAndProof { message, signature }) + let results = join_all(signing_futures) + .instrument(info_span!("sign_sync_contributions", count)) + .await; + + let mut signed = Vec::with_capacity(results.len()); + for (slot, result) in results { + match result { + Ok(contribution) => signed.push(contribution), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!(?pubkey, %slot, "Missing pubkey for sync contribution"); + } + Err(e) => { + crit!( + %slot, + error = ?e, + "Unable to sign sync committee contribution" + ); + } + } + } + Ok(signed) + }) } /// Prune the slashing protection database so that it remains performant. diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index a9d52833127..ae3f7844141 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,6 +1,6 @@ use crate::duties_service::{DutiesService, DutyAndProof}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, beacon_head_monitor::HeadEvent}; -use futures::future::join_all; +use futures::StreamExt; use logging::crit; use slot_clock::SlotClock; use std::collections::HashMap; @@ -13,7 +13,7 @@ use tokio::time::{Duration, Instant, sleep, sleep_until}; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use tree_hash::TreeHash; use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Hash256, Slot}; -use validator_store::{Error as ValidatorStoreError, ValidatorStore}; +use validator_store::{AggregateToSign, AttestationToSign, ValidatorStore}; /// Builds an `AttestationService`. #[derive(Default)] @@ -560,12 +560,12 @@ impl AttestationService AttestationService(attestation_data.slot); - let single_attestations = safe_attestations - .iter() - .filter_map(|(i, a)| { - match a.to_single_attestation_with_attester_index(*i) { - Ok(a) => Some(a), - Err(e) => { - // This shouldn't happen unless BN and VC are out of sync with - // respect to the Electra fork. - error!( - error = ?e, + // Publish each batch as it arrives from the stream. + let mut published_any = false; + while let Some(result) = attestation_stream.next().await { + match result { + Ok(batch) if !batch.is_empty() => { + published_any = true; + + let single_attestations = batch + .iter() + .filter_map(|(attester_index, attestation)| { + match attestation + .to_single_attestation_with_attester_index(*attester_index) + { + Ok(single_attestation) => Some(single_attestation), + Err(e) => { + // This shouldn't happen unless BN and VC are out of sync with + // respect to the Electra fork. + error!( + error = ?e, + committee_index = attestation_data.index, + slot = slot.as_u64(), + "type" = "unaggregated", + "Unable to convert to SingleAttestation" + ); + None + } + } + }) + .collect::>(); + let single_attestations = &single_attestations; + let validator_indices = single_attestations + .iter() + .map(|att| att.attester_index) + .collect::>(); + let published_count = single_attestations.len(); + + // Post the attestations to the BN. + match self + .beacon_nodes + .request(ApiTopic::Attestations, |beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_POST], + ); + + beacon_node + .post_beacon_pool_attestations_v2::( + single_attestations.clone(), + fork_name, + ) + .await + }) + .instrument(info_span!("publish_attestations", count = published_count)) + .await + { + Ok(()) => info!( + count = published_count, + validator_indices = ?validator_indices, + head_block = ?attestation_data.beacon_block_root, + committee_index = attestation_data.index, + slot = attestation_data.slot.as_u64(), + "type" = "unaggregated", + "Successfully published attestations" + ), + Err(e) => error!( + error = %e, committee_index = attestation_data.index, slot = slot.as_u64(), "type" = "unaggregated", - "Unable to convert to SingleAttestation" - ); - None + "Unable to publish attestations" + ), } } - }) - .collect::>(); - let single_attestations = &single_attestations; - let validator_indices = single_attestations - .iter() - .map(|att| att.attester_index) - .collect::>(); - let published_count = single_attestations.len(); - - // Post the attestations to the BN. - match self - .beacon_nodes - .request(ApiTopic::Attestations, |beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS_HTTP_POST], - ); + Err(e) => { + crit!(error = ?e, "Failed to sign attestations"); + } + _ => {} + } + } - beacon_node - .post_beacon_pool_attestations_v2::( - single_attestations.clone(), - fork_name, - ) - .await - }) - .instrument(info_span!("publish_attestations", count = published_count)) - .await - { - Ok(()) => info!( - count = published_count, - validator_indices = ?validator_indices, - head_block = ?attestation_data.beacon_block_root, - committee_index = attestation_data.index, - slot = attestation_data.slot.as_u64(), - "type" = "unaggregated", - "Successfully published attestations" - ), - Err(e) => error!( - error = %e, - committee_index = attestation_data.index, - slot = slot.as_u64(), - "type" = "unaggregated", - "Unable to publish attestations" - ), + if !published_any { + warn!("No attestations were published"); } Ok(()) @@ -725,113 +737,103 @@ impl AttestationService(attestation_data, &self.chain_spec) { - crit!("Inconsistent validator duties during signing"); - return None; - } + // Build the batch of aggregates to sign. + let aggregates_to_sign: Vec<_> = validator_duties + .iter() + .filter_map(|duty_and_proof| { + let duty = &duty_and_proof.duty; + let selection_proof = duty_and_proof.selection_proof.as_ref()?; - match self - .validator_store - .produce_signed_aggregate_and_proof( - duty.pubkey, - duty.validator_index, - aggregated_attestation.clone(), - selection_proof.clone(), - ) - .await - { - Ok(aggregate) => Some(aggregate), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!(?pubkey, "Missing pubkey for aggregate"); - None - } - Err(e) => { - crit!( - error = ?e, - pubkey = ?duty.pubkey, - "Failed to sign aggregate" - ); - None + if !duty.match_attestation_data::(attestation_data, &self.chain_spec) { + crit!("Inconsistent validator duties during signing"); + return None; } - } - }); - - // Execute all the futures in parallel, collecting any successful results. - let aggregator_count = validator_duties - .iter() - .filter(|d| d.selection_proof.is_some()) - .count(); - let signed_aggregate_and_proofs = join_all(signing_futures) - .instrument(info_span!("sign_aggregates", count = aggregator_count)) - .await - .into_iter() - .flatten() - .collect::>(); - if !signed_aggregate_and_proofs.is_empty() { - let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); - match self - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES_HTTP_POST], - ); - if fork_name.electra_enabled() { - beacon_node - .post_validator_aggregate_and_proof_v2( - signed_aggregate_and_proofs_slice, - fork_name, - ) - .await - } else { - beacon_node - .post_validator_aggregate_and_proof_v1( - signed_aggregate_and_proofs_slice, - ) - .await - } + Some(AggregateToSign { + pubkey: duty.pubkey, + aggregator_index: duty.validator_index, + aggregate: aggregated_attestation.clone(), + selection_proof: selection_proof.clone(), }) - .instrument(info_span!( - "publish_aggregates", - count = signed_aggregate_and_proofs.len() - )) - .await - { - Ok(()) => { - for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = signed_aggregate_and_proof.message().aggregate(); - info!( - aggregator = signed_aggregate_and_proof.message().aggregator_index(), - signatures = attestation.num_set_aggregation_bits(), - head_block = format!("{:?}", attestation.data().beacon_block_root), - committee_index = attestation.committee_index(), - slot = attestation.data().slot.as_u64(), - "type" = "aggregated", - "Successfully published attestation" - ); + }) + .collect(); + + // Sign aggregates. Returns a stream of batches. + let aggregate_stream = self + .validator_store + .sign_aggregate_and_proofs(aggregates_to_sign); + tokio::pin!(aggregate_stream); + + // Publish each batch as it arrives from the stream. + while let Some(result) = aggregate_stream.next().await { + match result { + Ok(batch) if !batch.is_empty() => { + let signed_aggregate_and_proofs = batch.as_slice(); + match self + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES_HTTP_POST], + ); + if fork_name.electra_enabled() { + beacon_node + .post_validator_aggregate_and_proof_v2( + signed_aggregate_and_proofs, + fork_name, + ) + .await + } else { + beacon_node + .post_validator_aggregate_and_proof_v1( + signed_aggregate_and_proofs, + ) + .await + } + }) + .instrument(info_span!( + "publish_aggregates", + count = signed_aggregate_and_proofs.len() + )) + .await + { + Ok(()) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = signed_aggregate_and_proof.message().aggregate(); + info!( + aggregator = + signed_aggregate_and_proof.message().aggregator_index(), + signatures = attestation.num_set_aggregation_bits(), + head_block = + format!("{:?}", attestation.data().beacon_block_root), + committee_index = attestation.committee_index(), + slot = attestation.data().slot.as_u64(), + "type" = "aggregated", + "Successfully published attestation" + ); + } + } + Err(e) => { + for signed_aggregate_and_proof in signed_aggregate_and_proofs { + let attestation = &signed_aggregate_and_proof.message().aggregate(); + crit!( + error = %e, + aggregator = signed_aggregate_and_proof + .message() + .aggregator_index(), + committee_index = attestation.committee_index(), + slot = attestation.data().slot.as_u64(), + "type" = "aggregated", + "Failed to publish attestation" + ); + } + } } } Err(e) => { - for signed_aggregate_and_proof in signed_aggregate_and_proofs { - let attestation = &signed_aggregate_and_proof.message().aggregate(); - crit!( - error = %e, - aggregator = signed_aggregate_and_proof.message().aggregator_index(), - committee_index = attestation.committee_index(), - slot = attestation.data().slot.as_u64(), - "type" = "aggregated", - "Failed to publish attestation" - ); - } + crit!(error = ?e, "Failed to sign aggregates"); } + _ => {} } } diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index 59e8524a1ae..e05514d0dde 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -2,8 +2,8 @@ use crate::duties_service::DutiesService; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use bls::PublicKeyBytes; use eth2::types::BlockId; +use futures::StreamExt; use futures::future::FutureExt; -use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; use std::collections::HashMap; @@ -17,7 +17,7 @@ use types::{ ChainSpec, EthSpec, Hash256, Slot, SyncCommitteeSubscription, SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId, }; -use validator_store::{Error as ValidatorStoreError, ValidatorStore}; +use validator_store::{ContributionToSign, SyncMessageToSign, ValidatorStore}; pub const SUBSCRIPTION_LOOKAHEAD_EPOCHS: u64 = 4; @@ -247,78 +247,56 @@ impl SyncCommitteeService, ) -> Result<(), ()> { - // Create futures to produce sync committee signatures. - let signature_futures = validator_duties.iter().map(|duty| async move { - match self - .validator_store - .produce_sync_committee_signature( - slot, - beacon_block_root, - duty.validator_index, - &duty.pubkey, - ) - .await - { - Ok(signature) => Some(signature), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!( - ?pubkey, - validator_index = duty.validator_index, + let messages_to_sign: Vec<_> = validator_duties + .iter() + .map(|duty| SyncMessageToSign { + slot, + beacon_block_root, + validator_index: duty.validator_index, + pubkey: duty.pubkey, + }) + .collect(); + + let signature_stream = self + .validator_store + .sign_sync_committee_signatures(messages_to_sign); + tokio::pin!(signature_stream); + + while let Some(result) = signature_stream.next().await { + match result { + Ok(committee_signatures) => { + let committee_signatures = &committee_signatures; + self.beacon_nodes + .request(ApiTopic::SyncCommittee, |beacon_node| async move { + beacon_node + .post_beacon_pool_sync_committee_signatures(committee_signatures) + .await + }) + .instrument(info_span!( + "publish_sync_signatures", + count = committee_signatures.len() + )) + .await + .map_err(|e| { + error!( + %slot, + error = %e, + "Unable to publish sync committee messages" + ); + })?; + + info!( + count = committee_signatures.len(), + head_block = ?beacon_block_root, %slot, - "Missing pubkey for sync committee signature" + "Successfully published sync committee messages" ); - None } Err(e) => { - crit!( - validator_index = duty.validator_index, - %slot, - error = ?e, - "Failed to sign sync committee signature" - ); - None + crit!(%slot, error = ?e, "Failed to sign sync committee signatures"); } } - }); - - // Execute all the futures in parallel, collecting any successful results. - let committee_signatures = &join_all(signature_futures) - .instrument(info_span!( - "sign_sync_signatures", - count = validator_duties.len() - )) - .await - .into_iter() - .flatten() - .collect::>(); - - self.beacon_nodes - .request(ApiTopic::SyncCommittee, |beacon_node| async move { - beacon_node - .post_beacon_pool_sync_committee_signatures(committee_signatures) - .await - }) - .instrument(info_span!( - "publish_sync_signatures", - count = committee_signatures.len() - )) - .await - .map_err(|e| { - error!( - %slot, - error = %e, - "Unable to publish sync committee messages" - ); - })?; - - info!( - count = committee_signatures.len(), - head_block = ?beacon_block_root, - %slot, - "Successfully published sync committee messages" - ); + } Ok(()) } @@ -389,77 +367,60 @@ impl SyncCommitteeService Some(signed_contribution), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!(?pubkey, %slot, "Missing pubkey for sync contribution"); - None - } - Err(e) => { - crit!( - %slot, - error = ?e, - "Unable to sign sync committee contribution" - ); - None - } - } - }, - ); - - // Execute all the futures in parallel, collecting any successful results. - let signed_contributions = &join_all(signature_futures) - .instrument(info_span!( - "sign_sync_contributions", - count = aggregator_count - )) - .await + let contributions_to_sign: Vec<_> = subnet_aggregators .into_iter() - .flatten() - .collect::>(); - - // Publish to the beacon node. - self.beacon_nodes - .first_success(|beacon_node| async move { - beacon_node - .post_validator_contribution_and_proofs(signed_contributions) - .await - }) - .instrument(info_span!( - "publish_sync_contributions", - count = signed_contributions.len() - )) - .await - .map_err(|e| { - error!( - %slot, - error = %e, - "Unable to publish signed contributions and proofs" - ); - })?; - - info!( - subnet = %subnet_id, - beacon_block_root = %beacon_block_root, - num_signers = contribution.aggregation_bits.num_set_bits(), - %slot, - "Successfully published sync contributions" - ); + .map( + |(aggregator_index, aggregator_pk, selection_proof)| ContributionToSign { + aggregator_index, + aggregator_pubkey: aggregator_pk, + contribution: contribution.clone(), + selection_proof, + }, + ) + .collect(); + + let contribution_stream = self + .validator_store + .sign_sync_committee_contributions(contributions_to_sign); + tokio::pin!(contribution_stream); + + while let Some(result) = contribution_stream.next().await { + match result { + Ok(signed_contributions) => { + let signed_contributions = &signed_contributions; + // Publish to the beacon node. + self.beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .post_validator_contribution_and_proofs(signed_contributions) + .await + }) + .instrument(info_span!( + "publish_sync_contributions", + count = signed_contributions.len() + )) + .await + .map_err(|e| { + error!( + %slot, + error = %e, + "Unable to publish signed contributions and proofs" + ); + })?; + + info!( + subnet = %subnet_id, + beacon_block_root = %beacon_block_root, + num_signers = contribution.aggregation_bits.num_set_bits(), + %slot, + "Successfully published sync contributions" + ); + } + Err(e) => { + crit!(%slot, error = ?e, "Failed to sign sync committee contributions"); + } + } + } Ok(()) } diff --git a/validator_client/validator_store/Cargo.toml b/validator_client/validator_store/Cargo.toml index 8b1879c837c..2c6a68d4949 100644 --- a/validator_client/validator_store/Cargo.toml +++ b/validator_client/validator_store/Cargo.toml @@ -7,5 +7,6 @@ authors = ["Sigma Prime "] [dependencies] bls = { workspace = true } eth2 = { workspace = true } +futures = { workspace = true } slashing_protection = { workspace = true } types = { workspace = true } diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 87ab669e8d2..f70e5b97f03 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -1,5 +1,6 @@ use bls::{PublicKeyBytes, Signature}; use eth2::types::{FullBlockContents, PublishBlockRequest}; +use futures::Stream; use slashing_protection::NotSafe; use std::fmt::Debug; use std::future::Future; @@ -32,6 +33,38 @@ impl From for Error { } } +/// Input for batch attestation signing +pub struct AttestationToSign { + pub validator_index: u64, + pub pubkey: PublicKeyBytes, + pub validator_committee_index: usize, + pub attestation: Attestation, +} + +/// Input for batch aggregate signing +pub struct AggregateToSign { + pub pubkey: PublicKeyBytes, + pub aggregator_index: u64, + pub aggregate: Attestation, + pub selection_proof: SelectionProof, +} + +/// Input for batch sync committee message signing +pub struct SyncMessageToSign { + pub slot: Slot, + pub beacon_block_root: Hash256, + pub validator_index: u64, + pub pubkey: PublicKeyBytes, +} + +/// Input for batch sync committee contribution signing +pub struct ContributionToSign { + pub aggregator_index: u64, + pub aggregator_pubkey: PublicKeyBytes, + pub contribution: SyncCommitteeContribution, + pub selection_proof: SyncSelectionProof, +} + /// A helper struct, used for passing data from the validator store to services. pub struct ProposalData { pub validator_index: Option, @@ -106,40 +139,25 @@ pub trait ValidatorStore: Send + Sync { /// Sign a batch of `attestations` and apply slashing protection to them. /// - /// Only successfully signed attestations that pass slashing protection are returned, along with - /// the validator index of the signer. Eventually this will be replaced by `SingleAttestation` - /// use. - /// - /// Input: - /// - /// * Vec of (validator_index, pubkey, validator_committee_index, attestation). + /// Returns a stream of batches of successfully signed attestations. Each batch contains + /// attestations that passed slashing protection, along with the validator index of the signer. + /// Eventually this will be replaced by `SingleAttestation` use. /// /// Output: /// /// * Vec of (validator_index, signed_attestation). + /// #[allow(clippy::type_complexity)] fn sign_attestations( self: &Arc, - attestations: Vec<(u64, PublicKeyBytes, usize, Attestation)>, - ) -> impl Future)>, Error>> + Send; + attestations: Vec>, + ) -> impl Stream)>, Error>> + Send; fn sign_validator_registration_data( &self, validator_registration_data: ValidatorRegistrationData, ) -> impl Future>> + Send; - /// Signs an `AggregateAndProof` for a given validator. - /// - /// The resulting `SignedAggregateAndProof` is sent on the aggregation channel and cannot be - /// modified by actors other than the signing validator. - fn produce_signed_aggregate_and_proof( - &self, - validator_pubkey: PublicKeyBytes, - aggregator_index: u64, - aggregate: Attestation, - selection_proof: SelectionProof, - ) -> impl Future, Error>> + Send; - /// Produces a `SelectionProof` for the `slot`, signed by with corresponding secret key to /// `validator_pubkey`. fn produce_selection_proof( @@ -156,21 +174,23 @@ pub trait ValidatorStore: Send + Sync { subnet_id: SyncSubnetId, ) -> impl Future>> + Send; - fn produce_sync_committee_signature( - &self, - slot: Slot, - beacon_block_root: Hash256, - validator_index: u64, - validator_pubkey: &PublicKeyBytes, - ) -> impl Future>> + Send; + /// Sign a batch of aggregate and proofs and return results as a stream of batches. + fn sign_aggregate_and_proofs( + self: &Arc, + aggregates: Vec>, + ) -> impl Stream>, Error>> + Send; - fn produce_signed_contribution_and_proof( - &self, - aggregator_index: u64, - aggregator_pubkey: PublicKeyBytes, - contribution: SyncCommitteeContribution, - selection_proof: SyncSelectionProof, - ) -> impl Future, Error>> + Send; + /// Sign a batch of sync committee messages and return results as a stream of batches. + fn sign_sync_committee_signatures( + self: &Arc, + messages: Vec, + ) -> impl Stream, Error>> + Send; + + /// Sign a batch of sync committee contributions and return results as a stream of batches. + fn sign_sync_committee_contributions( + self: &Arc, + contributions: Vec>, + ) -> impl Stream>, Error>> + Send; /// Prune the slashing protection database so that it remains performant. ///