From d412edb85aa2ed70d3637e09aa444cb86b3e4e49 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 2 Sep 2025 20:39:07 -0700 Subject: [PATCH 1/3] Allow batch to be in AwaitingDownload state --- .../network/src/sync/backfill_sync/mod.rs | 10 +- .../network/src/sync/range_sync/chain.rs | 97 ++++++++++++++++--- 2 files changed, 89 insertions(+), 18 deletions(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 2f5eb3f6894..24d57a4a3f1 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -687,11 +687,12 @@ impl BackFillSync { // Batch is not ready, nothing to process } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(ProcessResult::Successful), + BatchState::Failed | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed - // - AwaitingDownload -> A recoverable failed batch should have been - // re-requested. // - Processing -> `self.current_processing_batch` is None self.fail_sync(BackFillError::InvalidSyncState(String::from( "Invalid expected batch state", @@ -790,7 +791,8 @@ impl BackFillSync { } } BatchState::Downloading(..) => {} - BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { + BatchState::AwaitingDownload => return, + BatchState::Failed | BatchState::Poisoned => { crit!("batch indicates inconsistent chain state while advancing chain") } BatchState::AwaitingProcessing(..) => {} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 96319f2efad..8b426b664b7 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -350,7 +350,10 @@ impl SyncingChain { return Ok(KeepChain); } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Processing(_) | BatchState::AwaitingDownload | BatchState::Failed => { + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(KeepChain), + BatchState::Processing(_) | BatchState::Failed => { // these are all inconsistent states: // - Processing -> `self.current_processing_batch` is None // - Failed -> non recoverable batch. For an optimistic batch, it should @@ -384,7 +387,10 @@ impl SyncingChain { // Batch is not ready, nothing to process } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(KeepChain), + BatchState::Failed | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed // - AwaitingDownload -> A recoverable failed batch should have been @@ -582,8 +588,8 @@ impl SyncingChain { BatchProcessResult::NonFaultyFailure => { batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; - // Simply re-download the batch. - self.send_batch(network, batch_id) + // Simply re-download all batches in `AwaitingDownload` state. + self.attempt_send_awaiting_download_batches(network, "non-faulty-failure") } } } @@ -717,6 +723,7 @@ impl SyncingChain { previous_start = %old_start, new_start = %self.start_epoch, processing_target = %self.processing_target, + id=%self.id, "Chain advanced" ); } @@ -753,7 +760,6 @@ impl SyncingChain { } // this is our robust `processing_target`. All previous batches must be awaiting // validation - let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? { @@ -763,18 +769,14 @@ impl SyncingChain { failing_batch: *id, }); } - redownload_queue.push(*id); } // no batch maxed out it process attempts, so now the chain's volatile progress must be // reset self.processing_target = self.start_epoch; - for id in redownload_queue { - self.send_batch(network, id)?; - } - // finally, re-request the failed batch. - self.send_batch(network, batch_id) + // finally, re-request the failed batch and all other batches in `AwaitingDownload` state. + self.attempt_send_awaiting_download_batches(network, "handle_invalid_batch") } pub fn stop_syncing(&mut self) { @@ -810,6 +812,9 @@ impl SyncingChain { // advance the chain to the new validating epoch self.advance_chain(network, validating_epoch); + // attempt to download any batches stuck in the `AwaitingDownload` state because of + // a lack of peers earlier + self.attempt_send_awaiting_download_batches(network, "start_syncing")?; if self.optimistic_start.is_none() && optimistic_epoch > self.processing_target && !self.attempted_optimistic_starts.contains(&optimistic_epoch) @@ -939,6 +944,41 @@ impl SyncingChain { } } + /// Attempts to send all batches that are in `AwaitingDownload` state. + /// + /// Batches might get stuck in `AwaitingDownload` post peerdas because of lack of peers + /// in required subnets. We need to progress them if peers are available at a later point. + pub fn attempt_send_awaiting_download_batches( + &mut self, + network: &mut SyncNetworkContext, + src: &str, + ) -> ProcessingResult { + // Collect all batches in AwaitingDownload state and see if they can be sent + let awaiting_downloads: Vec<_> = self + .batches + .iter() + .filter(|(_, batch)| matches!(batch.state(), BatchState::AwaitingDownload)) + .map(|(batch_id, _)| batch_id) + .copied() + .collect(); + debug!( + ?awaiting_downloads, + src, "Attempting to send batches awaiting downlaod" + ); + + for batch_id in awaiting_downloads { + if self.good_peers_on_sampling_subnets(batch_id, network) { + self.send_batch(network, batch_id)?; + } else { + debug!( + src = "attempt_send_awaiting_download_batches", + "Waiting for peers to be available on sampling column subnets" + ); + } + } + Ok(KeepChain) + } + /// Requests the batch assigned to the given id from a given peer. pub fn send_batch( &mut self, @@ -1089,14 +1129,16 @@ impl SyncingChain { if !matches!(self.state, ChainSyncingState::Syncing) { return Ok(KeepChain); } - // find the next pending batch and request it from the peer // check if we have the batch for our optimistic start. If not, request it first. // We wait for this batch before requesting any other batches. if let Some(epoch) = self.optimistic_start { if !self.good_peers_on_sampling_subnets(epoch, network) { - debug!("Waiting for peers to be available on sampling column subnets"); + debug!( + src = "request_batches_optimistic", + "Waiting for peers to be available on sampling column subnets" + ); return Ok(KeepChain); } @@ -1105,6 +1147,8 @@ impl SyncingChain { let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); entry.insert(optimistic_batch); self.send_batch(network, epoch)?; + } else { + self.attempt_send_awaiting_download_batches(network, "request_batches_optimistic")?; } return Ok(KeepChain); } @@ -1119,6 +1163,28 @@ impl SyncingChain { self.send_batch(network, batch_id)?; } + // Force requesting the `processing_batch` to progress sync if required + if !self.batches.contains_key(&self.processing_target) { + debug!(?self.processing_target,"Forcing requesting processing_target to progress sync"); + if !self.good_peers_on_sampling_subnets(self.processing_target, network) { + debug!( + src = "request_batches_processing", + "Waiting for peers to be available on sampling column subnets" + ); + return Ok(KeepChain); + } + + if let Entry::Vacant(entry) = self.batches.entry(self.processing_target) { + let batch_type = network.batch_type(self.processing_target); + let processing_batch = + BatchInfo::new(&self.processing_target, EPOCHS_PER_BATCH, batch_type); + entry.insert(processing_batch); + self.send_batch(network, self.processing_target)?; + } else { + self.attempt_send_awaiting_download_batches(network, "request_batches_processing")?; + } + } + // No more batches, simply stop Ok(KeepChain) } @@ -1188,7 +1254,10 @@ impl SyncingChain { // block and data column requests are currently coupled. This can be removed once we find a // way to decouple the requests and do retries individually, see issue #6258. if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { - debug!("Waiting for peers to be available on custody column subnets"); + debug!( + src = "include_next_batch", + "Waiting for peers to be available on custody column subnets" + ); return None; } From ce616022457c68686ea5ee9735f4755a210843a5 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 3 Sep 2025 17:00:02 +1000 Subject: [PATCH 2/3] Improve some rust code that I saw --- beacon_node/network/src/sync/range_sync/chain.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 8b426b664b7..1afeae0e736 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1198,20 +1198,15 @@ impl SyncingChain { ) -> bool { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { // Require peers on all sampling column subnets before sending batches + let peer_db = network.network_globals().peers.read(); network .network_globals() .sampling_subnets() .iter() .all(|subnet_id| { - let peer_db = network.network_globals().peers.read(); - let peer_count = self - .peers - .iter() - .filter(|peer| { - peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer) - }) - .count(); - peer_count > 0 + self.peers.iter().any(|peer| { + peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer) + }) }) } else { true From 2241d9a86d56577053607475a7e23690aa5da2d2 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 3 Sep 2025 12:54:17 -0700 Subject: [PATCH 3/3] Remove unnecessary processing_target batch request code --- .../network/src/sync/range_sync/chain.rs | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 1afeae0e736..38a834ee4d3 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1163,28 +1163,6 @@ impl SyncingChain { self.send_batch(network, batch_id)?; } - // Force requesting the `processing_batch` to progress sync if required - if !self.batches.contains_key(&self.processing_target) { - debug!(?self.processing_target,"Forcing requesting processing_target to progress sync"); - if !self.good_peers_on_sampling_subnets(self.processing_target, network) { - debug!( - src = "request_batches_processing", - "Waiting for peers to be available on sampling column subnets" - ); - return Ok(KeepChain); - } - - if let Entry::Vacant(entry) = self.batches.entry(self.processing_target) { - let batch_type = network.batch_type(self.processing_target); - let processing_batch = - BatchInfo::new(&self.processing_target, EPOCHS_PER_BATCH, batch_type); - entry.insert(processing_batch); - self.send_batch(network, self.processing_target)?; - } else { - self.attempt_send_awaiting_download_batches(network, "request_batches_processing")?; - } - } - // No more batches, simply stop Ok(KeepChain) }