Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ use {
};

#[cfg(test)]
type TestBeaconChainType<E> =
pub(crate) type TestBeaconChainType<E> =
Witness<ManualSlotClock, CachingEth1Backend<E>, E, MemoryStore<E>, MemoryStore<E>>;

#[cfg(test)]
Expand Down
70 changes: 70 additions & 0 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1193,3 +1193,73 @@ enum ResetEpochError {
/// The chain has already completed.
SyncCompleted,
}

#[cfg(test)]
mod tests {
use super::*;
use beacon_chain::test_utils::BeaconChainHarness;
use bls::Hash256;
use lighthouse_network::{NetworkConfig, SyncInfo, SyncStatus};
use rand::prelude::StdRng;
use rand::SeedableRng;
use types::MinimalEthSpec;

#[test]
fn request_batches_should_not_loop_infinitely() {
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.deterministic_keypairs(4)
.fresh_ephemeral_store()
.build();

let beacon_chain = harness.chain.clone();
let slots_per_epoch = MinimalEthSpec::slots_per_epoch();

let network_globals = Arc::new(NetworkGlobals::new_test_globals(
vec![],
Arc::new(NetworkConfig::default()),
beacon_chain.spec.clone(),
));

{
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
let peer_id = network_globals
.peers
.write()
.__add_connected_peer_testing_only(
true,
&beacon_chain.spec,
k256::ecdsa::SigningKey::random(&mut rng).into(),
);

// Simulate finalized epoch and head being 2 epochs ahead
let finalized_epoch = Epoch::new(40);
let head_epoch = finalized_epoch + 2;
let head_slot = head_epoch.start_slot(slots_per_epoch) + 1;

network_globals.peers.write().update_sync_status(
&peer_id,
SyncStatus::Synced {
info: SyncInfo {
head_slot,
head_root: Hash256::random(),
finalized_epoch,
finalized_root: Hash256::random(),
},
},
);
}

let mut network = SyncNetworkContext::new_for_testing(
beacon_chain.clone(),
network_globals.clone(),
harness.runtime.task_executor.clone(),
);

let mut backfill = BackFillSync::new(beacon_chain, network_globals);
backfill.set_state(BackFillState::Syncing);

// if this ends up running into an infinite loop, the test will overflow the stack pretty quickly.
let _ = backfill.request_batches(&mut network);
}
}
35 changes: 35 additions & 0 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use super::range_sync::ByRangeRequestType;
use super::SyncMessage;
use crate::metrics;
use crate::network_beacon_processor::NetworkBeaconProcessor;
#[cfg(test)]
use crate::network_beacon_processor::TestBeaconChainType;
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
Expand All @@ -32,11 +34,15 @@ use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
};
#[cfg(test)]
use slot_clock::SlotClock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
#[cfg(test)]
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, span, warn, Level};
use types::blob_sidecar::FixedBlobSidecarList;
Expand Down Expand Up @@ -215,6 +221,35 @@ pub enum RangeBlockComponent<E: EthSpec> {
CustodyColumns(RpcResponseResult<Vec<Arc<DataColumnSidecar<E>>>>),
}

#[cfg(test)]
impl<E: EthSpec> SyncNetworkContext<TestBeaconChainType<E>> {
pub fn new_for_testing(
beacon_chain: Arc<BeaconChain<TestBeaconChainType<E>>>,
network_globals: Arc<NetworkGlobals<E>>,
task_executor: TaskExecutor,
) -> Self {
let fork_context = Arc::new(ForkContext::new::<E>(
beacon_chain.slot_clock.now().unwrap_or(Slot::new(0)),
beacon_chain.genesis_validators_root,
&beacon_chain.spec,
));
let (network_tx, _network_rx) = mpsc::unbounded_channel();
let (beacon_processor, _) = NetworkBeaconProcessor::null_for_testing(
network_globals,
mpsc::unbounded_channel().0,
beacon_chain.clone(),
task_executor,
);

SyncNetworkContext::new(
network_tx,
Arc::new(beacon_processor),
beacon_chain,
fork_context,
)
}
}

impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn new(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
Expand Down
Loading