Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anchor/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ pub use config::{Config, DEFAULT_DISC_PORT, DEFAULT_QUIC_PORT, DEFAULT_TCP_PORT}
pub use network::Network;
pub use network_utils::listen_addr::{ListenAddr, ListenAddress};
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use peer_manager::types::{ClientType, PeerInfo};
8 changes: 8 additions & 0 deletions anchor/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ pub static PEERS_CONNECTED: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge("libp2p_peers", "Count of libp2p peers currently connected")
});

pub static PEERS_BY_CLIENT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"libp2p_peers_by_client",
"Count of connected peers by client type (anchor, go-ssv, unknown)",
&["client_type"],
)
});

pub static HANDSHAKE_SUCCESSFUL: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"libp2p_handshake_successful_total",
Expand Down
67 changes: 41 additions & 26 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ use crate::{
Config, Enr,
behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError},
discovery::{DiscoveredPeers, Discovery, DiscoveryError},
handshake,
handshake::node_info::{NodeInfo, NodeMetadata},
handshake::{
self,
node_info::{NodeInfo, NodeMetadata},
},
keypair_utils::load_private_key,
network::NetworkError::SwarmConfig,
peer_manager,
peer_manager::{ConnectActions, PeerManager},
peer_manager::{self, ConnectActions, PeerManager},
scoring::topic_score_config::topic_score_params_for_subnet_with_rate,
transport::build_transport,
};
Expand Down Expand Up @@ -565,6 +566,36 @@ impl<R: MessageReceiver> Network<R> {
}
}

/// Record metrics about subnet overlap after successful handshake.
fn record_handshake_subnet_match_metrics(
&self,
peer_id: PeerId,
their_metadata: &NodeMetadata,
) {
if let Some(our_metadata) = &self.node_info.metadata {
let matching_count =
count_matching_subnets(&our_metadata.subnets, &their_metadata.subnets);

debug!(
%peer_id,
our_subnets = %our_metadata.subnets,
their_subnets = %their_metadata.subnets,
matching_subnets = matching_count,
"Handshake completed"
);

// Record subnet match count metric
if let Ok(gauge_vec) = crate::metrics::HANDSHAKE_SUBNET_MATCHES.as_ref() {
let label = &matching_count.to_string();
if let Ok(gauge) = gauge_vec.get_metric_with_label_values(&[label]) {
gauge.inc();
}
}
} else {
debug!(%peer_id, "Handshake completed");
}
}

