diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 93515ed5f6b..b9d5d4b9036 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -17,7 +17,7 @@ use std::{ time::{Duration, Instant}, }; use tracing::{debug, error, trace, warn}; -use types::{DataColumnSubnetId, EthSpec, SyncSubnetId}; +use types::{DataColumnSubnetId, EthSpec, SubnetId, SyncSubnetId}; pub use libp2p::core::Multiaddr; pub use libp2p::identity::Keypair; @@ -52,6 +52,11 @@ pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600); /// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet /// peers. pub const MIN_SYNC_COMMITTEE_PEERS: u64 = 2; +/// Avoid pruning sampling peers if subnet peer count is <= TARGET_SUBNET_PEERS. +pub const MIN_SAMPLING_COLUMN_SUBNET_PEERS: u64 = TARGET_SUBNET_PEERS as u64; +/// For non sampling columns, we need to ensure there is at least one peer for +/// publishing during proposals. +pub const MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS: u64 = 1; /// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of /// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and /// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55. @@ -729,7 +734,7 @@ impl PeerManager { } } else { // we have no meta-data for this peer, update - debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata"); + debug!(%peer_id, new_seq_no = meta_data.seq_number(), cgc=?meta_data.custody_group_count().ok(), "Obtained peer's metadata"); } let known_custody_group_count = peer_info @@ -949,6 +954,43 @@ impl PeerManager { } } + /// Run discovery query for additional custody peers if we fall below `TARGET_PEERS`. + fn maintain_custody_peers(&mut self) { + let subnets_to_discover: Vec = self + .network_globals + .sampling_subnets() + .iter() + .filter_map(|custody_subnet| { + if self + .network_globals + .peers + .read() + .has_good_peers_in_custody_subnet( + custody_subnet, + MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize, + ) + { + None + } else { + Some(SubnetDiscovery { + subnet: Subnet::DataColumn(*custody_subnet), + min_ttl: None, + }) + } + }) + .collect(); + + // request the subnet query from discovery + if !subnets_to_discover.is_empty() { + debug!( + subnets = ?subnets_to_discover.iter().map(|s| s.subnet).collect::>(), + "Making subnet queries for maintaining custody peers" + ); + self.events + .push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover)); + } + } + fn maintain_trusted_peers(&mut self) { let trusted_peers = self.trusted_peers.clone(); for trusted_peer in trusted_peers { @@ -1091,14 +1133,17 @@ impl PeerManager { // uniformly distributed, remove random peers. if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { // Of our connected peers, build a map from subnet_id -> Vec<(PeerId, PeerInfo)> - let mut subnet_to_peer: HashMap)>> = HashMap::new(); + let mut att_subnet_to_peer: HashMap)>> = + HashMap::new(); // These variables are used to track if a peer is in a long-lived sync-committee as we // may wish to retain this peer over others when pruning. let mut sync_committee_peer_count: HashMap = HashMap::new(); - let mut peer_to_sync_committee: HashMap< - PeerId, - std::collections::HashSet, - > = HashMap::new(); + let mut peer_to_sync_committee: HashMap> = HashMap::new(); + + let mut custody_subnet_peer_count: HashMap = HashMap::new(); + let mut peer_to_custody_subnet: HashMap> = + HashMap::new(); + let sampling_subnets = self.network_globals.sampling_subnets(); for (peer_id, info) in self.network_globals.peers.read().connected_peers() { // Ignore peers we trust or that we are already pruning @@ -1112,9 +1157,9 @@ impl PeerManager { // the dense sync committees. for subnet in info.long_lived_subnets() { match subnet { - Subnet::Attestation(_) => { - subnet_to_peer - .entry(subnet) + Subnet::Attestation(subnet_id) => { + att_subnet_to_peer + .entry(subnet_id) .or_default() .push((*peer_id, info.clone())); } @@ -1125,26 +1170,31 @@ impl PeerManager { .or_default() .insert(id); } - // TODO(das) to be implemented. We're not pruning data column peers yet - // because data column topics are subscribed as core topics until we - // implement recomputing data column subnets. - Subnet::DataColumn(_) => {} + Subnet::DataColumn(id) => { + *custody_subnet_peer_count.entry(id).or_default() += 1; + peer_to_custody_subnet + .entry(*peer_id) + .or_default() + .insert(id); + } } } } // Add to the peers to prune mapping while peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { - if let Some((_, peers_on_subnet)) = subnet_to_peer + if let Some((_, peers_on_subnet)) = att_subnet_to_peer .iter_mut() .max_by_key(|(_, peers)| peers.len()) { // and the subnet still contains peers if !peers_on_subnet.is_empty() { // Order the peers by the number of subnets they are long-lived - // subscribed too, shuffle equal peers. + // subscribed too, shuffle equal peers. Prioritize unsynced peers for pruning. peers_on_subnet.shuffle(&mut rand::rng()); - peers_on_subnet.sort_by_key(|(_, info)| info.long_lived_subnet_count()); + peers_on_subnet.sort_by_key(|(_, info)| { + (info.long_lived_attnet_count(), info.is_synced_or_advanced()) + }); // Try and find a candidate peer to remove from the subnet. // We ignore peers that would put us below our target outbound peers @@ -1187,6 +1237,32 @@ impl PeerManager { } } + // Ensure custody subnet peers are protected based on subnet type and peer count. + if let Some(subnets) = peer_to_custody_subnet.get(candidate_peer) { + let mut should_protect = false; + for subnet_id in subnets { + if let Some(subnet_count) = + custody_subnet_peer_count.get(subnet_id).copied() + { + let threshold = if sampling_subnets.contains(subnet_id) { + MIN_SAMPLING_COLUMN_SUBNET_PEERS + } else { + MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS + }; + + if subnet_count <= threshold { + should_protect = true; + break; + } + } + } + + if should_protect { + // Do not drop this peer in this pruning interval + continue; + } + } + if info.is_outbound_only() { outbound_peers_pruned += 1; } @@ -1202,7 +1278,7 @@ impl PeerManager { if let Some(index) = removed_peer_index { let (candidate_peer, _) = peers_on_subnet.remove(index); // Remove pruned peers from other subnet counts - for subnet_peers in subnet_to_peer.values_mut() { + for subnet_peers in att_subnet_to_peer.values_mut() { subnet_peers.retain(|(peer_id, _)| peer_id != &candidate_peer); } // Remove pruned peers from all sync-committee counts @@ -1218,6 +1294,19 @@ impl PeerManager { } } } + // Remove pruned peers from all custody subnet counts + if let Some(known_custody_subnets) = + peer_to_custody_subnet.get(&candidate_peer) + { + for custody_subnet in known_custody_subnets { + if let Some(custody_subnet_count) = + custody_subnet_peer_count.get_mut(custody_subnet) + { + *custody_subnet_count = + custody_subnet_count.saturating_sub(1); + } + } + } peers_to_prune.insert(candidate_peer); } else { peers_on_subnet.clear(); @@ -1271,6 +1360,9 @@ impl PeerManager { // Update peer score metrics; self.update_peer_score_metrics(); + // Maintain minimum count for custody peers. + self.maintain_custody_peers(); + // Maintain minimum count for sync committee peers. self.maintain_sync_committee_peers(); @@ -2153,6 +2245,83 @@ mod tests { assert!(!connected_peers.contains(&peers[5])); } + /// Test that custody subnet peers below threshold are protected from pruning. + /// Creates 3 peers: 2 on sampling subnet (below MIN_SAMPLING_COLUMN_SUBNET_PEERS=3), + /// 1 with no subnet. Should prune the peer with no subnet and keep the custody subnet peers. + #[tokio::test] + async fn test_peer_manager_protect_custody_subnet_peers_below_threshold() { + let target = 2; + let mut peer_manager = build_peer_manager(target).await; + + // Set up sampling subnets + let mut sampling_subnets = HashSet::new(); + sampling_subnets.insert(0.into()); + *peer_manager.network_globals.sampling_subnets.write() = sampling_subnets; + + let mut peers = Vec::new(); + + // Create 3 peers + for i in 0..3 { + let peer_id = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer_id, "/ip4/0.0.0.0".parse().unwrap(), None); + + let custody_subnets = if i < 2 { + // First 2 peers on sampling subnet 0 + [0.into()].into_iter().collect() + } else { + // Last peer has no custody subnets + HashSet::new() + }; + + // Set custody subnets for the peer + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer_id) + .unwrap() + .set_custody_subnets(custody_subnets.clone()); + + // Add subscriptions for custody subnets + for subnet_id in custody_subnets { + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer_id, Subnet::DataColumn(subnet_id)); + } + + peers.push(peer_id); + } + + // Verify initial setup + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + + // Perform the heartbeat to trigger pruning + peer_manager.heartbeat(); + + // Should prune down to target of 2 peers + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + // The 2 custody subnet peers should be protected + assert!(connected_peers.contains(&peers[0])); + assert!(connected_peers.contains(&peers[1])); + + // The peer with no custody subnets should be pruned + assert!(!connected_peers.contains(&peers[2])); + } + /// Test the pruning logic to prioritise peers with the most subnets, but not at the expense of /// removing our few sync-committee subnets. /// @@ -2265,7 +2434,7 @@ mod tests { /// This test is for reproducing the issue: /// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659 /// - /// Whether the issue happens depends on `subnet_to_peer` (HashMap), since HashMap doesn't + /// Whether the issue happens depends on `att_subnet_to_peer` (HashMap), since HashMap doesn't /// guarantee a particular order of iteration. So we repeat the test case to try to reproduce /// the issue. #[tokio::test] diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 974b41230e8..083c3f00c23 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -300,6 +300,7 @@ impl PeerDB { .filter(move |(_, info)| { // We check both the metadata and gossipsub data as we only want to count long-lived subscribed peers info.is_connected() + && info.is_synced_or_advanced() && info.on_subnet_metadata(&subnet) && info.on_subnet_gossipsub(&subnet) && info.is_good_gossipsub_peer() @@ -318,40 +319,69 @@ impl PeerDB { .filter(move |(_, info)| { // The custody_subnets hashset can be populated via enr or metadata let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); - info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer + info.is_connected() + && info.is_good_gossipsub_peer() + && is_custody_subnet_peer + && info.is_synced_or_advanced() }) .map(|(peer_id, _)| peer_id) } - /// Returns an iterator of all peers that are supposed to be custodying - /// the given subnet id. - pub fn good_range_sync_custody_subnet_peers( + /// Checks if there is at least one good peer for each specified custody subnet for the given epoch. + /// A "good" peer is one that is both connected and synced (or advanced) for the specified epoch. + pub fn has_good_custody_range_sync_peer( &self, - subnet: DataColumnSubnetId, - ) -> impl Iterator { - self.peers - .iter() - .filter(move |(_, info)| { - // The custody_subnets hashset can be populated via enr or metadata - info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) - }) - .map(|(peer_id, _)| peer_id) + subnets: &HashSet, + epoch: Epoch, + ) -> bool { + let mut remaining_subnets = subnets.clone(); + + let good_sync_peers_for_epoch = self.peers.values().filter(|&info| { + info.is_connected() + && match info.sync_status() { + SyncStatus::Synced { info } | SyncStatus::Advanced { info } => { + info.has_slot(epoch.end_slot(E::slots_per_epoch())) + } + SyncStatus::IrrelevantPeer + | SyncStatus::Behind { .. } + | SyncStatus::Unknown => false, + } + }); + + for info in good_sync_peers_for_epoch { + for subnet in info.custody_subnets_iter() { + if remaining_subnets.remove(subnet) && remaining_subnets.is_empty() { + return true; + } + } + } + + false } - /// Returns `true` if the given peer is assigned to the given subnet. - /// else returns `false` - /// - /// Returns `false` if peer doesn't exist in peerdb. - pub fn is_good_range_sync_custody_subnet_peer( + /// Checks if there are sufficient good peers for a single custody subnet. + /// A "good" peer is one that is both connected and synced (or advanced). + pub fn has_good_peers_in_custody_subnet( &self, - subnet: DataColumnSubnetId, - peer: &PeerId, + subnet: &DataColumnSubnetId, + target_peers: usize, ) -> bool { - if let Some(info) = self.peers.get(peer) { - info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) - } else { - false + let mut peer_count = 0usize; + for info in self + .peers + .values() + .filter(|info| info.is_connected() && info.is_synced_or_advanced()) + { + if info.is_assigned_to_custody_subnet(subnet) { + peer_count += 1; + } + + if peer_count >= target_peers { + return true; + } } + + false } /// Gives the ids of all known disconnected peers. diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index e643fca30fb..c2cdd5ecf73 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -174,9 +174,9 @@ impl PeerInfo { self.subnets.iter() } - /// Returns the number of long lived subnets a peer is subscribed to. - // NOTE: This currently excludes sync committee subnets - pub fn long_lived_subnet_count(&self) -> usize { + /// Returns the number of long lived attestation subnets a peer is subscribed to. + // NOTE: This currently excludes sync committee and column subnets + pub fn long_lived_attnet_count(&self) -> usize { if let Some(meta_data) = self.meta_data.as_ref() { return meta_data.attnets().num_set_bits(); } else if let Some(enr) = self.enr.as_ref() @@ -262,6 +262,17 @@ impl PeerInfo { { return true; } + + // Check if the peer has custody subnets populated and the peer is subscribed to any of + // its custody subnets + let subscribed_to_any_custody_subnets = self + .custody_subnets + .iter() + .any(|subnet_id| self.subnets.contains(&Subnet::DataColumn(*subnet_id))); + if subscribed_to_any_custody_subnets { + return true; + } + false } @@ -318,6 +329,14 @@ impl PeerInfo { ) } + /// Checks if the peer is synced or advanced. + pub fn is_synced_or_advanced(&self) -> bool { + matches!( + self.sync_status, + SyncStatus::Synced { .. } | SyncStatus::Advanced { .. } + ) + } + /// Checks if the status is connected. pub fn is_dialing(&self) -> bool { matches!(self.connection_status, PeerConnectionStatus::Dialing { .. }) @@ -645,3 +664,50 @@ impl From for PeerState { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::Subnet; + use types::{DataColumnSubnetId, MainnetEthSpec}; + + type E = MainnetEthSpec; + + fn create_test_peer_info() -> PeerInfo { + PeerInfo::default() + } + + #[test] + fn test_has_long_lived_subnet_empty_custody_subnets() { + let peer_info = create_test_peer_info(); + // peer has no custody subnets or subscribed to any subnets hence return false + assert!(!peer_info.has_long_lived_subnet()); + } + + #[test] + fn test_has_long_lived_subnet_empty_subnets_with_custody_subnets() { + let mut peer_info = create_test_peer_info(); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(1)); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(2)); + // Peer has custody subnets but isn't subscribed to any hence return false + assert!(!peer_info.has_long_lived_subnet()); + } + + #[test] + fn test_has_long_lived_subnet_subscribed_to_custody_subnets() { + let mut peer_info = create_test_peer_info(); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(1)); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(2)); + peer_info.custody_subnets.insert(DataColumnSubnetId::new(3)); + + peer_info + .subnets + .insert(Subnet::DataColumn(DataColumnSubnetId::new(1))); + peer_info + .subnets + .insert(Subnet::DataColumn(DataColumnSubnetId::new(2))); + // Missing DataColumnSubnetId::new(3) - but peer is subscribed to some custody subnets + // Peer is subscribed to any custody subnets - return true + assert!(peer_info.has_long_lived_subnet()); + } +} diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 48c356fb29c..a8f0be87a39 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -825,8 +825,8 @@ impl RequestType { match self { // add more protocols when versions/encodings are supported RequestType::Status(_) => vec![ - ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy), ProtocolId::new(SupportedProtocol::StatusV2, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy), ], RequestType::Goodbye(_) => vec![ProtocolId::new( SupportedProtocol::GoodbyeV1, diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index ad0b4c4462a..ad03b45db9c 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -76,21 +76,23 @@ fn test_tcp_status_rpc() { .await; // Dummy STATUS RPC message - let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 { + let rpc_request = RequestType::Status(StatusMessage::V2(StatusMessageV2 { fork_digest: [0; 4], finalized_root: Hash256::zero(), finalized_epoch: Epoch::new(1), head_root: Hash256::zero(), head_slot: Slot::new(1), + earliest_available_slot: Slot::new(0), })); // Dummy STATUS RPC message - let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 { + let rpc_response = Response::Status(StatusMessage::V2(StatusMessageV2 { fork_digest: [0; 4], finalized_root: Hash256::zero(), finalized_epoch: Epoch::new(1), head_root: Hash256::zero(), head_slot: Slot::new(1), + earliest_available_slot: Slot::new(0), })); // build the sender future @@ -1205,21 +1207,23 @@ fn test_delayed_rpc_response() { .await; // Dummy STATUS RPC message - let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 { + let rpc_request = RequestType::Status(StatusMessage::V2(StatusMessageV2 { fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), finalized_epoch: Epoch::new(1), head_root: Hash256::from_low_u64_be(0), head_slot: Slot::new(1), + earliest_available_slot: Slot::new(0), })); // Dummy STATUS RPC message - let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 { + let rpc_response = Response::Status(StatusMessage::V2(StatusMessageV2 { fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), finalized_epoch: Epoch::new(1), head_root: Hash256::from_low_u64_be(0), head_slot: Slot::new(1), + earliest_available_slot: Slot::new(0), })); // build the sender future @@ -1335,21 +1339,23 @@ fn test_active_requests() { .await; // Dummy STATUS RPC request. - let rpc_request = RequestType::Status(StatusMessage::V1(StatusMessageV1 { + let rpc_request = RequestType::Status(StatusMessage::V2(StatusMessageV2 { fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), finalized_epoch: Epoch::new(1), head_root: Hash256::from_low_u64_be(0), head_slot: Slot::new(1), + earliest_available_slot: Slot::new(0), })); // Dummy STATUS RPC response. - let rpc_response = Response::Status(StatusMessage::V1(StatusMessageV1 { + let rpc_response = Response::Status(StatusMessage::V2(StatusMessageV2 { fork_digest: [0; 4], finalized_root: Hash256::zero(), finalized_epoch: Epoch::new(1), head_root: Hash256::zero(), head_slot: Slot::new(1), + earliest_available_slot: Slot::new(0), })); // Number of requests. diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ae9ac2e7705..2e37691b4ba 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -1120,13 +1120,12 @@ impl BackFillSync { .sampling_subnets() .iter() .all(|subnet_id| { - let peer_count = network + let min_peer_count = 1; + network .network_globals() .peers .read() - .good_range_sync_custody_subnet_peers(*subnet_id) - .count(); - peer_count > 0 + .has_good_peers_in_custody_subnet(subnet_id, min_peer_count) }) } else { true diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 96319f2efad..8907f7510fd 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1132,21 +1132,12 @@ impl SyncingChain { ) -> bool { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { // Require peers on all sampling column subnets before sending batches + let sampling_subnets = network.network_globals().sampling_subnets(); network .network_globals() - .sampling_subnets() - .iter() - .all(|subnet_id| { - let peer_db = network.network_globals().peers.read(); - let peer_count = self - .peers - .iter() - .filter(|peer| { - peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer) - }) - .count(); - peer_count > 0 - }) + .peers + .read() + .has_good_custody_range_sync_peer(&sampling_subnets, epoch) } else { true }