Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 200 additions & 3 deletions beacon_node/network/src/sync/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {
/// After different operations over a batch, this could be in a state that allows it to
/// continue, or in failed state. When the batch has failed, we check if it did mainly due to
/// processing failures. In this case the batch is considered failed and faulty.
///
/// When failure counts are equal, `blacklist` is `false` — we assume network issues over
/// peer fault when the evidence is ambiguous.
pub fn outcome(&self) -> BatchOperationOutcome {
match self.state {
BatchState::Poisoned => unreachable!("Poisoned batch"),
Expand Down Expand Up @@ -255,8 +258,10 @@ impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {
/// Mark the batch as failed and return whether we can attempt a re-download.
///
/// This can happen if a peer disconnects or some error occurred that was not the peers fault.
/// The `peer` parameter, when set to None, does not increment the failed attempts of
/// this batch and register the peer, rather attempts a re-download.
/// The `peer` parameter, when set to `None`, still counts toward
/// `max_batch_download_attempts` (to prevent infinite retries on persistent failures)
/// but does not register a peer in `failed_peers()`. Use
/// [`Self::downloading_to_awaiting_download`] to retry without counting a failed attempt.
#[must_use = "Batch may have failed"]
pub fn download_failed(
&mut self,
Expand All @@ -272,7 +277,6 @@ impl<E: EthSpec, B: BatchConfig, D: Hash> BatchInfo<E, B, D> {
{
BatchState::Failed
} else {
// drop the blocks
BatchState::AwaitingDownload
};
Ok(self.outcome())
Expand Down Expand Up @@ -524,3 +528,196 @@ impl<D: Hash> BatchState<D> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::sync::range_sync::RangeSyncBatchConfig;
use types::MinimalEthSpec;

type Cfg = RangeSyncBatchConfig<MinimalEthSpec>;
type TestBatch = BatchInfo<MinimalEthSpec, Cfg, Vec<u64>>;

fn max_dl() -> u8 {
Cfg::max_batch_download_attempts()
}

fn max_proc() -> u8 {
Cfg::max_batch_processing_attempts()
}

fn new_batch() -> TestBatch {
BatchInfo::new(&Epoch::new(0), 1, ByRangeRequestType::Blocks)
}

fn peer() -> PeerId {
PeerId::random()
}

fn advance_to_processing(batch: &mut TestBatch, req_id: Id, peer_id: PeerId) {
batch.start_downloading(req_id).unwrap();
batch.download_completed(vec![1, 2, 3], peer_id).unwrap();
batch.start_processing().unwrap();
}

fn advance_to_awaiting_validation(batch: &mut TestBatch, req_id: Id, peer_id: PeerId) {
advance_to_processing(batch, req_id, peer_id);
batch
.processing_completed(BatchProcessingResult::Success)
.unwrap();
}

#[test]
fn happy_path_lifecycle() {
let mut batch = new_batch();
let p = peer();

assert!(matches!(batch.state(), BatchState::AwaitingDownload));

batch.start_downloading(1).unwrap();
assert!(matches!(batch.state(), BatchState::Downloading(1)));

batch.download_completed(vec![10, 20], p).unwrap();
assert!(matches!(batch.state(), BatchState::AwaitingProcessing(..)));

let (data, _duration) = batch.start_processing().unwrap();
assert_eq!(data, vec![10, 20]);
assert!(matches!(batch.state(), BatchState::Processing(..)));

let outcome = batch
.processing_completed(BatchProcessingResult::Success)
.unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
assert!(matches!(batch.state(), BatchState::AwaitingValidation(..)));
}

#[test]
fn download_failures_count_toward_limit() {
let mut batch = new_batch();

for i in 1..max_dl() as Id {
batch.start_downloading(i).unwrap();
let outcome = batch.download_failed(Some(peer())).unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}

// Next failure hits the limit
batch.start_downloading(max_dl() as Id).unwrap();
let outcome = batch.download_failed(Some(peer())).unwrap();
assert!(matches!(
outcome,
BatchOperationOutcome::Failed { blacklist: false }
));
}

#[test]
fn download_failed_none_counts_but_does_not_blame_peer() {
let mut batch = new_batch();

// None still counts toward the limit (prevents infinite retry on persistent
// network failures), but doesn't register a peer in failed_peers().
for i in 0..max_dl() as Id {
batch.start_downloading(i).unwrap();
batch.download_failed(None).unwrap();
}
assert!(matches!(batch.state(), BatchState::Failed));
assert!(batch.failed_peers().is_empty());
}

#[test]
fn faulty_processing_failures_count_toward_limit() {
let mut batch = new_batch();

for i in 1..max_proc() as Id {
advance_to_processing(&mut batch, i, peer());
let outcome = batch
.processing_completed(BatchProcessingResult::FaultyFailure)
.unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}

// Next faulty failure: limit reached
advance_to_processing(&mut batch, max_proc() as Id, peer());
let outcome = batch
.processing_completed(BatchProcessingResult::FaultyFailure)
.unwrap();
assert!(matches!(
outcome,
BatchOperationOutcome::Failed { blacklist: true }
));
}

#[test]
fn non_faulty_processing_failures_never_exhaust_batch() {
let mut batch = new_batch();

// Well past both limits — non-faulty failures should never cause failure
let iterations = (max_dl() + max_proc()) as Id * 2;
for i in 0..iterations {
advance_to_processing(&mut batch, i, peer());
let outcome = batch
.processing_completed(BatchProcessingResult::NonFaultyFailure)
.unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}
// Non-faulty failures also don't register peers as failed
assert!(batch.failed_peers().is_empty());
}

#[test]
fn validation_failures_count_toward_processing_limit() {
let mut batch = new_batch();

for i in 1..max_proc() as Id {
advance_to_awaiting_validation(&mut batch, i, peer());
let outcome = batch.validation_failed().unwrap();
assert!(matches!(outcome, BatchOperationOutcome::Continue));
}

advance_to_awaiting_validation(&mut batch, max_proc() as Id, peer());
let outcome = batch.validation_failed().unwrap();
assert!(matches!(
outcome,
BatchOperationOutcome::Failed { blacklist: true }
));
}

#[test]
fn mixed_failure_types_interact_correctly() {
let mut batch = new_batch();
let mut req_id: Id = 0;
let mut next_id = || {
req_id += 1;
req_id
};

// One download failure
batch.start_downloading(next_id()).unwrap();
batch.download_failed(Some(peer())).unwrap();

// One faulty processing failure (requires a successful download first)
advance_to_processing(&mut batch, next_id(), peer());
batch
.processing_completed(BatchProcessingResult::FaultyFailure)
.unwrap();

// One non-faulty processing failure
advance_to_processing(&mut batch, next_id(), peer());
batch
.processing_completed(BatchProcessingResult::NonFaultyFailure)
.unwrap();
assert!(matches!(batch.state(), BatchState::AwaitingDownload));

// Fill remaining download failures to hit the limit
for _ in 1..max_dl() {
batch.start_downloading(next_id()).unwrap();
batch.download_failed(Some(peer())).unwrap();
}

// Download failures > processing failures → blacklist: false
assert!(matches!(
batch.outcome(),
BatchOperationOutcome::Failed { blacklist: false }
));
}
}
2 changes: 2 additions & 0 deletions beacon_node/network/src/sync/range_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod chain_collection;
mod range;
mod sync_type;

#[cfg(test)]
pub use chain::RangeSyncBatchConfig;
pub use chain::{ChainId, EPOCHS_PER_BATCH};
#[cfg(test)]
pub use chain_collection::SyncChainStatus;
Expand Down