Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 110 additions & 85 deletions anchor/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ const SYNC_SELECTION_PROOF_LOG_NAME: &str = "sync selection proof";
const SYNC_COMMITTEE_SIGNATURE_LOG_NAME: &str = "sync committee signature";
const SYNC_COMMITTEE_CONTRIBUTION_LOG_NAME: &str = "sync committee contribution";

/// Input data for a single validator's attestation within a committee batch.
type AttestationData<E> = (
u64, // validator_index
PublicKeyBytes, // pubkey
usize, // validator_committee_position
Attestation<E>, // attestation
);

pub struct AnchorValidatorStore<T: SlotClock + 'static, E: EthSpec> {
database: Arc<NetworkDatabase>,
decrypted_keys: Mutex<LruCache<[u8; ENCRYPTED_KEY_LENGTH], SecretKey>>,
Expand Down Expand Up @@ -198,6 +206,19 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
))
}

/// Get validator metadata by public key.
fn get_validator_metadata(
&self,
validator_pubkey: PublicKeyBytes,
) -> Result<ValidatorMetadata, Error> {
self.database
.state()
.metadata()
.get_by(&validator_pubkey)
.ok_or(Error::UnknownPubkey(validator_pubkey))
.cloned()
}

fn get_domain(&self, epoch: Epoch, domain: Domain) -> Hash256 {
self.spec.get_domain(
epoch,
Expand Down Expand Up @@ -1282,46 +1303,44 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
.map(|signature| SignedContributionAndProof { message, signature })
}

async fn sign_attestation(
/// Sign attestations for all validators in a single SSV committee.
///
/// Runs QBFT consensus once for the committee, then collects signatures for each validator.
async fn sign_committee_attestations(
&self,
validator_pubkey: PublicKeyBytes,
validator_committee_position: usize,
attestation: &mut Attestation<E>,
current_epoch: Epoch,
) -> Result<(), Error> {
if !*self.is_synced.borrow() {
return Err(Error::SpecificError(SpecificError::NotSynced));
}
committee_id: CommitteeId,
attestations: Vec<AttestationData<E>>,
) -> Result<Vec<(u64, Attestation<E>, PublicKeyBytes)>, Error> {
let slot = attestations[0].3.data().slot;
let first_att_data = attestations[0].3.data();

let (validator, cluster) = self.get_validator_and_cluster(validator_pubkey)?;
let voting_context_tx = self.get_voting_context(attestation.data().slot).await?;
// All validators in the same committee share the same cluster
let (_, cluster) = self.get_validator_and_cluster(attestations[0].1)?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumption to Verify: This assumes all validators grouped by committee_id belong to the same SSV cluster. Is this guaranteed by the grouping logic in sign_attestations? If a committee could contain validators from different clusters, using attestations[0] would be incorrect.

Consider adding a debug assertion or comment documenting this invariant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// All validators in the same committee share the same cluster is in fact incorrect.

But the cluster is actually not needed, if you modify decide_instance. Decide instance actually only needs the cluster members - which are in fact equal to the committee members. :)


let voting_context_tx = self.get_voting_context(slot).await?;
let validator_attestation_committees =
self.get_attesting_validators_in_committee(&voting_context_tx, cluster.committee_id());
self.get_attesting_validators_in_committee(&voting_context_tx, committee_id);

// Run QBFT consensus once for the entire committee
let timer = metrics::start_timer_vec(&metrics::CONSENSUS_TIMES, &[metrics::BEACON_VOTE]);
let timeout_mode = TimeoutMode::SlotTime {
instance_start_time: self.get_instant_in_slot(
attestation.data().slot,
Duration::from_secs(self.spec.seconds_per_slot) / 3,
)?,
instance_start_time: self
.get_instant_in_slot(slot, Duration::from_secs(self.spec.seconds_per_slot) / 3)?,
};

let completed = self
.qbft_manager
.decide_instance(
CommitteeInstanceId {
committee: cluster.committee_id(),
instance_height: attestation.data().slot.as_usize().into(),
committee: committee_id,
instance_height: slot.as_usize().into(),
},
BeaconVote {
block_root: attestation.data().beacon_block_root,
source: attestation.data().source,
target: attestation.data().target,
block_root: first_att_data.beacon_block_root,
source: first_att_data.source,
target: first_att_data.target,
},
self.create_beacon_vote_validator(
attestation.data().slot,
validator_attestation_committees,
),
self.create_beacon_vote_validator(slot, validator_attestation_committees),
timeout_mode,
&cluster,
)
Expand All @@ -1334,42 +1353,49 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
Completed::Success(data) => data,
};
let data_hash = data.hash();
attestation.data_mut().beacon_block_root = data.block_root;
attestation.data_mut().source = data.source;
attestation.data_mut().target = data.target;

// yay - we agree! let's sign the att we agreed on
let domain_hash = self.get_domain(current_epoch, Domain::BeaconAttester);

// Calculate signature count for post-consensus committee collection
let committee_validator_indices =
self.get_committee_validator_indices(&cluster.committee_id());

// Use `voting_message_count_for_committee` for post-consensus (flat counting)
// Shared values for all validators in this committee
let domain_hash = self.get_domain(data.target.epoch, Domain::BeaconAttester);
let committee_validator_indices = self.get_committee_validator_indices(&committee_id);
let num_signatures_to_collect = voting_context_tx
.voting_assignments
.voting_message_count_for_committee(|idx| committee_validator_indices.contains(idx));
let signing_root = first_att_data.signing_root(domain_hash);

let signing_root = attestation.data().signing_root(domain_hash);
let signature = self
.collect_signature(
PartialSignatureKind::PostConsensus,
Role::Committee,
CollectionMode::Committee {
num_signatures_to_collect,
base_hash: data_hash,
},
&validator,
&cluster,
signing_root,
attestation.data().slot,
)
.await?;
attestation
.add_signature(&signature, validator_committee_position)
.map_err(Error::UnableToSignAttestation)?;
// Sign for each validator in the committee
let mut results = Vec::with_capacity(attestations.len());
for (validator_index, pubkey, validator_committee_position, mut attestation) in attestations
{
let validator = self.get_validator_metadata(pubkey)?;

// Apply consensus result to this attestation
attestation.data_mut().beacon_block_root = data.block_root;
attestation.data_mut().source = data.source;
attestation.data_mut().target = data.target;

Ok(())
let signature = self
.collect_signature(
PartialSignatureKind::PostConsensus,
Role::Committee,
CollectionMode::Committee {
num_signatures_to_collect,
base_hash: data_hash,
},
&validator,
&cluster,
signing_root,
slot,
)
.await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attestation validation is very time sensitive. in this loop over attestations, we .await each signature before moving onto the next, i.e. sequential processing, whereas it was being done in parallel via join_all before. we could turn this back into parallel futures with join_all as a fix.

I think at that point though, we'd just be recreating the pre-PR code, but with some extra steps and complexity.

other thoughts:
The gains from committee grouping are fewer calls to get_validator_and_cluster, get_voting_context, and similar, but these are in-memory lookups. And decide_instance is already engineered to handle multiple calls to the same instance gracefully.

Given that, I'm not sure the added complexity
(sign_committee_attestations, committee grouping HashMap, AttestationData type alias) justifies itself. What do you think about keeping the per-validator approach? And def lmk if i'm overlooking some solid gains we'd get from this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment! The original idea was that operations like deciding on a vote and collecting the signatures are preformed on a per committee basis, it would make sense to group them by committee. Also you wouldn't need the QBFT instance per validator as its the same for everyone in the committee. But I agree decide_instance is engineered to handle multiple calls to the same instance.
Although some overhead is avoided like fewer DashMap lookups, fewer oneshot channels... etc. but these are probably cheap operations.

Would like to hear @dknopik's input, if we agree the issue wasn't that significant I am okay with closing the PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this made me realize that the Pre-PR code is also flawed: if even one involved SSV committee is not able to sign (e.g. due to offline operators), the other signing futures will complete but the results will not be processed until those other signings time out. :/


attestation
.add_signature(&signature, validator_committee_position)
.map_err(Error::UnableToSignAttestation)?;

results.push((validator_index, attestation, pubkey));
}

Ok(results)
}

