Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// Batch is not ready, nothing to process
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => {
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
// peers to send the request to.
BatchState::AwaitingDownload => return Ok(ProcessResult::Successful),
BatchState::Failed | BatchState::Processing(_) => {
// these are all inconsistent states:
// - Failed -> non recoverable batch. Chain should have been removed
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
// - Processing -> `self.current_processing_batch` is None
self.fail_sync(BackFillError::InvalidSyncState(String::from(
"Invalid expected batch state",
Expand Down Expand Up @@ -790,7 +791,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
BatchState::Downloading(..) => {}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => {
BatchState::AwaitingDownload => return,
BatchState::Failed | BatchState::Poisoned => {
crit!("batch indicates inconsistent chain state while advancing chain")
}
BatchState::AwaitingProcessing(..) => {}
Expand Down
88 changes: 65 additions & 23 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return Ok(KeepChain);
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Processing(_) | BatchState::AwaitingDownload | BatchState::Failed => {
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
// peers to send the request to.
BatchState::AwaitingDownload => return Ok(KeepChain),
BatchState::Processing(_) | BatchState::Failed => {
// these are all inconsistent states:
// - Processing -> `self.current_processing_batch` is None
// - Failed -> non recoverable batch. For an optimistic batch, it should
Expand Down Expand Up @@ -384,7 +387,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Batch is not ready, nothing to process
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => {
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
// peers to send the request to.
BatchState::AwaitingDownload => return Ok(KeepChain),
BatchState::Failed | BatchState::Processing(_) => {
// these are all inconsistent states:
// - Failed -> non recoverable batch. Chain should have been removed
// - AwaitingDownload -> A recoverable failed batch should have been
Expand Down Expand Up @@ -582,8 +588,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
BatchProcessResult::NonFaultyFailure => {
batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?;

// Simply re-download the batch.
self.send_batch(network, batch_id)
// Simply re-download all batches in `AwaitingDownload` state.
self.attempt_send_awaiting_download_batches(network, "non-faulty-failure")
}
}
}
Expand Down Expand Up @@ -717,6 +723,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
previous_start = %old_start,
new_start = %self.start_epoch,
processing_target = %self.processing_target,
id=%self.id,
"Chain advanced"
);
}
Expand Down Expand Up @@ -753,7 +760,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
// this is our robust `processing_target`. All previous batches must be awaiting
// validation
let mut redownload_queue = Vec::new();

for (id, batch) in self.batches.range_mut(..batch_id) {
if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? {
Expand All @@ -763,18 +769,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
failing_batch: *id,
});
}
redownload_queue.push(*id);
}

// no batch maxed out it process attempts, so now the chain's volatile progress must be
// reset
self.processing_target = self.start_epoch;

for id in redownload_queue {
self.send_batch(network, id)?;
}
// finally, re-request the failed batch.
self.send_batch(network, batch_id)
// finally, re-request the failed batch and all other batches in `AwaitingDownload` state.
self.attempt_send_awaiting_download_batches(network, "handle_invalid_batch")
}

pub fn stop_syncing(&mut self) {
Expand Down Expand Up @@ -810,6 +812,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

// advance the chain to the new validating epoch
self.advance_chain(network, validating_epoch);
// attempt to download any batches stuck in the `AwaitingDownload` state because of
// a lack of peers earlier
self.attempt_send_awaiting_download_batches(network, "start_syncing")?;
if self.optimistic_start.is_none()
&& optimistic_epoch > self.processing_target
&& !self.attempted_optimistic_starts.contains(&optimistic_epoch)
Expand Down Expand Up @@ -939,6 +944,41 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}

/// Attempts to send all batches that are in `AwaitingDownload` state.
///
/// Batches might get stuck in `AwaitingDownload` post peerdas because of lack of peers
/// in required subnets. We need to progress them if peers are available at a later point.
pub fn attempt_send_awaiting_download_batches(
&mut self,
network: &mut SyncNetworkContext<T>,
src: &str,
) -> ProcessingResult {
// Collect all batches in AwaitingDownload state and see if they can be sent
let awaiting_downloads: Vec<_> = self
.batches
.iter()
.filter(|(_, batch)| matches!(batch.state(), BatchState::AwaitingDownload))
.map(|(batch_id, _)| batch_id)
.copied()
.collect();
debug!(
?awaiting_downloads,
src, "Attempting to send batches awaiting downlaod"
);

for batch_id in awaiting_downloads {
if self.good_peers_on_sampling_subnets(batch_id, network) {
self.send_batch(network, batch_id)?;
} else {
debug!(
src = "attempt_send_awaiting_download_batches",
"Waiting for peers to be available on sampling column subnets"
);
}
}
Ok(KeepChain)
}

/// Requests the batch assigned to the given id from a given peer.
pub fn send_batch(
&mut self,
Expand Down Expand Up @@ -1089,14 +1129,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if !matches!(self.state, ChainSyncingState::Syncing) {
return Ok(KeepChain);
}

// find the next pending batch and request it from the peer

// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_sampling_subnets(epoch, network) {
debug!("Waiting for peers to be available on sampling column subnets");
debug!(
src = "request_batches_optimistic",
"Waiting for peers to be available on sampling column subnets"
);
return Ok(KeepChain);
}

Expand All @@ -1105,6 +1147,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
entry.insert(optimistic_batch);
self.send_batch(network, epoch)?;
} else {
self.attempt_send_awaiting_download_batches(network, "request_batches_optimistic")?;
}
return Ok(KeepChain);
}
Expand Down Expand Up @@ -1132,20 +1176,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all sampling column subnets before sending batches
let peer_db = network.network_globals().peers.read();
network
.network_globals()
.sampling_subnets()
.iter()
.all(|subnet_id| {
let peer_db = network.network_globals().peers.read();
let peer_count = self
.peers
.iter()
.filter(|peer| {
peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer)
})
.count();
peer_count > 0
self.peers.iter().any(|peer| {
peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer)
})
})
} else {
true
Expand Down Expand Up @@ -1188,7 +1227,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// block and data column requests are currently coupled. This can be removed once we find a
// way to decouple the requests and do retries individually, see issue #6258.
if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
debug!("Waiting for peers to be available on custody column subnets");
debug!(
src = "include_next_batch",
"Waiting for peers to be available on custody column subnets"
);
return None;
}

Expand Down
Loading