Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
05baf9c
Maintain peers across all sampling subnets. Make discovery requests i…
jimmygchen Aug 21, 2025
9e87e49
Clean ups.
jimmygchen Aug 21, 2025
232e685
Prioritize unsynced peers for pruning
jimmygchen Aug 21, 2025
edf8571
Remove brittle and unmaintainable test
jimmygchen Aug 21, 2025
6b1f2c8
Fix function behaviour
jimmygchen Aug 21, 2025
901208e
Merge remote-tracking branch 'origin/unstable' into maintain-custody-…
jimmygchen Aug 29, 2025
0a2fd8b
Update peer manager tests to use `MetaDataV3`
jimmygchen Sep 1, 2025
74466bd
Prioritise data column subnet uniform distribution when pruning peers.
jimmygchen Sep 1, 2025
36c5f8a
Refactor prune peer function.
jimmygchen Sep 1, 2025
daa619c
Fix lint
jimmygchen Sep 1, 2025
d34b778
Update tests
jimmygchen Sep 2, 2025
6252834
Merge remote-tracking branch 'origin/unstable' into maintain-custody-…
jimmygchen Sep 2, 2025
7ac2096
Update tests and fix some bugs
jimmygchen Sep 2, 2025
0d06a46
Add additional peer pruning tests on sync status
jimmygchen Sep 2, 2025
4a31198
Fix test flakiness due to sampling subnet computation from random Pee…
jimmygchen Sep 2, 2025
73332c2
Merge branch 'unstable' into maintain-custody-peers
jimmygchen Sep 2, 2025
63ffd1d
Merge branch 'unstable' into maintain-custody-peers
jimmygchen Sep 3, 2025
ac778c6
Update code comments from review
jimmygchen Sep 3, 2025
09d6bbe
Update typo in comment
jimmygchen Sep 4, 2025
20483f9
Fix a pruning bug and update comments.
jimmygchen Sep 4, 2025
de98f29
Merge branch 'maintain-custody-peers' of github.com:jimmygchen/lighth…
jimmygchen Sep 4, 2025
ec88397
Merge branch 'unstable' into maintain-custody-peers
jimmygchen Sep 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 188 additions & 19 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
time::{Duration, Instant},
};
use tracing::{debug, error, trace, warn};
use types::{DataColumnSubnetId, EthSpec, SyncSubnetId};
use types::{DataColumnSubnetId, EthSpec, SubnetId, SyncSubnetId};

pub use libp2p::core::Multiaddr;
pub use libp2p::identity::Keypair;
Expand Down Expand Up @@ -52,6 +52,11 @@ pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600);
/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
/// peers.
pub const MIN_SYNC_COMMITTEE_PEERS: u64 = 2;
/// Avoid pruning sampling peers if subnet peer count is <= TARGET_SUBNET_PEERS.
pub const MIN_SAMPLING_COLUMN_SUBNET_PEERS: u64 = TARGET_SUBNET_PEERS as u64;
/// For non sampling columns, we need to ensure there is at least one peer for
/// publishing during proposals.
pub const MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS: u64 = 1;
/// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of
/// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and
/// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55.
Expand Down Expand Up @@ -729,7 +734,7 @@ impl<E: EthSpec> PeerManager<E> {
}
} else {
// we have no meta-data for this peer, update
debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata");
debug!(%peer_id, new_seq_no = meta_data.seq_number(), cgc=?meta_data.custody_group_count().ok(), "Obtained peer's metadata");
}

let known_custody_group_count = peer_info
Expand Down Expand Up @@ -949,6 +954,43 @@ impl<E: EthSpec> PeerManager<E> {
}
}

/// Run discovery query for additional custody peers if we fall below `TARGET_PEERS`.
fn maintain_custody_peers(&mut self) {
let subnets_to_discover: Vec<SubnetDiscovery> = self
.network_globals
.sampling_subnets()
.iter()
.filter_map(|custody_subnet| {
if self
.network_globals
.peers
.read()
.has_good_peers_in_custody_subnet(
custody_subnet,
MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize,
)
{
None
} else {
Some(SubnetDiscovery {
subnet: Subnet::DataColumn(*custody_subnet),
min_ttl: None,
})
}
})
.collect();

// request the subnet query from discovery
if !subnets_to_discover.is_empty() {
debug!(
subnets = ?subnets_to_discover.iter().map(|s| s.subnet).collect::<Vec<_>>(),
"Making subnet queries for maintaining custody peers"
);
self.events
.push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover));
}
}