/// Provide slashing protection for attestations, safely updating the slashing protection DB.
Expand Down Expand Up @@ -2561,46 +2587,45 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {

async fn sign_attestations(
self: &Arc<Self>,
mut attestations: Vec<(u64, PublicKeyBytes, usize, Attestation<E>)>,
attestations: Vec<AttestationData<E>>,
) -> Result<Vec<(u64, Attestation<E>)>, Error> {
if !*self.is_synced.borrow() {
return Err(Error::SpecificError(SpecificError::NotSynced));
}

let signing_futures = attestations.iter_mut().map(
|(_, pubkey, validator_committee_index, attestation)| async move {
self.sign_attestation(
*pubkey,
*validator_committee_index,
attestation,
attestation.data().target.epoch,
)
.await
},
);
// Group attestations by SSV committee
let mut committee_mapping: HashMap<CommitteeId, Vec<AttestationData<E>>> = HashMap::new();

let results = join_all(signing_futures).await;
for attestation_data in attestations {
let (_, cluster) = self.get_validator_and_cluster(attestation_data.1)?;
committee_mapping
.entry(cluster.committee_id())
.or_default()
.push(attestation_data);
}

let mut signed_attestations = Vec::with_capacity(results.len());
for (result, (validator_index, pubkey, _, attestation)) in
results.into_iter().zip(attestations.into_iter())
{
// Process all committees in parallel
let committee_futures: Vec<_> = committee_mapping
.into_iter()
.map(|(committee_id, attestations)| async move {
let result = self
.sign_committee_attestations(committee_id, attestations)
.await;
(committee_id, result)
})
.collect();

let results = join_all(committee_futures).await;

// Collect successful results, log failures
let mut signed_attestations = Vec::new();
for (committee_id, result) in results {
match result {
Ok(()) => {
signed_attestations.push((validator_index, attestation, pubkey));
}
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
warn!(
info = "a validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for attestation"
);
Ok(committee_attestations) => {
signed_attestations.extend(committee_attestations);
}
Err(e) => {
error!(
error = ?e,
"Failed to sign attestation"
);
error!(?committee_id, error = ?e, "Failed to sign attestations for committee");
}
}
}
Expand Down
Loading