Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt,
};
use types::AttestationData;
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset,
Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData,
ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock,
SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData,
SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData,
fork_versioned_response::EmptyMetadata, Attestation, AttestationShufflingId, AttesterSlashing,
BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -1999,11 +2000,14 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
query: api_types::AttestationPoolQuery| {
task_spawner.blocking_response_task(Priority::P1, move || {
let query_filter = |data: &AttestationData| {
let query_filter = |data: &AttestationData, committee_index: Option<u64>| {
query.slot.is_none_or(|slot| slot == data.slot)
&& query
.committee_index
.is_none_or(|index| index == data.index)
&& query.committee_index.is_none_or(|index| {
if let Some(committee_index) = committee_index {
return index == committee_index;
}
false
})
};

let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
Expand All @@ -2012,7 +2016,7 @@ pub fn serve<T: BeaconChainTypes>(
.naive_aggregation_pool
.read()
.iter()
.filter(|&att| query_filter(att.data()))
.filter(|&att| query_filter(att.data(), att.committee_index()))
.cloned(),
);
// Use the current slot to find the fork version, and convert all messages to the
Expand Down
39 changes: 39 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2111,8 +2111,47 @@ impl ApiTester {
.await
.unwrap()
.data;

assert_eq!(result, expected);

let result_committee_index_filtered = self
.client
.get_beacon_pool_attestations_v1(None, Some(1))
.await
.unwrap()
.data;

let expected_committee_index_filtered = expected
.clone()
.into_iter()
.filter(|att| att.committee_index() == Some(1))
.collect::<Vec<_>>();

assert_eq!(
result_committee_index_filtered,
expected_committee_index_filtered
);
assert_ne!(result_committee_index_filtered, expected);

let result_committee_index_filtered = self
.client
.get_beacon_pool_attestations_v2(None, Some(2))
.await
.unwrap()
.data;

let expected_committee_index_filtered = expected
.clone()
.into_iter()
.filter(|att| att.committee_index() == Some(2))
.collect::<Vec<_>>();

assert_eq!(
result_committee_index_filtered,
expected_committee_index_filtered
);
assert_ne!(result_committee_index_filtered, expected);

self
}

Expand Down
18 changes: 17 additions & 1 deletion beacon_node/operation_pool/src/attestation_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ impl<E: EthSpec> CompactAttestationRef<'_, E> {
}
}

pub fn committee_index(&self) -> Option<u64> {
match self.indexed {
CompactIndexedAttestation::Base(_) => Some(self.data.index),
CompactIndexedAttestation::Electra(indexed_att) => indexed_att
.committee_bits
.iter()
.enumerate()
.find(|&(_, bit)| bit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to account for aggregation here. Attestations in the op pool can cover multiple committee indices and if any of these match our query index, we should return that attestation.

The aggregation happens in aggregate_across_committees, which persists the aggregates in the attestation storage:

pub fn aggregate_across_committees(&mut self, checkpoint_key: CheckpointKey) {

We could use the committee_indices vec and check committee_indices.contains(&query.committee_index)? We should also add a test covering this case (might need to call aggregate_across_committees to prompt aggregation).

(We should maybe remove the committee_index methods here so we don't accidentally misuse them)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the issues with out test harness is that we only have attestations across a single committee. so the aggregation isnt doing much. I've added it anyways, but I had to make a field public which isnt the best solution

We should clean this up and get the test harness to create attestations across more than a single committee. I just didnt want to block v7 testing because of this. Let me know what you think

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah good point. We can come back to test this more thoroughly, I've opened an issue:

.map(|(index, _)| index as u64),
}
}

pub fn clone_as_attestation(&self) -> Attestation<E> {
match self.indexed {
CompactIndexedAttestation::Base(indexed_att) => Attestation::Base(AttestationBase {
Expand Down Expand Up @@ -268,7 +280,11 @@ impl<E: EthSpec> CompactIndexedAttestationElectra<E> {
}

pub fn committee_index(&self) -> Option<u64> {
self.get_committee_indices().first().copied()
self.committee_bits
.iter()
.enumerate()
.find(|&(_, bit)| bit)
.map(|(index, _)| index as u64)
}

pub fn get_committee_indices(&self) -> Vec<u64> {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,12 +673,12 @@ impl<E: EthSpec> OperationPool<E> {
/// This method may return objects that are invalid for block inclusion.
pub fn get_filtered_attestations<F>(&self, filter: F) -> Vec<Attestation<E>>
where
F: Fn(&AttestationData) -> bool,
F: Fn(&AttestationData, Option<u64>) -> bool,
{
self.attestations
.read()
.iter()
.filter(|att| filter(&att.attestation_data()))
.filter(|att| filter(&att.attestation_data(), att.committee_index()))
.map(|att| att.clone_as_attestation())
.collect()
}
Expand Down
6 changes: 5 additions & 1 deletion consensus/types/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,11 @@ impl<E: EthSpec> AttestationRef<'_, E> {

impl<E: EthSpec> AttestationElectra<E> {
pub fn committee_index(&self) -> Option<u64> {
self.get_committee_indices().first().cloned()
self.committee_bits
.iter()
.enumerate()
.find(|&(_, bit)| bit)
.map(|(index, _)| index as u64)
}

pub fn get_aggregation_bits(&self) -> Vec<u64> {
Expand Down