diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index f00503ec634..d5a4e9b73a8 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -687,11 +687,12 @@ impl BackFillSync { // Batch is not ready, nothing to process } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(ProcessResult::Successful), + BatchState::Failed | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed - // - AwaitingDownload -> A recoverable failed batch should have been - // re-requested. // - Processing -> `self.current_processing_batch` is None self.fail_sync(BackFillError::InvalidSyncState(String::from( "Invalid expected batch state", @@ -790,7 +791,8 @@ impl BackFillSync { } } BatchState::Downloading(..) => {} - BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => { + BatchState::AwaitingDownload => return, + BatchState::Failed | BatchState::Poisoned => { crit!("batch indicates inconsistent chain state while advancing chain") } BatchState::AwaitingProcessing(..) => {} diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 8907f7510fd..a8c85e44d26 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -350,7 +350,10 @@ impl SyncingChain { return Ok(KeepChain); } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Processing(_) | BatchState::AwaitingDownload | BatchState::Failed => { + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(KeepChain), + BatchState::Processing(_) | BatchState::Failed => { // these are all inconsistent states: // - Processing -> `self.current_processing_batch` is None // - Failed -> non recoverable batch. For an optimistic batch, it should @@ -384,7 +387,10 @@ impl SyncingChain { // Batch is not ready, nothing to process } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { + // Batches can be in `AwaitingDownload` state if there weren't good data column subnet + // peers to send the request to. + BatchState::AwaitingDownload => return Ok(KeepChain), + BatchState::Failed | BatchState::Processing(_) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed // - AwaitingDownload -> A recoverable failed batch should have been @@ -582,8 +588,8 @@ impl SyncingChain { BatchProcessResult::NonFaultyFailure => { batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; - // Simply re-download the batch. - self.send_batch(network, batch_id) + // Simply re-download all batches in `AwaitingDownload` state. + self.attempt_send_awaiting_download_batches(network, "non-faulty-failure") } } } @@ -717,6 +723,7 @@ impl SyncingChain { previous_start = %old_start, new_start = %self.start_epoch, processing_target = %self.processing_target, + id=%self.id, "Chain advanced" ); } @@ -753,7 +760,6 @@ impl SyncingChain { } // this is our robust `processing_target`. All previous batches must be awaiting // validation - let mut redownload_queue = Vec::new(); for (id, batch) in self.batches.range_mut(..batch_id) { if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? { @@ -763,18 +769,14 @@ impl SyncingChain { failing_batch: *id, }); } - redownload_queue.push(*id); } // no batch maxed out it process attempts, so now the chain's volatile progress must be // reset self.processing_target = self.start_epoch; - for id in redownload_queue { - self.send_batch(network, id)?; - } - // finally, re-request the failed batch. - self.send_batch(network, batch_id) + // finally, re-request the failed batch and all other batches in `AwaitingDownload` state. + self.attempt_send_awaiting_download_batches(network, "handle_invalid_batch") } pub fn stop_syncing(&mut self) { @@ -810,6 +812,9 @@ impl SyncingChain { // advance the chain to the new validating epoch self.advance_chain(network, validating_epoch); + // attempt to download any batches stuck in the `AwaitingDownload` state because of + // a lack of peers earlier + self.attempt_send_awaiting_download_batches(network, "start_syncing")?; if self.optimistic_start.is_none() && optimistic_epoch > self.processing_target && !self.attempted_optimistic_starts.contains(&optimistic_epoch) @@ -939,6 +944,41 @@ impl SyncingChain { } } + /// Attempts to send all batches that are in `AwaitingDownload` state. + /// + /// Batches might get stuck in `AwaitingDownload` post peerdas because of lack of peers + /// in required subnets. We need to progress them if peers are available at a later point. + pub fn attempt_send_awaiting_download_batches( + &mut self, + network: &mut SyncNetworkContext, + src: &str, + ) -> ProcessingResult { + // Collect all batches in AwaitingDownload state and see if they can be sent + let awaiting_downloads: Vec<_> = self + .batches + .iter() + .filter(|(_, batch)| matches!(batch.state(), BatchState::AwaitingDownload)) + .map(|(batch_id, _)| batch_id) + .copied() + .collect(); + debug!( + ?awaiting_downloads, + src, "Attempting to send batches awaiting downlaod" + ); + + for batch_id in awaiting_downloads { + if self.good_peers_on_sampling_subnets(batch_id, network) { + self.send_batch(network, batch_id)?; + } else { + debug!( + src = "attempt_send_awaiting_download_batches", + "Waiting for peers to be available on sampling column subnets" + ); + } + } + Ok(KeepChain) + } + /// Requests the batch assigned to the given id from a given peer. pub fn send_batch( &mut self, @@ -1089,14 +1129,16 @@ impl SyncingChain { if !matches!(self.state, ChainSyncingState::Syncing) { return Ok(KeepChain); } - // find the next pending batch and request it from the peer // check if we have the batch for our optimistic start. If not, request it first. // We wait for this batch before requesting any other batches. if let Some(epoch) = self.optimistic_start { if !self.good_peers_on_sampling_subnets(epoch, network) { - debug!("Waiting for peers to be available on sampling column subnets"); + debug!( + src = "request_batches_optimistic", + "Waiting for peers to be available on sampling column subnets" + ); return Ok(KeepChain); } @@ -1105,6 +1147,8 @@ impl SyncingChain { let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); entry.insert(optimistic_batch); self.send_batch(network, epoch)?; + } else { + self.attempt_send_awaiting_download_batches(network, "request_batches_optimistic")?; } return Ok(KeepChain); } @@ -1179,7 +1223,10 @@ impl SyncingChain { // block and data column requests are currently coupled. This can be removed once we find a // way to decouple the requests and do retries individually, see issue #6258. if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { - debug!("Waiting for peers to be available on custody column subnets"); + debug!( + src = "include_next_batch", + "Waiting for peers to be available on custody column subnets" + ); return None; }