Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anchor/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ pub use config::{Config, DEFAULT_DISC_PORT, DEFAULT_QUIC_PORT, DEFAULT_TCP_PORT}
pub use network::Network;
pub use network_utils::listen_addr::{ListenAddr, ListenAddress};
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use peer_manager::types::{ClientType, PeerInfo};
8 changes: 8 additions & 0 deletions anchor/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ pub static PEERS_CONNECTED: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge("libp2p_peers", "Count of libp2p peers currently connected")
});

pub static PEERS_BY_CLIENT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"libp2p_peers_by_client",
"Count of connected peers by client type (anchor, go-ssv, unknown)",
&["client_type"],
)
});

pub static HANDSHAKE_SUCCESSFUL: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"libp2p_handshake_successful_total",
Expand Down
84 changes: 59 additions & 25 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ use types::{ChainSpec, EthSpec};
use version::version_with_platform;

use crate::{
Config, Enr,
ClientType, Config, Enr, PeerInfo,
behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError},
discovery::{DiscoveredPeers, Discovery, DiscoveryError},
handshake,
handshake::node_info::{NodeInfo, NodeMetadata},
handshake::{
self,
node_info::{NodeInfo, NodeMetadata},
},
keypair_utils::load_private_key,
network::NetworkError::SwarmConfig,
peer_manager,
peer_manager::{ConnectActions, PeerManager},
peer_manager::{self, ConnectActions, PeerManager},
scoring::topic_score_config::topic_score_params_for_subnet_with_rate,
transport::build_transport,
};
Expand Down Expand Up @@ -576,30 +577,63 @@ impl<R: MessageReceiver> Network<R> {
counter.inc();
}

