Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// Batch is not ready, nothing to process
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => {
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
// peers to send the request to.
BatchState::AwaitingDownload => return Ok(ProcessResult::Successful),
BatchState::Failed | BatchState::Processing(_) => {
// these are all inconsistent states:
// - Failed -> non recoverable batch. Chain should have been removed
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
// - Processing -> `self.current_processing_batch` is None
self.fail_sync(BackFillError::InvalidSyncState(String::from(
"Invalid expected batch state",
Expand Down Expand Up @@ -790,7 +791,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
BatchState::Downloading(..) => {}
BatchState::Failed | BatchState::Poisoned | BatchState::AwaitingDownload => {
BatchState::AwaitingDownload => return,
BatchState::Failed | BatchState::Poisoned => {
crit!("batch indicates inconsistent chain state while advancing chain")
}
BatchState::AwaitingProcessing(..) => {}
Expand Down
97 changes: 83 additions & 14 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return Ok(KeepChain);
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Processing(_) | BatchState::AwaitingDownload | BatchState::Failed => {
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
// peers to send the request to.
BatchState::AwaitingDownload => return Ok(KeepChain),
BatchState::Processing(_) | BatchState::Failed => {
// these are all inconsistent states:
// - Processing -> `self.current_processing_batch` is None
// - Failed -> non recoverable batch. For an optimistic batch, it should
Expand Down Expand Up @@ -384,7 +387,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// Batch is not ready, nothing to process
}
BatchState::Poisoned => unreachable!("Poisoned batch"),
BatchState::Failed | BatchState::AwaitingDownload | BatchState::Processing(_) => {
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
// peers to send the request to.
BatchState::AwaitingDownload => return Ok(KeepChain),
BatchState::Failed | BatchState::Processing(_) => {
// these are all inconsistent states:
// - Failed -> non recoverable batch. Chain should have been removed
// - AwaitingDownload -> A recoverable failed batch should have been
Expand Down Expand Up @@ -582,8 +588,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
BatchProcessResult::NonFaultyFailure => {
batch.processing_completed(BatchProcessingResult::NonFaultyFailure)?;

// Simply re-download the batch.
self.send_batch(network, batch_id)
// Simply re-download all batches in `AwaitingDownload` state.
self.attempt_send_awaiting_download_batches(network, "non-faulty-failure")
}
}
}
Expand Down Expand Up @@ -717,6 +723,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
previous_start = %old_start,
new_start = %self.start_epoch,
processing_target = %self.processing_target,
id=%self.id,
"Chain advanced"
);
}
Expand Down Expand Up @@ -753,7 +760,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
// this is our robust `processing_target`. All previous batches must be awaiting
// validation
let mut redownload_queue = Vec::new();

for (id, batch) in self.batches.range_mut(..batch_id) {
if let BatchOperationOutcome::Failed { blacklist } = batch.validation_failed()? {
Expand All @@ -763,18 +769,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
failing_batch: *id,
});
}
redownload_queue.push(*id);
}

// no batch maxed out it process attempts, so now the chain's volatile progress must be
// reset
self.processing_target = self.start_epoch;

for id in redownload_queue {
self.send_batch(network, id)?;
}
// finally, re-request the failed batch.
self.send_batch(network, batch_id)
// finally, re-request the failed batch and all other batches in `AwaitingDownload` state.
self.attempt_send_awaiting_download_batches(network, "handle_invalid_batch")
}

pub fn stop_syncing(&mut self) {
Expand Down Expand Up @@ -810,6 +812,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

// advance the chain to the new validating epoch
self.advance_chain(network, validating_epoch);
// attempt to download any batches stuck in the `AwaitingDownload` state because of
// a lack of peers earlier
self.attempt_send_awaiting_download_batches(network, "start_syncing")?;
if self.optimistic_start.is_none()
&& optimistic_epoch > self.processing_target
&& !self.attempted_optimistic_starts.contains(&optimistic_epoch)
Expand Down Expand Up @@ -939,6 +944,41 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}

/// Attempts to send all batches that are in `AwaitingDownload` state.
///
/// Batches might get stuck in `AwaitingDownload` post peerdas because of lack of peers
/// in required subnets. We need to progress them if peers are available at a later point.
pub fn attempt_send_awaiting_download_batches(
&mut self,
network: &mut SyncNetworkContext<T>,
src: &str,
) -> ProcessingResult {
// Collect all batches in AwaitingDownload state and see if they can be sent
let awaiting_downloads: Vec<_> = self
.batches
.iter()
.filter(|(_, batch)| matches!(batch.state(), BatchState::AwaitingDownload))
.map(|(batch_id, _)| batch_id)
.copied()
.collect();
debug!(
?awaiting_downloads,
src, "Attempting to send batches awaiting downlaod"
);

for batch_id in awaiting_downloads {
if self.good_peers_on_sampling_subnets(batch_id, network) {
self.send_batch(network, batch_id)?;
} else {
debug!(
src = "attempt_send_awaiting_download_batches",
"Waiting for peers to be available on sampling column subnets"
);
}
}
Ok(KeepChain)
}

/// Requests the batch assigned to the given id from a given peer.
pub fn send_batch(
&mut self,
Expand Down Expand Up @@ -1089,14 +1129,16 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
if !matches!(self.state, ChainSyncingState::Syncing) {
return Ok(KeepChain);
}

// find the next pending batch and request it from the peer

// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_sampling_subnets(epoch, network) {
debug!("Waiting for peers to be available on sampling column subnets");
debug!(
src = "request_batches_optimistic",
"Waiting for peers to be available on sampling column subnets"
);
return Ok(KeepChain);
}

Expand All @@ -1105,6 +1147,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type);
entry.insert(optimistic_batch);
self.send_batch(network, epoch)?;
} else {
self.attempt_send_awaiting_download_batches(network, "request_batches_optimistic")?;
}
return Ok(KeepChain);
}
Expand All @@ -1119,6 +1163,28 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.send_batch(network, batch_id)?;
}

