Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2959,16 +2959,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ChainSegmentResult::Successful { imported_blocks }
}

/// Updates fork-choice node into a permanent `available` state so it can become a viable head.
/// Only completed sampling results are received. Blocks are unavailable by default and should
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice, act on sampling result, adjust log level
// NOTE: It is possible that sampling complets before block is imported into fork choice,
// in that case we may need to update availability cache.
info!(%block_root, "Sampling completed");
}

/// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the
/// gossip network. The block is not imported into the chain, it is just partially verified.
///
Expand Down Expand Up @@ -7043,15 +7033,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&& self.spec.is_peer_das_enabled_for_epoch(block_epoch)
}

/// Returns true if we should issue a sampling request for this block
/// TODO(das): check if the block is still within the da_window
pub fn should_sample_slot(&self, slot: Slot) -> bool {
self.config.enable_sampling
&& self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
}

/// Gets the `LightClientBootstrap` object for a requested block root.
///
/// Returns `None` when the state or block is not found in the database.
Expand Down
3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ pub struct ChainConfig {
pub enable_light_client_server: bool,
/// The number of data columns to withhold / exclude from publishing when proposing a block.
pub malicious_withhold_count: usize,
/// Enable peer sampling on blocks.
pub enable_sampling: bool,
/// Number of batches that the node splits blobs or data columns into during publication.
/// This doesn't apply if the node is the block proposer. For PeerDAS only.
pub blob_publication_batches: usize,
Expand Down Expand Up @@ -148,7 +146,6 @@ impl Default for ChainConfig {
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
enable_light_client_server: true,
malicious_withhold_count: 0,
enable_sampling: false,
blob_publication_batches: 4,
blob_publication_batch_interval: Duration::from_millis(300),
sync_tolerance_epochs: DEFAULT_SYNC_TOLERANCE_EPOCHS,
Expand Down
54 changes: 2 additions & 52 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ use types::{
BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof,
SingleAttestation, Slot, SubnetId,
};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock,
QueuedUnaggregate, ReadyWork,
};
use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest};