// Count and record matching subnets
if let (Some(our_metadata), Some(their_metadata)) =
(&self.node_info.metadata, &their_info.metadata)
{
let matching_count =
count_matching_subnets(&our_metadata.subnets, &their_metadata.subnets);

debug!(
%peer_id,
our_subnets = %our_metadata.subnets,
their_subnets = %their_metadata.subnets,
matching_subnets = matching_count,
"Handshake completed"
);
let store_ref = self
.swarm
.behaviour_mut()
.peer_manager
.peer_store
.store_mut();

if let Some(metadata) = their_info.metadata {
let client_type = ClientType::from(metadata.node_version.clone());

// Update client type in peer store
if let Some(peer_info) = store_ref.get_custom_data_mut(&peer_id) {
peer_info.set_client_type(client_type);
} else {
// If no peer info yet, create new peer info
store_ref.insert_custom_data(
&peer_id,
PeerInfo {
enr: None,
client_type: Some(client_type),
},
);
}

// Record subnet match count
if let Ok(gauge_vec) = crate::metrics::HANDSHAKE_SUBNET_MATCHES.as_ref() {
let label = &matching_count.to_string();
if let Ok(gauge) = gauge_vec.get_metric_with_label_values(&[label]) {
gauge.inc();
// Count and record matching subnets
if let Some(our_metadata) = &self.node_info.metadata {
let matching_count =
count_matching_subnets(&our_metadata.subnets, &metadata.subnets);

debug!(
%peer_id,
our_subnets = %our_metadata.subnets,
their_subnets = %metadata.subnets,
matching_subnets = matching_count,
"Handshake completed"
);

// Record subnet match count
if let Ok(gauge_vec) = crate::metrics::HANDSHAKE_SUBNET_MATCHES.as_ref() {
let label = &matching_count.to_string();
if let Ok(gauge) = gauge_vec.get_metric_with_label_values(&[label]) {
gauge.inc();
}
}
} else {
debug!(%peer_id, "Handshake completed");
}

// Trigger metric recalculation
let behaviour = self.swarm.behaviour();
let store_ref = behaviour.peer_manager.peer_store.store();
behaviour
.peer_manager
.connection_manager
.update_metrics_if_changed(true, store_ref);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, what do you think about moving everything inside this if into the peer manager and/or the connection manager within it? This would allow us to avoid making the peer_store and conncection_manager pub, for better encapsulation

} else {
debug!(%peer_id, ?their_info, "Handshake completed");
debug!(%peer_id, ?their_info, "Handshake completed without metadata");
}
}
Err(handshake::Failed { peer_id, error }) => {
Expand Down
50 changes: 39 additions & 11 deletions anchor/network/src/peer_manager/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ssz_types::{Bitfield, length::Fixed, typenum::U128};
use subnet_service::SubnetId;
use thiserror::Error;

use crate::{Config, Enr, discovery, metrics::PEERS_CONNECTED};
use crate::{ClientType, Config, PeerInfo, discovery, metrics::PEERS_CONNECTED};

/// A fraction of `target_peers` that we allow to connect to us in excess of
/// `target_peers`. For clarity, if `target_peers` is 50 and
Expand Down Expand Up @@ -111,7 +111,7 @@ impl ConnectionManager {
pub fn should_dial_peer(
&self,
peer_id: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
blocked_peers: &HashSet<PeerId>,
) -> bool {
Expand All @@ -135,7 +135,7 @@ impl ConnectionManager {
pub fn qualifies_for_priority_connection(
&self,
peer_id: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> bool {
let Some(subnets) = self.get_peer_subnets_with_enr_fallback(peer_id, peer_store) else {
Expand Down Expand Up @@ -204,7 +204,7 @@ impl ConnectionManager {
pub fn peer_offers_needed_subnets_with_enr_fallback(
&self,
peer: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed: &HashSet<SubnetId>,
) -> bool {
if needed.is_empty() {
Expand Down Expand Up @@ -246,12 +246,15 @@ impl ConnectionManager {
fn get_peer_subnets_with_enr_fallback(
&self,
peer: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
) -> Option<Bitfield<Fixed<U128>>> {
self.get_peer_subnets_observed_only(peer).or_else(|| {
// Fallback to ENR
let enr = peer_store.get_custom_data(peer)?;
discovery::committee_bitfield(enr).ok()
peer_store
.get_custom_data(peer)?
.enr
.as_ref()
.and_then(|enr| discovery::committee_bitfield(enr).ok())
})
}

Expand Down Expand Up @@ -303,12 +306,37 @@ impl ConnectionManager {
}

/// Update metrics if connection state changed
pub fn update_metrics_if_changed(&self, changed: bool) {
pub fn update_metrics_if_changed(&self, changed: bool, peer_store: &MemoryStore<PeerInfo>) {
if changed {
metrics::set_gauge(
&PEERS_CONNECTED,
self.connected.len().try_into().unwrap_or(0),
);

let mut anchor_count = 0;
let mut go_ssv_count = 0;
let mut unknown_count = 0;

// Count all connected peers by client type
for peer_id in self.connected.iter() {
if let Some(data) = peer_store.get_custom_data(peer_id) {
match data.client_type {
Some(ClientType::Anchor) => anchor_count += 1,
Some(ClientType::GoSSV) => go_ssv_count += 1,
None => unknown_count += 1,
}
} else {
unknown_count += 1;
}
}

metrics::set_gauge_vec(&crate::metrics::PEERS_BY_CLIENT, &["anchor"], anchor_count);
metrics::set_gauge_vec(&crate::metrics::PEERS_BY_CLIENT, &["go-ssv"], go_ssv_count);
metrics::set_gauge_vec(
&crate::metrics::PEERS_BY_CLIENT,
&["unknown"],
unknown_count,
);
}
}

Expand All @@ -331,7 +359,7 @@ impl ConnectionManager {
&self,
limit_result: Result<(), ConnectionDenied>,
peer: PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
match limit_result {
Expand Down Expand Up @@ -371,7 +399,7 @@ impl ConnectionManager {
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
let limit_result = self
Expand Down Expand Up @@ -407,7 +435,7 @@ impl ConnectionManager {
addr: &Multiaddr,
role_override: Endpoint,
port_use: PortUse,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
let limit_result = self
Expand Down
34 changes: 24 additions & 10 deletions anchor/network/src/peer_manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use subnet_service::SubnetId;
use tracing::debug;

use super::{connection::ConnectionManager, types::ConnectActions};
use crate::{Enr, discovery};
use crate::{Enr, PeerInfo, discovery};

/// Number of times we overdial when we need peers for a subnet
const PEER_OVERDIAL_FACTOR: usize = 2;
Expand All @@ -33,7 +33,7 @@ impl PeerDiscovery {
/// Process a discovered peer and return dial options if we should connect
pub fn process_discovered_peer(
enr: Enr,
peer_store: &mut MemoryStore<Enr>,
peer_store: &mut MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
needed_subnets: &HashSet<SubnetId>,
blocked_peers: &HashSet<PeerId>,
Expand All @@ -51,7 +51,18 @@ impl PeerDiscovery {
addr: multiaddr,
}));
}
peer_store.insert_custom_data(&id, enr.clone());

if let Some(peer_info) = peer_store.get_custom_data_mut(&id) {
peer_info.set_enr(enr);
} else {
peer_store.insert_custom_data(
&id,
PeerInfo {
enr: Some(enr),
client_type: None,
},
)
}

// Check if we should dial this peer
let should_dial =
Expand All @@ -68,7 +79,7 @@ impl PeerDiscovery {
pub fn track_subnet_peers(
subnet_id: SubnetId,
needed_subnets: &mut HashSet<SubnetId>,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> ConnectActions {
Expand All @@ -85,7 +96,7 @@ impl PeerDiscovery {
/// Determine what actions to take for the given subnets
pub fn determine_actions_for_subnets(
subnets: &[SubnetId],
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> ConnectActions {
Expand All @@ -106,7 +117,10 @@ impl PeerDiscovery {
continue;
}

let Some(enr) = record.get_custom_data() else {
let Some(enr) = record
.get_custom_data()
.and_then(|peer_info| peer_info.enr.as_ref())
else {
continue;
};

Expand Down Expand Up @@ -142,7 +156,7 @@ impl PeerDiscovery {
/// Check if any subnets need more peers and return dial/discovery actions
pub fn check_subnet_peers(
needed_subnets: &HashSet<SubnetId>,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> Option<ConnectActions> {
Expand All @@ -162,9 +176,9 @@ impl PeerDiscovery {

/// Get candidate peers that we could potentially dial
fn candidate_peers<'a>(
peer_store: &'a MemoryStore<Enr>,
peer_store: &'a MemoryStore<PeerInfo>,
connected: &HashSet<PeerId>,
) -> Vec<(&'a PeerId, &'a PeerRecord<Enr>)> {
) -> Vec<(&'a PeerId, &'a PeerRecord<PeerInfo>)> {
let mut peers = peer_store
.record_iter()
.filter(|(peer, record)| {
Expand All @@ -176,7 +190,7 @@ impl PeerDiscovery {
}

/// Convert a peer ID to dial options
fn peer_to_dial_opts(peer: &PeerId, peer_store: &MemoryStore<Enr>) -> DialOpts {
fn peer_to_dial_opts(peer: &PeerId, peer_store: &MemoryStore<PeerInfo>) -> DialOpts {
let addresses = peer_store
.addresses_of_peer(peer)
.into_iter()
Expand Down
8 changes: 4 additions & 4 deletions anchor/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use peer_store::memory_store::{self, MemoryStore};
use subnet_service::SubnetId;
use tracing::info;

use crate::{Config, Enr};
use crate::{Config, Enr, peer_manager::types::PeerInfo};

pub mod blocking;
pub mod connection;
Expand All @@ -34,8 +34,8 @@ pub use types::{ConnectActions, Event};

/// Main peer manager that coordinates all peer management functionality
pub struct PeerManager {
peer_store: peer_store::Behaviour<MemoryStore<Enr>>,
connection_manager: ConnectionManager,
pub peer_store: peer_store::Behaviour<MemoryStore<PeerInfo>>,
pub connection_manager: ConnectionManager,
heartbeat_manager: HeartbeatManager,
blocking_manager: BlockingManager,
needed_subnets: HashSet<SubnetId>,
Expand Down Expand Up @@ -334,7 +334,7 @@ impl NetworkBehaviour for PeerManager {

// Update metrics if connection state changed
self.connection_manager
.update_metrics_if_changed(changed_connected);
.update_metrics_if_changed(changed_connected, self.peer_store.store());

// Delegate to sub-components
self.blocking_manager.on_swarm_event(event);
Expand Down
Loading
Loading