// Force requesting the `processing_batch` to progress sync if required
if !self.batches.contains_key(&self.processing_target) {
debug!(?self.processing_target,"Forcing requesting processing_target to progress sync");
if !self.good_peers_on_sampling_subnets(self.processing_target, network) {
debug!(
src = "request_batches_processing",
"Waiting for peers to be available on sampling column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(self.processing_target) {
let batch_type = network.batch_type(self.processing_target);
let processing_batch =
BatchInfo::new(&self.processing_target, EPOCHS_PER_BATCH, batch_type);
entry.insert(processing_batch);
self.send_batch(network, self.processing_target)?;
} else {
self.attempt_send_awaiting_download_batches(network, "request_batches_processing")?;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the point of this. The processing target is the epoch in which we need to process next. Why do we need to download it again?

This seems to handle an edge case and makes this more complex. It seems like it's covering for an underlying issue somewhere else?

Copy link
Member Author

@pawanjay176 pawanjay176 Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue i saw on devnet was this:
Batch 1,2,3,4,5 are sent for download. 2,3,4,5 were downloaded successfully and are now AwaitingProcessing.
Batch 1 fails downloading a couple of times and the state changes to AwaitingDownload. Now batch 6 sneaks in at some point when batch 1 is still in the AwaitingDownloading state.
So now self.batches contains 2,3,4,5,6 => AwaitingProcessing state while batch 1 can no longer get in.

I thought the simplest way to handle this would be force send the processing target.

But now I think of it, this could have happened before I added a call to attempt_send_awaiting_download_batches here https://github.com/pawanjay176/lighthouse/blob/ce616022457c68686ea5ee9735f4755a210843a5/beacon_node/network/src/sync/range_sync/chain.rs#L817

So maybe we don't need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done some more testing and have concluded that we do not need this. In my testing, i probably added the call to attempt_send_awaiting_download_batches in start_syncing along with this code for sending the processing_target. The sync getting stuck issue was probably fixed by the former and not the latter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 2241d9a


// No more batches, simply stop
Ok(KeepChain)
}
Expand Down Expand Up @@ -1188,7 +1254,10 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// block and data column requests are currently coupled. This can be removed once we find a
// way to decouple the requests and do retries individually, see issue #6258.
if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
debug!("Waiting for peers to be available on custody column subnets");
debug!(
src = "include_next_batch",
"Waiting for peers to be available on custody column subnets"
);
return None;
}

Expand Down
Loading