mod metrics;
pub mod scheduler;
Expand Down Expand Up @@ -112,12 +112,9 @@ pub struct BeaconProcessorQueueLengths {
gossip_proposer_slashing_queue: usize,
gossip_attester_slashing_queue: usize,
unknown_light_client_update_queue: usize,
unknown_block_sampling_request_queue: usize,
rpc_block_queue: usize,
rpc_blob_queue: usize,
rpc_custody_column_queue: usize,
rpc_verify_data_column_queue: usize,
sampling_result_queue: usize,
column_reconstruction_queue: usize,
chain_segment_queue: usize,
backfill_chain_segment: usize,
Expand Down Expand Up @@ -183,9 +180,6 @@ impl BeaconProcessorQueueLengths {
rpc_blob_queue: 1024,
// TODO(das): Placeholder values
rpc_custody_column_queue: 1000,
rpc_verify_data_column_queue: 1000,
unknown_block_sampling_request_queue: 16384,
sampling_result_queue: 1000,
column_reconstruction_queue: 64,
chain_segment_queue: 64,
backfill_chain_segment: 64,
Expand Down Expand Up @@ -487,10 +481,6 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
process_fn,
},
},
ReadyWork::SamplingRequest(QueuedSamplingRequest { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockSamplingRequest { process_fn },
},
ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self {
drop_during_sync: false,
work: Work::ChainSegmentBackfill(process_fn),
Expand Down Expand Up @@ -582,9 +572,6 @@ pub enum Work<E: EthSpec> {
parent_root: Hash256,
process_fn: BlockingFn,
},
UnknownBlockSamplingRequest {
process_fn: BlockingFn,
},
GossipAggregateBatch {
aggregates: Vec<GossipAggregatePackage<E>>,
process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
Expand All @@ -611,8 +598,6 @@ pub enum Work<E: EthSpec> {
process_fn: AsyncFn,
},
RpcCustodyColumn(AsyncFn),
RpcVerifyDataColumn(AsyncFn),
SamplingResult(AsyncFn),
ColumnReconstruction(AsyncFn),
IgnoredRpcBlock {
process_fn: BlockingFn,
Expand Down Expand Up @@ -652,7 +637,6 @@ pub enum WorkType {
GossipAggregate,
UnknownBlockAggregate,
UnknownLightClientOptimisticUpdate,
UnknownBlockSamplingRequest,
GossipAggregateBatch,
GossipBlock,
GossipBlobSidecar,
Expand All @@ -668,8 +652,6 @@ pub enum WorkType {
RpcBlock,
RpcBlobs,
RpcCustodyColumn,
RpcVerifyDataColumn,
SamplingResult,
ColumnReconstruction,
IgnoredRpcBlock,
ChainSegment,
Expand Down Expand Up @@ -720,8 +702,6 @@ impl<E: EthSpec> Work<E> {
Work::RpcBlock { .. } => WorkType::RpcBlock,
Work::RpcBlobs { .. } => WorkType::RpcBlobs,
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
Work::SamplingResult { .. } => WorkType::SamplingResult,
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
Work::ChainSegment { .. } => WorkType::ChainSegment,
Expand All @@ -741,7 +721,6 @@ impl<E: EthSpec> Work<E> {
Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
Work::UnknownBlockSamplingRequest { .. } => WorkType::UnknownBlockSamplingRequest,
Work::UnknownLightClientOptimisticUpdate { .. } => {
WorkType::UnknownLightClientOptimisticUpdate
}
Expand Down Expand Up @@ -884,14 +863,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
let mut rpc_verify_data_column_queue =
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
// TODO(das): the sampling_request_queue is never read
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
let mut column_reconstruction_queue =
FifoQueue::new(queue_lengths.column_reconstruction_queue);
let mut unknown_block_sampling_request_queue =
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
Expand Down Expand Up @@ -1058,13 +1031,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = rpc_custody_column_queue.pop() {
Some(item)
// TODO(das): decide proper prioritization for sampling columns
} else if let Some(item) = rpc_custody_column_queue.pop() {
Some(item)
} else if let Some(item) = rpc_verify_data_column_queue.pop() {
Some(item)
} else if let Some(item) = sampling_result_queue.pop() {
Some(item)
// Check delayed blocks before gossip blocks, the gossip blocks might rely
// on the delayed ones.
} else if let Some(item) = delayed_block_queue.pop() {
Expand Down Expand Up @@ -1224,9 +1192,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = dcbrange_queue.pop() {
Some(item)
// Prioritize sampling requests after block syncing requests
} else if let Some(item) = unknown_block_sampling_request_queue.pop() {
Some(item)
// Check slashings after all other consensus messages so we prioritize
// following head.
//
Expand Down Expand Up @@ -1379,10 +1344,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::RpcCustodyColumn { .. } => {
rpc_custody_column_queue.push(work, work_id)
}
Work::RpcVerifyDataColumn(_) => {
rpc_verify_data_column_queue.push(work, work_id)
}
Work::SamplingResult(_) => sampling_result_queue.push(work, work_id),
Work::ColumnReconstruction(_) => {
column_reconstruction_queue.push(work, work_id)
}
Expand Down Expand Up @@ -1425,9 +1386,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id)
}
Work::UnknownBlockSamplingRequest { .. } => {
unknown_block_sampling_request_queue.push(work, work_id)
}
Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id),
Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id),
};
Expand All @@ -1451,9 +1409,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::UnknownLightClientOptimisticUpdate => {
unknown_light_client_update_queue.len()
}
WorkType::UnknownBlockSamplingRequest => {
unknown_block_sampling_request_queue.len()
}
WorkType::GossipAggregateBatch => 0, // No queue
WorkType::GossipBlock => gossip_block_queue.len(),
WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
Expand All @@ -1473,8 +1428,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::RpcBlock => rpc_block_queue.len(),
WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
WorkType::SamplingResult => sampling_result_queue.len(),
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
WorkType::ChainSegment => chain_segment_queue.len(),
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
Expand Down Expand Up @@ -1600,8 +1553,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}),
Work::UnknownBlockAttestation { process_fn }
| Work::UnknownBlockAggregate { process_fn }
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. }
| Work::UnknownBlockSamplingRequest { process_fn } => {
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
task_spawner.spawn_blocking(process_fn)
}
Work::DelayedImportBlock {
Expand All @@ -1612,8 +1564,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::RpcBlock { process_fn }
| Work::RpcBlobs { process_fn }
| Work::RpcCustodyColumn(process_fn)
| Work::RpcVerifyDataColumn(process_fn)
| Work::SamplingResult(process_fn)
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
Work::GossipBlock(work)
Expand Down
9 changes: 0 additions & 9 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@ pub static BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS: LazyLock<Re
"Number of queued attestations where as matching block has been imported.",
)
});
// TODO: This should be labeled instead of N single metrics
pub static BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_SAMPLING_REQUESTS: LazyLock<
Result<IntCounter>,
> = LazyLock::new(|| {
try_create_int_counter(
"beacon_processor_reprocessing_queue_matched_sampling_requests",
"Number of queued sampling requests where a matching block has been imported.",
)
});

/*
* Light client update reprocessing queue metrics.
Expand Down
Loading
Loading