-
Notifications
You must be signed in to change notification settings - Fork 28
fix: optimize the signing of attestations #834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Changes from 2 commits
c45ddb4
83bb6ef
36df4ff
6ed73e3
28f148d
ba9b498
6d91e03
fec6d95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,6 +89,14 @@ const SYNC_SELECTION_PROOF_LOG_NAME: &str = "sync selection proof"; | |
| const SYNC_COMMITTEE_SIGNATURE_LOG_NAME: &str = "sync committee signature"; | ||
| const SYNC_COMMITTEE_CONTRIBUTION_LOG_NAME: &str = "sync committee contribution"; | ||
|
|
||
| /// Input data for a single validator's attestation within a committee batch. | ||
| type AttestationData<E> = ( | ||
| u64, // validator_index | ||
| PublicKeyBytes, // pubkey | ||
| usize, // validator_committee_position | ||
| Attestation<E>, // attestation | ||
| ); | ||
|
|
||
| pub struct AnchorValidatorStore<T: SlotClock + 'static, E: EthSpec> { | ||
| database: Arc<NetworkDatabase>, | ||
| decrypted_keys: Mutex<LruCache<[u8; ENCRYPTED_KEY_LENGTH], SecretKey>>, | ||
|
|
@@ -198,6 +206,19 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> { | |
| )) | ||
| } | ||
|
|
||
| /// Get validator metadata by public key. | ||
| fn get_validator_metadata( | ||
petarjuki7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| &self, | ||
| validator_pubkey: PublicKeyBytes, | ||
| ) -> Result<ValidatorMetadata, Error> { | ||
| self.database | ||
| .state() | ||
| .metadata() | ||
| .get_by(&validator_pubkey) | ||
| .ok_or(Error::UnknownPubkey(validator_pubkey)) | ||
| .cloned() | ||
| } | ||
|
|
||
| fn get_domain(&self, epoch: Epoch, domain: Domain) -> Hash256 { | ||
| self.spec.get_domain( | ||
| epoch, | ||
|
|
@@ -1282,46 +1303,44 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> { | |
| .map(|signature| SignedContributionAndProof { message, signature }) | ||
| } | ||
|
|
||
| async fn sign_attestation( | ||
| /// Sign attestations for all validators in a single SSV committee. | ||
| /// | ||
| /// Runs QBFT consensus once for the committee, then collects signatures for each validator. | ||
| async fn sign_committee_attestations( | ||
| &self, | ||
| validator_pubkey: PublicKeyBytes, | ||
| validator_committee_position: usize, | ||
| attestation: &mut Attestation<E>, | ||
| current_epoch: Epoch, | ||
| ) -> Result<(), Error> { | ||
| if !*self.is_synced.borrow() { | ||
| return Err(Error::SpecificError(SpecificError::NotSynced)); | ||
| } | ||
| committee_id: CommitteeId, | ||
| attestations: Vec<AttestationData<E>>, | ||
| ) -> Result<Vec<(u64, Attestation<E>, PublicKeyBytes)>, Error> { | ||
| let slot = attestations[0].3.data().slot; | ||
petarjuki7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let first_att_data = attestations[0].3.data(); | ||
|
|
||
| let (validator, cluster) = self.get_validator_and_cluster(validator_pubkey)?; | ||
| let voting_context_tx = self.get_voting_context(attestation.data().slot).await?; | ||
| // All validators in the same committee share the same cluster | ||
| let (_, cluster) = self.get_validator_and_cluster(attestations[0].1)?; | ||
|
||
|
|
||
| let voting_context_tx = self.get_voting_context(slot).await?; | ||
| let validator_attestation_committees = | ||
| self.get_attesting_validators_in_committee(&voting_context_tx, cluster.committee_id()); | ||
| self.get_attesting_validators_in_committee(&voting_context_tx, committee_id); | ||
|
|
||
| // Run QBFT consensus once for the entire committee | ||
| let timer = metrics::start_timer_vec(&metrics::CONSENSUS_TIMES, &[metrics::BEACON_VOTE]); | ||
| let timeout_mode = TimeoutMode::SlotTime { | ||
| instance_start_time: self.get_instant_in_slot( | ||
| attestation.data().slot, | ||
| Duration::from_secs(self.spec.seconds_per_slot) / 3, | ||
| )?, | ||
| instance_start_time: self | ||
| .get_instant_in_slot(slot, Duration::from_secs(self.spec.seconds_per_slot) / 3)?, | ||
| }; | ||
|
|
||
| let completed = self | ||
| .qbft_manager | ||
| .decide_instance( | ||
| CommitteeInstanceId { | ||
| committee: cluster.committee_id(), | ||
| instance_height: attestation.data().slot.as_usize().into(), | ||
| committee: committee_id, | ||
| instance_height: slot.as_usize().into(), | ||
| }, | ||
| BeaconVote { | ||
| block_root: attestation.data().beacon_block_root, | ||
| source: attestation.data().source, | ||
| target: attestation.data().target, | ||
| block_root: first_att_data.beacon_block_root, | ||
| source: first_att_data.source, | ||
| target: first_att_data.target, | ||
| }, | ||
| self.create_beacon_vote_validator( | ||
| attestation.data().slot, | ||
| validator_attestation_committees, | ||
| ), | ||
| self.create_beacon_vote_validator(slot, validator_attestation_committees), | ||
| timeout_mode, | ||
| &cluster, | ||
| ) | ||
|
|
@@ -1334,42 +1353,49 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> { | |
| Completed::Success(data) => data, | ||
| }; | ||
| let data_hash = data.hash(); | ||
| attestation.data_mut().beacon_block_root = data.block_root; | ||
| attestation.data_mut().source = data.source; | ||
| attestation.data_mut().target = data.target; | ||
|
|
||
| // yay - we agree! let's sign the att we agreed on | ||
| let domain_hash = self.get_domain(current_epoch, Domain::BeaconAttester); | ||
|
|
||
| // Calculate signature count for post-consensus committee collection | ||
| let committee_validator_indices = | ||
| self.get_committee_validator_indices(&cluster.committee_id()); | ||
|
|
||
| // Use `voting_message_count_for_committee` for post-consensus (flat counting) | ||
| // Shared values for all validators in this committee | ||
| let domain_hash = self.get_domain(data.target.epoch, Domain::BeaconAttester); | ||
| let committee_validator_indices = self.get_committee_validator_indices(&committee_id); | ||
| let num_signatures_to_collect = voting_context_tx | ||
| .voting_assignments | ||
| .voting_message_count_for_committee(|idx| committee_validator_indices.contains(idx)); | ||
| let signing_root = first_att_data.signing_root(domain_hash); | ||
|
|
||
| let signing_root = attestation.data().signing_root(domain_hash); | ||
| let signature = self | ||
| .collect_signature( | ||
| PartialSignatureKind::PostConsensus, | ||
| Role::Committee, | ||
| CollectionMode::Committee { | ||
| num_signatures_to_collect, | ||
| base_hash: data_hash, | ||
| }, | ||
| &validator, | ||
| &cluster, | ||
| signing_root, | ||
| attestation.data().slot, | ||
| ) | ||
| .await?; | ||
| attestation | ||
| .add_signature(&signature, validator_committee_position) | ||
| .map_err(Error::UnableToSignAttestation)?; | ||
| // Sign for each validator in the committee | ||
| let mut results = Vec::with_capacity(attestations.len()); | ||
| for (validator_index, pubkey, validator_committee_position, mut attestation) in attestations | ||
| { | ||
| let validator = self.get_validator_metadata(pubkey)?; | ||
petarjuki7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Apply consensus result to this attestation | ||
| attestation.data_mut().beacon_block_root = data.block_root; | ||
| attestation.data_mut().source = data.source; | ||
| attestation.data_mut().target = data.target; | ||
petarjuki7 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Ok(()) | ||
| let signature = self | ||
| .collect_signature( | ||
| PartialSignatureKind::PostConsensus, | ||
| Role::Committee, | ||
| CollectionMode::Committee { | ||
| num_signatures_to_collect, | ||
| base_hash: data_hash, | ||
| }, | ||
| &validator, | ||
| &cluster, | ||
| signing_root, | ||
| slot, | ||
| ) | ||
| .await?; | ||
petarjuki7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| attestation | ||
| .add_signature(&signature, validator_committee_position) | ||
| .map_err(Error::UnableToSignAttestation)?; | ||
petarjuki7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| results.push((validator_index, attestation, pubkey)); | ||
| } | ||
|
|
||
| Ok(results) | ||
| } | ||
|
|
||
| /// Provide slashing protection for attestations, safely updating the slashing protection DB. | ||
|
|
@@ -2561,46 +2587,45 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> { | |
|
|
||
| async fn sign_attestations( | ||
| self: &Arc<Self>, | ||
| mut attestations: Vec<(u64, PublicKeyBytes, usize, Attestation<E>)>, | ||
| attestations: Vec<AttestationData<E>>, | ||
| ) -> Result<Vec<(u64, Attestation<E>)>, Error> { | ||
| if !*self.is_synced.borrow() { | ||
| return Err(Error::SpecificError(SpecificError::NotSynced)); | ||
| } | ||
|
|
||
| let signing_futures = attestations.iter_mut().map( | ||
| |(_, pubkey, validator_committee_index, attestation)| async move { | ||
| self.sign_attestation( | ||
| *pubkey, | ||
| *validator_committee_index, | ||
| attestation, | ||
| attestation.data().target.epoch, | ||
| ) | ||
| .await | ||
| }, | ||
| ); | ||
| // Group attestations by SSV committee | ||
| let mut committee_mapping: HashMap<CommitteeId, Vec<AttestationData<E>>> = HashMap::new(); | ||
|
|
||
| let results = join_all(signing_futures).await; | ||
| for attestation_data in attestations { | ||
| let (_, cluster) = self.get_validator_and_cluster(attestation_data.1)?; | ||
| committee_mapping | ||
| .entry(cluster.committee_id()) | ||
| .or_default() | ||
| .push(attestation_data); | ||
| } | ||
|
|
||
| let mut signed_attestations = Vec::with_capacity(results.len()); | ||
| for (result, (validator_index, pubkey, _, attestation)) in | ||
| results.into_iter().zip(attestations.into_iter()) | ||
| { | ||
| // Process all committees in parallel | ||
| let committee_futures: Vec<_> = committee_mapping | ||
| .into_iter() | ||
| .map(|(committee_id, attestations)| async move { | ||
| let result = self | ||
| .sign_committee_attestations(committee_id, attestations) | ||
| .await; | ||
| (committee_id, result) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let results = join_all(committee_futures).await; | ||
|
|
||
| // Collect successful results, log failures | ||
| let mut signed_attestations = Vec::new(); | ||
| for (committee_id, result) in results { | ||
| match result { | ||
| Ok(()) => { | ||
| signed_attestations.push((validator_index, attestation, pubkey)); | ||
| } | ||
| Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { | ||
| warn!( | ||
| info = "a validator may have recently been removed from this VC", | ||
| ?pubkey, | ||
| "Missing pubkey for attestation" | ||
| ); | ||
| Ok(committee_attestations) => { | ||
| signed_attestations.extend(committee_attestations); | ||
| } | ||
| Err(e) => { | ||
petarjuki7 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| error!( | ||
| error = ?e, | ||
| "Failed to sign attestation" | ||
| ); | ||
| error!(?committee_id, error = ?e, "Failed to sign attestations for committee"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.