fn handle_handshake_result(&mut self, result: Result<handshake::Completed, handshake::Failed>) {
match result {
Ok(handshake::Completed {
Expand All @@ -576,30 +607,14 @@ impl<R: MessageReceiver> Network<R> {
counter.inc();
}

// Count and record matching subnets
if let (Some(our_metadata), Some(their_metadata)) =
(&self.node_info.metadata, &their_info.metadata)
{
let matching_count =
count_matching_subnets(&our_metadata.subnets, &their_metadata.subnets);

debug!(
%peer_id,
our_subnets = %our_metadata.subnets,
their_subnets = %their_metadata.subnets,
matching_subnets = matching_count,
"Handshake completed"
);
if let Some(metadata) = their_info.metadata {
self.peer_manager()
.handle_handshake_completed(peer_id, metadata.node_version.clone());

// Record subnet match count
if let Ok(gauge_vec) = crate::metrics::HANDSHAKE_SUBNET_MATCHES.as_ref() {
let label = &matching_count.to_string();
if let Ok(gauge) = gauge_vec.get_metric_with_label_values(&[label]) {
gauge.inc();
}
}
// Record subnet matching metrics
self.record_handshake_subnet_match_metrics(peer_id, &metadata);
} else {
debug!(%peer_id, ?their_info, "Handshake completed");
debug!(%peer_id, ?their_info, "Handshake completed without metadata");
}
}
Err(handshake::Failed { peer_id, error }) => {
Expand Down
50 changes: 39 additions & 11 deletions anchor/network/src/peer_manager/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ssz_types::{Bitfield, length::Fixed, typenum::U128};
use subnet_service::SubnetId;
use thiserror::Error;

use crate::{Config, Enr, discovery, metrics::PEERS_CONNECTED};
use crate::{ClientType, Config, PeerInfo, discovery, metrics::PEERS_CONNECTED};

/// A fraction of `target_peers` that we allow to connect to us in excess of
/// `target_peers`. For clarity, if `target_peers` is 50 and
Expand Down Expand Up @@ -111,7 +111,7 @@ impl ConnectionManager {
pub fn should_dial_peer(
&self,
peer_id: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
blocked_peers: &HashSet<PeerId>,
) -> bool {
Expand All @@ -135,7 +135,7 @@ impl ConnectionManager {
pub fn qualifies_for_priority_connection(
&self,
peer_id: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> bool {
let Some(subnets) = self.get_peer_subnets_with_enr_fallback(peer_id, peer_store) else {
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ConnectionManager {
pub fn peer_offers_needed_subnets_with_enr_fallback(
&self,
peer: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed: &HashSet<SubnetId>,
) -> bool {
if needed.is_empty() {
Expand Down Expand Up @@ -246,12 +246,15 @@ impl ConnectionManager {
fn get_peer_subnets_with_enr_fallback(
&self,
peer: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
) -> Option<Bitfield<Fixed<U128>>> {
self.get_peer_subnets_observed_only(peer).or_else(|| {
// Fallback to ENR
let enr = peer_store.get_custom_data(peer)?;
discovery::committee_bitfield(enr).ok()
peer_store
.get_custom_data(peer)?
.enr
.as_ref()
.and_then(|enr| discovery::committee_bitfield(enr).ok())
})
}

Expand Down Expand Up @@ -303,12 +306,37 @@ impl ConnectionManager {
}

/// Update metrics if connection state changed
pub fn update_metrics_if_changed(&self, changed: bool) {
pub fn update_metrics_if_changed(&self, changed: bool, peer_store: &MemoryStore<PeerInfo>) {
if changed {
metrics::set_gauge(
&PEERS_CONNECTED,
self.connected.len().try_into().unwrap_or(0),
);

let mut anchor_count = 0;
let mut go_ssv_count = 0;
let mut unknown_count = 0;

// Count all connected peers by client type
for peer_id in self.connected.iter() {
if let Some(data) = peer_store.get_custom_data(peer_id) {
match data.client_type {
Some(ClientType::Anchor) => anchor_count += 1,
Some(ClientType::GoSSV) => go_ssv_count += 1,
None => unknown_count += 1,
}
} else {
unknown_count += 1;
}
}

metrics::set_gauge_vec(&crate::metrics::PEERS_BY_CLIENT, &["anchor"], anchor_count);
metrics::set_gauge_vec(&crate::metrics::PEERS_BY_CLIENT, &["go-ssv"], go_ssv_count);
metrics::set_gauge_vec(
&crate::metrics::PEERS_BY_CLIENT,
&["unknown"],
unknown_count,
);
}
}

Expand All @@ -331,7 +359,7 @@ impl ConnectionManager {
&self,
limit_result: Result<(), ConnectionDenied>,
peer: PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
match limit_result {
Expand Down Expand Up @@ -371,7 +399,7 @@ impl ConnectionManager {
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
let limit_result = self
Expand Down Expand Up @@ -407,7 +435,7 @@ impl ConnectionManager {
addr: &Multiaddr,
role_override: Endpoint,
port_use: PortUse,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
let limit_result = self
Expand Down
34 changes: 24 additions & 10 deletions anchor/network/src/peer_manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use subnet_service::SubnetId;
use tracing::debug;

use super::{connection::ConnectionManager, types::ConnectActions};
use crate::{Enr, discovery};
use crate::{Enr, PeerInfo, discovery};

/// Number of times we overdial when we need peers for a subnet
const PEER_OVERDIAL_FACTOR: usize = 2;
Expand All @@ -33,7 +33,7 @@ impl PeerDiscovery {
/// Process a discovered peer and return dial options if we should connect
pub fn process_discovered_peer(
enr: Enr,
peer_store: &mut MemoryStore<Enr>,
peer_store: &mut MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
needed_subnets: &HashSet<SubnetId>,
blocked_peers: &HashSet<PeerId>,
Expand All @@ -51,7 +51,18 @@ impl PeerDiscovery {
addr: multiaddr,
}));
}
peer_store.insert_custom_data(&id, enr.clone());

if let Some(peer_info) = peer_store.get_custom_data_mut(&id) {
peer_info.set_enr(enr);
} else {
peer_store.insert_custom_data(
&id,
PeerInfo {
enr: Some(enr),
client_type: None,
},
)
}

// Check if we should dial this peer
let should_dial =
Expand All @@ -68,7 +79,7 @@ impl PeerDiscovery {
pub fn track_subnet_peers(
subnet_id: SubnetId,
needed_subnets: &mut HashSet<SubnetId>,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> ConnectActions {
Expand All @@ -85,7 +96,7 @@ impl PeerDiscovery {
/// Determine what actions to take for the given subnets
pub fn determine_actions_for_subnets(
subnets: &[SubnetId],
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> ConnectActions {
Expand All @@ -106,7 +117,10 @@ impl PeerDiscovery {
continue;
}

let Some(enr) = record.get_custom_data() else {
let Some(enr) = record
.get_custom_data()
.and_then(|peer_info| peer_info.enr.as_ref())
else {
continue;
};

Expand Down Expand Up @@ -142,7 +156,7 @@ impl PeerDiscovery {
/// Check if any subnets need more peers and return dial/discovery actions
pub fn check_subnet_peers(
needed_subnets: &HashSet<SubnetId>,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> Option<ConnectActions> {
Expand All @@ -162,9 +176,9 @@ impl PeerDiscovery {

/// Get candidate peers that we could potentially dial
fn candidate_peers<'a>(
peer_store: &'a MemoryStore<Enr>,
peer_store: &'a MemoryStore<PeerInfo>,
connected: &HashSet<PeerId>,
) -> Vec<(&'a PeerId, &'a PeerRecord<Enr>)> {
) -> Vec<(&'a PeerId, &'a PeerRecord<PeerInfo>)> {
let mut peers = peer_store
.record_iter()
.filter(|(peer, record)| {
Expand All @@ -176,7 +190,7 @@ impl PeerDiscovery {
}

/// Convert a peer ID to dial options
fn peer_to_dial_opts(peer: &PeerId, peer_store: &MemoryStore<Enr>) -> DialOpts {
fn peer_to_dial_opts(peer: &PeerId, peer_store: &MemoryStore<PeerInfo>) -> DialOpts {
let addresses = peer_store
.addresses_of_peer(peer)
.into_iter()
Expand Down
28 changes: 25 additions & 3 deletions anchor/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use peer_store::memory_store::{self, MemoryStore};
use subnet_service::SubnetId;
use tracing::info;

use crate::{Config, Enr};
use crate::{ClientType, Config, Enr, peer_manager::types::PeerInfo};

pub mod blocking;
pub mod connection;
Expand All @@ -34,7 +34,7 @@ pub use types::{ConnectActions, Event};

/// Main peer manager that coordinates all peer management functionality
pub struct PeerManager {
peer_store: peer_store::Behaviour<MemoryStore<Enr>>,
peer_store: peer_store::Behaviour<MemoryStore<PeerInfo>>,
connection_manager: ConnectionManager,
heartbeat_manager: HeartbeatManager,
blocking_manager: BlockingManager,
Expand Down Expand Up @@ -154,6 +154,28 @@ impl PeerManager {
.set_peer_subscribed(peer, subnet, subscribed);
}

/// Handle a completed handshake by updating peer client type and triggering metrics update.
pub fn handle_handshake_completed(&mut self, peer_id: PeerId, node_version: String) {
let client_type = ClientType::from(node_version);

// Update client type in peer store
if let Some(peer_info) = self.peer_store.store_mut().get_custom_data_mut(&peer_id) {
peer_info.set_client_type(client_type);
} else {
self.peer_store.store_mut().insert_custom_data(
&peer_id,
PeerInfo {
enr: None,
client_type: Some(client_type),
},
);
}

// Trigger metric recalculation
self.connection_manager
.update_metrics_if_changed(true, self.peer_store.store());
}

/// Returns true if a connected peer should be disconnected because it doesn't offer any needed
/// subnets based on observed gossipsub subscriptions (no ENR fallback)
pub fn should_disconnect_due_to_subnets(&self, peer: &PeerId) -> bool {
Expand Down Expand Up @@ -334,7 +356,7 @@ impl NetworkBehaviour for PeerManager {

// Update metrics if connection state changed
self.connection_manager
.update_metrics_if_changed(changed_connected);
.update_metrics_if_changed(changed_connected, self.peer_store.store());

// Delegate to sub-components
self.blocking_manager.on_swarm_event(event);
Expand Down
Loading
Loading