Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

// remove all skip slots i.e. duplicated roots
Ok(block_roots.into_iter().unique().collect::<Vec<_>>())
Ok(block_roots
.into_iter()
.unique_by(|(root, _)| *root)
.collect::<Vec<_>>())
}

/// Handle a `BlobsByRange` request from the peer.
Expand Down
116 changes: 116 additions & 0 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,39 @@ impl TestRig {
.await
}

pub async fn new_with_skip_slots(chain_length: u64, skip_slots: &HashSet<u64>) -> Self {
let mut spec = test_spec::<E>();
spec.shard_committee_period = 2;
let spec = Arc::new(spec);
let beacon_processor_config = BeaconProcessorConfig::default();
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.clone())
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
.node_custody_type(NodeCustodyType::Fullnode)
.chain_config(<_>::default())
.build();

harness.advance_slot();

for slot in 1..=chain_length {
if !skip_slots.contains(&slot) {
harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
}

harness.advance_slot();
}

Self::from_harness(harness, beacon_processor_config, spec).await
}

pub async fn new_parametric(
chain_length: u64,
beacon_processor_config: BeaconProcessorConfig,
Expand Down Expand Up @@ -150,6 +183,14 @@ impl TestRig {
harness.advance_slot();
}

Self::from_harness(harness, beacon_processor_config, spec).await
}

async fn from_harness(
harness: BeaconChainHarness<T>,
beacon_processor_config: BeaconProcessorConfig,
spec: Arc<ChainSpec>,
) -> Self {
let head = harness.chain.head_snapshot();

assert_eq!(
Expand Down Expand Up @@ -1986,3 +2027,78 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() {
"Should have received at least some data columns"
);
}

/// Test that DataColumnsByRange does not return duplicate data columns for skip slots.
///
/// When skip slots occur, `forwards_iter_block_roots` returns the same block root for
/// consecutive slots. The deduplication in `get_block_roots_from_store` must use
/// `unique_by` on the root (not the full `(root, slot)` tuple) to avoid serving
/// duplicate data columns for the same block.
#[tokio::test]
async fn test_data_columns_by_range_no_duplicates_with_skip_slots() {
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};

// Build a chain of 128 slots (4 epochs) with skip slots at positions 5 and 6.
// After 4 epochs, finalized_epoch=2 (finalized_slot=64). Requesting slots 0-9
// satisfies req_start_slot + req_count <= finalized_slot (10 <= 64), which routes
// through `get_block_roots_from_store` — the code path with the bug.
let skip_slots: HashSet<u64> = [5, 6].into_iter().collect();
let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await;

let all_custody_columns = rig.chain.custody_columns_for_epoch(Some(Epoch::new(0)));
let requested_column = vec![all_custody_columns[0]];

// Request a range that spans the skip slots (slots 0 through 9).
let start_slot = 0;
let slot_count = 10;

rig.network_beacon_processor
.send_data_columns_by_range_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
DataColumnsByRangeRequest {
start_slot,
count: slot_count,
columns: requested_column.clone(),
},
)
.unwrap();

// Collect block roots from all data column responses.
let mut block_roots: Vec<Hash256> = Vec::new();

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::DataColumnsByRange(data_column),
inbound_request_id: _,
} = next
{
if let Some(column) = data_column {
block_roots.push(column.block_root());
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}

assert!(
!block_roots.is_empty(),
"Should have received at least some data columns"
);

// Before the fix, skip slots caused the same block root to appear multiple times
// (once per skip slot) because .unique() on (Hash256, Slot) tuples didn't deduplicate.
let unique_roots: HashSet<_> = block_roots.iter().collect();
assert_eq!(
block_roots.len(),
unique_roots.len(),
"Response contained duplicate block roots: got {} columns but only {} unique roots",
block_roots.len(),
unique_roots.len(),
);
}