From 1ad3e85f843ab4e3b84b7b23c1d5995e3f65bcc1 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 22 Jul 2025 13:33:18 +1000 Subject: [PATCH 1/3] Remove peer sampling code. --- beacon_node/beacon_chain/src/beacon_chain.rs | 19 - beacon_node/beacon_chain/src/chain_config.rs | 3 - beacon_node/beacon_processor/src/lib.rs | 54 +- beacon_node/beacon_processor/src/metrics.rs | 9 - .../src/scheduler/work_reprocessing_queue.rs | 97 --- .../src/service/api_types.rs | 47 +- beacon_node/network/src/metrics.rs | 35 - .../gossip_methods.rs | 15 - .../src/network_beacon_processor/mod.rs | 38 - .../network_beacon_processor/sync_methods.rs | 43 +- beacon_node/network/src/sync/manager.rs | 87 +-- beacon_node/network/src/sync/mod.rs | 2 - beacon_node/network/src/sync/peer_sampling.rs | 735 ------------------ beacon_node/network/src/sync/tests/lookups.rs | 272 +------ beacon_node/src/cli.rs | 9 - beacon_node/src/config.rs | 4 - lighthouse/tests/beacon_node.rs | 13 - 17 files changed, 9 insertions(+), 1473 deletions(-) delete mode 100644 beacon_node/network/src/sync/peer_sampling.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 01075ae4a4c..12e565b197e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2959,16 +2959,6 @@ impl BeaconChain { ChainSegmentResult::Successful { imported_blocks } } - /// Updates fork-choice node into a permanent `available` state so it can become a viable head. - /// Only completed sampling results are received. Blocks are unavailable by default and should - /// be pruned on finalization, on a timeout or by a max count. - pub async fn process_sampling_completed(self: &Arc, block_root: Hash256) { - // TODO(das): update fork-choice, act on sampling result, adjust log level - // NOTE: It is possible that sampling complets before block is imported into fork choice, - // in that case we may need to update availability cache. - info!(%block_root, "Sampling completed"); - } - /// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the /// gossip network. The block is not imported into the chain, it is just partially verified. /// @@ -7062,15 +7052,6 @@ impl BeaconChain { && self.spec.is_peer_das_enabled_for_epoch(block_epoch) } - /// Returns true if we should issue a sampling request for this block - /// TODO(das): check if the block is still within the da_window - pub fn should_sample_slot(&self, slot: Slot) -> bool { - self.config.enable_sampling - && self - .spec - .is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())) - } - /// Gets the `LightClientBootstrap` object for a requested block root. /// /// Returns `None` when the state or block is not found in the database. diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 808c96d9650..08f17c6c6bb 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -96,8 +96,6 @@ pub struct ChainConfig { pub enable_light_client_server: bool, /// The number of data columns to withhold / exclude from publishing when proposing a block. pub malicious_withhold_count: usize, - /// Enable peer sampling on blocks. - pub enable_sampling: bool, /// Number of batches that the node splits blobs or data columns into during publication. /// This doesn't apply if the node is the block proposer. For PeerDAS only. pub blob_publication_batches: usize, @@ -148,7 +146,6 @@ impl Default for ChainConfig { epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, enable_light_client_server: true, malicious_withhold_count: 0, - enable_sampling: false, blob_publication_batches: 4, blob_publication_batch_interval: Duration::from_millis(300), sync_tolerance_epochs: DEFAULT_SYNC_TOLERANCE_EPOCHS, diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 0f324071a1e..ae785e51279 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -67,11 +67,11 @@ use types::{ BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; +use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::{ spawn_reprocess_scheduler, QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, }; -use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest}; mod metrics; pub mod scheduler; @@ -112,12 +112,9 @@ pub struct BeaconProcessorQueueLengths { gossip_proposer_slashing_queue: usize, gossip_attester_slashing_queue: usize, unknown_light_client_update_queue: usize, - unknown_block_sampling_request_queue: usize, rpc_block_queue: usize, rpc_blob_queue: usize, rpc_custody_column_queue: usize, - rpc_verify_data_column_queue: usize, - sampling_result_queue: usize, column_reconstruction_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, @@ -183,9 +180,6 @@ impl BeaconProcessorQueueLengths { rpc_blob_queue: 1024, // TODO(das): Placeholder values rpc_custody_column_queue: 1000, - rpc_verify_data_column_queue: 1000, - unknown_block_sampling_request_queue: 16384, - sampling_result_queue: 1000, column_reconstruction_queue: 64, chain_segment_queue: 64, backfill_chain_segment: 64, @@ -487,10 +481,6 @@ impl From for WorkEvent { process_fn, }, }, - ReadyWork::SamplingRequest(QueuedSamplingRequest { process_fn, .. }) => Self { - drop_during_sync: true, - work: Work::UnknownBlockSamplingRequest { process_fn }, - }, ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self { drop_during_sync: false, work: Work::ChainSegmentBackfill(process_fn), @@ -582,9 +572,6 @@ pub enum Work { parent_root: Hash256, process_fn: BlockingFn, }, - UnknownBlockSamplingRequest { - process_fn: BlockingFn, - }, GossipAggregateBatch { aggregates: Vec>, process_batch: Box>) + Send + Sync>, @@ -611,8 +598,6 @@ pub enum Work { process_fn: AsyncFn, }, RpcCustodyColumn(AsyncFn), - RpcVerifyDataColumn(AsyncFn), - SamplingResult(AsyncFn), ColumnReconstruction(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, @@ -652,7 +637,6 @@ pub enum WorkType { GossipAggregate, UnknownBlockAggregate, UnknownLightClientOptimisticUpdate, - UnknownBlockSamplingRequest, GossipAggregateBatch, GossipBlock, GossipBlobSidecar, @@ -668,8 +652,6 @@ pub enum WorkType { RpcBlock, RpcBlobs, RpcCustodyColumn, - RpcVerifyDataColumn, - SamplingResult, ColumnReconstruction, IgnoredRpcBlock, ChainSegment, @@ -720,8 +702,6 @@ impl Work { Work::RpcBlock { .. } => WorkType::RpcBlock, Work::RpcBlobs { .. } => WorkType::RpcBlobs, Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, - Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn, - Work::SamplingResult { .. } => WorkType::SamplingResult, Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, Work::ChainSegment { .. } => WorkType::ChainSegment, @@ -741,7 +721,6 @@ impl Work { Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest, Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation, Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate, - Work::UnknownBlockSamplingRequest { .. } => WorkType::UnknownBlockSamplingRequest, Work::UnknownLightClientOptimisticUpdate { .. } => { WorkType::UnknownLightClientOptimisticUpdate } @@ -884,14 +863,8 @@ impl BeaconProcessor { let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); - let mut rpc_verify_data_column_queue = - FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); - // TODO(das): the sampling_request_queue is never read - let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); let mut column_reconstruction_queue = FifoQueue::new(queue_lengths.column_reconstruction_queue); - let mut unknown_block_sampling_request_queue = - FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); @@ -1058,13 +1031,8 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = rpc_custody_column_queue.pop() { Some(item) - // TODO(das): decide proper prioritization for sampling columns } else if let Some(item) = rpc_custody_column_queue.pop() { Some(item) - } else if let Some(item) = rpc_verify_data_column_queue.pop() { - Some(item) - } else if let Some(item) = sampling_result_queue.pop() { - Some(item) // Check delayed blocks before gossip blocks, the gossip blocks might rely // on the delayed ones. } else if let Some(item) = delayed_block_queue.pop() { @@ -1224,9 +1192,6 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = dcbrange_queue.pop() { Some(item) - // Prioritize sampling requests after block syncing requests - } else if let Some(item) = unknown_block_sampling_request_queue.pop() { - Some(item) // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1379,10 +1344,6 @@ impl BeaconProcessor { Work::RpcCustodyColumn { .. } => { rpc_custody_column_queue.push(work, work_id) } - Work::RpcVerifyDataColumn(_) => { - rpc_verify_data_column_queue.push(work, work_id) - } - Work::SamplingResult(_) => sampling_result_queue.push(work, work_id), Work::ColumnReconstruction(_) => { column_reconstruction_queue.push(work, work_id) } @@ -1425,9 +1386,6 @@ impl BeaconProcessor { Work::UnknownLightClientOptimisticUpdate { .. } => { unknown_light_client_update_queue.push(work, work_id) } - Work::UnknownBlockSamplingRequest { .. } => { - unknown_block_sampling_request_queue.push(work, work_id) - } Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id), Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id), }; @@ -1451,9 +1409,6 @@ impl BeaconProcessor { WorkType::UnknownLightClientOptimisticUpdate => { unknown_light_client_update_queue.len() } - WorkType::UnknownBlockSamplingRequest => { - unknown_block_sampling_request_queue.len() - } WorkType::GossipAggregateBatch => 0, // No queue WorkType::GossipBlock => gossip_block_queue.len(), WorkType::GossipBlobSidecar => gossip_blob_queue.len(), @@ -1473,8 +1428,6 @@ impl BeaconProcessor { WorkType::RpcBlock => rpc_block_queue.len(), WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), - WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(), - WorkType::SamplingResult => sampling_result_queue.len(), WorkType::ColumnReconstruction => column_reconstruction_queue.len(), WorkType::ChainSegment => chain_segment_queue.len(), WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), @@ -1600,8 +1553,7 @@ impl BeaconProcessor { }), Work::UnknownBlockAttestation { process_fn } | Work::UnknownBlockAggregate { process_fn } - | Work::UnknownLightClientOptimisticUpdate { process_fn, .. } - | Work::UnknownBlockSamplingRequest { process_fn } => { + | Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => { task_spawner.spawn_blocking(process_fn) } Work::DelayedImportBlock { @@ -1612,8 +1564,6 @@ impl BeaconProcessor { Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) - | Work::RpcVerifyDataColumn(process_fn) - | Work::SamplingResult(process_fn) | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index fc8c712f4e7..275875b1a48 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -98,15 +98,6 @@ pub static BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_ATTESTATIONS: LazyLock, -> = LazyLock::new(|| { - try_create_int_counter( - "beacon_processor_reprocessing_queue_matched_sampling_requests", - "Number of queued sampling requests where a matching block has been imported.", - ) -}); /* * Light client update reprocessing queue metrics. diff --git a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs index 855342d8bda..07d540050f9 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs @@ -69,10 +69,6 @@ const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384; /// How many light client updates we keep before new ones get dropped. const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128; -/// How many sampling requests we queue before new ones get dropped. -/// TODO(das): choose a sensible value -const MAXIMUM_QUEUED_SAMPLING_REQUESTS: usize = 16_384; - // Process backfill batch 50%, 60%, 80% through each slot. // // Note: use caution to set these fractions in a way that won't cause panic-y @@ -109,8 +105,6 @@ pub enum ReprocessQueueMessage { UnknownBlockAggregate(QueuedAggregate), /// A light client optimistic update that references a parent root that has not been seen as a parent. UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate), - /// A sampling request that references an unknown block. - UnknownBlockSamplingRequest(QueuedSamplingRequest), /// A new backfill batch that needs to be scheduled for processing. BackfillSync(QueuedBackfillBatch), /// A delayed column reconstruction that needs checking @@ -125,7 +119,6 @@ pub enum ReadyWork { Unaggregate(QueuedUnaggregate), Aggregate(QueuedAggregate), LightClientUpdate(QueuedLightClientUpdate), - SamplingRequest(QueuedSamplingRequest), BackfillSync(QueuedBackfillBatch), ColumnReconstruction(QueuedColumnReconstruction), } @@ -151,12 +144,6 @@ pub struct QueuedLightClientUpdate { pub process_fn: BlockingFn, } -/// A sampling request for which the corresponding block is not known while processing. -pub struct QueuedSamplingRequest { - pub beacon_block_root: Hash256, - pub process_fn: BlockingFn, -} - /// A block that arrived early and has been queued for later import. pub struct QueuedGossipBlock { pub beacon_block_slot: Slot, @@ -246,8 +233,6 @@ struct ReprocessQueue { attestations_delay_queue: DelayQueue, /// Queue to manage scheduled light client updates. lc_updates_delay_queue: DelayQueue, - /// Queue to manage scheduled sampling requests - sampling_requests_delay_queue: DelayQueue, /// Queue to manage scheduled column reconstructions. column_reconstructions_delay_queue: DelayQueue, @@ -264,10 +249,6 @@ struct ReprocessQueue { queued_lc_updates: FnvHashMap, /// Light Client Updates per parent_root. awaiting_lc_updates_per_parent_root: HashMap>, - /// Queued sampling requests. - queued_sampling_requests: FnvHashMap, - /// Sampling requests per block root. - awaiting_sampling_requests_per_block_root: HashMap>, /// Column reconstruction per block root. queued_column_reconstructions: HashMap, /// Queued backfill batches @@ -277,18 +258,15 @@ struct ReprocessQueue { /// Next attestation id, used for both aggregated and unaggregated attestations next_attestation: usize, next_lc_update: usize, - next_sampling_request_update: usize, early_block_debounce: TimeLatch, rpc_block_debounce: TimeLatch, attestation_delay_debounce: TimeLatch, lc_update_delay_debounce: TimeLatch, - sampling_request_delay_debounce: TimeLatch, next_backfill_batch_event: Option>>, slot_clock: Arc, } pub type QueuedLightClientUpdateId = usize; -pub type QueuedSamplingRequestId = usize; #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum QueuedAttestationId { @@ -436,26 +414,21 @@ impl ReprocessQueue { rpc_block_delay_queue: DelayQueue::new(), attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), - sampling_requests_delay_queue: <_>::default(), column_reconstructions_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), queued_unaggregates: FnvHashMap::default(), - queued_sampling_requests: <_>::default(), awaiting_attestations_per_root: HashMap::new(), awaiting_lc_updates_per_parent_root: HashMap::new(), - awaiting_sampling_requests_per_block_root: <_>::default(), queued_backfill_batches: Vec::new(), queued_column_reconstructions: HashMap::new(), next_attestation: 0, next_lc_update: 0, - next_sampling_request_update: 0, early_block_debounce: TimeLatch::default(), rpc_block_debounce: TimeLatch::default(), attestation_delay_debounce: TimeLatch::default(), lc_update_delay_debounce: TimeLatch::default(), - sampling_request_delay_debounce: <_>::default(), next_backfill_batch_event: None, slot_clock, } @@ -664,34 +637,6 @@ impl ReprocessQueue { self.next_lc_update += 1; } - InboundEvent::Msg(UnknownBlockSamplingRequest(queued_sampling_request)) => { - if self.sampling_requests_delay_queue.len() >= MAXIMUM_QUEUED_SAMPLING_REQUESTS { - if self.sampling_request_delay_debounce.elapsed() { - error!( - queue_size = MAXIMUM_QUEUED_SAMPLING_REQUESTS, - "Sampling requests delay queue is full" - ); - } - // Drop the inbound message. - return; - } - - let id: QueuedSamplingRequestId = self.next_sampling_request_update; - self.next_sampling_request_update += 1; - - // Register the delay. - let delay_key = self - .sampling_requests_delay_queue - .insert(id, QUEUED_SAMPLING_REQUESTS_DELAY); - - self.awaiting_sampling_requests_per_block_root - .entry(queued_sampling_request.beacon_block_root) - .or_default() - .push(id); - - self.queued_sampling_requests - .insert(id, (queued_sampling_request, delay_key)); - } InboundEvent::Msg(BlockImported { block_root, parent_root, @@ -751,48 +696,6 @@ impl ReprocessQueue { ); } } - // Unqueue the sampling requests we have for this root, if any. - if let Some(queued_ids) = self - .awaiting_sampling_requests_per_block_root - .remove(&block_root) - { - let mut sent_count = 0; - let mut failed_to_send_count = 0; - - for id in queued_ids { - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_REPROCESSING_QUEUE_MATCHED_SAMPLING_REQUESTS, - ); - - if let Some((queued, delay_key)) = self.queued_sampling_requests.remove(&id) - { - // Remove the delay. - self.sampling_requests_delay_queue.remove(&delay_key); - - // Send the work. - let work = ReadyWork::SamplingRequest(queued); - - if self.ready_work_tx.try_send(work).is_err() { - failed_to_send_count += 1; - } else { - sent_count += 1; - } - } else { - // This should never happen. - error!(?block_root, ?id, "Unknown sampling request for block root"); - } - } - - if failed_to_send_count > 0 { - error!( - hint = "system may be overloaded", - ?block_root, - failed_to_send_count, - sent_count, - "Ignored scheduled sampling requests for block" - ); - } - } } InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => { // Unqueue the light client optimistic updates we have for this root, if any. diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 3013596f9f7..2bc5388a302 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -3,7 +3,7 @@ use libp2p::PeerId; use std::fmt::{Display, Formatter}; use std::sync::Arc; use types::{ - BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, + BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, }; @@ -87,9 +87,9 @@ pub enum RangeRequestId { BackfillSync { batch_id: Epoch }, } +// TODO review enum #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { - Sampling(SamplingId), Custody(CustodyId), } @@ -99,21 +99,6 @@ pub enum RangeRequester { BackfillSync { batch_id: Epoch }, } -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct SamplingId { - pub id: SamplingRequester, - pub sampling_request_id: SamplingRequestId, -} - -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub enum SamplingRequester { - ImportedBlock(Hash256), -} - -/// Identifier of sampling requests. -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct SamplingRequestId(pub usize); - #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct CustodyId { pub requester: CustodyRequester, @@ -231,13 +216,11 @@ impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); impl_display!(CustodyId, "{}", requester); -impl_display!(SamplingId, "{}/{}", sampling_request_id, id); impl Display for DataColumnsByRootRequester { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Self::Custody(id) => write!(f, "Custody/{id}"), - Self::Sampling(id) => write!(f, "Sampling/{id}"), } } } @@ -257,20 +240,6 @@ impl Display for RangeRequestId { } } -impl Display for SamplingRequestId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Display for SamplingRequester { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::ImportedBlock(block) => write!(f, "ImportedBlock/{block}"), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -289,18 +258,6 @@ mod tests { assert_eq!(format!("{id}"), "123/Custody/121/Lookup/101"); } - #[test] - fn display_id_data_columns_by_root_sampling() { - let id = DataColumnsByRootRequestId { - id: 123, - requester: DataColumnsByRootRequester::Sampling(SamplingId { - id: SamplingRequester::ImportedBlock(Hash256::ZERO), - sampling_request_id: SamplingRequestId(101), - }), - }; - assert_eq!(format!("{id}"), "123/Sampling/101/ImportedBlock/0x0000000000000000000000000000000000000000000000000000000000000000"); - } - #[test] fn display_id_data_columns_by_range() { let id = DataColumnsByRangeRequestId { diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 05c7dc287b0..24a179fa807 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -17,9 +17,6 @@ use strum::IntoEnumIterator; use types::DataColumnSubnetId; use types::EthSpec; -pub const SUCCESS: &str = "SUCCESS"; -pub const FAILURE: &str = "FAILURE"; - #[derive(Debug, AsRefStr)] pub(crate) enum BlockSource { Gossip, @@ -611,31 +608,6 @@ pub static BEACON_PROCESSOR_REPROCESSING_QUEUE_SENT_OPTIMISTIC_UPDATES: LazyLock ) }); -/* - * Sampling - */ -pub static SAMPLE_DOWNLOAD_RESULT: LazyLock> = LazyLock::new(|| { - try_create_int_counter_vec( - "beacon_sampling_sample_verify_result_total", - "Total count of individual sample download results", - &["result"], - ) -}); -pub static SAMPLE_VERIFY_RESULT: LazyLock> = LazyLock::new(|| { - try_create_int_counter_vec( - "beacon_sampling_sample_verify_result_total", - "Total count of individual sample verify results", - &["result"], - ) -}); -pub static SAMPLING_REQUEST_RESULT: LazyLock> = LazyLock::new(|| { - try_create_int_counter_vec( - "beacon_sampling_request_result_total", - "Total count of sample request results", - &["result"], - ) -}); - pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) { inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]); } @@ -683,13 +655,6 @@ pub(crate) fn register_process_result_metrics( } } -pub fn from_result(result: &std::result::Result) -> &str { - match result { - Ok(_) => SUCCESS, - Err(_) => FAILURE, - } -} - pub fn update_gossip_metrics( gossipsub: &Gossipsub, network_globals: &Arc>, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 0b17965f3cb..47d15465063 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1436,21 +1436,6 @@ impl NetworkBeaconProcessor { let block = verified_block.block.block_cloned(); let block_root = verified_block.block_root; - // Note: okay to issue sampling request before the block is execution verified. If the - // proposer sends us a block with invalid blob transactions it can trigger us to issue - // sampling queries that will never resolve. This attack is equivalent to withholding data. - // Dismissed proposal to move this block to post-execution: https://github.com/sigp/lighthouse/pull/6492 - if block.num_expected_blobs() > 0 { - // Trigger sampling for block not yet execution valid. At this point column custodials are - // unlikely to have received their columns. Triggering sampling so early is only viable with - // either: - // - Sync delaying sampling until some latter window - // - Re-processing early sampling requests: https://github.com/sigp/lighthouse/pull/5569 - if self.chain.should_sample_slot(block.slot()) { - self.send_sync_message(SyncMessage::SampleBlock(block_root, block.slot())); - } - } - // Block is gossip valid. Attempt to fetch blobs from the EL using versioned hashes derived // from kzg commitments, without having to wait for all blobs to be sent from the peers. let publish_blobs = true; diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f7c3a1bf8db..19305e05ffd 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,5 +1,4 @@ use crate::sync::manager::BlockProcessType; -use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::RpcBlock; @@ -498,43 +497,6 @@ impl NetworkBeaconProcessor { }) } - /// Create a new `Work` event for some sampling columns, and reports the verification result - /// back to sync. - pub fn send_rpc_validate_data_columns( - self: &Arc, - block_root: Hash256, - data_columns: Vec>>, - seen_timestamp: Duration, - id: SamplingId, - ) -> Result<(), Error> { - let s = self.clone(); - self.try_send(BeaconWorkEvent { - drop_during_sync: false, - work: Work::RpcVerifyDataColumn(Box::pin(async move { - let result = s - .clone() - .validate_rpc_data_columns(block_root, data_columns, seen_timestamp) - .await; - // Sync handles these results - s.send_sync_message(SyncMessage::SampleVerified { id, result }); - })), - }) - } - - /// Create a new `Work` event with a block sampling completed result - pub fn send_sampling_completed( - self: &Arc, - block_root: Hash256, - ) -> Result<(), Error> { - let nbp = self.clone(); - self.try_send(BeaconWorkEvent { - drop_during_sync: false, - work: Work::SamplingResult(Box::pin(async move { - nbp.process_sampling_completed(block_root).await; - })), - }) - } - /// Create a new work event to import `blocks` as a beacon chain segment. pub fn send_chain_segment( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index cff6e26165b..32c4705ea89 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -8,7 +8,6 @@ use crate::sync::{ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; -use beacon_chain::data_column_verification::verify_kzg_for_data_column_list; use beacon_chain::{ validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -25,7 +24,7 @@ use store::KzgCommitment; use tracing::{debug, error, info, warn}; use types::beacon_block_body::format_kzg_commitments; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlockImportSource, DataColumnSidecar, DataColumnSidecarList, Epoch, Hash256}; +use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -142,7 +141,6 @@ impl NetworkBeaconProcessor { }; let slot = block.slot(); - let block_has_data = block.as_block().num_expected_blobs() > 0; let parent_root = block.message().parent_root(); let commitments_formatted = block.as_block().commitments_formatted(); @@ -215,17 +213,6 @@ impl NetworkBeaconProcessor { _ => {} } - // RPC block imported or execution validated. If the block was already imported by gossip we - // receive Err(BlockError::AlreadyKnown). - if result.is_ok() && - // Block has at least one blob, so it produced columns - block_has_data && - // Block slot is within the DA boundary (should always be the case) and PeerDAS is activated - self.chain.should_sample_slot(slot) - { - self.send_sync_message(SyncMessage::SampleBlock(block_root, slot)); - } - // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, @@ -426,25 +413,6 @@ impl NetworkBeaconProcessor { }); } - /// Validate a list of data columns received from RPC requests - pub async fn validate_rpc_data_columns( - self: Arc>, - _block_root: Hash256, - data_columns: Vec>>, - _seen_timestamp: Duration, - ) -> Result<(), String> { - verify_kzg_for_data_column_list(data_columns.iter(), &self.chain.kzg) - .map_err(|err| format!("{err:?}")) - } - - /// Process a sampling completed event, inserting it into fork-choice - pub async fn process_sampling_completed( - self: Arc>, - block_root: Hash256, - ) { - self.chain.process_sampling_completed(block_root).await; - } - /// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync /// thread if more blocks are needed to process it. pub async fn process_chain_segment( @@ -570,15 +538,6 @@ impl NetworkBeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_CHAIN_SEGMENT_SUCCESS_TOTAL); if !imported_blocks.is_empty() { self.chain.recompute_head_at_current_slot().await; - - for (block_root, block_slot) in &imported_blocks { - if self.chain.should_sample_slot(*block_slot) { - self.send_sync_message(SyncMessage::SampleBlock( - *block_root, - *block_slot, - )); - } - } } (imported_blocks.len(), Ok(())) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 81b22b99e89..944f55dba1e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -38,7 +38,6 @@ use super::block_lookups::BlockLookups; use super::network_context::{ CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext, }; -use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; @@ -58,7 +57,7 @@ use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, - SamplingId, SamplingRequester, SingleLookupReqId, SyncRequestId, + SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; @@ -69,14 +68,11 @@ use std::ops::Sub; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use tracing::{debug, error, info, info_span, trace, warn, Instrument}; +use tracing::{debug, error, info, info_span, trace, Instrument}; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, }; -#[cfg(test)] -use types::ColumnIndex; - /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// from a peer. If a peer is within this tolerance (forwards or backwards), it is treated as a /// fully sync'd peer. @@ -146,10 +142,6 @@ pub enum SyncMessage { /// manager to attempt to find the block matching the unknown hash. UnknownBlockHashFromAttestation(PeerId, Hash256), - /// Request to start sampling a block. Caller should ensure that block has data before sending - /// the request. - SampleBlock(Hash256, Slot), - /// A peer has disconnected. Disconnect(PeerId), @@ -172,12 +164,6 @@ pub enum SyncMessage { result: BlockProcessingResult, }, - /// Sample data column verified - SampleVerified { - id: SamplingId, - result: Result<(), String>, - }, - /// A block from gossip has completed processing, GossipBlockProcessResult { block_root: Hash256, imported: bool }, } @@ -248,8 +234,6 @@ pub struct SyncManager { /// may forward us thousands of a attestations, each one triggering an individual event. Only /// one event is useful, the rest generating log noise and wasted cycles notified_unknown_roots: LRUTimeCache<(PeerId, Hash256)>, - - sampling: Sampling, } /// Spawns a new `SyncManager` thread which has a weak reference to underlying beacon @@ -274,7 +258,6 @@ pub fn spawn( network_send, beacon_processor, sync_recv, - SamplingConfig::Default, fork_context, ); @@ -296,7 +279,6 @@ impl SyncManager { network_send: mpsc::UnboundedSender>, beacon_processor: Arc>, sync_recv: mpsc::UnboundedReceiver>, - sampling_config: SamplingConfig, fork_context: Arc, ) -> Self { let network_globals = beacon_processor.network_globals.clone(); @@ -315,7 +297,6 @@ impl SyncManager { notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( NOTIFIED_UNKNOWN_ROOT_EXPIRY_SECONDS, )), - sampling: Sampling::new(sampling_config), } } @@ -360,20 +341,6 @@ impl SyncManager { self.block_lookups.insert_failed_chain(block_root); } - #[cfg(test)] - pub(crate) fn active_sampling_requests(&self) -> Vec { - self.sampling.active_sampling_requests() - } - - #[cfg(test)] - pub(crate) fn get_sampling_request_status( - &self, - block_root: Hash256, - index: &ColumnIndex, - ) -> Option { - self.sampling.get_request_status(block_root, index) - } - #[cfg(test)] pub(crate) fn update_execution_engine_state(&mut self, state: EngineState) { self.handle_new_execution_engine_state(state); @@ -853,15 +820,6 @@ impl SyncManager { self.handle_unknown_block_root(peer_id, block_root); } } - SyncMessage::SampleBlock(block_root, block_slot) => { - debug!(%block_root, slot = %block_slot, "Received SampleBlock message"); - if let Some((requester, result)) = self - .sampling - .on_new_sample_request(block_root, &mut self.network) - { - self.on_sampling_result(requester, result) - } - } SyncMessage::Disconnect(peer_id) => { debug!(%peer_id, "Received disconnected message"); self.peer_disconnect(&peer_id); @@ -911,14 +869,6 @@ impl SyncManager { } } }, - SyncMessage::SampleVerified { id, result } => { - if let Some((requester, result)) = - self.sampling - .on_sample_verified(id, result, &mut self.network) - { - self.on_sampling_result(requester, result) - } - } } } @@ -1175,14 +1125,6 @@ impl SyncManager { .on_data_columns_by_root_response(req_id, peer_id, data_column) { match req_id.requester { - DataColumnsByRootRequester::Sampling(id) => { - if let Some((requester, result)) = - self.sampling - .on_sample_downloaded(id, peer_id, resp, &mut self.network) - { - self.on_sampling_result(requester, result) - } - } DataColumnsByRootRequester::Custody(custody_id) => { if let Some(result) = self .network @@ -1256,31 +1198,6 @@ impl SyncManager { ); } - fn on_sampling_result(&mut self, requester: SamplingRequester, result: SamplingResult) { - match requester { - SamplingRequester::ImportedBlock(block_root) => { - debug!(%block_root, ?result, "Sampling result"); - - match result { - Ok(_) => { - // Notify the fork-choice of a successful sampling result to mark the block - // branch as safe. - if let Err(e) = self - .network - .beacon_processor() - .send_sampling_completed(block_root) - { - warn!(?block_root, reason = ?e, "Error sending sampling result"); - } - } - Err(e) => { - warn!(?block_root, reason = ?e, "Sampling failed"); - } - } - } - } - } - /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. fn on_range_components_response( diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 0f5fd6fb9f1..4dab2e17d3f 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -6,12 +6,10 @@ mod block_lookups; mod block_sidecar_coupling; pub mod manager; mod network_context; -mod peer_sampling; mod peer_sync_info; mod range_sync; #[cfg(test)] mod tests; -pub use lighthouse_network::service::api_types::SamplingId; pub use manager::{BatchProcessResult, SyncMessage}; pub use range_sync::{BatchOperationOutcome, ChainId}; diff --git a/beacon_node/network/src/sync/peer_sampling.rs b/beacon_node/network/src/sync/peer_sampling.rs deleted file mode 100644 index 4ad77176aae..00000000000 --- a/beacon_node/network/src/sync/peer_sampling.rs +++ /dev/null @@ -1,735 +0,0 @@ -use self::request::ActiveColumnSampleRequest; -#[cfg(test)] -pub(crate) use self::request::Status; -use super::network_context::{ - DataColumnsByRootSingleBlockRequest, RpcResponseError, SyncNetworkContext, -}; -use crate::metrics; -use beacon_chain::BeaconChainTypes; -use fnv::FnvHashMap; -use lighthouse_network::service::api_types::{ - DataColumnsByRootRequester, SamplingId, SamplingRequestId, SamplingRequester, -}; -use lighthouse_network::{PeerAction, PeerId}; -use rand::{seq::SliceRandom, thread_rng}; -use std::{ - collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc, - time::Duration, -}; -use tracing::{debug, error, instrument, warn}; -use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256}; - -pub type SamplingResult = Result<(), SamplingError>; - -type DataColumnSidecarList = Vec>>; - -pub struct Sampling { - requests: HashMap>, - sampling_config: SamplingConfig, -} - -impl Sampling { - #[instrument(parent = None, fields(service = "sampling"), name = "sampling")] - pub fn new(sampling_config: SamplingConfig) -> Self { - Self { - requests: <_>::default(), - sampling_config, - } - } - - #[cfg(test)] - #[instrument(parent = None, - fields(service = "sampling"), - name = "sampling", - skip_all - )] - pub fn active_sampling_requests(&self) -> Vec { - self.requests.values().map(|r| r.block_root).collect() - } - - #[cfg(test)] - #[instrument(parent = None, - fields(service = "sampling"), - name = "sampling", - skip_all - )] - pub fn get_request_status( - &self, - block_root: Hash256, - index: &ColumnIndex, - ) -> Option { - let requester = SamplingRequester::ImportedBlock(block_root); - self.requests - .get(&requester) - .and_then(|req| req.get_request_status(index)) - } - - /// Create a new sampling request for a known block - /// - /// ### Returns - /// - /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. - /// - `None`: Request still active, requester should do no action - #[instrument(parent = None, - fields(service = "sampling"), - name = "sampling", - skip_all - )] - pub fn on_new_sample_request( - &mut self, - block_root: Hash256, - cx: &mut SyncNetworkContext, - ) -> Option<(SamplingRequester, SamplingResult)> { - let id = SamplingRequester::ImportedBlock(block_root); - - let request = match self.requests.entry(id) { - Entry::Vacant(e) => e.insert(ActiveSamplingRequest::new( - block_root, - id, - &self.sampling_config, - &cx.chain.spec, - )), - Entry::Occupied(_) => { - // Sampling is triggered from multiple sources, duplicate sampling requests are - // likely (gossip block + gossip data column) - // TODO(das): Should track failed sampling request for some time? Otherwise there's - // a risk of a loop with multiple triggers creating the request, then failing, - // and repeat. - debug!(?id, "Ignoring duplicate sampling request"); - return None; - } - }; - - debug!( - ?id, - column_selection = ?request.column_selection(), - "Created new sample request" - ); - - // TOOD(das): If a node has very little peers, continue_sampling() will attempt to find enough - // to sample here, immediately failing the sampling request. There should be some grace - // period to allow the peer manager to find custody peers. - let result = request.continue_sampling(cx); - self.handle_sampling_result(result, &id) - } - - /// Insert a downloaded column into an active sampling request. Then make progress on the - /// entire request. - /// - /// ### Returns - /// - /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. - /// - `None`: Request still active, requester should do no action - #[instrument(parent = None, - fields(service = "sampling"), - name = "sampling", - skip_all - )] - pub fn on_sample_downloaded( - &mut self, - id: SamplingId, - peer_id: PeerId, - resp: Result<(DataColumnSidecarList, Duration), RpcResponseError>, - cx: &mut SyncNetworkContext, - ) -> Option<(SamplingRequester, SamplingResult)> { - let Some(request) = self.requests.get_mut(&id.id) else { - // TOOD(das): This log can happen if the request is error'ed early and dropped - debug!(?id, "Sample downloaded event for unknown request"); - return None; - }; - - let result = request.on_sample_downloaded(peer_id, id.sampling_request_id, resp, cx); - self.handle_sampling_result(result, &id.id) - } - - /// Insert a downloaded column into an active sampling request. Then make progress on the - /// entire request. - /// - /// ### Returns - /// - /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. - /// - `None`: Request still active, requester should do no action - #[instrument(parent = None, - fields(service = "sampling"), - name = "sampling", - skip_all - )] - pub fn on_sample_verified( - &mut self, - id: SamplingId, - result: Result<(), String>, - cx: &mut SyncNetworkContext, - ) -> Option<(SamplingRequester, SamplingResult)> { - let Some(request) = self.requests.get_mut(&id.id) else { - // TOOD(das): This log can happen if the request is error'ed early and dropped - debug!(?id, "Sample verified event for unknown request"); - return None; - }; - - let result = request.on_sample_verified(id.sampling_request_id, result, cx); - self.handle_sampling_result(result, &id.id) - } - - /// Converts a result from the internal format of `ActiveSamplingRequest` (error first to use ? - /// conveniently), to an Option first format to use an `if let Some() { act on result }` pattern - /// in the sync manager. - #[instrument(parent = None, - fields(service = "sampling"), - name = "sampling", - skip_all - )] - fn handle_sampling_result( - &mut self, - result: Result, SamplingError>, - id: &SamplingRequester, - ) -> Option<(SamplingRequester, SamplingResult)> { - let result = result.transpose(); - if let Some(result) = result { - debug!(?id, ?result, "Sampling request completed, removing"); - metrics::inc_counter_vec( - &metrics::SAMPLING_REQUEST_RESULT, - &[metrics::from_result(&result)], - ); - self.requests.remove(id); - Some((*id, result)) - } else { - None - } - } -} - -pub struct ActiveSamplingRequest { - block_root: Hash256, - requester_id: SamplingRequester, - column_requests: FnvHashMap, - /// Mapping of column indexes for a sampling request. - column_indexes_by_sampling_request: FnvHashMap>, - /// Sequential ID for sampling requests. - current_sampling_request_id: SamplingRequestId, - column_shuffle: Vec, - required_successes: Vec, - _phantom: PhantomData, -} - -#[derive(Debug)] -pub enum SamplingError { - SendFailed(#[allow(dead_code)] &'static str), - ProcessorUnavailable, - TooManyFailures, - BadState(#[allow(dead_code)] String), - ColumnIndexOutOfBounds, -} - -/// Required success index by current failures, with p_target=5.00E-06 -/// Ref: https://colab.research.google.com/drive/18uUgT2i-m3CbzQ5TyP9XFKqTn1DImUJD#scrollTo=E82ITcgB5ATh -const REQUIRED_SUCCESSES: [usize; 11] = [16, 20, 23, 26, 29, 32, 34, 37, 39, 42, 44]; - -#[derive(Debug, Clone)] -pub enum SamplingConfig { - Default, - #[allow(dead_code)] - Custom { - required_successes: Vec, - }, -} - -impl ActiveSamplingRequest { - fn new( - block_root: Hash256, - requester_id: SamplingRequester, - sampling_config: &SamplingConfig, - spec: &ChainSpec, - ) -> Self { - // Select ahead of time the full list of to-sample columns - let mut column_shuffle = - (0..spec.number_of_columns as ColumnIndex).collect::>(); - let mut rng = thread_rng(); - column_shuffle.shuffle(&mut rng); - - Self { - block_root, - requester_id, - column_requests: <_>::default(), - column_indexes_by_sampling_request: <_>::default(), - current_sampling_request_id: SamplingRequestId(0), - column_shuffle, - required_successes: match sampling_config { - SamplingConfig::Default => REQUIRED_SUCCESSES.to_vec(), - SamplingConfig::Custom { required_successes } => required_successes.clone(), - }, - _phantom: PhantomData, - } - } - - #[cfg(test)] - pub fn get_request_status(&self, index: &ColumnIndex) -> Option { - self.column_requests.get(index).map(|req| req.status()) - } - - /// Return the current ordered list of columns that this requests has to sample to succeed - pub(crate) fn column_selection(&self) -> Vec { - self.column_shuffle - .iter() - .take(REQUIRED_SUCCESSES[0]) - .copied() - .collect() - } - - /// Insert a downloaded column into an active sampling request. Then make progress on the - /// entire request. - /// - /// ### Returns - /// - /// - `Err`: Sampling request has failed and will be dropped - /// - `Ok(Some)`: Sampling request has successfully completed and will be dropped - /// - `Ok(None)`: Sampling request still active - pub(crate) fn on_sample_downloaded( - &mut self, - _peer_id: PeerId, - sampling_request_id: SamplingRequestId, - resp: Result<(DataColumnSidecarList, Duration), RpcResponseError>, - cx: &mut SyncNetworkContext, - ) -> Result, SamplingError> { - // Select columns to sample - // Create individual request per column - // Progress requests - // If request fails retry or expand search - // If all good return - let Some(column_indexes) = self - .column_indexes_by_sampling_request - .get(&sampling_request_id) - else { - error!( - ?sampling_request_id, - "Column indexes for the sampling request ID not found" - ); - return Ok(None); - }; - - match resp { - Ok((mut resp_data_columns, seen_timestamp)) => { - let resp_column_indexes = resp_data_columns - .iter() - .map(|r| r.index) - .collect::>(); - debug!( - block_root = %self.block_root, - column_indexes = ?resp_column_indexes, - count = resp_data_columns.len(), - "Sample download success" - ); - metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::SUCCESS]); - - // Filter the data received in the response using the requested column indexes. - let mut data_columns = vec![]; - for column_index in column_indexes { - let Some(request) = self.column_requests.get_mut(column_index) else { - warn!( - block_root = %self.block_root, - column_index, - "Active column sample request not found" - ); - continue; - }; - - let Some(data_pos) = resp_data_columns - .iter() - .position(|data| &data.index == column_index) - else { - // Peer does not have the requested data, mark peer as "dont have" and try - // again with a different peer. - debug!( - block_root = %self.block_root, - column_index, - "Sampling peer claims to not have the data" - ); - request.on_sampling_error()?; - continue; - }; - - data_columns.push(resp_data_columns.swap_remove(data_pos)); - } - - if !resp_data_columns.is_empty() { - let resp_column_indexes = resp_data_columns - .iter() - .map(|d| d.index) - .collect::>(); - debug!( - block_root = %self.block_root, - column_indexes = ?resp_column_indexes, - "Received data that was not requested" - ); - } - - // Handle the downloaded data columns. - if data_columns.is_empty() { - debug!(block_root = %self.block_root, "Received empty response"); - self.column_indexes_by_sampling_request - .remove(&sampling_request_id); - } else { - // Overwrite `column_indexes` with the column indexes received in the response. - let column_indexes = data_columns.iter().map(|d| d.index).collect::>(); - self.column_indexes_by_sampling_request - .insert(sampling_request_id, column_indexes.clone()); - // Peer has data column, send to verify - let Some(beacon_processor) = cx.beacon_processor_if_enabled() else { - // If processor is not available, error the entire sampling - debug!( - block = %self.block_root, - reason = "beacon processor unavailable", - "Dropping sampling" - ); - return Err(SamplingError::ProcessorUnavailable); - }; - debug!( - block = ?self.block_root, - ?column_indexes, - "Sending data_column for verification" - ); - if let Err(e) = beacon_processor.send_rpc_validate_data_columns( - self.block_root, - data_columns, - seen_timestamp, - SamplingId { - id: self.requester_id, - sampling_request_id, - }, - ) { - // Beacon processor is overloaded, drop sampling attempt. Failing to sample - // is not a permanent state so we should recover once the node has capacity - // and receives a descendant block. - error!( - block = %self.block_root, - reason = e.to_string(), - "Dropping sampling" - ); - return Err(SamplingError::SendFailed("beacon processor send failure")); - } - } - } - Err(err) => { - debug!( - block_root = %self.block_root, - ?column_indexes, - error = ?err, - "Sample download error" - ); - metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::FAILURE]); - - // Error downloading, malicious network errors are already penalized before - // reaching this function. Mark the peer as failed and try again with another. - for column_index in column_indexes { - let Some(request) = self.column_requests.get_mut(column_index) else { - warn!( - block_root = %self.block_root, - column_index, - "Active column sample request not found" - ); - continue; - }; - request.on_sampling_error()?; - } - } - }; - - self.continue_sampling(cx) - } - - /// Insert a column verification result into an active sampling request. Then make progress - /// on the entire request. - /// - /// ### Returns - /// - /// - `Err`: Sampling request has failed and will be dropped - /// - `Ok(Some)`: Sampling request has successfully completed and will be dropped - /// - `Ok(None)`: Sampling request still active - pub(crate) fn on_sample_verified( - &mut self, - sampling_request_id: SamplingRequestId, - result: Result<(), String>, - cx: &mut SyncNetworkContext, - ) -> Result, SamplingError> { - let Some(column_indexes) = self - .column_indexes_by_sampling_request - .get(&sampling_request_id) - else { - error!( - ?sampling_request_id, - "Column indexes for the sampling request ID not found" - ); - return Ok(None); - }; - - match result { - Ok(_) => { - debug!(block_root = %self.block_root,?column_indexes, "Sample verification success"); - metrics::inc_counter_vec(&metrics::SAMPLE_VERIFY_RESULT, &[metrics::SUCCESS]); - - // Valid, continue_sampling will maybe consider sampling succees - for column_index in column_indexes { - let Some(request) = self.column_requests.get_mut(column_index) else { - warn!( - block_root = %self.block_root, column_index, - "Active column sample request not found" - ); - continue; - }; - request.on_sampling_success()?; - } - } - Err(err) => { - debug!(block_root = %self.block_root, ?column_indexes, reason = ?err, "Sample verification failure"); - metrics::inc_counter_vec(&metrics::SAMPLE_VERIFY_RESULT, &[metrics::FAILURE]); - - // Peer sent invalid data, penalize and try again from different peer - // TODO(das): Count individual failures - for column_index in column_indexes { - let Some(request) = self.column_requests.get_mut(column_index) else { - warn!( - block_root = %self.block_root, - column_index, - "Active column sample request not found" - ); - continue; - }; - let peer_id = request.on_sampling_error()?; - cx.report_peer( - peer_id, - PeerAction::LowToleranceError, - "invalid data column", - ); - } - } - } - - self.continue_sampling(cx) - } - - pub(crate) fn continue_sampling( - &mut self, - cx: &mut SyncNetworkContext, - ) -> Result, SamplingError> { - // First check if sampling is completed, by computing `required_successes` - let mut successes = 0; - let mut failures = 0; - let mut ongoings = 0; - - for request in self.column_requests.values() { - if request.is_completed() { - successes += 1; - } - if request.is_failed() { - failures += 1; - } - if request.is_ongoing() { - ongoings += 1; - } - } - - // If there are too many failures, consider the sampling failed - let Some(required_successes) = self.required_successes.get(failures) else { - return Err(SamplingError::TooManyFailures); - }; - - // If there are enough successes, consider the sampling complete - if successes >= *required_successes { - return Ok(Some(())); - } - - // First, attempt to progress sampling by requesting more columns, so that request failures - // are accounted for below. - - // Group the requested column indexes by the destination peer to batch sampling requests. - let mut column_indexes_to_request = FnvHashMap::default(); - for idx in 0..*required_successes { - // Re-request columns. Note: out of bounds error should never happen, inputs are hardcoded - let column_index = *self - .column_shuffle - .get(idx) - .ok_or(SamplingError::ColumnIndexOutOfBounds)?; - let request = self - .column_requests - .entry(column_index) - .or_insert(ActiveColumnSampleRequest::new(column_index)); - - if request.is_ready_to_request() { - if let Some(peer_id) = request.choose_peer(cx) { - let indexes = column_indexes_to_request.entry(peer_id).or_insert(vec![]); - indexes.push(column_index); - } - } - } - - // Send requests. - let mut sent_request = false; - for (peer_id, column_indexes) in column_indexes_to_request { - cx.data_column_lookup_request( - DataColumnsByRootRequester::Sampling(SamplingId { - id: self.requester_id, - sampling_request_id: self.current_sampling_request_id, - }), - peer_id, - DataColumnsByRootSingleBlockRequest { - block_root: self.block_root, - indices: column_indexes.clone(), - }, - // false = We issue request to custodians who may or may not have received the - // samples yet. We don't any signal (like an attestation or status messages that the - // custodian has received data). - false, - ) - .map_err(SamplingError::SendFailed)?; - self.column_indexes_by_sampling_request - .insert(self.current_sampling_request_id, column_indexes.clone()); - self.current_sampling_request_id.0 += 1; - sent_request = true; - - // Update request status. - for column_index in column_indexes { - let Some(request) = self.column_requests.get_mut(&column_index) else { - continue; - }; - request.on_start_sampling(peer_id)?; - } - } - - // Make sure that sampling doesn't stall, by ensuring that this sampling request will - // receive a new event of some type. If there are no ongoing requests, and no new - // request was sent, loop to increase the required_successes until the sampling fails if - // there are no peers. - if ongoings == 0 && !sent_request { - debug!(block_root = %self.block_root, "Sampling request stalled"); - } - - Ok(None) - } -} - -mod request { - use super::SamplingError; - use crate::sync::network_context::SyncNetworkContext; - use beacon_chain::BeaconChainTypes; - use lighthouse_network::PeerId; - use rand::seq::SliceRandom; - use rand::thread_rng; - use std::collections::HashSet; - use types::data_column_sidecar::ColumnIndex; - - pub(crate) struct ActiveColumnSampleRequest { - column_index: ColumnIndex, - status: Status, - // TODO(das): Should downscore peers that claim to not have the sample? - peers_dont_have: HashSet, - } - - // Exposed only for testing assertions in lookup tests - #[derive(Debug, Clone)] - pub(crate) enum Status { - NoPeers, - NotStarted, - Sampling(PeerId), - Verified, - } - - impl ActiveColumnSampleRequest { - pub(crate) fn new(column_index: ColumnIndex) -> Self { - Self { - column_index, - status: Status::NotStarted, - peers_dont_have: <_>::default(), - } - } - - pub(crate) fn is_completed(&self) -> bool { - match self.status { - Status::NoPeers | Status::NotStarted | Status::Sampling(_) => false, - Status::Verified => true, - } - } - - pub(crate) fn is_failed(&self) -> bool { - match self.status { - Status::NotStarted | Status::Sampling(_) | Status::Verified => false, - Status::NoPeers => true, - } - } - - pub(crate) fn is_ongoing(&self) -> bool { - match self.status { - Status::NotStarted | Status::NoPeers | Status::Verified => false, - Status::Sampling(_) => true, - } - } - - pub(crate) fn is_ready_to_request(&self) -> bool { - match self.status { - Status::NoPeers | Status::NotStarted => true, - Status::Sampling(_) | Status::Verified => false, - } - } - - #[cfg(test)] - pub(crate) fn status(&self) -> Status { - self.status.clone() - } - - pub(crate) fn choose_peer( - &mut self, - cx: &SyncNetworkContext, - ) -> Option { - // TODO: When is a fork and only a subset of your peers know about a block, sampling should only - // be queried on the peers on that fork. Should this case be handled? How to handle it? - let mut peer_ids = cx.get_custodial_peers(self.column_index); - - peer_ids.retain(|peer_id| !self.peers_dont_have.contains(peer_id)); - - if let Some(peer_id) = peer_ids.choose(&mut thread_rng()) { - Some(*peer_id) - } else { - self.status = Status::NoPeers; - None - } - } - - pub(crate) fn on_start_sampling(&mut self, peer_id: PeerId) -> Result<(), SamplingError> { - match self.status.clone() { - Status::NoPeers | Status::NotStarted => { - self.status = Status::Sampling(peer_id); - Ok(()) - } - other => Err(SamplingError::BadState(format!( - "bad state on_start_sampling expected NoPeers|NotStarted got {other:?}. column_index:{}", - self.column_index - ))), - } - } - - pub(crate) fn on_sampling_error(&mut self) -> Result { - match self.status.clone() { - Status::Sampling(peer_id) => { - self.peers_dont_have.insert(peer_id); - self.status = Status::NotStarted; - Ok(peer_id) - } - other => Err(SamplingError::BadState(format!( - "bad state on_sampling_error expected Sampling got {other:?}. column_index:{}", - self.column_index - ))), - } - } - - pub(crate) fn on_sampling_success(&mut self) -> Result<(), SamplingError> { - match &self.status { - Status::Sampling(_) => { - self.status = Status::Verified; - Ok(()) - } - other => Err(SamplingError::BadState(format!( - "bad state on_sampling_success expected Sampling got {other:?}. column_index:{}", - self.column_index - ))), - } - } - } -} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a2c359c87e7..90c669e52ff 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -4,8 +4,7 @@ use crate::sync::block_lookups::{ }; use crate::sync::{ manager::{BlockProcessType, BlockProcessingResult, SyncManager}, - peer_sampling::SamplingConfig, - SamplingId, SyncMessage, + SyncMessage, }; use crate::NetworkMessage; use std::sync::Arc; @@ -33,7 +32,7 @@ use lighthouse_network::{ rpc::{RPCError, RequestType, RpcErrorResponse}, service::api_types::{ AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, - SamplingRequester, SingleLookupReqId, SyncRequestId, + SingleLookupReqId, SyncRequestId, }, types::SyncState, NetworkConfig, NetworkGlobals, PeerId, @@ -50,7 +49,6 @@ use types::{ const D: Duration = Duration::new(0, 0); const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; -const SAMPLING_REQUIRED_SUCCESSES: usize = 2; type DCByRootIds = Vec; type DCByRootId = (SyncRequestId, Vec); @@ -124,9 +122,6 @@ impl TestRig { beacon_processor.into(), // Pass empty recv not tied to any tx mpsc::unbounded_channel().1, - SamplingConfig::Custom { - required_successes: vec![SAMPLING_REQUIRED_SUCCESSES], - }, fork_context, ), harness, @@ -180,10 +175,6 @@ impl TestRig { )); } - fn trigger_sample_block(&mut self, block_root: Hash256, block_slot: Slot) { - self.send_sync_message(SyncMessage::SampleBlock(block_root, block_slot)) - } - /// Drain all sync messages in the sync_rx attached to the beacon processor fn drain_sync_rx(&mut self) { while let Ok(sync_message) = self.sync_rx.try_recv() { @@ -260,27 +251,6 @@ impl TestRig { ); } - fn expect_no_active_sampling(&mut self) { - assert_eq!( - self.sync_manager.active_sampling_requests(), - Vec::::new(), - "expected no active sampling" - ); - } - - fn expect_active_sampling(&mut self, block_root: &Hash256) { - assert!(self - .sync_manager - .active_sampling_requests() - .contains(block_root)); - } - - fn expect_clean_finished_sampling(&mut self) { - self.expect_empty_network(); - self.expect_sampling_result_work(); - self.expect_no_active_sampling(); - } - fn assert_parent_lookups_count(&self, count: usize) { assert_eq!( self.active_parent_lookups_count(), @@ -672,51 +642,6 @@ impl TestRig { ) } - fn complete_valid_sampling_column_requests( - &mut self, - ids: DCByRootIds, - data_columns: Vec>>, - ) { - for id in ids { - self.log(&format!("return valid data column for {id:?}")); - let indices = &id.1; - let columns_to_send = indices - .iter() - .map(|&i| data_columns[i as usize].clone()) - .collect::>(); - self.complete_valid_sampling_column_request(id, &columns_to_send); - } - } - - fn complete_valid_sampling_column_request( - &mut self, - id: DCByRootId, - data_columns: &[Arc>], - ) { - let first_dc = data_columns.first().unwrap(); - let block_root = first_dc.block_root(); - let sampling_request_id = match id.0 { - SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId { - requester: DataColumnsByRootRequester::Sampling(sampling_id), - .. - }) => sampling_id.sampling_request_id, - _ => unreachable!(), - }; - self.complete_data_columns_by_root_request(id, data_columns); - - // Expect work event - self.expect_rpc_sample_verify_work_event(); - - // Respond with valid result - self.send_sync_message(SyncMessage::SampleVerified { - id: SamplingId { - id: SamplingRequester::ImportedBlock(block_root), - sampling_request_id, - }, - result: Ok(()), - }) - } - fn complete_valid_custody_request( &mut self, ids: DCByRootIds, @@ -1047,28 +972,6 @@ impl TestRig { .unwrap_or_else(|e| panic!("Expected RPC custody column work: {e}")) } - fn expect_rpc_sample_verify_work_event(&mut self) { - self.pop_received_processor_event(|ev| { - if ev.work_type() == beacon_processor::WorkType::RpcVerifyDataColumn { - Some(()) - } else { - None - } - }) - .unwrap_or_else(|e| panic!("Expected sample verify work: {e}")) - } - - fn expect_sampling_result_work(&mut self) { - self.pop_received_processor_event(|ev| { - if ev.work_type() == beacon_processor::WorkType::SamplingResult { - Some(()) - } else { - None - } - }) - .unwrap_or_else(|e| panic!("Expected sampling result work: {e}")) - } - fn expect_no_work_event(&mut self) { self.drain_processor_rx(); assert!(self.network_rx_queue.is_empty()); @@ -1280,46 +1183,6 @@ impl TestRig { imported: false, }); } - - fn assert_sampling_request_ongoing(&self, block_root: Hash256, indices: &[ColumnIndex]) { - for index in indices { - let status = self - .sync_manager - .get_sampling_request_status(block_root, index) - .unwrap_or_else(|| panic!("No request state for {index}")); - if !matches!(status, crate::sync::peer_sampling::Status::Sampling { .. }) { - panic!("expected {block_root} {index} request to be on going: {status:?}"); - } - } - } - - fn assert_sampling_request_nopeers(&self, block_root: Hash256, indices: &[ColumnIndex]) { - for index in indices { - let status = self - .sync_manager - .get_sampling_request_status(block_root, index) - .unwrap_or_else(|| panic!("No request state for {index}")); - if !matches!(status, crate::sync::peer_sampling::Status::NoPeers) { - panic!("expected {block_root} {index} request to be no peers: {status:?}"); - } - } - } - - fn log_sampling_requests(&self, block_root: Hash256, indices: &[ColumnIndex]) { - let statuses = indices - .iter() - .map(|index| { - let status = self - .sync_manager - .get_sampling_request_status(block_root, index) - .unwrap_or_else(|| panic!("No request state for {index}")); - (index, status) - }) - .collect::>(); - self.log(&format!( - "Sampling request status for {block_root}: {statuses:?}" - )); - } } #[test] @@ -2074,137 +1937,6 @@ fn blobs_in_da_checker_skip_download() { r.expect_no_active_lookups(); } -#[test] -fn sampling_happy_path() { - let Some(mut r) = TestRig::test_setup_after_fulu() else { - return; - }; - r.new_connected_peers_for_peerdas(); - let (block, data_columns) = r.rand_block_and_data_columns(); - let block_root = block.canonical_root(); - r.trigger_sample_block(block_root, block.slot()); - // Retrieve all outgoing sample requests for random column indexes - let sampling_ids = - r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); - // Resolve all of them one by one - r.complete_valid_sampling_column_requests(sampling_ids, data_columns); - r.expect_clean_finished_sampling(); -} - -#[test] -fn sampling_with_retries() { - let Some(mut r) = TestRig::test_setup_after_fulu() else { - return; - }; - r.new_connected_peers_for_peerdas(); - // Add another supernode to ensure that the node can retry. - r.new_connected_supernode_peer(); - let (block, data_columns) = r.rand_block_and_data_columns(); - let block_root = block.canonical_root(); - r.trigger_sample_block(block_root, block.slot()); - // Retrieve all outgoing sample requests for random column indexes, and return empty responses - let sampling_ids = - r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); - r.return_empty_sampling_requests(sampling_ids); - // Expect retries for all of them, and resolve them - let sampling_ids = - r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); - r.complete_valid_sampling_column_requests(sampling_ids, data_columns); - r.expect_clean_finished_sampling(); -} - -#[test] -fn sampling_avoid_retrying_same_peer() { - let Some(mut r) = TestRig::test_setup_after_fulu() else { - return; - }; - let peer_id_1 = r.new_connected_supernode_peer(); - let peer_id_2 = r.new_connected_supernode_peer(); - let block_root = Hash256::random(); - r.trigger_sample_block(block_root, Slot::new(0)); - // Retrieve all outgoing sample requests for random column indexes, and return empty responses - let sampling_ids = - r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); - r.sampling_requests_failed(sampling_ids, peer_id_1, RPCError::Disconnected); - // Should retry the other peer - let sampling_ids = - r.expect_only_data_columns_by_root_requests(block_root, SAMPLING_REQUIRED_SUCCESSES); - r.sampling_requests_failed(sampling_ids, peer_id_2, RPCError::Disconnected); - // Expect no more retries - r.expect_empty_network(); -} - -#[test] -fn sampling_batch_requests() { - let Some(mut r) = TestRig::test_setup_after_fulu() else { - return; - }; - let _supernode = r.new_connected_supernode_peer(); - let (block, data_columns) = r.rand_block_and_data_columns(); - let block_root = block.canonical_root(); - r.trigger_sample_block(block_root, block.slot()); - - // Retrieve the sample request, which should be batched. - let (sync_request_id, column_indexes) = r - .expect_only_data_columns_by_root_requests(block_root, 1) - .pop() - .unwrap(); - assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES); - r.assert_sampling_request_ongoing(block_root, &column_indexes); - - // Resolve the request. - r.complete_valid_sampling_column_requests( - vec![(sync_request_id, column_indexes.clone())], - data_columns, - ); - r.expect_clean_finished_sampling(); -} - -#[test] -fn sampling_batch_requests_not_enough_responses_returned() { - let Some(mut r) = TestRig::test_setup_after_fulu() else { - return; - }; - let _supernode = r.new_connected_supernode_peer(); - let (block, data_columns) = r.rand_block_and_data_columns(); - let block_root = block.canonical_root(); - r.trigger_sample_block(block_root, block.slot()); - - // Retrieve the sample request, which should be batched. - let (sync_request_id, column_indexes) = r - .expect_only_data_columns_by_root_requests(block_root, 1) - .pop() - .unwrap(); - assert_eq!(column_indexes.len(), SAMPLING_REQUIRED_SUCCESSES); - - // The request status should be set to Sampling. - r.assert_sampling_request_ongoing(block_root, &column_indexes); - - // Split the indexes to simulate the case where the supernode doesn't have the requested column. - let (column_indexes_supernode_does_not_have, column_indexes_to_complete) = - column_indexes.split_at(1); - - // Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs. - let data_columns_to_complete = data_columns - .iter() - .filter(|d| column_indexes_to_complete.contains(&d.index)) - .cloned() - .collect::>(); - r.complete_data_columns_by_root_request( - (sync_request_id, column_indexes.clone()), - &data_columns_to_complete, - ); - - // The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses. - r.log_sampling_requests(block_root, &column_indexes); - r.assert_sampling_request_nopeers(block_root, column_indexes_supernode_does_not_have); - - // The sampling request stalls. - r.expect_empty_network(); - r.expect_no_work_event(); - r.expect_active_sampling(&block_root); -} - #[test] fn custody_lookup_happy_path() { let Some(mut r) = TestRig::test_setup_after_fulu() else { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index eb27a03552b..e58320010f2 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -78,15 +78,6 @@ pub fn cli_app() -> Command { .hide(true) .display_order(0) ) - .arg( - Arg::new("enable-sampling") - .long("enable-sampling") - .action(ArgAction::SetTrue) - .help_heading(FLAG_HEADER) - .help("Enable peer sampling on data columns. Disabled by default.") - .hide(true) - .display_order(0) - ) .arg( Arg::new("blob-publication-batches") .long("blob-publication-batches") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f55b91d58c3..1cf56ae043f 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -192,10 +192,6 @@ pub fn get_config( client_config.chain.shuffling_cache_size = cache_size; } - if cli_args.get_flag("enable-sampling") { - client_config.chain.enable_sampling = true; - } - if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? { client_config.chain.blob_publication_batches = batches; } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 884e5eddeba..e064096aec1 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -799,13 +799,6 @@ fn network_subscribe_all_data_column_subnets_flag() { .with_config(|config| assert!(config.network.subscribe_all_data_column_subnets)); } #[test] -fn network_enable_sampling_flag() { - CommandLineTest::new() - .flag("enable-sampling", None) - .run_with_zero_port() - .with_config(|config| assert!(config.chain.enable_sampling)); -} -#[test] fn blob_publication_batches() { CommandLineTest::new() .flag("blob-publication-batches", Some("3")) @@ -826,12 +819,6 @@ fn blob_publication_batch_interval() { }); } -#[test] -fn network_enable_sampling_flag_default() { - CommandLineTest::new() - .run_with_zero_port() - .with_config(|config| assert!(!config.chain.enable_sampling)); -} #[test] fn network_subscribe_all_subnets_flag() { CommandLineTest::new() From fefd40295dbaf72518edb92bd2ff45fd09bcd586 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 22 Jul 2025 14:27:54 +1000 Subject: [PATCH 2/3] Update TODO comment. --- beacon_node/lighthouse_network/src/service/api_types.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 2bc5388a302..0f5fd99c279 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -87,7 +87,8 @@ pub enum RangeRequestId { BackfillSync { batch_id: Epoch }, } -// TODO review enum +// TODO(das) refactor in a separate PR. We might be able to remove this and replace +// [`DataColumnsByRootRequestId`] with a [`SingleLookupReqId`]. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Custody(CustodyId), From 6785a81c8ee03451cd21cf54f9bce18259f6c1f8 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 22 Jul 2025 14:30:18 +1000 Subject: [PATCH 3/3] Remove unused functions. --- beacon_node/network/src/sync/tests/lookups.rs | 37 +------------------ 1 file changed, 1 insertion(+), 36 deletions(-) diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 90c669e52ff..0dcc29ef586 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -583,39 +583,6 @@ impl TestRig { }) } - fn return_empty_sampling_requests(&mut self, ids: DCByRootIds) { - for id in ids { - self.log(&format!("return empty data column for {id:?}")); - self.return_empty_sampling_request(id) - } - } - - fn return_empty_sampling_request(&mut self, (sync_request_id, _): DCByRootId) { - let peer_id = PeerId::random(); - // Send stream termination - self.send_sync_message(SyncMessage::RpcDataColumn { - sync_request_id, - peer_id, - data_column: None, - seen_timestamp: timestamp_now(), - }); - } - - fn sampling_requests_failed( - &mut self, - sampling_ids: DCByRootIds, - peer_id: PeerId, - error: RPCError, - ) { - for (sync_request_id, _) in sampling_ids { - self.send_sync_message(SyncMessage::RpcError { - peer_id, - sync_request_id, - error: error.clone(), - }) - } - } - fn complete_valid_block_request( &mut self, id: SingleLookupReqId, @@ -972,6 +939,7 @@ impl TestRig { .unwrap_or_else(|e| panic!("Expected RPC custody column work: {e}")) } + #[allow(dead_code)] fn expect_no_work_event(&mut self) { self.drain_processor_rx(); assert!(self.network_rx_queue.is_empty()); @@ -1965,9 +1933,6 @@ fn custody_lookup_happy_path() { // - Respond with stream terminator // ^ The stream terminator should be ignored and not close the next retry -// TODO(das): Test error early a sampling request and it getting drop + then receiving responses -// from pending requests. - mod deneb_only { use super::*; use beacon_chain::{