diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 2d471538093..a245e830b9d 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -1223,7 +1223,7 @@ impl Discovery { #[cfg(test)] mod tests { use super::*; - use crate::rpc::methods::{MetaData, MetaDataV2}; + use crate::rpc::methods::{MetaData, MetaDataV3}; use libp2p::identity::secp256k1; use types::{BitVector, MinimalEthSpec, SubnetId}; @@ -1248,10 +1248,11 @@ mod tests { .unwrap(); let globals = NetworkGlobals::new( enr, - MetaData::V2(MetaDataV2 { + MetaData::V3(MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets: Default::default(), + custody_group_count: spec.custody_requirement, }), vec![], false, diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 93515ed5f6b..efb86a5feb7 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -17,7 +17,7 @@ use std::{ time::{Duration, Instant}, }; use tracing::{debug, error, trace, warn}; -use types::{DataColumnSubnetId, EthSpec, SyncSubnetId}; +use types::{DataColumnSubnetId, EthSpec, SubnetId, SyncSubnetId}; pub use libp2p::core::Multiaddr; pub use libp2p::identity::Keypair; @@ -26,9 +26,7 @@ pub mod peerdb; use crate::peer_manager::peerdb::client::ClientKind; use libp2p::multiaddr; -pub use peerdb::peer_info::{ - ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo, -}; +pub use peerdb::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use peerdb::score::{PeerAction, ReportSource}; pub use peerdb::sync_status::{SyncInfo, SyncStatus}; use std::collections::{HashMap, HashSet, hash_map::Entry}; @@ -38,6 +36,14 @@ use types::data_column_custody_group::{ CustodyIndex, compute_subnets_from_custody_group, get_custody_groups, }; +/// Unified peer subnet information structure for pruning logic. +struct PeerSubnetInfo { + info: PeerInfo, + attestation_subnets: HashSet, + sync_committees: HashSet, + custody_subnets: HashSet, +} + pub mod config; mod network_behaviour; @@ -52,6 +58,8 @@ pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600); /// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet /// peers. pub const MIN_SYNC_COMMITTEE_PEERS: u64 = 2; +/// Avoid pruning sampling peers if subnet peer count is below this number. +pub const MIN_SAMPLING_COLUMN_SUBNET_PEERS: u64 = 2; /// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of /// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and /// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55. @@ -161,7 +169,7 @@ impl PeerManager { } = cfg; // Set up the peer manager heartbeat interval - let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL)); + let heartbeat = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL)); // Compute subnets for all custody groups let subnets_by_custody_group = if network_globals.spec.is_peer_das_scheduled() { @@ -729,7 +737,16 @@ impl PeerManager { } } else { // we have no meta-data for this peer, update - debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata"); + let cgc = meta_data + .custody_group_count() + .map(|&count| count.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + debug!( + %peer_id, + new_seq_no = meta_data.seq_number(), + cgc, + "Obtained peer's metadata" + ); } let known_custody_group_count = peer_info @@ -949,6 +966,43 @@ impl PeerManager { } } + /// Run discovery query for additional custody peers if we fall below `MIN_SAMPLING_COLUMN_SUBNET_PEERS`. + fn maintain_custody_peers(&mut self) { + let subnets_to_discover: Vec = self + .network_globals + .sampling_subnets() + .iter() + .filter_map(|custody_subnet| { + if self + .network_globals + .peers + .read() + .has_good_peers_in_custody_subnet( + custody_subnet, + MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize, + ) + { + None + } else { + Some(SubnetDiscovery { + subnet: Subnet::DataColumn(*custody_subnet), + min_ttl: None, + }) + } + }) + .collect(); + + // request the subnet query from discovery + if !subnets_to_discover.is_empty() { + debug!( + subnets = ?subnets_to_discover.iter().map(|s| s.subnet).collect::>(), + "Making subnet queries for maintaining custody peers" + ); + self.events + .push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover)); + } + } + fn maintain_trusted_peers(&mut self) { let trusted_peers = self.trusted_peers.clone(); for trusted_peer in trusted_peers { @@ -991,9 +1045,204 @@ impl PeerManager { } } + /// Build unified peer subnet information from connected peers. + /// + /// This creates a unified structure containing all subnet information for each peer, + /// excluding trusted peers and peers already marked for pruning. + fn build_peer_subnet_info( + &self, + peers_to_prune: &HashSet, + ) -> HashMap> { + let mut peer_subnet_info: HashMap> = HashMap::new(); + + for (peer_id, info) in self.network_globals.peers.read().connected_peers() { + // Ignore peers we trust or that we are already pruning + if info.is_trusted() || peers_to_prune.contains(peer_id) { + continue; + } + + let mut peer_info = PeerSubnetInfo { + info: info.clone(), + attestation_subnets: HashSet::new(), + sync_committees: HashSet::new(), + custody_subnets: HashSet::new(), + }; + + // Populate subnet information from long-lived subnets + for subnet in info.long_lived_subnets() { + match subnet { + Subnet::Attestation(subnet_id) => { + peer_info.attestation_subnets.insert(subnet_id); + } + Subnet::SyncCommittee(id) => { + peer_info.sync_committees.insert(id); + } + Subnet::DataColumn(id) => { + peer_info.custody_subnets.insert(id); + } + } + } + + peer_subnet_info.insert(*peer_id, peer_info); + } + + peer_subnet_info + } + + /// Build reverse lookup from custody subnets to peer lists. + fn build_custody_subnet_lookup( + peer_subnet_info: &HashMap>, + ) -> HashMap> { + let mut custody_subnet_to_peers: HashMap> = HashMap::new(); + + for (peer_id, peer_info) in peer_subnet_info { + for &custody_subnet in &peer_info.custody_subnets { + custody_subnet_to_peers + .entry(custody_subnet) + .or_default() + .push(*peer_id); + } + } + + custody_subnet_to_peers + } + + /// Determine if a peer should be protected from pruning based on various criteria. + /// + /// Protection criteria: + /// - Outbound peers: don't prune if it would drop below target outbound peer count + /// - Data column sampling: ≤ MIN_SAMPLING_COLUMN_SUBNET_PEERS (2) peers per subnet + /// - Sync committees: ≤ MIN_SYNC_COMMITTEE_PEERS (2) peers per committee + /// - Attestation subnets: protect peers on the scarcest attestation subnets + /// + /// Returns true if the peer should be protected (not pruned). + fn should_protect_peer( + &self, + candidate_info: &PeerSubnetInfo, + sampling_subnets: &HashSet, + custody_subnet_to_peers: &HashMap>, + peer_subnet_info: &HashMap>, + connected_outbound_peer_count: usize, + outbound_peers_pruned: usize, + ) -> bool { + // Ensure we don't remove too many outbound peers + if candidate_info.info.is_outbound_only() + && self.target_outbound_peers() + >= connected_outbound_peer_count.saturating_sub(outbound_peers_pruned) + { + return true; + } + + // Check data column sampling subnets + // If the peer exists in a sampling subnet that is less than or equal to MIN_SAMPLING_COLUMN_SUBNET_PEERS, we keep it + let should_protect_sampling = candidate_info + .custody_subnets + .iter() + .filter(|subnet| sampling_subnets.contains(subnet)) + .any(|subnet| { + let count = custody_subnet_to_peers + .get(subnet) + .map(|peers| peers.len()) + .unwrap_or(0); + count <= MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize + }); + + if should_protect_sampling { + return true; + } + + // Check sync committee protection + let should_protect_sync = candidate_info.sync_committees.iter().any(|sync_committee| { + let count = peer_subnet_info + .values() + .filter(|p| p.sync_committees.contains(sync_committee)) + .count(); + count <= MIN_SYNC_COMMITTEE_PEERS as usize + }); + + if should_protect_sync { + return true; + } + + // Check attestation subnet to avoid pruning from subnets with the lowest peer count + let attestation_subnet_counts: HashMap = peer_subnet_info + .values() + .flat_map(|p| &p.attestation_subnets) + .fold(HashMap::new(), |mut acc, &subnet| { + *acc.entry(subnet).or_insert(0) += 1; + acc + }); + + if let Some(&least_dense_size) = attestation_subnet_counts.values().min() { + let is_on_least_dense = candidate_info + .attestation_subnets + .iter() + .any(|subnet| attestation_subnet_counts.get(subnet) == Some(&least_dense_size)); + + if is_on_least_dense { + return true; + } + } + + false + } + + /// Find the best candidate for removal from the densest custody subnet. + /// + /// Returns the PeerId of the candidate to remove, or None if no suitable candidate found. + fn find_prune_candidate( + &self, + column_subnet: DataColumnSubnetId, + column_subnet_to_peers: &HashMap>, + peer_subnet_info: &HashMap>, + sampling_subnets: &HashSet, + connected_outbound_peer_count: usize, + outbound_peers_pruned: usize, + ) -> Option { + let peers_on_subnet_clone = column_subnet_to_peers.get(&column_subnet)?.clone(); + + // Create a sorted list of peers prioritized for removal + let mut sorted_peers = peers_on_subnet_clone; + sorted_peers.shuffle(&mut rand::rng()); + sorted_peers.sort_by_key(|peer_id| { + if let Some(peer_info) = peer_subnet_info.get(peer_id) { + ( + peer_info.info.custody_subnet_count(), + peer_info.info.is_synced_or_advanced(), + ) + } else { + (0, false) + } + }); + + // Try and find a candidate peer to remove from the subnet + for candidate_peer in &sorted_peers { + let Some(candidate_info) = peer_subnet_info.get(candidate_peer) else { + continue; + }; + + // Check if this peer should be protected + if self.should_protect_peer( + candidate_info, + sampling_subnets, + column_subnet_to_peers, + peer_subnet_info, + connected_outbound_peer_count, + outbound_peers_pruned, + ) { + continue; + } + + // Found a suitable candidate + return Some(*candidate_peer); + } + + None + } + /// Remove excess peers back down to our target values. /// This prioritises peers with a good score and uniform distribution of peers across - /// subnets. + /// data column subnets. /// /// The logic for the peer pruning is as follows: /// @@ -1023,9 +1272,12 @@ impl PeerManager { /// Prune peers in the following order: /// 1. Remove worst scoring peers /// 2. Remove peers that are not subscribed to a subnet (they have less value) - /// 3. Remove peers that we have many on any particular subnet - /// 4. Randomly remove peers if all the above are satisfied - /// + /// 3. Remove peers that we have many on any particular subnet, with some exceptions + /// - Don't remove peers needed for data column sampling (≥ MIN_SAMPLING_COLUMN_SUBNET_PEERS) + /// - Don't remove peers needed for sync committees (>=MIN_SYNC_COMMITTEE_PEERS) + /// - Don't remove peers from the lowest density attestation subnets + /// 4. Randomly remove peers if all the above are satisfied until we reach `target_peers`, or + /// until we can't prune any more peers due to the above constraints. fn prune_excess_peers(&mut self) { // The current number of connected peers. let connected_peer_count = self.network_globals.connected_peers(); @@ -1035,7 +1287,7 @@ impl PeerManager { } // Keep a list of peers we are pruning. - let mut peers_to_prune = std::collections::HashSet::new(); + let mut peers_to_prune = HashSet::new(); let connected_outbound_peer_count = self.network_globals.connected_outbound_only_peers(); // Keep track of the number of outbound peers we are pruning. @@ -1087,146 +1339,57 @@ impl PeerManager { prune_peers!(|info: &PeerInfo| { !info.has_long_lived_subnet() }); } - // 3. and 4. Remove peers that are too grouped on any given subnet. If all subnets are + // 3. and 4. Remove peers that are too grouped on any given data column subnet. If all subnets are // uniformly distributed, remove random peers. if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { - // Of our connected peers, build a map from subnet_id -> Vec<(PeerId, PeerInfo)> - let mut subnet_to_peer: HashMap)>> = HashMap::new(); - // These variables are used to track if a peer is in a long-lived sync-committee as we - // may wish to retain this peer over others when pruning. - let mut sync_committee_peer_count: HashMap = HashMap::new(); - let mut peer_to_sync_committee: HashMap< - PeerId, - std::collections::HashSet, - > = HashMap::new(); - - for (peer_id, info) in self.network_globals.peers.read().connected_peers() { - // Ignore peers we trust or that we are already pruning - if info.is_trusted() || peers_to_prune.contains(peer_id) { - continue; - } - - // Count based on long-lived subnets not short-lived subnets - // NOTE: There are only 4 sync committees. These are likely to be denser than the - // subnets, so our priority here to make the subnet peer count uniform, ignoring - // the dense sync committees. - for subnet in info.long_lived_subnets() { - match subnet { - Subnet::Attestation(_) => { - subnet_to_peer - .entry(subnet) - .or_default() - .push((*peer_id, info.clone())); - } - Subnet::SyncCommittee(id) => { - *sync_committee_peer_count.entry(id).or_default() += 1; - peer_to_sync_committee - .entry(*peer_id) - .or_default() - .insert(id); - } - // TODO(das) to be implemented. We're not pruning data column peers yet - // because data column topics are subscribed as core topics until we - // implement recomputing data column subnets. - Subnet::DataColumn(_) => {} - } - } - } + let sampling_subnets = self.network_globals.sampling_subnets(); + let mut peer_subnet_info = self.build_peer_subnet_info(&peers_to_prune); + let mut custody_subnet_to_peers = Self::build_custody_subnet_lookup(&peer_subnet_info); - // Add to the peers to prune mapping + // Attempt to prune peers to `target_peers`, or until we run out of peers to prune. while peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { - if let Some((_, peers_on_subnet)) = subnet_to_peer - .iter_mut() + let custody_subnet_with_most_peers = custody_subnet_to_peers + .iter() + .filter(|(_, peers)| !peers.is_empty()) .max_by_key(|(_, peers)| peers.len()) - { - // and the subnet still contains peers - if !peers_on_subnet.is_empty() { - // Order the peers by the number of subnets they are long-lived - // subscribed too, shuffle equal peers. - peers_on_subnet.shuffle(&mut rand::rng()); - peers_on_subnet.sort_by_key(|(_, info)| info.long_lived_subnet_count()); - - // Try and find a candidate peer to remove from the subnet. - // We ignore peers that would put us below our target outbound peers - // and we currently ignore peers that would put us below our - // sync-committee threshold, if we can avoid it. - - let mut removed_peer_index = None; - for (index, (candidate_peer, info)) in peers_on_subnet.iter().enumerate() { - // Ensure we don't remove too many outbound peers - if info.is_outbound_only() - && self.target_outbound_peers() - >= connected_outbound_peer_count - .saturating_sub(outbound_peers_pruned) - { - // Restart the main loop with the outbound peer removed from - // the list. This will lower the peers per subnet count and - // potentially a new subnet may be chosen to remove peers. This - // can occur recursively until we have no peers left to choose - // from. - continue; - } - - // Check the sync committee - if let Some(subnets) = peer_to_sync_committee.get(candidate_peer) { - // The peer is subscribed to some long-lived sync-committees - // Of all the subnets this peer is subscribed too, the minimum - // peer count of all of them is min_subnet_count - if let Some(min_subnet_count) = subnets - .iter() - .filter_map(|v| sync_committee_peer_count.get(v).copied()) - .min() - { - // If the minimum count is our target or lower, we - // shouldn't remove this peer, because it drops us lower - // than our target - if min_subnet_count <= MIN_SYNC_COMMITTEE_PEERS { - // Do not drop this peer in this pruning interval - continue; - } - } - } - - if info.is_outbound_only() { - outbound_peers_pruned += 1; - } - // This peer is suitable to be pruned - removed_peer_index = Some(index); - break; + .map(|(subnet_id, _)| *subnet_id); + + if let Some(densest_subnet) = custody_subnet_with_most_peers { + // If we have successfully found a candidate peer to prune, prune it, + // otherwise all peers on this subnet should not be removed due to our + // outbound limit or min_subnet_count. In this case, we remove all + // peers from the pruning logic and try another subnet. + if let Some(candidate_peer) = self.find_prune_candidate( + densest_subnet, + &custody_subnet_to_peers, + &peer_subnet_info, + &sampling_subnets, + connected_outbound_peer_count, + outbound_peers_pruned, + ) { + // Update outbound peer count if needed + if let Some(candidate_info) = peer_subnet_info.get(&candidate_peer) + && candidate_info.info.is_outbound_only() + { + outbound_peers_pruned += 1; } - // If we have successfully found a candidate peer to prune, prune it, - // otherwise all peers on this subnet should not be removed due to our - // outbound limit or min_subnet_count. In this case, we remove all - // peers from the pruning logic and try another subnet. - if let Some(index) = removed_peer_index { - let (candidate_peer, _) = peers_on_subnet.remove(index); - // Remove pruned peers from other subnet counts - for subnet_peers in subnet_to_peer.values_mut() { - subnet_peers.retain(|(peer_id, _)| peer_id != &candidate_peer); - } - // Remove pruned peers from all sync-committee counts - if let Some(known_sync_committes) = - peer_to_sync_committee.get(&candidate_peer) - { - for sync_committee in known_sync_committes { - if let Some(sync_committee_count) = - sync_committee_peer_count.get_mut(sync_committee) - { - *sync_committee_count = - sync_committee_count.saturating_sub(1); - } - } - } - peers_to_prune.insert(candidate_peer); - } else { - peers_on_subnet.clear(); + // Remove the candidate peer from the maps, so we don't account for them + // when finding the next prune candidate. + for subnet_peers in custody_subnet_to_peers.values_mut() { + subnet_peers.retain(|peer_id| peer_id != &candidate_peer); } - continue; + peer_subnet_info.remove(&candidate_peer); + + peers_to_prune.insert(candidate_peer); + } else if let Some(peers) = custody_subnet_to_peers.get_mut(&densest_subnet) { + // If we can't find a prune candidate in this subnet, remove peers in this subnet + peers.clear() } + } else { + // If there are no peers left to prune, exit. + break; } - // If there are no peers left to prune exit. - break; } } @@ -1271,6 +1434,9 @@ impl PeerManager { // Update peer score metrics; self.update_peer_score_metrics(); + // Maintain minimum count for custody peers. + self.maintain_custody_peers(); + // Maintain minimum count for sync committee peers. self.maintain_sync_committee_peers(); @@ -1561,6 +1727,22 @@ mod tests { PeerManager::new(config, Arc::new(globals)).unwrap() } + fn empty_synced_status() -> SyncStatus { + SyncStatus::Synced { + info: empty_sync_info(), + } + } + + fn empty_sync_info() -> SyncInfo { + SyncInfo { + head_slot: Default::default(), + head_root: Default::default(), + finalized_epoch: Default::default(), + finalized_root: Default::default(), + earliest_available_slot: None, + } + } + #[tokio::test] async fn test_peer_manager_disconnects_correctly_during_heartbeat() { // Create 6 peers to connect to with a target of 3. @@ -1805,6 +1987,7 @@ mod tests { /// a priority over all else. async fn test_peer_manager_remove_non_subnet_peers_when_all_healthy() { let mut peer_manager = build_peer_manager(3).await; + let spec = peer_manager.network_globals.spec.clone(); // Create 5 peers to connect to. let peer0 = PeerId::random(); @@ -1828,10 +2011,11 @@ mod tests { // Have some of the peers be on a long-lived subnet let mut attnets = crate::types::EnrAttestationBitfield::::new(); attnets.set(1, true).unwrap(); - let metadata = crate::rpc::MetaDataV2 { + let metadata = MetaDataV3 { seq_number: 0, attnets, syncnets: Default::default(), + custody_group_count: spec.custody_requirement, }; peer_manager .network_globals @@ -1839,7 +2023,7 @@ mod tests { .write() .peer_info_mut(&peer0) .unwrap() - .set_meta_data(MetaData::V2(metadata)); + .set_meta_data(MetaData::V3(metadata)); peer_manager .network_globals .peers @@ -1848,10 +2032,11 @@ mod tests { let mut attnets = crate::types::EnrAttestationBitfield::::new(); attnets.set(10, true).unwrap(); - let metadata = crate::rpc::MetaDataV2 { + let metadata = MetaDataV3 { seq_number: 0, attnets, syncnets: Default::default(), + custody_group_count: spec.custody_requirement, }; peer_manager .network_globals @@ -1859,7 +2044,7 @@ mod tests { .write() .peer_info_mut(&peer2) .unwrap() - .set_meta_data(MetaData::V2(metadata)); + .set_meta_data(MetaData::V3(metadata)); peer_manager .network_globals .peers @@ -1868,10 +2053,11 @@ mod tests { let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); syncnets.set(3, true).unwrap(); - let metadata = crate::rpc::MetaDataV2 { + let metadata = MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets, + custody_group_count: spec.custody_requirement, }; peer_manager .network_globals @@ -1879,7 +2065,7 @@ mod tests { .write() .peer_info_mut(&peer4) .unwrap() - .set_meta_data(MetaData::V2(metadata)); + .set_meta_data(MetaData::V3(metadata)); peer_manager .network_globals .peers @@ -1893,7 +2079,7 @@ mod tests { assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); // Check that we removed the peers that were not subscribed to any subnet - let mut peers_should_have_removed = std::collections::HashSet::new(); + let mut peers_should_have_removed = HashSet::new(); peers_should_have_removed.insert(peer1); peers_should_have_removed.insert(peer3); for (peer, _) in peer_manager @@ -1954,12 +2140,14 @@ mod tests { } #[tokio::test] - /// Test the pruning logic to remove grouped subnet peers - async fn test_peer_manager_prune_grouped_subnet_peers() { + /// Test the pruning logic to remove grouped data column subnet peers + async fn test_peer_manager_prune_grouped_data_column_subnet_peers() { let target = 9; let mut peer_manager = build_peer_manager(target).await; + // Override sampling subnets to prevent sampling peer protection from interfering with this test. + *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); - // Create 5 peers to connect to. + // Create 20 peers to connect to. let mut peers = Vec::new(); for x in 0..20 { // Make 20 peers and group peers as: @@ -1972,25 +2160,18 @@ mod tests { peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet - let mut attnets = crate::types::EnrAttestationBitfield::::new(); - attnets.set(subnet as usize, true).unwrap(); - let metadata = crate::rpc::MetaDataV2 { - seq_number: 0, - attnets, - syncnets: Default::default(), - }; - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&peer) - .unwrap() - .set_meta_data(MetaData::V2(metadata)); + { + let mut peers_db = peer_manager.network_globals.peers.write(); + let peer_info = peers_db.peer_info_mut(&peer).unwrap(); + peer_info.set_custody_subnets(HashSet::from([DataColumnSubnetId::new(subnet)])); + peer_info.update_sync_status(empty_synced_status()); + } + peer_manager .network_globals .peers .write() - .add_subscription(&peer, Subnet::Attestation(subnet.into())); + .add_subscription(&peer, Subnet::DataColumn(subnet.into())); println!("{},{},{}", x, subnet, peer); peers.push(peer); } @@ -2062,7 +2243,7 @@ mod tests { /// most peers and have the least subscribed long-lived subnets. And peer 0 because it has no /// long-lived subnet. #[tokio::test] - async fn test_peer_manager_prune_subnet_peers_most_subscribed() { + async fn test_peer_manager_prune_data_column_subnet_peers_most_subscribed() { let target = 3; let mut peer_manager = build_peer_manager(target).await; @@ -2073,43 +2254,27 @@ mod tests { peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet - let mut attnets = crate::types::EnrAttestationBitfield::::new(); - - match x { - 0 => {} - 1 => { - attnets.set(1, true).unwrap(); - attnets.set(2, true).unwrap(); - attnets.set(3, true).unwrap(); - } - 2 => { - attnets.set(1, true).unwrap(); - attnets.set(2, true).unwrap(); - } - 3 => { - attnets.set(3, true).unwrap(); - } - 4 => { - attnets.set(1, true).unwrap(); - } - 5 => { - attnets.set(2, true).unwrap(); - } + let custody_subnets = match x { + 0 => HashSet::new(), + 1 => HashSet::from([ + DataColumnSubnetId::new(1), + DataColumnSubnetId::new(2), + DataColumnSubnetId::new(3), + ]), + 2 => HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]), + 3 => HashSet::from([DataColumnSubnetId::new(3)]), + 4 => HashSet::from([DataColumnSubnetId::new(1)]), + 5 => HashSet::from([DataColumnSubnetId::new(2)]), _ => unreachable!(), + }; + + { + let mut peer_db = peer_manager.network_globals.peers.write(); + let peer_info = peer_db.peer_info_mut(&peer).unwrap(); + peer_info.set_custody_subnets(custody_subnets); + peer_info.update_sync_status(empty_synced_status()); } - let metadata = crate::rpc::MetaDataV2 { - seq_number: 0, - attnets, - syncnets: Default::default(), - }; - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&peer) - .unwrap() - .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2153,22 +2318,24 @@ mod tests { assert!(!connected_peers.contains(&peers[5])); } - /// Test the pruning logic to prioritise peers with the most subnets, but not at the expense of - /// removing our few sync-committee subnets. + /// Test the pruning logic to prioritise peers with the most data column subnets, but not at + /// the expense of removing our few sync-committee subnets. /// /// Create 6 peers. /// Peer0: None - /// Peer1 : Subnet 1,2,3, - /// Peer2 : Subnet 1,2, - /// Peer3 : Subnet 3 - /// Peer4 : Subnet 1,2, Sync-committee-1 - /// Peer5 : Subnet 1,2, Sync-committee-2 + /// Peer1 : Column subnet 1,2,3, + /// Peer2 : Column subnet 1,2, + /// Peer3 : Column subnet 3 + /// Peer4 : Column subnet 1,2, Sync-committee-1 + /// Peer5 : Column subnet 1,2, Sync-committee-2 /// /// Prune 3 peers: Should be Peer0, Peer1 and Peer2 because (4 and 5 are on a sync-committee) #[tokio::test] async fn test_peer_manager_prune_subnet_peers_sync_committee() { let target = 3; let mut peer_manager = build_peer_manager(target).await; + // Override sampling subnets to prevent sampling peer protection from interfering with this test. + *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); // Create 6 peers to connect to. let mut peers = Vec::new(); @@ -2177,48 +2344,40 @@ mod tests { peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet - let mut attnets = crate::types::EnrAttestationBitfield::::new(); let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); - - match x { - 0 => {} - 1 => { - attnets.set(1, true).unwrap(); - attnets.set(2, true).unwrap(); - attnets.set(3, true).unwrap(); - } - 2 => { - attnets.set(1, true).unwrap(); - attnets.set(2, true).unwrap(); - } - 3 => { - attnets.set(3, true).unwrap(); - } + let custody_subnets = match x { + 0 => HashSet::new(), + 1 => HashSet::from([ + DataColumnSubnetId::new(1), + DataColumnSubnetId::new(2), + DataColumnSubnetId::new(3), + ]), + 2 => HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]), + 3 => HashSet::from([DataColumnSubnetId::new(3)]), 4 => { - attnets.set(1, true).unwrap(); - attnets.set(2, true).unwrap(); syncnets.set(1, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]) } 5 => { - attnets.set(1, true).unwrap(); - attnets.set(2, true).unwrap(); syncnets.set(2, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]) } _ => unreachable!(), + }; + + { + let mut peer_db = peer_manager.network_globals.peers.write(); + let peer_info = peer_db.peer_info_mut(&peer).unwrap(); + peer_info.set_meta_data(MetaData::V3(MetaDataV3 { + seq_number: 0, + attnets: Default::default(), + syncnets, + custody_group_count: 0, // unused in this test, as pruning logic uses `custody_subnets` + })); + peer_info.set_custody_subnets(custody_subnets); + peer_info.update_sync_status(empty_synced_status()); } - let metadata = crate::rpc::MetaDataV2 { - seq_number: 0, - attnets, - syncnets, - }; - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&peer) - .unwrap() - .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2262,10 +2421,111 @@ mod tests { assert!(!connected_peers.contains(&peers[2])); } + /// Test that custody subnet peer count below the `MIN_SAMPLING_COLUMN_SUBNET_PEERS`(2) + /// threshold are protected from pruning. + /// + /// Create 8 peers. + /// Peer0: None (can be pruned) + /// Peer1: Subnet 1,4,5 + /// Peer2: Subnet 1,4 + /// Peer3: Subnet 2 + /// Peer4: Subnet 2 + /// Peer5: Subnet 1 (can be pruned) + /// Peer6: Subnet 3 + /// Peer7: Subnet 5 (can be pruned) + /// + /// Sampling subnets: 1, 2 + /// + /// Prune 3 peers: Should be Peer0, Peer 5 and Peer 7 because + /// - Peer 0 because it has no long-lived subnet. + /// - Peer 5 is on the subnet with the most peers and have the least subscribed long-lived subnets. + /// - Peer 7 because it's on a non-sampling subnet and have the least subscribed long-lived subnets. + #[tokio::test] + async fn test_peer_manager_protect_sampling_subnet_peers_below_threshold() { + let target = 5; + let mut peer_manager = build_peer_manager(target).await; + + *peer_manager.network_globals.sampling_subnets.write() = + HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]); + + // Create 8 peers to connect to. + let mut peers = Vec::new(); + for peer_idx in 0..8 { + let peer = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); + + // Have some of the peers be on a long-lived subnet + let custody_subnets = match peer_idx { + 0 => HashSet::new(), + 1 => HashSet::from([ + DataColumnSubnetId::new(1), + DataColumnSubnetId::new(4), + DataColumnSubnetId::new(5), + ]), + 2 => HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(4)]), + 3 => HashSet::from([DataColumnSubnetId::new(2)]), + 4 => HashSet::from([DataColumnSubnetId::new(2)]), + 5 => HashSet::from([DataColumnSubnetId::new(1)]), + 6 => HashSet::from([DataColumnSubnetId::new(3)]), + 7 => HashSet::from([DataColumnSubnetId::new(5)]), + _ => unreachable!(), + }; + + { + let mut peer_db = peer_manager.network_globals.peers.write(); + let peer_info = peer_db.peer_info_mut(&peer).unwrap(); + peer_info.set_custody_subnets(custody_subnets); + peer_info.update_sync_status(empty_synced_status()); + } + + let long_lived_subnets = peer_manager + .network_globals + .peers + .read() + .peer_info(&peer) + .unwrap() + .long_lived_subnets(); + for subnet in long_lived_subnets { + println!("Subnet: {:?}", subnet); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, subnet); + } + println!("{},{}", peer_idx, peer); + peers.push(peer); + } + + // Perform the heartbeat. + peer_manager.heartbeat(); + + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + // Check that we removed peers 0, 5 and 7 + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + println!("Connected peers: {:?}", connected_peers); + assert!(!connected_peers.contains(&peers[0])); + assert!(!connected_peers.contains(&peers[5])); + assert!(!connected_peers.contains(&peers[7])); + } + /// This test is for reproducing the issue: /// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659 /// - /// Whether the issue happens depends on `subnet_to_peer` (HashMap), since HashMap doesn't + /// Whether the issue happens depends on `custody_subnet_to_peers` (HashMap), since HashMap doesn't /// guarantee a particular order of iteration. So we repeat the test case to try to reproduce /// the issue. #[tokio::test] @@ -2275,41 +2535,42 @@ mod tests { } } - /// Test the pruning logic to prioritize peers with the most subnets. This test specifies + /// Test the pruning logic to prioritize peers with the most column subnets. This test specifies /// the connection direction for the peers. /// Either Peer 4 or 5 is expected to be removed in this test case. /// /// Create 8 peers. - /// Peer0 (out) : Subnet 1, Sync-committee-1 - /// Peer1 (out) : Subnet 1, Sync-committee-1 - /// Peer2 (out) : Subnet 2, Sync-committee-2 - /// Peer3 (out) : Subnet 2, Sync-committee-2 - /// Peer4 (out) : Subnet 3 - /// Peer5 (out) : Subnet 3 - /// Peer6 (in) : Subnet 4 - /// Peer7 (in) : Subnet 5 + /// Peer0 (out) : Column subnet 1, Sync-committee-1 + /// Peer1 (out) : Column subnet 1, Sync-committee-1 + /// Peer2 (out) : Column subnet 2, Sync-committee-2 + /// Peer3 (out) : Column subnet 2, Sync-committee-2 + /// Peer4 (out) : Column subnet 3 + /// Peer5 (out) : Column subnet 3 + /// Peer6 (in) : Column subnet 4 + /// Peer7 (in) : Column subnet 5 async fn test_peer_manager_prune_based_on_subnet_count() { let target = 7; let mut peer_manager = build_peer_manager(target).await; + // Override sampling subnets to prevent sampling peer protection from interfering with this test. + *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); // Create 8 peers to connect to. let mut peers = Vec::new(); - for x in 0..8 { + for peer_idx in 0..8 { let peer = PeerId::random(); // Have some of the peers be on a long-lived subnet - let mut attnets = crate::types::EnrAttestationBitfield::::new(); let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); - match x { + let custody_subnets = match peer_idx { 0 => { peer_manager.inject_connect_outgoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(1, true).unwrap(); syncnets.set(1, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(1)]) } 1 => { peer_manager.inject_connect_outgoing( @@ -2317,8 +2578,8 @@ mod tests { "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(1, true).unwrap(); syncnets.set(1, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(1)]) } 2 => { peer_manager.inject_connect_outgoing( @@ -2326,8 +2587,8 @@ mod tests { "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(2, true).unwrap(); syncnets.set(2, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(2)]) } 3 => { peer_manager.inject_connect_outgoing( @@ -2335,8 +2596,8 @@ mod tests { "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(2, true).unwrap(); syncnets.set(2, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(2)]) } 4 => { peer_manager.inject_connect_outgoing( @@ -2344,7 +2605,7 @@ mod tests { "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(3, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(3)]) } 5 => { peer_manager.inject_connect_outgoing( @@ -2352,7 +2613,7 @@ mod tests { "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(3, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(3)]) } 6 => { peer_manager.inject_connect_ingoing( @@ -2360,7 +2621,7 @@ mod tests { "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(4, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(4)]) } 7 => { peer_manager.inject_connect_ingoing( @@ -2368,23 +2629,26 @@ mod tests { "/ip4/0.0.0.0".parse().unwrap(), None, ); - attnets.set(5, true).unwrap(); + HashSet::from([DataColumnSubnetId::new(5)]) } _ => unreachable!(), - } + }; - let metadata = crate::rpc::MetaDataV2 { + let metadata = MetaDataV3 { seq_number: 0, - attnets, + attnets: Default::default(), syncnets, + custody_group_count: 0, // unused in this test, as pruning logic uses `custody_subnets` }; - peer_manager - .network_globals - .peers - .write() - .peer_info_mut(&peer) - .unwrap() - .set_meta_data(MetaData::V2(metadata)); + + { + let mut peer_db_write = peer_manager.network_globals.peers.write(); + let peer_info = peer_db_write.peer_info_mut(&peer).unwrap(); + peer_info.set_meta_data(MetaData::V3(metadata)); + peer_info.set_custody_subnets(custody_subnets); + peer_info.update_sync_status(empty_synced_status()); + } + let long_lived_subnets = peer_manager .network_globals .peers @@ -2392,7 +2656,7 @@ mod tests { .peer_info(&peer) .unwrap() .long_lived_subnets(); - println!("{},{}", x, peer); + println!("{},{}", peer_idx, peer); for subnet in long_lived_subnets { println!("Subnet: {:?}", subnet); peer_manager @@ -2428,17 +2692,286 @@ mod tests { assert!(connected_peers.contains(&peers[7])); } + /// Test that peers with the sparsest attestation subnets are protected from pruning. + /// + /// Create 7 peers: + /// - 4 on attnet 0 + /// - 1 on attnet 1 (least dense) + /// - 2 on attnet 2 + /// + /// Prune 3 peers: 2 peers from subnet 0 and 1 from either subnet 0 or 2, BUT never from attnet 1. + #[tokio::test] + async fn test_peer_manager_not_prune_sparsest_attestation_subnet() { + let target = 4; + let mut peer_manager = build_peer_manager(target).await; + let spec = peer_manager.network_globals.spec.clone(); + let mut peers = Vec::new(); + + let subnet_assignments = [0, 0, 0, 0, 1, 2, 2]; + + for &subnet in subnet_assignments.iter() { + let peer = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); + + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + attnets.set(subnet, true).unwrap(); + + let metadata = MetaDataV3 { + seq_number: 0, + attnets, + syncnets: Default::default(), + custody_group_count: spec.custody_requirement, + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer) + .unwrap() + .set_meta_data(MetaData::V3(metadata)); + + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, Subnet::Attestation((subnet as u64).into())); + + peers.push(peer); + } + + peer_manager.heartbeat(); + + // Check attestation subnet to avoid pruning from subnets with lowest peer count: + // Peer 4 (on least dense subnet 1) should be protected + // Should preferentially remove from subnet 0 (most dense) rather than subnet 1 (least dense) + let connected_peers: HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + // Peer 4 (on least dense attestation subnet 1) should be kept + assert!(connected_peers.contains(&peers[4])); + + // Attestation subnet uniformity should protect peers on least dense subnets + // Count peers on subnet 1 (least dense) + let subnet_1_count = peers + .iter() + .filter(|&peer| connected_peers.contains(peer)) + .filter(|&peer| { + peer_manager + .network_globals + .peers + .read() + .peer_info(peer) + .unwrap() + .long_lived_subnets() + .iter() + .any(|subnet| matches!(subnet, Subnet::Attestation(id) if id == &1u64.into())) + }) + .count(); + + assert!(subnet_1_count > 0, "Least dense subnet should be protected"); + } + + /// Test the pruning logic prioritizes synced and advanced peers over behind/unknown peers. + /// + /// Create 6 peers with different sync statuses: + /// Peer0: Behind + /// Peer1: Unknown + /// Peer2: Synced + /// Peer3: Advanced + /// Peer4: Synced + /// Peer5: Unknown + /// + /// Target: 3 peers. Should prune peers 0, 1, 5 (behind/unknown) and keep 2, 3, 4 (synced/advanced). + #[tokio::test] + async fn test_peer_manager_prune_should_prioritize_synced_advanced_peers() { + let target = 3; + let mut peer_manager = build_peer_manager(target).await; + // Override sampling subnets to prevent sampling peer protection from interfering with this test. + *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); + + let mut peers = Vec::new(); + let current_peer_count = 6; + for i in 0..current_peer_count { + let peer = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); + + let sync_status = match i { + 0 => SyncStatus::Behind { + info: empty_sync_info(), + }, + 1 | 5 => SyncStatus::Unknown, + 2 | 4 => SyncStatus::Synced { + info: empty_sync_info(), + }, + 3 => SyncStatus::Advanced { + info: empty_sync_info(), + }, + _ => unreachable!(), + }; + + { + let mut peer_db = peer_manager.network_globals.peers.write(); + let peer_info = peer_db.peer_info_mut(&peer).unwrap(); + peer_info.update_sync_status(sync_status); + // make sure all the peers have some long live subnets that are not protected + peer_info.set_custody_subnets(HashSet::from([DataColumnSubnetId::new(2)])) + } + + let long_lived_subnets = peer_manager + .network_globals + .peers + .read() + .peer_info(&peer) + .unwrap() + .long_lived_subnets(); + for subnet in long_lived_subnets { + println!("Subnet: {:?}", subnet); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, subnet); + } + + peers.push(peer); + } + + // Perform the heartbeat to trigger pruning + peer_manager.heartbeat(); + + // Should have exactly target number of peers + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + // Count how many synced/advanced peers are kept vs behind/unknown peers + let synced_advanced_kept = [&peers[2], &peers[3], &peers[4]] + .iter() + .filter(|peer| connected_peers.contains(peer)) + .count(); + + let behind_unknown_kept = [&peers[0], &peers[1], &peers[5]] + .iter() + .filter(|peer| connected_peers.contains(peer)) + .count(); + + assert_eq!(synced_advanced_kept, target); + assert_eq!(behind_unknown_kept, 0); + } + + /// Test that `peer_subnet_info` is properly cleaned up during pruning iterations. + /// + /// Without proper cleanup, stale peer data affects protection logic for sync committees and we + /// may end up pruning more than expected. + #[tokio::test] + async fn test_peer_manager_prune_mixed_custody_subnet_protection() { + let target = 6; + let mut peer_manager = build_peer_manager(target).await; + // Override sampling subnets to prevent sampling peer protection from interfering. + *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); + + // Create 12 peers: + // - 4 on custody subnet 0 + // - 3 on subnet 1 + //- 2 on subnet 2 + // - 3 scattered. + // Every 4th peer (0,4,8) is on sync committee 0. + let mut peers = Vec::new(); + for i in 0..12 { + let peer = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); + + let custody_subnet = match i { + ..4 => 0, + 4..7 => 1, + 7..9 => 2, + _ => i - 6, + }; + let on_sync_committee = i % 4 == 0; + + { + let mut peers_db = peer_manager.network_globals.peers.write(); + let peer_info = peers_db.peer_info_mut(&peer).unwrap(); + peer_info + .set_custody_subnets(HashSet::from([DataColumnSubnetId::new(custody_subnet)])); + peer_info.update_sync_status(empty_synced_status()); + + if on_sync_committee { + let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); + syncnets.set(0, true).unwrap(); + peer_info.set_meta_data(MetaData::V3(MetaDataV3 { + seq_number: 0, + attnets: Default::default(), + syncnets, + custody_group_count: 0, + })); + } + + for subnet in peer_info.long_lived_subnets() { + peers_db.add_subscription(&peer, subnet); + } + + peers.push(peer); + } + } + + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + 12 + ); + + peer_manager.heartbeat(); + + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + let connected_peers: HashSet = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + let sync_committee_peers = [&peers[0], &peers[4], &peers[8]]; + let remaining_sync_peers = connected_peers + .iter() + .filter(|peer| sync_committee_peers.contains(peer)) + .count(); + assert_eq!( + remaining_sync_peers, 2, + "Sync committee protection should preserve exactly MIN_SYNC_COMMITTEE_PEERS (2)" + ); + } + // Test properties PeerManager should have using randomly generated input. #[cfg(test)] mod property_based_tests { use crate::peer_manager::config::DEFAULT_TARGET_PEERS; use crate::peer_manager::tests::build_peer_manager_with_trusted_peers; - use crate::rpc::MetaData; + use crate::rpc::{MetaData, MetaDataV3}; use libp2p::PeerId; use quickcheck::{Arbitrary, Gen, TestResult}; use quickcheck_macros::quickcheck; + use std::collections::HashSet; use tokio::runtime::Runtime; - use types::Unsigned; + use types::{DataColumnSubnetId, Unsigned}; use types::{EthSpec, MainnetEthSpec as E}; #[derive(Clone, Debug)] @@ -2450,6 +2983,7 @@ mod tests { score: f64, trusted: bool, gossipsub_score: f64, + custody_subnets: HashSet, } impl Arbitrary for PeerCondition { @@ -2472,6 +3006,17 @@ mod tests { bitfield }; + let spec = E::default_spec(); + let custody_subnets = { + let total_subnet_count = spec.data_column_sidecar_subnet_count; + let custody_subnet_count = u64::arbitrary(g) % (total_subnet_count + 1); // 0 to 128 + (spec.custody_requirement..total_subnet_count) + .filter(|_| bool::arbitrary(g)) + .map(DataColumnSubnetId::new) + .take(custody_subnet_count as usize) + .collect() + }; + PeerCondition { peer_id: PeerId::random(), outgoing: bool::arbitrary(g), @@ -2480,6 +3025,7 @@ mod tests { score: f64::arbitrary(g), trusted: bool::arbitrary(g), gossipsub_score: f64::arbitrary(g), + custody_subnets, } } } @@ -2487,6 +3033,7 @@ mod tests { #[quickcheck] fn prune_excess_peers(peer_conditions: Vec) -> TestResult { let target_peer_count = DEFAULT_TARGET_PEERS; + let spec = E::default_spec(); if peer_conditions.len() < target_peer_count { return TestResult::discard(); } @@ -2533,17 +3080,22 @@ mod tests { syncnets.set(i, *value).unwrap(); } - let metadata = crate::rpc::MetaDataV2 { + let subnets_per_custody_group = + spec.data_column_sidecar_subnet_count / spec.number_of_custody_groups; + let metadata = MetaDataV3 { seq_number: 0, attnets, syncnets, + custody_group_count: condition.custody_subnets.len() as u64 + / subnets_per_custody_group, }; let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap(); - peer_info.set_meta_data(MetaData::V2(metadata)); + peer_info.set_meta_data(MetaData::V3(metadata)); peer_info.set_gossipsub_score(condition.gossipsub_score); peer_info.add_to_score(condition.score); + peer_info.set_custody_subnets(condition.custody_subnets.clone()); for subnet in peer_info.long_lived_subnets() { peer_db.add_subscription(&condition.peer_id, subnet); diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 974b41230e8..083c3f00c23 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -300,6 +300,7 @@ impl PeerDB { .filter(move |(_, info)| { // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers info.is_connected() + && info.is_synced_or_advanced() && info.on_subnet_metadata(&subnet) && info.on_subnet_gossipsub(&subnet) && info.is_good_gossipsub_peer() @@ -318,40 +319,69 @@ impl PeerDB { .filter(move |(_, info)| { // The custody_subnets hashset can be populated via enr or metadata let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); - info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer + info.is_connected() + && info.is_good_gossipsub_peer() + && is_custody_subnet_peer + && info.is_synced_or_advanced() }) .map(|(peer_id, _)| peer_id) } - /// Returns an iterator of all peers that are supposed to be custodying - /// the given subnet id. - pub fn good_range_sync_custody_subnet_peers( + /// Checks if there is at least one good peer for each specified custody subnet for the given epoch. + /// A "good" peer is one that is both connected and synced (or advanced) for the specified epoch. + pub fn has_good_custody_range_sync_peer( &self, - subnet: DataColumnSubnetId, - ) -> impl Iterator { - self.peers - .iter() - .filter(move |(_, info)| { - // The custody_subnets hashset can be populated via enr or metadata - info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) - }) - .map(|(peer_id, _)| peer_id) + subnets: &HashSet, + epoch: Epoch, + ) -> bool { + let mut remaining_subnets = subnets.clone(); + + let good_sync_peers_for_epoch = self.peers.values().filter(|&info| { + info.is_connected() + && match info.sync_status() { + SyncStatus::Synced { info } | SyncStatus::Advanced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + SyncStatus::IrrelevantPeer + | SyncStatus::Behind { .. } + | SyncStatus::Unknown => false, + } + }); + + for info in good_sync_peers_for_epoch { + for subnet in info.custody_subnets_iter() { + if remaining_subnets.remove(subnet) && remaining_subnets.is_empty() { + return true; + } + } + } + + false } - /// Returns `true` if the given peer is assigned to the given subnet. - /// else returns `false` - /// - /// Returns `false` if peer doesn't exist in peerdb. - pub fn is_good_range_sync_custody_subnet_peer( + /// Checks if there are sufficient good peers for a single custody subnet. + /// A "good" peer is one that is both connected and synced (or advanced). + pub fn has_good_peers_in_custody_subnet( &self, - subnet: DataColumnSubnetId, - peer: &PeerId, + subnet: &DataColumnSubnetId, + target_peers: usize, ) -> bool { - if let Some(info) = self.peers.get(peer) { - info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) - } else { - false + let mut peer_count = 0usize; + for info in self + .peers + .values() + .filter(|info| info.is_connected() && info.is_synced_or_advanced()) + { + if info.is_assigned_to_custody_subnet(subnet) { + peer_count += 1; + } + + if peer_count >= target_peers { + return true; + } } + + false } /// Gives the ids of all known disconnected peers. diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index e643fca30fb..c289cb9a69c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -174,19 +174,6 @@ impl PeerInfo { self.subnets.iter() } - /// Returns the number of long lived subnets a peer is subscribed to. - // NOTE: This currently excludes sync committee subnets - pub fn long_lived_subnet_count(&self) -> usize { - if let Some(meta_data) = self.meta_data.as_ref() { - return meta_data.attnets().num_set_bits(); - } else if let Some(enr) = self.enr.as_ref() - && let Ok(attnets) = enr.attestation_bitfield::() - { - return attnets.num_set_bits(); - } - 0 - } - /// Returns an iterator over the long-lived subnets if it has any. pub fn long_lived_subnets(&self) -> Vec { let mut long_lived_subnets = Vec::new(); @@ -222,6 +209,13 @@ impl PeerInfo { } } } + + long_lived_subnets.extend( + self.custody_subnets + .iter() + .map(|&id| Subnet::DataColumn(id)), + ); + long_lived_subnets } @@ -240,6 +234,11 @@ impl PeerInfo { self.custody_subnets.iter() } + /// Returns the number of custody subnets this peer is assigned to. + pub fn custody_subnet_count(&self) -> usize { + self.custody_subnets.len() + } + /// Returns true if the peer is connected to a long-lived subnet. pub fn has_long_lived_subnet(&self) -> bool { // Check the meta_data @@ -262,6 +261,17 @@ impl PeerInfo { { return true; } + + // Check if the peer has custody subnets populated and the peer is subscribed to any of + // its custody subnets + let subscribed_to_any_custody_subnets = self + .custody_subnets + .iter() + .any(|subnet_id| self.subnets.contains(&Subnet::DataColumn(*subnet_id))); + if subscribed_to_any_custody_subnets { + return true; + } + false } @@ -318,6 +328,14 @@ impl PeerInfo { ) } + /// Checks if the peer is synced or advanced. + pub fn is_synced_or_advanced(&self) -> bool { + matches!( + self.sync_status, + SyncStatus::Synced { .. } | SyncStatus::Advanced { .. } + ) + } + /// Checks if the status is connected. pub fn is_dialing(&self) -> bool { matches!(self.connection_status, PeerConnectionStatus::Dialing { .. }) @@ -645,3 +663,50 @@ impl From for PeerState { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::Subnet; + use types::{DataColumnSubnetId, MainnetEthSpec}; + + type E = MainnetEthSpec; + + fn create_test_peer_info() -> PeerInfo { + PeerInfo::default() + } + + #[test] + fn test_has_long_lived_subnet_empty_custody_subnets() { + let peer_info = create_test_peer_info(); + // peer has no custody subnets or subscribed to any subnets hence return false + assert!(!peer_info.has_long_lived_subnet()); + } + + #[test] + fn test_has_long_lived_subnet_empty_subnets_with_custody_subnets() { + let mut peer_info = create_test_peer_info(); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(1)); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(2)); + // Peer has custody subnets but isn't subscribed to any hence return false + assert!(!peer_info.has_long_lived_subnet()); + } + + #[test] + fn test_has_long_lived_subnet_subscribed_to_custody_subnets() { + let mut peer_info = create_test_peer_info(); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(1)); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(2)); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(3)); + + peer_info + .subnets + .insert(Subnet::DataColumn(DataColumnSubnetId::new(1))); + peer_info + .subnets + .insert(Subnet::DataColumn(DataColumnSubnetId::new(2))); + // Missing DataColumnSubnetId::new(3) - but peer is subscribed to some custody subnets + // Peer is subscribed to any custody subnets - return true + assert!(peer_info.has_long_lived_subnet()); + } +} diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 2f5eb3f6894..f00503ec634 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -1120,13 +1120,12 @@ impl BackFillSync { .sampling_subnets() .iter() .all(|subnet_id| { - let peer_count = network + let min_peer_count = 1; + network .network_globals() .peers .read() - .good_range_sync_custody_subnet_peers(*subnet_id) - .count(); - peer_count > 0 + .has_good_peers_in_custody_subnet(subnet_id, min_peer_count) }) } else { true diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 96319f2efad..8907f7510fd 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1132,21 +1132,12 @@ impl SyncingChain { ) -> bool { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { // Require peers on all sampling column subnets before sending batches + let sampling_subnets = network.network_globals().sampling_subnets(); network .network_globals() - .sampling_subnets() - .iter() - .all(|subnet_id| { - let peer_db = network.network_globals().peers.read(); - let peer_count = self - .peers - .iter() - .filter(|peer| { - peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer) - }) - .count(); - peer_count > 0 - }) + .peers + .read() + .has_good_custody_range_sync_peer(&sampling_subnets, epoch) } else { true }