Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 210 additions & 122 deletions beacon_node/beacon_chain/src/attestation_verification.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ pub fn batch_verify_unaggregated_attestations<'a, T, I>(
) -> Result<Vec<Result<VerifiedUnaggregatedAttestation<'a, T>, Error>>, Error>
where
T: BeaconChainTypes,
I: Iterator<Item = (&'a Attestation<T::EthSpec>, Option<SubnetId>)> + ExactSizeIterator,
I: Iterator<Item = (&'a SingleAttestation, Option<SubnetId>)> + ExactSizeIterator,
{
let mut num_partially_verified = 0;
let mut num_failed = 0;

// Perform partial verification of all attestations, collecting the results.
// TODO(electra): we will refactor this so it doesn't do committee calculations
let partial_results = attestations
.map(|(attn, subnet_opt)| {
let result = IndexedUnaggregatedAttestation::verify(attn, subnet_opt, chain);
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2062,7 +2062,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
AttestationError,
>
where
I: Iterator<Item = (&'a Attestation<T::EthSpec>, Option<SubnetId>)> + ExactSizeIterator,
I: Iterator<Item = (&'a SingleAttestation, Option<SubnetId>)> + ExactSizeIterator,
{
batch_verify_unaggregated_attestations(attestations, self)
}
Expand Down
40 changes: 30 additions & 10 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use types::{
Attestation, BeaconState, ChainSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SubnetId,
Attestation, BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof,
SingleAttestation, Slot, SubnetId,
};
use types::{EthSpec, Slot};
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
Expand Down Expand Up @@ -504,10 +504,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {

/// Items required to verify a batch of unaggregated gossip attestations.
#[derive(Debug)]
pub struct GossipAttestationPackage<E: EthSpec> {
pub struct GossipAttestationPackage<T> {
pub message_id: MessageId,
pub peer_id: PeerId,
pub attestation: Box<Attestation<E>>,
pub attestation: Box<T>,
pub subnet_id: SubnetId,
pub should_import: bool,
pub seen_timestamp: Duration,
Expand Down Expand Up @@ -554,16 +554,23 @@ pub enum BlockingOrAsync {
/// queuing specifics.
pub enum Work<E: EthSpec> {
GossipAttestation {
attestation: Box<GossipAttestationPackage<E>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<E>) + Send + Sync>,
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
attestation: Box<GossipAttestationPackage<SingleAttestation>>,
process_individual:
Box<dyn FnOnce(GossipAttestationPackage<SingleAttestation>) + Send + Sync>,
process_batch:
Box<dyn FnOnce(Vec<GossipAttestationPackage<SingleAttestation>>) + Send + Sync>,
},
GossipLegacyAttestation {
attestation: Box<GossipAttestationPackage<Attestation<E>>>,
process_individual: Box<dyn FnOnce(GossipAttestationPackage<Attestation<E>>) + Send + Sync>,
},
UnknownBlockAttestation {
process_fn: BlockingFn,
},
GossipAttestationBatch {
attestations: Vec<GossipAttestationPackage<E>>,
process_batch: Box<dyn FnOnce(Vec<GossipAttestationPackage<E>>) + Send + Sync>,
attestations: Vec<GossipAttestationPackage<SingleAttestation>>,
process_batch:
Box<dyn FnOnce(Vec<GossipAttestationPackage<SingleAttestation>>) + Send + Sync>,
},
GossipAggregate {
aggregate: Box<GossipAggregatePackage<E>>,
Expand Down Expand Up @@ -639,6 +646,7 @@ impl<E: EthSpec> fmt::Debug for Work<E> {
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
GossipAttestation,
GossipLegacyAttestation,
UnknownBlockAttestation,
GossipAttestationBatch,
GossipAggregate,
Expand Down Expand Up @@ -690,6 +698,7 @@ impl<E: EthSpec> Work<E> {
fn to_type(&self) -> WorkType {
match self {
Work::GossipAttestation { .. } => WorkType::GossipAttestation,
Work::GossipLegacyAttestation { .. } => WorkType::GossipLegacyAttestation,
Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
Work::GossipAggregate { .. } => WorkType::GossipAggregate,
Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
Expand Down Expand Up @@ -849,6 +858,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue);
let mut aggregate_debounce = TimeLatch::default();
let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
let mut legacy_attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
let mut attestation_debounce = TimeLatch::default();
let mut unknown_block_aggregate_queue =
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
Expand Down Expand Up @@ -1301,6 +1311,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
match work {
_ if can_spawn => self.spawn_worker(work, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),
Work::GossipLegacyAttestation { .. } => {
legacy_attestation_queue.push(work)
}
// Attestation batches are formed internally within the
// `BeaconProcessor`, they are not sent from external services.
Work::GossipAttestationBatch { .. } => crit!(
Expand Down Expand Up @@ -1430,7 +1443,8 @@ impl<E: EthSpec> BeaconProcessor<E> {

if let Some(modified_queue_id) = modified_queue_id {
let queue_len = match modified_queue_id {
WorkType::GossipAttestation => aggregate_queue.len(),
WorkType::GossipAttestation => attestation_queue.len(),
WorkType::GossipLegacyAttestation => legacy_attestation_queue.len(),
WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => aggregate_queue.len(),
Expand Down Expand Up @@ -1563,6 +1577,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
} => task_spawner.spawn_blocking(move || {
process_individual(*attestation);
}),
Work::GossipLegacyAttestation {
attestation,
process_individual,
} => task_spawner.spawn_blocking(move || {
process_individual(*attestation);
}),
Work::GossipAttestationBatch {
attestations,
process_batch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

pub fn process_gossip_attestation_batch(
self: Arc<Self>,
packages: Vec<GossipAttestationPackage<T::EthSpec>>,
packages: Vec<GossipAttestationPackage<SingleAttestation>>,
reprocess_tx: Option<mpsc::Sender<ReprocessQueueMessage>>,
) {
let attestations_and_subnets = packages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use types::{
InconsistentFork, IndexedAttestation, IndexedAttestationRef, ProposerSlashing, PublicKey,
PublicKeyBytes, Signature, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockHeader,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedRoot, SignedVoluntaryExit,
SigningData, Slot, SyncAggregate, SyncAggregatorSelectionData, Unsigned,
SigningData, SingleAttestation, Slot, SyncAggregate, SyncAggregatorSelectionData, Unsigned,
};

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -298,6 +298,37 @@ where
Ok(SignatureSet::multiple_pubkeys(signature, pubkeys, message))
}

/// Returns the signature set for the given `SingleAttestation`.
pub fn single_attestation_signature_set<'a, E, F>(
state: &'a BeaconState<E>,
get_pubkey: F,
single_attestation: &'a SingleAttestation,
spec: &'a ChainSpec,
) -> Result<SignatureSet<'a>>
where
E: EthSpec,
F: Fn(usize) -> Option<Cow<'a, PublicKey>>,
{
let validator_index = single_attestation.attester_index;
let pubkey =
get_pubkey(validator_index as usize).ok_or(Error::ValidatorUnknown(validator_index))?;

let domain = spec.get_domain(
single_attestation.data.target.epoch,
Domain::BeaconAttester,
&state.fork(),
state.genesis_validators_root(),
);

let message = single_attestation.data.signing_root(domain);

Ok(SignatureSet::single_pubkey(
&single_attestation.signature,
pubkey,
message,
))
}

/// Returns the signature set for the given `indexed_attestation` but pubkeys are supplied directly
/// instead of from the state.
pub fn indexed_attestation_signature_set_from_pubkeys<'a, 'b, E, F>(
Expand Down
71 changes: 56 additions & 15 deletions consensus/types/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use tree_hash_derive::TreeHash;

use super::{
AggregateSignature, AttestationData, BitList, ChainSpec, CommitteeIndex, Domain, EthSpec, Fork,
SecretKey, Signature, SignedRoot,
IndexedAttestation, IndexedAttestationBase, IndexedAttestationElectra, SecretKey, Signature,
SignedRoot, VariableList,
};

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -588,7 +589,11 @@ pub struct SingleAttestation {
}

impl SingleAttestation {
pub fn to_attestation<E: EthSpec>(&self, committee: &[usize]) -> Result<Attestation<E>, Error> {
pub fn to_attestation<E: EthSpec>(
&self,
committee: &[usize],
spec: &ChainSpec,
) -> Result<Attestation<E>, Error> {
let aggregation_bit = committee
.iter()
.enumerate()
Expand All @@ -600,22 +605,58 @@ impl SingleAttestation {
})
.ok_or(Error::AttesterNotInCommittee(self.attester_index))?;

let mut committee_bits: BitVector<E::MaxCommitteesPerSlot> = BitVector::default();
committee_bits
.set(self.committee_index as usize, true)
.map_err(|_| Error::InvalidCommitteeIndex)?;
let fork_name = spec.fork_name_at_slot::<E>(self.data.slot);

let mut aggregation_bits =
BitList::with_capacity(committee.len()).map_err(|_| Error::InvalidCommitteeLength)?;
if fork_name.electra_enabled() {
let mut committee_bits: BitVector<E::MaxCommitteesPerSlot> = BitVector::default();
committee_bits
.set(self.committee_index as usize, true)
.map_err(|_| Error::InvalidCommitteeIndex)?;

aggregation_bits.set(aggregation_bit, true)?;
let mut aggregation_bits = BitList::with_capacity(committee.len())
.map_err(|_| Error::InvalidCommitteeLength)?;

Ok(Attestation::Electra(AttestationElectra {
aggregation_bits,
committee_bits,
data: self.data.clone(),
signature: self.signature.clone(),
}))
aggregation_bits.set(aggregation_bit, true)?;

Ok(Attestation::Electra(AttestationElectra {
aggregation_bits,
committee_bits,
data: self.data.clone(),
signature: self.signature.clone(),
}))
} else {
let mut aggregation_bits = BitList::with_capacity(committee.len())
.map_err(|_| Error::InvalidCommitteeLength)?;
aggregation_bits.set(aggregation_bit, true)?;

Ok(Attestation::Base(AttestationBase {
aggregation_bits,
data: self.data.clone(),
signature: self.signature.clone(),
}))
}
}

pub fn to_indexed_attestation<E: EthSpec>(
&self,
spec: &ChainSpec,
) -> Result<IndexedAttestation<E>, Error> {
let fork_name = spec.fork_name_at_slot::<E>(self.data.slot);
if fork_name.electra_enabled() {
let attesting_indices = VariableList::new(vec![self.attester_index])?;
Ok(IndexedAttestation::Electra(IndexedAttestationElectra {
attesting_indices,
data: self.data.clone(),
signature: self.signature.clone(),
}))
} else {
let attesting_indices = VariableList::new(vec![self.attester_index])?;
Ok(IndexedAttestation::Base(IndexedAttestationBase {
attesting_indices,
data: self.data.clone(),
signature: self.signature.clone(),
}))
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions consensus/types/src/subnet_id.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Identifies each shard by an integer identifier.
use crate::SingleAttestation;
use crate::{AttestationRef, ChainSpec, CommitteeIndex, EthSpec, Slot};
use crate::{ChainSpec, CommitteeIndex, EthSpec, Slot};
use alloy_primitives::{bytes::Buf, U256};
use safe_arith::{ArithError, SafeArith};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -44,14 +44,14 @@ impl SubnetId {
/// Compute the subnet for an attestation where each slot in the
/// attestation epoch contains `committee_count_per_slot` committees.
pub fn compute_subnet_for_attestation<E: EthSpec>(
attestation: AttestationRef<E>,
attestation: &SingleAttestation,
committee_count_per_slot: u64,
spec: &ChainSpec,
) -> Result<SubnetId, ArithError> {
let committee_index = attestation.committee_index().ok_or(ArithError::Overflow)?;
let committee_index = attestation.committee_index;

Self::compute_subnet::<E>(
attestation.data().slot,
attestation.data.slot,
committee_index,
committee_count_per_slot,
spec,
Expand Down
Loading