Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 126 additions & 85 deletions anchor/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ const SYNC_SELECTION_PROOF_LOG_NAME: &str = "sync selection proof";
const SYNC_COMMITTEE_SIGNATURE_LOG_NAME: &str = "sync committee signature";
const SYNC_COMMITTEE_CONTRIBUTION_LOG_NAME: &str = "sync committee contribution";

/// Input data for a single validator's attestation within a committee batch.
type AttestationEntry<E> = (
u64, // validator_index
PublicKeyBytes, // pubkey
usize, // validator_committee_position
Attestation<E>, // attestation
);

pub struct AnchorValidatorStore<T: SlotClock + 'static, E: EthSpec> {
database: Arc<NetworkDatabase>,
decrypted_keys: Mutex<LruCache<[u8; ENCRYPTED_KEY_LENGTH], SecretKey>>,
Expand Down Expand Up @@ -1282,46 +1290,44 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
.map(|signature| SignedContributionAndProof { message, signature })
}

async fn sign_attestation(
/// Sign attestations for all validators in a single SSV committee.
///
/// Runs QBFT consensus once for the committee, then collects signatures for each validator.
async fn sign_committee_attestations(
&self,
validator_pubkey: PublicKeyBytes,
validator_committee_position: usize,
attestation: &mut Attestation<E>,
current_epoch: Epoch,
) -> Result<(), Error> {
if !*self.is_synced.borrow() {
return Err(Error::SpecificError(SpecificError::NotSynced));
}
committee_id: CommitteeId,
attestations: Vec<AttestationEntry<E>>,
) -> Result<Vec<(u64, Attestation<E>, PublicKeyBytes)>, Error> {
let slot = attestations[0].3.data().slot;
let first_att_data = attestations[0].3.data();

let (validator, cluster) = self.get_validator_and_cluster(validator_pubkey)?;
let voting_context_tx = self.get_voting_context(attestation.data().slot).await?;
// All validators in the same committee share the same cluster
let (_, cluster) = self.get_validator_and_cluster(attestations[0].1)?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumption to Verify: This assumes all validators grouped by committee_id belong to the same SSV cluster. Is this guaranteed by the grouping logic in sign_attestations? If a committee could contain validators from different clusters, using attestations[0] would be incorrect.

Consider adding a debug assertion or comment documenting this invariant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// All validators in the same committee share the same cluster is in fact incorrect.

But the cluster is actually not needed, if you modify decide_instance. Decide instance actually only needs the cluster members - which are in fact equal to the committee members. :)


let voting_context_tx = self.get_voting_context(slot).await?;
let validator_attestation_committees =
self.get_attesting_validators_in_committee(&voting_context_tx, cluster.committee_id());
self.get_attesting_validators_in_committee(&voting_context_tx, committee_id);

// Run QBFT consensus once for the entire committee
let timer = metrics::start_timer_vec(&metrics::CONSENSUS_TIMES, &[metrics::BEACON_VOTE]);
let timeout_mode = TimeoutMode::SlotTime {
instance_start_time: self.get_instant_in_slot(
attestation.data().slot,
Duration::from_secs(self.spec.seconds_per_slot) / 3,
)?,
instance_start_time: self
.get_instant_in_slot(slot, Duration::from_secs(self.spec.seconds_per_slot) / 3)?,
};

let completed = self
.qbft_manager
.decide_instance(
CommitteeInstanceId {
committee: cluster.committee_id(),
instance_height: attestation.data().slot.as_usize().into(),
committee: committee_id,
instance_height: slot.as_usize().into(),
},
BeaconVote {
block_root: attestation.data().beacon_block_root,
source: attestation.data().source,
target: attestation.data().target,
block_root: first_att_data.beacon_block_root,
source: first_att_data.source,
target: first_att_data.target,
},
self.create_beacon_vote_validator(
attestation.data().slot,
validator_attestation_committees,
),
self.create_beacon_vote_validator(slot, validator_attestation_committees),
timeout_mode,
&cluster,
)
Expand All @@ -1334,42 +1340,68 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
Completed::Success(data) => data,
};
let data_hash = data.hash();
attestation.data_mut().beacon_block_root = data.block_root;
attestation.data_mut().source = data.source;
attestation.data_mut().target = data.target;

// yay - we agree! let's sign the att we agreed on
let domain_hash = self.get_domain(current_epoch, Domain::BeaconAttester);

// Calculate signature count for post-consensus committee collection
let committee_validator_indices =
self.get_committee_validator_indices(&cluster.committee_id());

// Use `voting_message_count_for_committee` for post-consensus (flat counting)
// Shared values for all validators in this committee
let domain_hash = self.get_domain(data.target.epoch, Domain::BeaconAttester);
let committee_validator_indices = self.get_committee_validator_indices(&committee_id);
let num_signatures_to_collect = voting_context_tx
.voting_assignments
.voting_message_count_for_committee(|idx| committee_validator_indices.contains(idx));