fn maintain_trusted_peers(&mut self) {
let trusted_peers = self.trusted_peers.clone();
for trusted_peer in trusted_peers {
Expand Down Expand Up @@ -1091,14 +1133,17 @@ impl<E: EthSpec> PeerManager<E> {
// uniformly distributed, remove random peers.
if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) {
// Of our connected peers, build a map from subnet_id -> Vec<(PeerId, PeerInfo)>
let mut subnet_to_peer: HashMap<Subnet, Vec<(PeerId, PeerInfo<E>)>> = HashMap::new();
let mut att_subnet_to_peer: HashMap<SubnetId, Vec<(PeerId, PeerInfo<E>)>> =
HashMap::new();
// These variables are used to track if a peer is in a long-lived sync-committee as we
// may wish to retain this peer over others when pruning.
let mut sync_committee_peer_count: HashMap<SyncSubnetId, u64> = HashMap::new();
let mut peer_to_sync_committee: HashMap<
PeerId,
std::collections::HashSet<SyncSubnetId>,
> = HashMap::new();
let mut peer_to_sync_committee: HashMap<PeerId, HashSet<SyncSubnetId>> = HashMap::new();

let mut custody_subnet_peer_count: HashMap<DataColumnSubnetId, u64> = HashMap::new();
let mut peer_to_custody_subnet: HashMap<PeerId, HashSet<DataColumnSubnetId>> =
HashMap::new();
let sampling_subnets = self.network_globals.sampling_subnets();

for (peer_id, info) in self.network_globals.peers.read().connected_peers() {
// Ignore peers we trust or that we are already pruning
Expand All @@ -1112,9 +1157,9 @@ impl<E: EthSpec> PeerManager<E> {
// the dense sync committees.
for subnet in info.long_lived_subnets() {
match subnet {
Subnet::Attestation(_) => {
subnet_to_peer
.entry(subnet)
Subnet::Attestation(subnet_id) => {
att_subnet_to_peer
.entry(subnet_id)
.or_default()
.push((*peer_id, info.clone()));
}
Expand All @@ -1125,26 +1170,31 @@ impl<E: EthSpec> PeerManager<E> {
.or_default()
.insert(id);
}
// TODO(das) to be implemented. We're not pruning data column peers yet
// because data column topics are subscribed as core topics until we
// implement recomputing data column subnets.
Subnet::DataColumn(_) => {}
Subnet::DataColumn(id) => {
*custody_subnet_peer_count.entry(id).or_default() += 1;
peer_to_custody_subnet
.entry(*peer_id)
.or_default()
.insert(id);
}
}
}
}

// Add to the peers to prune mapping
while peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) {
if let Some((_, peers_on_subnet)) = subnet_to_peer
if let Some((_, peers_on_subnet)) = att_subnet_to_peer
.iter_mut()
.max_by_key(|(_, peers)| peers.len())
{
// and the subnet still contains peers
if !peers_on_subnet.is_empty() {
// Order the peers by the number of subnets they are long-lived
// subscribed too, shuffle equal peers.
// subscribed too, shuffle equal peers. Prioritize unsynced peers for pruning.
peers_on_subnet.shuffle(&mut rand::rng());
peers_on_subnet.sort_by_key(|(_, info)| info.long_lived_subnet_count());
peers_on_subnet.sort_by_key(|(_, info)| {
(info.long_lived_attnet_count(), info.is_synced_or_advanced())
});

// Try and find a candidate peer to remove from the subnet.
// We ignore peers that would put us below our target outbound peers
Expand Down Expand Up @@ -1187,6 +1237,32 @@ impl<E: EthSpec> PeerManager<E> {
}
}

// Ensure custody subnet peers are protected based on subnet type and peer count.
if let Some(subnets) = peer_to_custody_subnet.get(candidate_peer) {
let mut should_protect = false;
for subnet_id in subnets {
if let Some(subnet_count) =
custody_subnet_peer_count.get(subnet_id).copied()
{
let threshold = if sampling_subnets.contains(subnet_id) {
MIN_SAMPLING_COLUMN_SUBNET_PEERS
} else {
MIN_NON_SAMPLING_COLUMN_SUBNET_PEERS
};

if subnet_count <= threshold {
should_protect = true;
break;
}
}
}

if should_protect {
// Do not drop this peer in this pruning interval
continue;
}
}

if info.is_outbound_only() {
outbound_peers_pruned += 1;
}
Expand All @@ -1202,7 +1278,7 @@ impl<E: EthSpec> PeerManager<E> {
if let Some(index) = removed_peer_index {
let (candidate_peer, _) = peers_on_subnet.remove(index);
// Remove pruned peers from other subnet counts
for subnet_peers in subnet_to_peer.values_mut() {
for subnet_peers in att_subnet_to_peer.values_mut() {
subnet_peers.retain(|(peer_id, _)| peer_id != &candidate_peer);
}
// Remove pruned peers from all sync-committee counts
Expand All @@ -1218,6 +1294,19 @@ impl<E: EthSpec> PeerManager<E> {
}
}
}
// Remove pruned peers from all custody subnet counts
if let Some(known_custody_subnets) =
peer_to_custody_subnet.get(&candidate_peer)
{
for custody_subnet in known_custody_subnets {
if let Some(custody_subnet_count) =
custody_subnet_peer_count.get_mut(custody_subnet)
{
*custody_subnet_count =
custody_subnet_count.saturating_sub(1);
}
}
}
peers_to_prune.insert(candidate_peer);
} else {
peers_on_subnet.clear();
Expand Down Expand Up @@ -1271,6 +1360,9 @@ impl<E: EthSpec> PeerManager<E> {
// Update peer score metrics;
self.update_peer_score_metrics();

// Maintain minimum count for custody peers.
self.maintain_custody_peers();

// Maintain minimum count for sync committee peers.
self.maintain_sync_committee_peers();

Expand Down Expand Up @@ -2153,6 +2245,83 @@ mod tests {
assert!(!connected_peers.contains(&peers[5]));
}

/// Test that custody subnet peers below threshold are protected from pruning.
/// Creates 3 peers: 2 on sampling subnet (below MIN_SAMPLING_COLUMN_SUBNET_PEERS=3),
/// 1 with no subnet. Should prune the peer with no subnet and keep the custody subnet peers.
#[tokio::test]
async fn test_peer_manager_protect_custody_subnet_peers_below_threshold() {
let target = 2;
let mut peer_manager = build_peer_manager(target).await;

// Set up sampling subnets
let mut sampling_subnets = HashSet::new();
sampling_subnets.insert(0.into());
*peer_manager.network_globals.sampling_subnets.write() = sampling_subnets;

let mut peers = Vec::new();

// Create 3 peers
for i in 0..3 {
let peer_id = PeerId::random();
peer_manager.inject_connect_ingoing(&peer_id, "/ip4/0.0.0.0".parse().unwrap(), None);

let custody_subnets = if i < 2 {
// First 2 peers on sampling subnet 0
[0.into()].into_iter().collect()
} else {
// Last peer has no custody subnets
HashSet::new()
};

// Set custody subnets for the peer
peer_manager
.network_globals
.peers
.write()
.peer_info_mut(&peer_id)
.unwrap()
.set_custody_subnets(custody_subnets.clone());

// Add subscriptions for custody subnets
for subnet_id in custody_subnets {
peer_manager
.network_globals
.peers
.write()
.add_subscription(&peer_id, Subnet::DataColumn(subnet_id));
}

peers.push(peer_id);
}

// Verify initial setup
assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3);

// Perform the heartbeat to trigger pruning
peer_manager.heartbeat();

// Should prune down to target of 2 peers
assert_eq!(
peer_manager.network_globals.connected_or_dialing_peers(),
target
);

let connected_peers: std::collections::HashSet<_> = peer_manager
.network_globals
.peers
.read()
.connected_or_dialing_peers()
.cloned()
.collect();

// The 2 custody subnet peers should be protected
assert!(connected_peers.contains(&peers[0]));
assert!(connected_peers.contains(&peers[1]));

// The peer with no custody subnets should be pruned
assert!(!connected_peers.contains(&peers[2]));
}

/// Test the pruning logic to prioritise peers with the most subnets, but not at the expense of
/// removing our few sync-committee subnets.
///
Expand Down Expand Up @@ -2265,7 +2434,7 @@ mod tests {
/// This test is for reproducing the issue:
/// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659
///
/// Whether the issue happens depends on `subnet_to_peer` (HashMap), since HashMap doesn't
/// Whether the issue happens depends on `att_subnet_to_peer` (HashMap), since HashMap doesn't
/// guarantee a particular order of iteration. So we repeat the test case to try to reproduce
/// the issue.
#[tokio::test]
Expand Down
Loading
Loading