Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anchor/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ pub use config::{Config, DEFAULT_DISC_PORT, DEFAULT_QUIC_PORT, DEFAULT_TCP_PORT}
pub use network::Network;
pub use network_utils::listen_addr::{ListenAddr, ListenAddress};
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;
pub use peer_manager::types::{ClientType, PeerInfo};
8 changes: 8 additions & 0 deletions anchor/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ use metrics::*;
pub static PEERS_CONNECTED: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge("libp2p_peers", "Count of libp2p peers currently connected")
});

pub static PEERS_BY_CLIENT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"libp2p_peers_by_client",
"Count of connected peers by client type (anchor, go-ssv, unknown)",
&["client_type"],
)
});
48 changes: 42 additions & 6 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ use types::{ChainSpec, EthSpec};
use version::version_with_platform;

use crate::{
Config, Enr,
ClientType, Config, Enr,
behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError},
discovery::{DiscoveredPeers, Discovery, DiscoveryError},
handshake,
handshake::node_info::{NodeInfo, NodeMetadata},
handshake::{
self,
node_info::{NodeInfo, NodeMetadata},
},
keypair_utils::load_private_key,
network::NetworkError::SwarmConfig,
peer_manager,
peer_manager::{ConnectActions, PeerManager},
peer_manager::{self, ConnectActions, PeerManager},
scoring::topic_score_config::topic_score_params_for_subnet_with_rate,
transport::build_transport,
};
Expand Down Expand Up @@ -512,7 +513,42 @@ impl<R: MessageReceiver> Network<R> {
their_info,
}) => {
debug!(%peer_id, ?their_info, "Handshake completed");
// Update peer store with their_info

if let Some(metadata) = their_info.metadata {
let client_type = ClientType::from(metadata.node_version);

let peer_info_opt = self
.swarm
.behaviour()
.peer_manager
.peer_store
.store()
.get_custom_data(&peer_id)
.cloned();

if let Some(mut peer_info) = peer_info_opt {
// Update the client type
peer_info.client_type = Some(client_type);

// Insert back into peer store
{
let behaviour = self.swarm.behaviour_mut();
behaviour
.peer_manager
.peer_store
.store_mut()
.insert_custom_data(&peer_id, peer_info);
}

// Trigger metric recalculation
let behaviour = self.swarm.behaviour();
let store_ref = behaviour.peer_manager.peer_store.store();
behaviour
.peer_manager
.connection_manager
.update_metrics_if_changed(true, Some(store_ref));
}
}
}
Err(handshake::Failed { peer_id, error }) => {
debug!(%peer_id, ?error, "Handshake failed");
Expand Down
51 changes: 41 additions & 10 deletions anchor/network/src/peer_manager/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ssz_types::{Bitfield, length::Fixed, typenum::U128};
use subnet_service::SubnetId;
use thiserror::Error;

use crate::{Config, Enr, discovery, metrics::PEERS_CONNECTED};
use crate::{ClientType, Config, PeerInfo, discovery, metrics::PEERS_CONNECTED};

/// A fraction of `target_peers` that we allow to connect to us in excess of
/// `target_peers`. For clarity, if `target_peers` is 50 and
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ConnectionManager {
pub fn should_dial_peer(
&self,
peer_id: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
blocked_peers: &HashSet<PeerId>,
) -> bool {
Expand All @@ -125,7 +125,7 @@ impl ConnectionManager {
pub fn qualifies_for_priority_connection(
&self,
peer_id: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> bool {
let Some(subnets) = self.get_peer_subnets_with_enr_fallback(peer_id, peer_store) else {
Expand Down Expand Up @@ -194,7 +194,7 @@ impl ConnectionManager {
pub fn peer_offers_needed_subnets_with_enr_fallback(
&self,
peer: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed: &HashSet<SubnetId>,
) -> bool {
if needed.is_empty() {
Expand Down Expand Up @@ -232,11 +232,11 @@ impl ConnectionManager {
fn get_peer_subnets_with_enr_fallback(
&self,
peer: &PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
) -> Option<Bitfield<Fixed<U128>>> {
self.get_peer_subnets_observed_only(peer).or_else(|| {
// Fallback to ENR
let enr = peer_store.get_custom_data(peer)?;
let enr = &peer_store.get_custom_data(peer)?.enr;
discovery::committee_bitfield(enr).ok()
})
}
Expand All @@ -257,12 +257,43 @@ impl ConnectionManager {
}

/// Update metrics if connection state changed
pub fn update_metrics_if_changed(&self, changed: bool) {
pub fn update_metrics_if_changed(
&self,
changed: bool,
peer_store: Option<&MemoryStore<PeerInfo>>,
) {
if changed {
metrics::set_gauge(
&PEERS_CONNECTED,
self.connected.len().try_into().unwrap_or(0),
);

if let Some(peer_store) = peer_store {
let mut anchor_count = 0;
let mut go_ssv_count = 0;
let mut unknown_count = 0;

// Count all connected peers by client type
for peer_id in self.connected.iter() {
if let Some(data) = peer_store.get_custom_data(peer_id) {
match data.client_type {
Some(ClientType::Anchor) => anchor_count += 1,
Some(ClientType::GoSSV) => go_ssv_count += 1,
None => unknown_count += 1,
}
} else {
unknown_count += 1;
}
}

metrics::set_gauge_vec(&crate::metrics::PEERS_BY_CLIENT, &["anchor"], anchor_count);
metrics::set_gauge_vec(&crate::metrics::PEERS_BY_CLIENT, &["go-ssv"], go_ssv_count);
metrics::set_gauge_vec(
&crate::metrics::PEERS_BY_CLIENT,
&["unknown"],
unknown_count,
);
}
}
}

Expand All @@ -285,7 +316,7 @@ impl ConnectionManager {
&self,
limit_result: Result<(), ConnectionDenied>,
peer: PeerId,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
match limit_result {
Expand Down Expand Up @@ -325,7 +356,7 @@ impl ConnectionManager {
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
let limit_result = self
Expand Down Expand Up @@ -361,7 +392,7 @@ impl ConnectionManager {
addr: &Multiaddr,
role_override: Endpoint,
port_use: PortUse,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
needed_subnets: &HashSet<SubnetId>,
) -> Result<(), ConnectionDenied> {
let limit_result = self
Expand Down
28 changes: 17 additions & 11 deletions anchor/network/src/peer_manager/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use subnet_service::SubnetId;
use tracing::debug;

use super::{connection::ConnectionManager, types::ConnectActions};
use crate::{Enr, discovery};
use crate::{Enr, PeerInfo, discovery};

/// Number of times we overdial when we need peers for a subnet
const PEER_OVERDIAL_FACTOR: usize = 2;
Expand All @@ -33,7 +33,7 @@ impl PeerDiscovery {
/// Process a discovered peer and return dial options if we should connect
pub fn process_discovered_peer(
enr: Enr,
peer_store: &mut MemoryStore<Enr>,
peer_store: &mut MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
needed_subnets: &HashSet<SubnetId>,
blocked_peers: &HashSet<PeerId>,
Expand All @@ -49,7 +49,13 @@ impl PeerDiscovery {
addr: multiaddr,
}));
}
peer_store.insert_custom_data(&id, enr.clone());
peer_store.insert_custom_data(
&id,
PeerInfo {
enr,
client_type: None,
},
);

// Check if we should dial this peer
let should_dial =
Expand All @@ -66,7 +72,7 @@ impl PeerDiscovery {
pub fn track_subnet_peers(
subnet_id: SubnetId,
needed_subnets: &mut HashSet<SubnetId>,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> ConnectActions {
Expand All @@ -83,7 +89,7 @@ impl PeerDiscovery {
/// Determine what actions to take for the given subnets
pub fn determine_actions_for_subnets(
subnets: &[SubnetId],
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> ConnectActions {
Expand All @@ -104,11 +110,11 @@ impl PeerDiscovery {
continue;
}

let Some(enr) = record.get_custom_data() else {
let Some(peer_info) = record.get_custom_data() else {
continue;
};

let subnets = discovery::committee_bitfield(enr).unwrap_or_default();
let subnets = discovery::committee_bitfield(&peer_info.enr).unwrap_or_default();

let mut relevant = false;
for subnet in subnets
Expand Down Expand Up @@ -140,7 +146,7 @@ impl PeerDiscovery {
/// Check if any subnets need more peers and return dial/discovery actions
pub fn check_subnet_peers(
needed_subnets: &HashSet<SubnetId>,
peer_store: &MemoryStore<Enr>,
peer_store: &MemoryStore<PeerInfo>,
connection_manager: &ConnectionManager,
blocked_peers: &HashSet<PeerId>,
) -> Option<ConnectActions> {
Expand All @@ -160,9 +166,9 @@ impl PeerDiscovery {

/// Get candidate peers that we could potentially dial
fn candidate_peers<'a>(
peer_store: &'a MemoryStore<Enr>,
peer_store: &'a MemoryStore<PeerInfo>,
connected: &HashSet<PeerId>,
) -> Vec<(&'a PeerId, &'a PeerRecord<Enr>)> {
) -> Vec<(&'a PeerId, &'a PeerRecord<PeerInfo>)> {
let mut peers = peer_store
.record_iter()
.filter(|(peer, record)| {
Expand All @@ -174,7 +180,7 @@ impl PeerDiscovery {
}

/// Convert a peer ID to dial options
fn peer_to_dial_opts(peer: &PeerId, peer_store: &MemoryStore<Enr>) -> DialOpts {
fn peer_to_dial_opts(peer: &PeerId, peer_store: &MemoryStore<PeerInfo>) -> DialOpts {
let addresses = peer_store
.addresses_of_peer(peer)
.into_iter()
Expand Down
8 changes: 4 additions & 4 deletions anchor/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use peer_store::memory_store::{self, MemoryStore};
use subnet_service::SubnetId;
use tracing::info;

use crate::{Config, Enr};
use crate::{Config, Enr, peer_manager::types::PeerInfo};

pub mod blocking;
pub mod connection;
Expand All @@ -34,8 +34,8 @@ pub use types::{ConnectActions, Event};

/// Main peer manager that coordinates all peer management functionality
pub struct PeerManager {
peer_store: peer_store::Behaviour<MemoryStore<Enr>>,
connection_manager: ConnectionManager,
pub peer_store: peer_store::Behaviour<MemoryStore<PeerInfo>>,
pub connection_manager: ConnectionManager,
heartbeat_manager: HeartbeatManager,
blocking_manager: BlockingManager,
needed_subnets: HashSet<SubnetId>,
Expand Down Expand Up @@ -304,7 +304,7 @@ impl NetworkBehaviour for PeerManager {

// Update metrics if connection state changed
self.connection_manager
.update_metrics_if_changed(changed_connected);
.update_metrics_if_changed(changed_connected, Some(self.peer_store.store()));

// Delegate to sub-components
self.blocking_manager.on_swarm_event(event);
Expand Down
34 changes: 34 additions & 0 deletions anchor/network/src/peer_manager/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use libp2p::swarm::dial_opts::DialOpts;
use subnet_service::SubnetId;

use crate::Enr;

/// Actions that the peer manager can request from the network
#[derive(Debug)]
pub struct ConnectActions {
Expand All @@ -27,3 +29,35 @@ pub enum Event {
PeerStore(peer_store::memory_store::Event),
Heartbeat(crate::peer_manager::heartbeat::Event),
}

/// Different types of clients
#[derive(Debug, Clone, Copy)]
pub enum ClientType {
Anchor,
GoSSV,
}

// Not sure how to get ClientType properly from Handshake data
impl From<String> for ClientType {
fn from(value: String) -> Self {
if value.starts_with("Anchor/") {
Self::Anchor
} else {
Self::GoSSV
}
}
}
#[derive(Clone)]
pub struct PeerInfo {
pub enr: Enr,
pub client_type: Option<ClientType>,
}

impl PeerInfo {
pub fn set_client_type(&mut self, client_type: ClientType) -> Self {
PeerInfo {
enr: self.enr.clone(),
client_type: Some(client_type),
}
}
}