From 1eee63696aacf13c9462333a32a707845338da2c Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Tue, 10 Jun 2025 18:05:30 +0200 Subject: [PATCH 01/14] Wait for more columns before starting reconstruction --- .../overflow_lru_cache.rs | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 3478c183f34..86bbd5042d0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -13,6 +13,8 @@ use parking_lot::RwLock; use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; use tracing::debug; use types::blob_sidecar::BlobIdentifier; use types::{ @@ -560,7 +562,7 @@ impl DataAvailabilityCheckerInner { // If we're sampling all columns, it means we must be custodying all columns. let total_column_count = self.spec.number_of_columns as usize; - let received_column_count = pending_components.verified_data_columns.len(); + let mut received_column_count = pending_components.verified_data_columns.len(); if pending_components.reconstruction_started { return ReconstructColumnsDecision::No("already started"); @@ -573,7 +575,31 @@ impl DataAvailabilityCheckerInner { } pending_components.reconstruction_started = true; - ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()) + + // Instead of starting to reconstruct immediately, wait for more columns to arrive + drop(write_lock); + loop { + sleep(Duration::from_millis(25)); + let mut write_lock = self.critical.write(); + let Some(pending_components) = write_lock.get(block_root) else { + // Block may have been imported as it does not exist in availability cache. + return ReconstructColumnsDecision::No("block already imported"); + }; + let new_received_column_count = pending_components.verified_data_columns.len(); + + // Check if there is still a need to reconstruct. + if new_received_column_count >= total_column_count { + return ReconstructColumnsDecision::No("all columns received"); + } + + // Check if no new column arrived. + if new_received_column_count == received_column_count { + return ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()); + } + + // Update count for next check. + received_column_count = new_received_column_count; + } } /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. From bef3344d1ed2c060d59e0db32f40fd92b486a0c4 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 11 Jun 2025 09:07:07 +0200 Subject: [PATCH 02/14] Logging for testing --- .../src/data_availability_checker/overflow_lru_cache.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 86bbd5042d0..3594cc0cdd4 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -576,8 +576,11 @@ impl DataAvailabilityCheckerInner { pending_components.reconstruction_started = true; + debug!(received_column_count, "Starting wait for more cols..."); + // Instead of starting to reconstruct immediately, wait for more columns to arrive drop(write_lock); + let mut iter = 1; loop { sleep(Duration::from_millis(25)); let mut write_lock = self.critical.write(); @@ -589,16 +592,21 @@ impl DataAvailabilityCheckerInner { // Check if there is still a need to reconstruct. if new_received_column_count >= total_column_count { + debug!(iter, new_received_column_count, "Got all!"); return ReconstructColumnsDecision::No("all columns received"); } // Check if no new column arrived. if new_received_column_count == received_column_count { + debug!(iter, new_received_column_count, "No new cols :/"); return ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()); } + debug!(iter, new_received_column_count, "Waiting for more cols..."); + // Update count for next check. received_column_count = new_received_column_count; + iter += 1; } } From e2216c021fc9dc90955061ec6201c5443ef2faea Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 11 Jun 2025 09:26:10 +0200 Subject: [PATCH 03/14] try 100ms --- .../src/data_availability_checker/overflow_lru_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 3594cc0cdd4..a2b83622a1e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -582,7 +582,7 @@ impl DataAvailabilityCheckerInner { drop(write_lock); let mut iter = 1; loop { - sleep(Duration::from_millis(25)); + sleep(Duration::from_millis(100)); let mut write_lock = self.critical.write(); let Some(pending_components) = write_lock.get(block_root) else { // Block may have been imported as it does not exist in availability cache. From 65e5ac42b18f67814d4458d327f26b7d34e6ba03 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 11 Jun 2025 14:05:50 +0200 Subject: [PATCH 04/14] Use processor --- beacon_node/beacon_chain/src/beacon_chain.rs | 33 +++-- .../src/data_availability_checker.rs | 4 + .../overflow_lru_cache.rs | 73 +++++------ beacon_node/beacon_chain/src/lib.rs | 2 +- beacon_node/beacon_processor/src/lib.rs | 20 ++- .../src/work_reprocessing_queue.rs | 35 ++++++ .../gossip_methods.rs | 2 +- .../src/network_beacon_processor/mod.rs | 117 +++++++++++------- .../network_beacon_processor/sync_methods.rs | 2 +- 9 files changed, 185 insertions(+), 103 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 990f4b6099c..bad6ed883b8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -181,6 +181,15 @@ pub enum AvailabilityProcessingStatus { Imported(Hash256), } +pub enum ReconstructionOutcome { + Reconstructed { + availability_processing_status: AvailabilityProcessingStatus, + data_columns_to_publish: DataColumnSidecarList, + }, + Delay, + NoReconstruction, +} + impl TryInto for AvailabilityProcessingStatus { type Error = (); @@ -3240,13 +3249,7 @@ impl BeaconChain { pub async fn reconstruct_data_columns( self: &Arc, block_root: Hash256, - ) -> Result< - Option<( - AvailabilityProcessingStatus, - DataColumnSidecarList, - )>, - BlockError, - > { + ) -> Result, BlockError> { // As of now we only reconstruct data columns on supernodes, so if the block is already // available on a supernode, there's no need to reconstruct as the node must already have // all columns. @@ -3255,7 +3258,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Ok(None); + return Ok(ReconstructionOutcome::NoReconstruction); } let data_availability_checker = self.data_availability_checker.clone(); @@ -3274,17 +3277,21 @@ impl BeaconChain { DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. - return Ok(None); + return Err(BlockError::InternalError("should have columns".to_string())); }; let r = self .process_availability(slot, availability, || Ok(())) .await; self.remove_notified(&block_root, r) - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) + .map( + |availability_processing_status| ReconstructionOutcome::Reconstructed { + availability_processing_status, + data_columns_to_publish, + }, + ) } + DataColumnReconstructionResult::Reattempt => Ok(ReconstructionOutcome::Delay), DataColumnReconstructionResult::NotStarted(reason) | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { // We use metric here because logging this would be *very* noisy. @@ -3292,7 +3299,7 @@ impl BeaconChain { &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, &[reason], ); - Ok(None) + Ok(ReconstructionOutcome::NoReconstruction) } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 0fd417389b2..f59870119ee 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -82,6 +82,7 @@ pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSi #[derive(Debug)] pub enum DataColumnReconstructionResult { Success(AvailabilityAndReconstructedColumns), + Reattempt, NotStarted(&'static str), RecoveredColumnsNotImported(&'static str), } @@ -523,6 +524,9 @@ impl DataAvailabilityChecker { .check_and_set_reconstruction_started(block_root) { ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, + ReconstructColumnsDecision::Wait => { + return Ok(DataColumnReconstructionResult::Reattempt) + } ReconstructColumnsDecision::No(reason) => { return Ok(DataColumnReconstructionResult::NotStarted(reason)); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index a2b83622a1e..6dcc852b530 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -13,8 +13,6 @@ use parking_lot::RwLock; use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; -use std::thread::sleep; -use std::time::Duration; use tracing::debug; use types::blob_sidecar::BlobIdentifier; use types::{ @@ -31,7 +29,7 @@ pub struct PendingComponents { pub verified_blobs: RuntimeFixedVector>>, pub verified_data_columns: Vec>, pub executed_block: Option>, - pub reconstruction_started: bool, + pub reconstruction_state: ReconstructionState, } impl PendingComponents { @@ -280,7 +278,7 @@ impl PendingComponents { verified_blobs: RuntimeFixedVector::new(vec![None; max_len]), verified_data_columns: vec![], executed_block: None, - reconstruction_started: false, + reconstruction_state: ReconstructionState::NotStarted, } } @@ -340,6 +338,12 @@ impl PendingComponents { } } +pub enum ReconstructionState { + NotStarted, + WaitingForColumns { num_last: usize }, + Started, +} + /// This is the main struct for this module. Outside methods should /// interact with the cache through this. pub struct DataAvailabilityCheckerInner { @@ -357,6 +361,7 @@ pub struct DataAvailabilityCheckerInner { #[allow(clippy::large_enum_variant)] pub(crate) enum ReconstructColumnsDecision { Yes(Vec>), + Wait, No(&'static str), } @@ -562,11 +567,8 @@ impl DataAvailabilityCheckerInner { // If we're sampling all columns, it means we must be custodying all columns. let total_column_count = self.spec.number_of_columns as usize; - let mut received_column_count = pending_components.verified_data_columns.len(); + let received_column_count = pending_components.verified_data_columns.len(); - if pending_components.reconstruction_started { - return ReconstructColumnsDecision::No("already started"); - } if received_column_count >= total_column_count { return ReconstructColumnsDecision::No("all columns received"); } @@ -574,39 +576,30 @@ impl DataAvailabilityCheckerInner { return ReconstructColumnsDecision::No("not enough columns"); } - pending_components.reconstruction_started = true; - - debug!(received_column_count, "Starting wait for more cols..."); - - // Instead of starting to reconstruct immediately, wait for more columns to arrive - drop(write_lock); - let mut iter = 1; - loop { - sleep(Duration::from_millis(100)); - let mut write_lock = self.critical.write(); - let Some(pending_components) = write_lock.get(block_root) else { - // Block may have been imported as it does not exist in availability cache. - return ReconstructColumnsDecision::No("block already imported"); - }; - let new_received_column_count = pending_components.verified_data_columns.len(); - - // Check if there is still a need to reconstruct. - if new_received_column_count >= total_column_count { - debug!(iter, new_received_column_count, "Got all!"); - return ReconstructColumnsDecision::No("all columns received"); + match pending_components.reconstruction_state { + ReconstructionState::NotStarted => { + pending_components.reconstruction_state = ReconstructionState::WaitingForColumns { + num_last: received_column_count, + }; + ReconstructColumnsDecision::Wait } - - // Check if no new column arrived. - if new_received_column_count == received_column_count { - debug!(iter, new_received_column_count, "No new cols :/"); - return ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()); + ReconstructionState::WaitingForColumns { num_last } => { + if num_last < received_column_count { + // We got more columns, let's wait more + pending_components.reconstruction_state = + ReconstructionState::WaitingForColumns { + num_last: received_column_count, + }; + ReconstructColumnsDecision::Wait + } else { + // We made no progress waiting for columns, let's start. + pending_components.reconstruction_state = ReconstructionState::Started; + ReconstructColumnsDecision::Yes( + pending_components.verified_data_columns.clone(), + ) + } } - - debug!(iter, new_received_column_count, "Waiting for more cols..."); - - // Update count for next check. - received_column_count = new_received_column_count; - iter += 1; + ReconstructionState::Started => ReconstructColumnsDecision::No("already started"), } } @@ -616,7 +609,7 @@ impl DataAvailabilityCheckerInner { pub fn handle_reconstruction_failure(&self, block_root: &Hash256) { if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) { pending_components_mut.verified_data_columns = vec![]; - pending_components_mut.reconstruction_started = false; + pending_components_mut.reconstruction_state = ReconstructionState::NotStarted; } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 5b79312d371..841783d481e 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -66,7 +66,7 @@ pub use self::beacon_chain::{ AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse, BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, - ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, + ProduceBlockVerification, ReconstructionOutcome, StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index e864cb1fd91..89fdab49947 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -39,7 +39,7 @@ //! task. use crate::work_reprocessing_queue::{ - QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage, + QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, }; use futures::stream::{Stream, StreamExt}; use futures::task::Poll; @@ -117,6 +117,7 @@ pub struct BeaconProcessorQueueLengths { rpc_custody_column_queue: usize, rpc_verify_data_column_queue: usize, sampling_result_queue: usize, + column_reconstruction_queue: usize, chain_segment_queue: usize, backfill_chain_segment: usize, gossip_block_queue: usize, @@ -184,6 +185,7 @@ impl BeaconProcessorQueueLengths { rpc_verify_data_column_queue: 1000, unknown_block_sampling_request_queue: 16384, sampling_result_queue: 1000, + column_reconstruction_queue: 64, chain_segment_queue: 64, backfill_chain_segment: 64, gossip_block_queue: 1024, @@ -498,6 +500,10 @@ impl From for WorkEvent { drop_during_sync: false, work: Work::ChainSegmentBackfill(process_fn), }, + ReadyWork::ColumnReconstruction(QueuedColumnReconstruction(process_fn)) => Self { + drop_during_sync: true, + work: Work::ColumnReconstruction(process_fn), + }, } } } @@ -619,6 +625,7 @@ pub enum Work { RpcCustodyColumn(AsyncFn), RpcVerifyDataColumn(AsyncFn), SamplingResult(AsyncFn), + ColumnReconstruction(AsyncFn), IgnoredRpcBlock { process_fn: BlockingFn, }, @@ -674,6 +681,7 @@ pub enum WorkType { RpcCustodyColumn, RpcVerifyDataColumn, SamplingResult, + ColumnReconstruction, IgnoredRpcBlock, ChainSegment, ChainSegmentBackfill, @@ -725,6 +733,7 @@ impl Work { Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn, Work::SamplingResult { .. } => WorkType::SamplingResult, + Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, Work::ChainSegment { .. } => WorkType::ChainSegment, Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill, @@ -891,6 +900,8 @@ impl BeaconProcessor { FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); // TODO(das): the sampling_request_queue is never read let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); + let mut column_reconstruction_queue = + FifoQueue::new(queue_lengths.column_reconstruction_queue); let mut unknown_block_sampling_request_queue = FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); @@ -1371,6 +1382,9 @@ impl BeaconProcessor { rpc_verify_data_column_queue.push(work, work_id) } Work::SamplingResult(_) => sampling_result_queue.push(work, work_id), + Work::ColumnReconstruction(_) => { + column_reconstruction_queue.push(work, work_id) + } Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id), Work::ChainSegmentBackfill { .. } => { backfill_chain_segment.push(work, work_id) @@ -1460,6 +1474,7 @@ impl BeaconProcessor { WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(), WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(), WorkType::SamplingResult => sampling_result_queue.len(), + WorkType::ColumnReconstruction => column_reconstruction_queue.len(), WorkType::ChainSegment => chain_segment_queue.len(), WorkType::ChainSegmentBackfill => backfill_chain_segment.len(), WorkType::Status => status_queue.len(), @@ -1602,7 +1617,8 @@ impl BeaconProcessor { | Work::RpcBlobs { process_fn } | Work::RpcCustodyColumn(process_fn) | Work::RpcVerifyDataColumn(process_fn) - | Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn), + | Work::SamplingResult(process_fn) + | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), Work::GossipBlock(work) | Work::GossipBlobSidecar(work) diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index 2b6e72ae0c3..ac4aebb8f09 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -54,6 +54,9 @@ pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4); /// For how long to queue sampling requests for reprocessing. pub const QUEUED_SAMPLING_REQUESTS_DELAY: Duration = Duration::from_secs(12); +/// For how long to queue delayed column reconstruction. +pub const QUEUED_RECONSTRUCTION_DELAY: Duration = Duration::from_millis(150); + /// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that /// we signature-verify blocks before putting them in the queue *should* protect against this, but /// it's nice to have extra protection. @@ -109,6 +112,8 @@ pub enum ReprocessQueueMessage { UnknownBlockSamplingRequest(QueuedSamplingRequest), /// A new backfill batch that needs to be scheduled for processing. BackfillSync(QueuedBackfillBatch), + /// A delayed column reconstruction that needs checking + DelayColumnReconstruction(QueuedColumnReconstruction), } /// Events sent by the scheduler once they are ready for re-processing. @@ -121,6 +126,7 @@ pub enum ReadyWork { LightClientUpdate(QueuedLightClientUpdate), SamplingRequest(QueuedSamplingRequest), BackfillSync(QueuedBackfillBatch), + ColumnReconstruction(QueuedColumnReconstruction), } /// An Attestation for which the corresponding block was not seen while processing, queued for @@ -176,6 +182,8 @@ pub struct IgnoredRpcBlock { /// A backfill batch work that has been queued for processing later. pub struct QueuedBackfillBatch(pub AsyncFn); +pub struct QueuedColumnReconstruction(pub AsyncFn); + impl TryFrom> for QueuedBackfillBatch { type Error = WorkEvent; @@ -212,6 +220,8 @@ enum InboundEvent { ReadyLightClientUpdate(QueuedLightClientUpdateId), /// A backfill batch that was queued is ready for processing. ReadyBackfillSync(QueuedBackfillBatch), + /// A column reconstruction that was queued is ready for processing. + ReadyColumnReconstruction(QueuedColumnReconstruction), /// A message sent to the `ReprocessQueue` Msg(ReprocessQueueMessage), } @@ -234,6 +244,8 @@ struct ReprocessQueue { lc_updates_delay_queue: DelayQueue, /// Queue to manage scheduled sampling requests sampling_requests_delay_queue: DelayQueue, + /// Queue to manage scheduled column reconstructions. + column_reconstructions_delay_queue: DelayQueue, /* Queued items */ /// Queued blocks. @@ -343,6 +355,13 @@ impl Stream for ReprocessQueue { Poll::Ready(None) | Poll::Pending => (), } + match self.column_reconstructions_delay_queue.poll_expired(cx) { + Poll::Ready(Some(reconstruction)) => { + return Poll::Ready(Some(InboundEvent::ReadyColumnReconstruction(reconstruction.into_inner()))); + } + Poll::Ready(None) | Poll::Pending => (), + } + if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() { match next_backfill_batch_event.as_mut().poll(cx) { Poll::Ready(_) => { @@ -410,6 +429,7 @@ impl ReprocessQueue { attestations_delay_queue: DelayQueue::new(), lc_updates_delay_queue: DelayQueue::new(), sampling_requests_delay_queue: <_>::default(), + column_reconstructions_delay_queue: DelayQueue::new(), queued_gossip_block_roots: HashSet::new(), queued_lc_updates: FnvHashMap::default(), queued_aggregates: FnvHashMap::default(), @@ -817,6 +837,10 @@ impl ReprocessQueue { self.recompute_next_backfill_batch_event(); } } + InboundEvent::Msg(DelayColumnReconstruction(request)) => { + self.column_reconstructions_delay_queue + .insert(request, QUEUED_RECONSTRUCTION_DELAY); + } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { let block_root = ready_block.beacon_block_root; @@ -940,6 +964,17 @@ impl ReprocessQueue { _ => crit!("Unexpected return from try_send error"), } } + InboundEvent::ReadyColumnReconstruction(column_reconstruction) => { + if self + .ready_work_tx + .try_send(ReadyWork::ColumnReconstruction(column_reconstruction)).is_err() + { + error!( + hint = "system may be overloaded", + "Ignored scheduled column reconstruction" + ); + } + } } metrics::set_gauge_vec( diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 8757ab43830..1a645cdaa73 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1173,7 +1173,7 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - self.attempt_data_column_reconstruction(block_root, true) + Arc::clone(self).attempt_data_column_reconstruction(block_root, true) .await; } }, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f9390a2c7b8..d283ae86f43 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -10,6 +10,7 @@ use beacon_chain::fetch_blobs::{ use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, + ReconstructionOutcome, }; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, @@ -25,6 +26,7 @@ use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PubsubMessage, }; use rand::prelude::SliceRandom; +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -924,57 +926,81 @@ impl NetworkBeaconProcessor { /// /// The `publish_columns` parameter controls whether reconstructed columns should be published /// to the gossip network. - async fn attempt_data_column_reconstruction( - self: &Arc, + // rustc can't figure out `Send + Sync` on its own + #[allow(clippy::manual_async_fn)] + fn attempt_data_column_reconstruction( + self: Arc, block_root: Hash256, publish_columns: bool, - ) -> Option { - // Only supernodes attempt reconstruction - if !self.network_globals.is_supernode() { - return None; - } + ) -> impl Future> + Send + Sync { + async move { + // Only supernodes attempt reconstruction + if !self.network_globals.is_supernode() { + return None; + } - let result = self.chain.reconstruct_data_columns(block_root).await; - match result { - Ok(Some((availability_processing_status, data_columns_to_publish))) => { - if publish_columns { - self.publish_data_columns_gradually(data_columns_to_publish, block_root); - } - match &availability_processing_status { - AvailabilityProcessingStatus::Imported(hash) => { - debug!( - result = "imported block and custody columns", - block_hash = %hash, - "Block components available via reconstruction" - ); - self.chain.recompute_head_at_current_slot().await; + let result = self.chain.reconstruct_data_columns(block_root).await; + match result { + Ok(ReconstructionOutcome::Reconstructed { + availability_processing_status, + data_columns_to_publish, + }) => { + if publish_columns { + self.publish_data_columns_gradually(data_columns_to_publish, block_root); } - AvailabilityProcessingStatus::MissingComponents(_, _) => { - debug!( - result = "imported all custody columns", - block_hash = %block_root, - "Block components still missing block after reconstruction" - ); + match &availability_processing_status { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + result = "imported block and custody columns", + block_hash = %hash, + "Block components available via reconstruction" + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + result = "imported all custody columns", + block_hash = %block_root, + "Block components still missing block after reconstruction" + ); + } } - } - Some(availability_processing_status) - } - Ok(None) => { - // reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric - trace!( - block_hash = %block_root, - "Reconstruction not required for block" - ); - None - } - Err(e) => { - error!( - %block_root, - error = ?e, - "Error during data column reconstruction" - ); - None + Some(availability_processing_status) + } + Ok(ReconstructionOutcome::Delay) => { + let cloned_self = Arc::clone(&self); + let send_result = self.reprocess_tx + .send(ReprocessQueueMessage::DelayColumnReconstruction( + QueuedColumnReconstruction(Box::pin(async move { + cloned_self + .attempt_data_column_reconstruction(block_root, publish_columns) + .await; + })), + )) + .await; + + if send_result.is_err() { + warn!("Unable to send reconstruction to reprocessing"); + } + None + } + Ok(ReconstructionOutcome::NoReconstruction) => { + // reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric + trace!( + block_hash = %block_root, + "Reconstruction not required for block" + ); + None + } + Err(e) => { + error!( + %block_root, + error = ?e, + "Error during data column reconstruction" + ); + None + } } } } @@ -1140,6 +1166,7 @@ impl NetworkBeaconProcessor { } } +use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; #[cfg(test)] use { beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend}, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 31b17a41a42..6aa28f80a91 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -386,7 +386,7 @@ impl NetworkBeaconProcessor { // We don't publish columns reconstructed from rpc columns to the gossip network, // as these are likely historic columns. let publish_columns = false; - if let Some(availability) = self + if let Some(availability) = Arc::clone(&self) .attempt_data_column_reconstruction(block_root, publish_columns) .await { From c7f521186bfb84e4a91229751257b23f3ecc412d Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 11 Jun 2025 14:38:26 +0200 Subject: [PATCH 05/14] Log and restructure --- .../overflow_lru_cache.rs | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 6dcc852b530..732e720ae25 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -576,31 +576,41 @@ impl DataAvailabilityCheckerInner { return ReconstructColumnsDecision::No("not enough columns"); } - match pending_components.reconstruction_state { - ReconstructionState::NotStarted => { - pending_components.reconstruction_state = ReconstructionState::WaitingForColumns { - num_last: received_column_count, - }; - ReconstructColumnsDecision::Wait - } + let descision = match pending_components.reconstruction_state { + ReconstructionState::NotStarted => ReconstructColumnsDecision::Wait, ReconstructionState::WaitingForColumns { num_last } => { if num_last < received_column_count { // We got more columns, let's wait more - pending_components.reconstruction_state = - ReconstructionState::WaitingForColumns { - num_last: received_column_count, - }; ReconstructColumnsDecision::Wait } else { // We made no progress waiting for columns, let's start. - pending_components.reconstruction_state = ReconstructionState::Started; ReconstructColumnsDecision::Yes( pending_components.verified_data_columns.clone(), ) } } ReconstructionState::Started => ReconstructColumnsDecision::No("already started"), + }; + + match descision { + ReconstructColumnsDecision::Yes(_) => { + pending_components.reconstruction_state = ReconstructionState::Started; + debug!(block_root, received_column_count, "Starting reconstruction"); + } + ReconstructColumnsDecision::Wait => { + pending_components.reconstruction_state = ReconstructionState::WaitingForColumns { + num_last: received_column_count, + }; + debug!( + block_root, + received_column_count, + "Waiting for more columns to arrive before reconstruction" + ); + } + ReconstructColumnsDecision::No(_) => {} } + + descision } /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. From f8137674e1f3aad478d9b6fecc8e69097a9a7cfa Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 11 Jun 2025 14:42:00 +0200 Subject: [PATCH 06/14] fmt --- .../src/data_availability_checker/overflow_lru_cache.rs | 4 ++-- .../beacon_processor/src/work_reprocessing_queue.rs | 7 +++++-- .../network/src/network_beacon_processor/gossip_methods.rs | 3 ++- beacon_node/network/src/network_beacon_processor/mod.rs | 3 ++- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 732e720ae25..217a151ad82 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -595,14 +595,14 @@ impl DataAvailabilityCheckerInner { match descision { ReconstructColumnsDecision::Yes(_) => { pending_components.reconstruction_state = ReconstructionState::Started; - debug!(block_root, received_column_count, "Starting reconstruction"); + debug!(%block_root, received_column_count, "Starting reconstruction"); } ReconstructColumnsDecision::Wait => { pending_components.reconstruction_state = ReconstructionState::WaitingForColumns { num_last: received_column_count, }; debug!( - block_root, + %block_root, received_column_count, "Waiting for more columns to arrive before reconstruction" ); diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index ac4aebb8f09..24f4fa79068 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -357,7 +357,9 @@ impl Stream for ReprocessQueue { match self.column_reconstructions_delay_queue.poll_expired(cx) { Poll::Ready(Some(reconstruction)) => { - return Poll::Ready(Some(InboundEvent::ReadyColumnReconstruction(reconstruction.into_inner()))); + return Poll::Ready(Some(InboundEvent::ReadyColumnReconstruction( + reconstruction.into_inner(), + ))); } Poll::Ready(None) | Poll::Pending => (), } @@ -967,7 +969,8 @@ impl ReprocessQueue { InboundEvent::ReadyColumnReconstruction(column_reconstruction) => { if self .ready_work_tx - .try_send(ReadyWork::ColumnReconstruction(column_reconstruction)).is_err() + .try_send(ReadyWork::ColumnReconstruction(column_reconstruction)) + .is_err() { error!( hint = "system may be overloaded", diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 1a645cdaa73..047a53fd030 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1173,7 +1173,8 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - Arc::clone(self).attempt_data_column_reconstruction(block_root, true) + Arc::clone(self) + .attempt_data_column_reconstruction(block_root, true) .await; } }, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index d283ae86f43..dd95cdcba77 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -970,7 +970,8 @@ impl NetworkBeaconProcessor { } Ok(ReconstructionOutcome::Delay) => { let cloned_self = Arc::clone(&self); - let send_result = self.reprocess_tx + let send_result = self + .reprocess_tx .send(ReprocessQueueMessage::DelayColumnReconstruction( QueuedColumnReconstruction(Box::pin(async move { cloned_self From b2f7a5befd87ca024915b92fda4f607702af26e9 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 11 Jun 2025 15:15:44 +0200 Subject: [PATCH 07/14] correctly reschedule reconstruction --- beacon_node/beacon_processor/src/lib.rs | 10 ++++--- .../src/work_reprocessing_queue.rs | 26 ++++++++++++++++--- .../src/network_beacon_processor/mod.rs | 18 ++++++++----- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 89fdab49947..cb45dd26a98 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -500,10 +500,12 @@ impl From for WorkEvent { drop_during_sync: false, work: Work::ChainSegmentBackfill(process_fn), }, - ReadyWork::ColumnReconstruction(QueuedColumnReconstruction(process_fn)) => Self { - drop_during_sync: true, - work: Work::ColumnReconstruction(process_fn), - }, + ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => { + Self { + drop_during_sync: true, + work: Work::ColumnReconstruction(process_fn), + } + } } } } diff --git a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs index 24f4fa79068..855342d8bda 100644 --- a/beacon_node/beacon_processor/src/work_reprocessing_queue.rs +++ b/beacon_node/beacon_processor/src/work_reprocessing_queue.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use logging::crit; use logging::TimeLatch; use slot_clock::SlotClock; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; @@ -182,7 +183,10 @@ pub struct IgnoredRpcBlock { /// A backfill batch work that has been queued for processing later. pub struct QueuedBackfillBatch(pub AsyncFn); -pub struct QueuedColumnReconstruction(pub AsyncFn); +pub struct QueuedColumnReconstruction { + pub block_root: Hash256, + pub process_fn: AsyncFn, +} impl TryFrom> for QueuedBackfillBatch { type Error = WorkEvent; @@ -264,6 +268,8 @@ struct ReprocessQueue { queued_sampling_requests: FnvHashMap, /// Sampling requests per block root. awaiting_sampling_requests_per_block_root: HashMap>, + /// Column reconstruction per block root. + queued_column_reconstructions: HashMap, /// Queued backfill batches queued_backfill_batches: Vec, @@ -441,6 +447,7 @@ impl ReprocessQueue { awaiting_lc_updates_per_parent_root: HashMap::new(), awaiting_sampling_requests_per_block_root: <_>::default(), queued_backfill_batches: Vec::new(), + queued_column_reconstructions: HashMap::new(), next_attestation: 0, next_lc_update: 0, next_sampling_request_update: 0, @@ -840,8 +847,19 @@ impl ReprocessQueue { } } InboundEvent::Msg(DelayColumnReconstruction(request)) => { - self.column_reconstructions_delay_queue - .insert(request, QUEUED_RECONSTRUCTION_DELAY); + match self.queued_column_reconstructions.entry(request.block_root) { + Entry::Occupied(key) => { + // Push back the reattempted reconstruction + self.column_reconstructions_delay_queue + .reset(key.get(), QUEUED_RECONSTRUCTION_DELAY) + } + Entry::Vacant(vacant) => { + let delay_key = self + .column_reconstructions_delay_queue + .insert(request, QUEUED_RECONSTRUCTION_DELAY); + vacant.insert(delay_key); + } + } } // A block that was queued for later processing is now ready to be processed. InboundEvent::ReadyGossipBlock(ready_block) => { @@ -967,6 +985,8 @@ impl ReprocessQueue { } } InboundEvent::ReadyColumnReconstruction(column_reconstruction) => { + self.queued_column_reconstructions + .remove(&column_reconstruction.block_root); if self .ready_work_tx .try_send(ReadyWork::ColumnReconstruction(column_reconstruction)) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index dd95cdcba77..8d1264a2f46 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -12,6 +12,7 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, ReconstructionOutcome, }; +use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, @@ -973,11 +974,17 @@ impl NetworkBeaconProcessor { let send_result = self .reprocess_tx .send(ReprocessQueueMessage::DelayColumnReconstruction( - QueuedColumnReconstruction(Box::pin(async move { - cloned_self - .attempt_data_column_reconstruction(block_root, publish_columns) - .await; - })), + QueuedColumnReconstruction { + block_root, + process_fn: Box::pin(async move { + cloned_self + .attempt_data_column_reconstruction( + block_root, + publish_columns, + ) + .await; + }), + }, )) .await; @@ -1167,7 +1174,6 @@ impl NetworkBeaconProcessor { } } -use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; #[cfg(test)] use { beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend}, From 492d73ccadac1ca24d821cfc25569110d690ee53 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 11 Jun 2025 15:50:28 +0200 Subject: [PATCH 08/14] make sure to let only retry start reconstruction --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 ++- beacon_node/beacon_chain/src/data_availability_checker.rs | 3 ++- .../src/data_availability_checker/overflow_lru_cache.rs | 3 ++- .../network/src/network_beacon_processor/gossip_methods.rs | 2 +- beacon_node/network/src/network_beacon_processor/mod.rs | 7 ++++++- .../network/src/network_beacon_processor/sync_methods.rs | 2 +- 6 files changed, 14 insertions(+), 6 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index bad6ed883b8..231aa476bdf 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3249,6 +3249,7 @@ impl BeaconChain { pub async fn reconstruct_data_columns( self: &Arc, block_root: Hash256, + is_retry: bool, ) -> Result, BlockError> { // As of now we only reconstruct data columns on supernodes, so if the block is already // available on a supernode, there's no need to reconstruct as the node must already have @@ -3266,7 +3267,7 @@ impl BeaconChain { let result = self .task_executor .spawn_blocking_handle( - move || data_availability_checker.reconstruct_data_columns(&block_root), + move || data_availability_checker.reconstruct_data_columns(&block_root, is_retry), "reconstruct_data_columns", ) .ok_or(BeaconChainError::RuntimeShutdown)? diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index f59870119ee..84a52238cbb 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -518,10 +518,11 @@ impl DataAvailabilityChecker { pub fn reconstruct_data_columns( &self, block_root: &Hash256, + is_retry: bool, ) -> Result, AvailabilityCheckError> { let verified_data_columns = match self .availability_cache - .check_and_set_reconstruction_started(block_root) + .check_and_set_reconstruction_started(block_root, is_retry) { ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, ReconstructColumnsDecision::Wait => { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 217a151ad82..59b11d6c4b8 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -558,6 +558,7 @@ impl DataAvailabilityCheckerInner { pub fn check_and_set_reconstruction_started( &self, block_root: &Hash256, + is_retry: bool, ) -> ReconstructColumnsDecision { let mut write_lock = self.critical.write(); let Some(pending_components) = write_lock.get_mut(block_root) else { @@ -579,7 +580,7 @@ impl DataAvailabilityCheckerInner { let descision = match pending_components.reconstruction_state { ReconstructionState::NotStarted => ReconstructColumnsDecision::Wait, ReconstructionState::WaitingForColumns { num_last } => { - if num_last < received_column_count { + if !is_retry || num_last < received_column_count { // We got more columns, let's wait more ReconstructColumnsDecision::Wait } else { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 047a53fd030..926c7ecc7d0 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1174,7 +1174,7 @@ impl NetworkBeaconProcessor { ); Arc::clone(self) - .attempt_data_column_reconstruction(block_root, true) + .attempt_data_column_reconstruction(block_root, true, false) .await; } }, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 8d1264a2f46..40041f05aa4 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -933,6 +933,7 @@ impl NetworkBeaconProcessor { self: Arc, block_root: Hash256, publish_columns: bool, + is_retry: bool, ) -> impl Future> + Send + Sync { async move { // Only supernodes attempt reconstruction @@ -940,7 +941,10 @@ impl NetworkBeaconProcessor { return None; } - let result = self.chain.reconstruct_data_columns(block_root).await; + let result = self + .chain + .reconstruct_data_columns(block_root, is_retry) + .await; match result { Ok(ReconstructionOutcome::Reconstructed { availability_processing_status, @@ -981,6 +985,7 @@ impl NetworkBeaconProcessor { .attempt_data_column_reconstruction( block_root, publish_columns, + true, ) .await; }), diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6aa28f80a91..6bcfae5038b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -387,7 +387,7 @@ impl NetworkBeaconProcessor { // as these are likely historic columns. let publish_columns = false; if let Some(availability) = Arc::clone(&self) - .attempt_data_column_reconstruction(block_root, publish_columns) + .attempt_data_column_reconstruction(block_root, publish_columns, false) .await { result = Ok(availability) From 891af696b8a561dd69671daca7124ad9bb33bde1 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Thu, 12 Jun 2025 09:28:49 +0200 Subject: [PATCH 09/14] fmt --- .../network/src/network_beacon_processor/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ef74031a8c3..305c6e2bd4a 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -938,11 +938,11 @@ impl NetworkBeaconProcessor { async move { // Only supernodes attempt reconstruction if !self - .chain - .data_availability_checker - .custody_context() - .current_is_supernode - { + .chain + .data_availability_checker + .custody_context() + .current_is_supernode + { return None; } From 5a24f45911502289f026b72449b654d85cb178bb Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Thu, 12 Jun 2025 16:38:15 +0200 Subject: [PATCH 10/14] refactor --- beacon_node/beacon_chain/src/beacon_chain.rs | 36 ++--- .../src/data_availability_checker.rs | 7 +- .../overflow_lru_cache.rs | 54 +------ beacon_node/beacon_chain/src/lib.rs | 2 +- .../gossip_methods.rs | 28 +++- .../src/network_beacon_processor/mod.rs | 139 +++++++----------- .../network_beacon_processor/sync_methods.rs | 2 +- 7 files changed, 101 insertions(+), 167 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4ce0ab5ce6e..50efb367a82 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -185,15 +185,6 @@ pub enum AvailabilityProcessingStatus { Imported(Hash256), } -pub enum ReconstructionOutcome { - Reconstructed { - availability_processing_status: AvailabilityProcessingStatus, - data_columns_to_publish: DataColumnSidecarList, - }, - Delay, - NoReconstruction, -} - impl TryInto for AvailabilityProcessingStatus { type Error = (); @@ -3310,8 +3301,13 @@ impl BeaconChain { pub async fn reconstruct_data_columns( self: &Arc, block_root: Hash256, - is_retry: bool, - ) -> Result, BlockError> { + ) -> Result< + Option<( + AvailabilityProcessingStatus, + DataColumnSidecarList, + )>, + BlockError, + > { // As of now we only reconstruct data columns on supernodes, so if the block is already // available on a supernode, there's no need to reconstruct as the node must already have // all columns. @@ -3320,7 +3316,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Ok(ReconstructionOutcome::NoReconstruction); + return Ok(None); } let data_availability_checker = self.data_availability_checker.clone(); @@ -3328,7 +3324,7 @@ impl BeaconChain { let result = self .task_executor .spawn_blocking_handle( - move || data_availability_checker.reconstruct_data_columns(&block_root, is_retry), + move || data_availability_checker.reconstruct_data_columns(&block_root), "reconstruct_data_columns", ) .ok_or(BeaconChainError::RuntimeShutdown)? @@ -3339,21 +3335,17 @@ impl BeaconChain { DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. - return Err(BlockError::InternalError("should have columns".to_string())); + return Ok(None); }; let r = self .process_availability(slot, availability, || Ok(())) .await; self.remove_notified(&block_root, r) - .map( - |availability_processing_status| ReconstructionOutcome::Reconstructed { - availability_processing_status, - data_columns_to_publish, - }, - ) + .map(|availability_processing_status| { + Some((availability_processing_status, data_columns_to_publish)) + }) } - DataColumnReconstructionResult::Reattempt => Ok(ReconstructionOutcome::Delay), DataColumnReconstructionResult::NotStarted(reason) | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { // We use metric here because logging this would be *very* noisy. @@ -3361,7 +3353,7 @@ impl BeaconChain { &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, &[reason], ); - Ok(ReconstructionOutcome::NoReconstruction) + Ok(None) } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 44ed0b9be71..91ff5fb644c 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -83,7 +83,6 @@ pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSi #[derive(Debug)] pub enum DataColumnReconstructionResult { Success(AvailabilityAndReconstructedColumns), - Reattempt, NotStarted(&'static str), RecoveredColumnsNotImported(&'static str), } @@ -512,16 +511,12 @@ impl DataAvailabilityChecker { pub fn reconstruct_data_columns( &self, block_root: &Hash256, - is_retry: bool, ) -> Result, AvailabilityCheckError> { let verified_data_columns = match self .availability_cache - .check_and_set_reconstruction_started(block_root, is_retry) + .check_and_set_reconstruction_started(block_root) { ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, - ReconstructColumnsDecision::Wait => { - return Ok(DataColumnReconstructionResult::Reattempt) - } ReconstructColumnsDecision::No(reason) => { return Ok(DataColumnReconstructionResult::NotStarted(reason)); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 868352d6ee5..36c4f2cdc1e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -30,7 +30,7 @@ pub struct PendingComponents { pub verified_blobs: RuntimeFixedVector>>, pub verified_data_columns: Vec>, pub executed_block: Option>, - pub reconstruction_state: ReconstructionState, + pub reconstruction_started: bool, } impl PendingComponents { @@ -278,7 +278,7 @@ impl PendingComponents { verified_blobs: RuntimeFixedVector::new(vec![None; max_len]), verified_data_columns: vec![], executed_block: None, - reconstruction_state: ReconstructionState::NotStarted, + reconstruction_started: false, } } @@ -340,12 +340,6 @@ impl PendingComponents { } } -pub enum ReconstructionState { - NotStarted, - WaitingForColumns { num_last: usize }, - Started, -} - /// This is the main struct for this module. Outside methods should /// interact with the cache through this. pub struct DataAvailabilityCheckerInner { @@ -364,7 +358,6 @@ pub struct DataAvailabilityCheckerInner { #[allow(clippy::large_enum_variant)] pub(crate) enum ReconstructColumnsDecision { Yes(Vec>), - Wait, No(&'static str), } @@ -568,7 +561,6 @@ impl DataAvailabilityCheckerInner { pub fn check_and_set_reconstruction_started( &self, block_root: &Hash256, - is_retry: bool, ) -> ReconstructColumnsDecision { let mut write_lock = self.critical.write(); let Some(pending_components) = write_lock.get_mut(block_root) else { @@ -580,6 +572,9 @@ impl DataAvailabilityCheckerInner { let total_column_count = self.spec.number_of_columns as usize; let received_column_count = pending_components.verified_data_columns.len(); + if pending_components.reconstruction_started { + return ReconstructColumnsDecision::No("already started"); + } if received_column_count >= total_column_count { return ReconstructColumnsDecision::No("all columns received"); } @@ -587,41 +582,8 @@ impl DataAvailabilityCheckerInner { return ReconstructColumnsDecision::No("not enough columns"); } - let descision = match pending_components.reconstruction_state { - ReconstructionState::NotStarted => ReconstructColumnsDecision::Wait, - ReconstructionState::WaitingForColumns { num_last } => { - if !is_retry || num_last < received_column_count { - // We got more columns, let's wait more - ReconstructColumnsDecision::Wait - } else { - // We made no progress waiting for columns, let's start. - ReconstructColumnsDecision::Yes( - pending_components.verified_data_columns.clone(), - ) - } - } - ReconstructionState::Started => ReconstructColumnsDecision::No("already started"), - }; - - match descision { - ReconstructColumnsDecision::Yes(_) => { - pending_components.reconstruction_state = ReconstructionState::Started; - debug!(%block_root, received_column_count, "Starting reconstruction"); - } - ReconstructColumnsDecision::Wait => { - pending_components.reconstruction_state = ReconstructionState::WaitingForColumns { - num_last: received_column_count, - }; - debug!( - %block_root, - received_column_count, - "Waiting for more columns to arrive before reconstruction" - ); - } - ReconstructColumnsDecision::No(_) => {} - } - - descision + pending_components.reconstruction_started = true; + ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()) } /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. @@ -630,7 +592,7 @@ impl DataAvailabilityCheckerInner { pub fn handle_reconstruction_failure(&self, block_root: &Hash256) { if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) { pending_components_mut.verified_data_columns = vec![]; - pending_components_mut.reconstruction_state = ReconstructionState::NotStarted; + pending_components_mut.reconstruction_started = false; } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 9fd7d6dcbd7..0eec6dc770f 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -68,7 +68,7 @@ pub use self::beacon_chain::{ AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse, BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate, - ProduceBlockVerification, ReconstructionOutcome, StateSkipConfig, WhenSlotSkipped, + ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 7a83b214f14..d9aab07d5f6 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -32,6 +32,7 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; use tracing::{debug, error, info, trace, warn}; use types::{ beacon_block::BlockImportSource, Attestation, AttestationData, AttestationRef, @@ -42,6 +43,7 @@ use types::{ SyncCommitteeMessage, SyncSubnetId, }; +use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ work_reprocessing_queue::{ QueuedAggregate, QueuedGossipBlock, QueuedLightClientUpdate, QueuedUnaggregate, @@ -1173,9 +1175,31 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - Arc::clone(self) - .attempt_data_column_reconstruction(block_root, true, false) + // Instead of triggering reconstruction immediately, schedule it to be run. If + // another column arrives it either completes availability or pushes + // reconstruction back a bit. + let cloned_self = Arc::clone(self); + let send_result = self + .reprocess_tx + .send(ReprocessQueueMessage::DelayColumnReconstruction( + QueuedColumnReconstruction { + block_root, + process_fn: Box::pin(async move { + cloned_self + .attempt_data_column_reconstruction(block_root, true) + .await; + }), + }, + )) .await; + if let Err(SendError(ReprocessQueueMessage::DelayColumnReconstruction( + reconstruction, + ))) = send_result + { + warn!("Unable to send reconstruction to reprocessing"); + // Execute it immediately instead. + reconstruction.process_fn.await; + } } }, Err(BlockError::DuplicateFullyImported(_)) => { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 305c6e2bd4a..df9b656051b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -10,9 +10,7 @@ use beacon_chain::fetch_blobs::{ use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, - ReconstructionOutcome, }; -use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, @@ -27,7 +25,6 @@ use lighthouse_network::{ Client, MessageId, NetworkGlobals, PeerId, PubsubMessage, }; use rand::prelude::SliceRandom; -use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -927,98 +924,62 @@ impl NetworkBeaconProcessor { /// /// The `publish_columns` parameter controls whether reconstructed columns should be published /// to the gossip network. - // rustc can't figure out `Send + Sync` on its own - #[allow(clippy::manual_async_fn)] - fn attempt_data_column_reconstruction( - self: Arc, + async fn attempt_data_column_reconstruction( + self: &Arc, block_root: Hash256, publish_columns: bool, - is_retry: bool, - ) -> impl Future> + Send + Sync { - async move { - // Only supernodes attempt reconstruction - if !self - .chain - .data_availability_checker - .custody_context() - .current_is_supernode - { - return None; - } - - let result = self - .chain - .reconstruct_data_columns(block_root, is_retry) - .await; - match result { - Ok(ReconstructionOutcome::Reconstructed { - availability_processing_status, - data_columns_to_publish, - }) => { - if publish_columns { - self.publish_data_columns_gradually(data_columns_to_publish, block_root); - } - match &availability_processing_status { - AvailabilityProcessingStatus::Imported(hash) => { - debug!( - result = "imported block and custody columns", - block_hash = %hash, - "Block components available via reconstruction" - ); - self.chain.recompute_head_at_current_slot().await; - } - AvailabilityProcessingStatus::MissingComponents(_, _) => { - debug!( - result = "imported all custody columns", - block_hash = %block_root, - "Block components still missing block after reconstruction" - ); - } - } + ) -> Option { + // Only supernodes attempt reconstruction + if !self + .chain + .data_availability_checker + .custody_context() + .current_is_supernode + { + return None; + } - Some(availability_processing_status) + let result = self.chain.reconstruct_data_columns(block_root).await; + match result { + Ok(Some((availability_processing_status, data_columns_to_publish))) => { + if publish_columns { + self.publish_data_columns_gradually(data_columns_to_publish, block_root); } - Ok(ReconstructionOutcome::Delay) => { - let cloned_self = Arc::clone(&self); - let send_result = self - .reprocess_tx - .send(ReprocessQueueMessage::DelayColumnReconstruction( - QueuedColumnReconstruction { - block_root, - process_fn: Box::pin(async move { - cloned_self - .attempt_data_column_reconstruction( - block_root, - publish_columns, - true, - ) - .await; - }), - }, - )) - .await; - - if send_result.is_err() { - warn!("Unable to send reconstruction to reprocessing"); + match &availability_processing_status { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + result = "imported block and custody columns", + block_hash = %hash, + "Block components available via reconstruction" + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + result = "imported all custody columns", + block_hash = %block_root, + "Block components still missing block after reconstruction" + ); } - None - } - Ok(ReconstructionOutcome::NoReconstruction) => { - // reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric - trace!( - block_hash = %block_root, - "Reconstruction not required for block" - ); - None - } - Err(e) => { - error!( - %block_root, - error = ?e, - "Error during data column reconstruction" - ); - None } + + Some(availability_processing_status) + } + Ok(None) => { + // reason is tracked via the `KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL` metric + trace!( + block_hash = %block_root, + "Reconstruction not required for block" + ); + None + } + Err(e) => { + error!( + %block_root, + error = ?e, + "Error during data column reconstruction" + ); + None } } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6bcfae5038b..6aa28f80a91 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -387,7 +387,7 @@ impl NetworkBeaconProcessor { // as these are likely historic columns. let publish_columns = false; if let Some(availability) = Arc::clone(&self) - .attempt_data_column_reconstruction(block_root, publish_columns, false) + .attempt_data_column_reconstruction(block_root, publish_columns) .await { result = Ok(availability) From 0ee7abfc9ec8c8abe0200d954a11af572974375d Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Thu, 12 Jun 2025 16:44:27 +0200 Subject: [PATCH 11/14] remove obsolete change --- .../network/src/network_beacon_processor/sync_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6aa28f80a91..31b17a41a42 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -386,7 +386,7 @@ impl NetworkBeaconProcessor { // We don't publish columns reconstructed from rpc columns to the gossip network, // as these are likely historic columns. let publish_columns = false; - if let Some(availability) = Arc::clone(&self) + if let Some(availability) = self .attempt_data_column_reconstruction(block_root, publish_columns) .await { From ab94e8a1c19d8f6ca9ef0a1b23b40cfff83cdfeb Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 13 Jun 2025 14:12:28 +0200 Subject: [PATCH 12/14] maybe fix test? --- beacon_node/network/src/network_beacon_processor/tests.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index f6a1069a7f4..ffa5d0e010b 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -729,6 +729,9 @@ async fn import_gossip_block_acceptably_early() { rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) .await; } + if num_data_columns > 0 { + rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction]).await; + } // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock // and check the head in the time between the block arrived early and when its due for From 19113d17dfd077c3b0e38b81af5a214519156f26 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Fri, 13 Jun 2025 14:16:13 +0200 Subject: [PATCH 13/14] FORMAT? --- beacon_node/network/src/network_beacon_processor/tests.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index ffa5d0e010b..9f133ea55ee 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -730,7 +730,8 @@ async fn import_gossip_block_acceptably_early() { .await; } if num_data_columns > 0 { - rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction]).await; + rig.assert_event_journal_completes(&[WorkType::ColumnReconstruction]) + .await; } // Note: this section of the code is a bit race-y. We're assuming that we can set the slot clock From 719df16df6a74c54050b160db90ef92f6dbc6edd Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 13 Jun 2025 16:02:51 +0200 Subject: [PATCH 14/14] Pop from column_reconstruction_queue. Co-authored-by: dknopik <107140945+dknopik@users.noreply.github.com> --- beacon_node/beacon_processor/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index cb45dd26a98..3acc11b1d27 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -1085,6 +1085,8 @@ impl BeaconProcessor { Some(item) } else if let Some(item) = gossip_data_column_queue.pop() { Some(item) + } else if let Some(item) = column_reconstruction_queue.pop() { + Some(item) // Check the priority 0 API requests after blocks and blobs, but before attestations. } else if let Some(item) = api_request_p0_queue.pop() { Some(item)