-
Notifications
You must be signed in to change notification settings - Fork 971
Allow AwaitingDownload to be a valid in-between state #7984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d412edb
ce61602
2241d9a
b650fc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -350,7 +350,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| return Ok(KeepChain); | ||
| } | ||
| BatchState::Poisoned => unreachable!("Poisoned batch"), | ||
| BatchState::Processing(_) | BatchState::AwaitingDownload | BatchState::Failed => { | ||
| // Batches can be in `AwaitingDownload` state if there weren't good data column subnet | ||
| // peers to send the request to. | ||
| BatchState::AwaitingDownload => return Ok(KeepChain), | ||
| BatchState::Processing(_) | BatchState::Failed => { | ||
| // these are all inconsistent states: | ||
| // - Processing -> `self.current_processing_batch` is None | ||
| // - Failed -> non recoverable batch. For an optimistic batch, it should | ||
|
|
@@ -384,7 +387,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| // Batch is not ready, nothing to process | ||
| } | ||
| BatchState::Poisoned => unreachable!("Poisoned batch"), | ||
| BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => { | ||
| // Batches can be in `AwaitingDownload` state if there weren't good data column subnet | ||
| // peers to send the request to. | ||
| BatchState::AwaitingDownload => return Ok(KeepChain), | ||
| BatchState::Failed | BatchState::Processing(_) => { | ||
| // these are all inconsistent states: | ||
| // - Failed -> non recoverable batch. Chain should have been removed | ||
| // - AwaitingDownload -> A recoverable failed batch should have been | ||
|
|
@@ -582,8 +588,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| BatchProcessResult::NonFaultyFailure => { | ||
| batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?; | ||
|
|
||
| // Simply re-download the batch. | ||
| self.send_batch(network, batch_id) | ||
| // Simply re-download all batches in `AwaitingDownload` state. | ||
| self.attempt_send_awaiting_download_batches(network, "non-faulty-failure") | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -717,6 +723,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| previous_start = %old_start, | ||
| new_start = %self.start_epoch, | ||
| processing_target = %self.processing_target, | ||
| id=%self.id, | ||
| "Chain advanced" | ||
| ); | ||
| } | ||
|
|
@@ -753,7 +760,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| } | ||
| // this is our robust `processing_target`. All previous batches must be awaiting | ||
| // validation | ||
| let mut redownload_queue = Vec::new(); | ||
|
|
||
| for (id, batch) in self.batches.range_mut(..batch_id) { | ||
| if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? { | ||
|
|
@@ -763,18 +769,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| failing_batch: *id, | ||
| }); | ||
| } | ||
| redownload_queue.push(*id); | ||
| } | ||
|
|
||
| // no batch maxed out it process attempts, so now the chain's volatile progress must be | ||
| // reset | ||
| self.processing_target = self.start_epoch; | ||
|
|
||
| for id in redownload_queue { | ||
| self.send_batch(network, id)?; | ||
| } | ||
| // finally, re-request the failed batch. | ||
| self.send_batch(network, batch_id) | ||
| // finally, re-request the failed batch and all other batches in `AwaitingDownload` state. | ||
| self.attempt_send_awaiting_download_batches(network, "handle_invalid_batch") | ||
| } | ||
|
|
||
| pub fn stop_syncing(&mut self) { | ||
|
|
@@ -810,6 +812,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
|
|
||
| // advance the chain to the new validating epoch | ||
| self.advance_chain(network, validating_epoch); | ||
| // attempt to download any batches stuck in the `AwaitingDownload` state because of | ||
| // a lack of peers earlier | ||
| self.attempt_send_awaiting_download_batches(network, "start_syncing")?; | ||
| if self.optimistic_start.is_none() | ||
| && optimistic_epoch > self.processing_target | ||
| && !self.attempted_optimistic_starts.contains(&optimistic_epoch) | ||
|
|
@@ -939,6 +944,41 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| } | ||
| } | ||
|
|
||
| /// Attempts to send all batches that are in `AwaitingDownload` state. | ||
| /// | ||
| /// Batches might get stuck in `AwaitingDownload` post peerdas because of lack of peers | ||
| /// in required subnets. We need to progress them if peers are available at a later point. | ||
| pub fn attempt_send_awaiting_download_batches( | ||
| &mut self, | ||
| network: &mut SyncNetworkContext<T>, | ||
| src: &str, | ||
| ) -> ProcessingResult { | ||
| // Collect all batches in AwaitingDownload state and see if they can be sent | ||
| let awaiting_downloads: Vec<_> = self | ||
| .batches | ||
| .iter() | ||
| .filter(|(_, batch)| matches!(batch.state(), BatchState::AwaitingDownload)) | ||
| .map(|(batch_id, _)| batch_id) | ||
| .copied() | ||
| .collect(); | ||
| debug!( | ||
| ?awaiting_downloads, | ||
| src, "Attempting to send batches awaiting downlaod" | ||
| ); | ||
|
|
||
| for batch_id in awaiting_downloads { | ||
| if self.good_peers_on_sampling_subnets(batch_id, network) { | ||
| self.send_batch(network, batch_id)?; | ||
| } else { | ||
| debug!( | ||
| src = "attempt_send_awaiting_download_batches", | ||
| "Waiting for peers to be available on sampling column subnets" | ||
| ); | ||
| } | ||
| } | ||
| Ok(KeepChain) | ||
| } | ||
|
|
||
| /// Requests the batch assigned to the given id from a given peer. | ||
| pub fn send_batch( | ||
| &mut self, | ||
|
|
@@ -1089,14 +1129,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| if !matches!(self.state, ChainSyncingState::Syncing) { | ||
| return Ok(KeepChain); | ||
| } | ||
|
|
||
| // find the next pending batch and request it from the peer | ||
|
|
||
| // check if we have the batch for our optimistic start. If not, request it first. | ||
| // We wait for this batch before requesting any other batches. | ||
| if let Some(epoch) = self.optimistic_start { | ||
| if !self.good_peers_on_sampling_subnets(epoch, network) { | ||
| debug!("Waiting for peers to be available on sampling column subnets"); | ||
| debug!( | ||
| src = "request_batches_optimistic", | ||
| "Waiting for peers to be available on sampling column subnets" | ||
| ); | ||
| return Ok(KeepChain); | ||
| } | ||
|
|
||
|
|
@@ -1105,6 +1147,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); | ||
| entry.insert(optimistic_batch); | ||
| self.send_batch(network, epoch)?; | ||
| } else { | ||
| self.attempt_send_awaiting_download_batches(network, "request_batches_optimistic")?; | ||
| } | ||
| return Ok(KeepChain); | ||
| } | ||
|
|
@@ -1119,6 +1163,28 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| self.send_batch(network, batch_id)?; | ||
| } | ||
|
|
||
| // Force requesting the `processing_batch` to progress sync if required | ||
| if !self.batches.contains_key(&self.processing_target) { | ||
| debug!(?self.processing_target,"Forcing requesting processing_target to progress sync"); | ||
| if !self.good_peers_on_sampling_subnets(self.processing_target, network) { | ||
| debug!( | ||
| src = "request_batches_processing", | ||
| "Waiting for peers to be available on sampling column subnets" | ||
| ); | ||
| return Ok(KeepChain); | ||
| } | ||
|
|
||
| if let Entry::Vacant(entry) = self.batches.entry(self.processing_target) { | ||
| let batch_type = network.batch_type(self.processing_target); | ||
| let processing_batch = | ||
| BatchInfo::new(&self.processing_target, EPOCHS_PER_BATCH, batch_type); | ||
| entry.insert(processing_batch); | ||
| self.send_batch(network, self.processing_target)?; | ||
| } else { | ||
| self.attempt_send_awaiting_download_batches(network, "request_batches_processing")?; | ||
| } | ||
| } | ||
|
||
|
|
||
| // No more batches, simply stop | ||
| Ok(KeepChain) | ||
| } | ||
|
|
@@ -1188,7 +1254,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> { | |
| // block and data column requests are currently coupled. This can be removed once we find a | ||
| // way to decouple the requests and do retries individually, see issue #6258. | ||
| if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { | ||
| debug!("Waiting for peers to be available on custody column subnets"); | ||
| debug!( | ||
| src = "include_next_batch", | ||
| "Waiting for peers to be available on custody column subnets" | ||
| ); | ||
| return None; | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.