let signing_root = attestation.data().signing_root(domain_hash);
let signature = self
.collect_signature(
PartialSignatureKind::PostConsensus,
Role::Committee,
CollectionMode::Committee {
num_signatures_to_collect,
base_hash: data_hash,
},
&validator,
&cluster,
signing_root,
attestation.data().slot,
)
.await?;
attestation
.add_signature(&signature, validator_committee_position)
.map_err(Error::UnableToSignAttestation)?;
// Sign for each validator in the committee
let mut results = Vec::with_capacity(attestations.len());
for (validator_index, pubkey, validator_committee_position, mut attestation) in attestations
{
let validator = match self.get_validator_and_cluster(pubkey) {
Ok((v, _)) => v,
Err(Error::UnknownPubkey(pk)) => {
warn!(?pk, "Unknown pubkey while signing attestation, skipping");
continue;
}
Err(e) => {
error!(error = ?e, ?pubkey, "Failed to get validator metadata, skipping");
continue;
}
};

// Apply consensus result to this attestation
attestation.data_mut().beacon_block_root = data.block_root;
attestation.data_mut().source = data.source;
attestation.data_mut().target = data.target;

let signing_root = attestation.data().signing_root(domain_hash);

let signature = match self
.collect_signature(
PartialSignatureKind::PostConsensus,
Role::Committee,
CollectionMode::Committee {
num_signatures_to_collect,
base_hash: data_hash,
},
&validator,
&cluster,
signing_root,
slot,
)
.await
Copy link
Member

@dknopik dknopik Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block as it will wait for collect_signature calls for the other validators - which will happen in the next iterations, so this deadlocks.

Therefore, we need a variant that can register for multiple validators.

Likely, this is why the testnet fails.

{
Ok(sig) => sig,
Err(e) => {
error!(error = ?e, ?pubkey, "Failed to collect signature, skipping");
continue;
}
};

if let Err(e) = attestation.add_signature(&signature, validator_committee_position) {
error!(error = ?e, ?pubkey, "Failed to add signature to attestation, skipping");
continue;
}

results.push((validator_index, attestation, pubkey));
}

Ok(())
Ok(results)
}

/// Provide slashing protection for attestations, safely updating the slashing protection DB.
Expand Down Expand Up @@ -2561,46 +2593,55 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {

async fn sign_attestations(
self: &Arc<Self>,
mut attestations: Vec<(u64, PublicKeyBytes, usize, Attestation<E>)>,
attestations: Vec<AttestationEntry<E>>,
) -> Result<Vec<(u64, Attestation<E>)>, Error> {
if !*self.is_synced.borrow() {
return Err(Error::SpecificError(SpecificError::NotSynced));
}

let signing_futures = attestations.iter_mut().map(
|(_, pubkey, validator_committee_index, attestation)| async move {
self.sign_attestation(
*pubkey,
*validator_committee_index,
attestation,
attestation.data().target.epoch,
)
.await
},
);
// Group attestations by SSV committee
let mut committee_mapping: HashMap<CommitteeId, Vec<AttestationEntry<E>>> = HashMap::new();

for attestation_data in attestations {
let pubkey = attestation_data.1;
match self.get_validator_and_cluster(pubkey) {
Ok((_, cluster)) => {
committee_mapping
.entry(cluster.committee_id())
.or_default()
.push(attestation_data);
}
Err(Error::UnknownPubkey(pk)) => {
warn!(?pk, "Unknown pubkey while grouping attestations, skipping");
}
Err(e) => {
error!(error = ?e, ?pubkey, "Failed to get cluster for attestation, skipping");
}
}
}

let results = join_all(signing_futures).await;
// Process all committees in parallel
let committee_futures: Vec<_> = committee_mapping
.into_iter()
.map(|(committee_id, attestations)| async move {
let result = self
.sign_committee_attestations(committee_id, attestations)
.await;
(committee_id, result)
})
.collect();

let mut signed_attestations = Vec::with_capacity(results.len());
for (result, (validator_index, pubkey, _, attestation)) in
results.into_iter().zip(attestations.into_iter())
{
let results = join_all(committee_futures).await;

// Collect successful results, log failures
let mut signed_attestations = Vec::new();
for (committee_id, result) in results {
match result {
Ok(()) => {
signed_attestations.push((validator_index, attestation, pubkey));
}
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
warn!(
info = "a validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for attestation"
);
Ok(committee_attestations) => {
signed_attestations.extend(committee_attestations);
}
Err(e) => {
error!(
error = ?e,
"Failed to sign attestation"
);
error!(?committee_id, error = ?e, "Failed to sign attestations for committee");
}
}
}
Expand Down
Loading