diff --git a/Cargo.lock b/Cargo.lock index a098d6eea..7094b7de4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1888,7 +1888,7 @@ dependencies = [ "ssv_network_config", "ssv_types", "strum 0.27.1", - "subnet_tracker", + "subnet_service", "task_executor", "tokio", "tracing", @@ -5196,7 +5196,7 @@ dependencies = [ "processor", "slot_clock", "ssv_types", - "subnet_tracker", + "subnet_service", "tokio", "tracing", ] @@ -5535,7 +5535,7 @@ dependencies = [ "serde_json", "ssv_types", "ssz_types", - "subnet_tracker", + "subnet_service", "task_executor", "thiserror 2.0.12", "tokio", @@ -7800,17 +7800,19 @@ dependencies = [ ] [[package]] -name = "subnet_tracker" +name = "subnet_service" version = "0.1.0" dependencies = [ "alloy", "database", "ethereum_serde_utils", "serde", + "slot_clock", "ssv_types", "task_executor", "tokio", "tracing", + "types", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a818f5ff6..a428011ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ members = [ "anchor/processor", "anchor/qbft_manager", "anchor/signature_collector", - "anchor/subnet_tracker", + "anchor/subnet_service", "anchor/validator_store", ] @@ -58,7 +58,7 @@ qbft_manager = { path = "anchor/qbft_manager" } signature_collector = { path = "anchor/signature_collector" } ssv_network_config = { path = "anchor/common/ssv_network_config" } ssv_types = { path = "anchor/common/ssv_types" } -subnet_tracker = { path = "anchor/subnet_tracker" } +subnet_service = { path = "anchor/subnet_service" } version = { path = "anchor/common/version" } beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "9b84dac" } diff --git a/anchor/client/Cargo.toml b/anchor/client/Cargo.toml index b52cd8718..7aabf56b0 100644 --- a/anchor/client/Cargo.toml +++ b/anchor/client/Cargo.toml @@ -43,7 +43,7 @@ slot_clock = { workspace = true } ssv_network_config = { workspace = true } ssv_types = { workspace = true } strum = { workspace = true } -subnet_tracker = { workspace = true } +subnet_service = { workspace = true } task_executor = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index 8a11488b8..c92c6d535 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -558,11 +558,20 @@ pub struct Node { #[clap( long, - help = "Disables peer scoring altogether.", + help = "Disables gossipsub peer scoring.", display_order = 0, help_heading = FLAG_HEADER )] - pub disable_peer_scoring: bool, + pub disable_gossipsub_peer_scoring: bool, + + #[clap( + long, + help = "Disables gossipsub topic scoring.", + action = ArgAction::Set, + default_value = "true", + hide = true + )] + pub disable_gossipsub_topic_scoring: bool, #[clap(flatten)] pub logging_flags: LoggingFlags, diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 096e39383..e84bae5e5 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -216,7 +216,8 @@ pub fn from_cli(cli_args: &Node) -> Result { config.network.subscribe_all_subnets = cli_args.subscribe_all_subnets; // Network related - set peer scoring configuration - config.network.disable_peer_scoring = cli_args.disable_peer_scoring; + config.network.disable_gossipsub_peer_scoring = cli_args.disable_gossipsub_peer_scoring; + config.network.disable_gossipsub_topic_scoring = cli_args.disable_gossipsub_topic_scoring; config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone(); config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone(); diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 6474adcae..fb2c6fe53 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -40,7 +40,7 @@ use signature_collector::SignatureCollectorManager; use slashing_protection::SlashingDatabase; use slot_clock::{SlotClock, SystemTimeSlotClock}; use ssv_types::OperatorId; -use subnet_tracker::{SubnetId, start_subnet_tracker}; +use subnet_service::{SUBNET_COUNT, SubnetId, start_subnet_service}; use task_executor::TaskExecutor; use tokio::{ net::TcpListener, @@ -200,13 +200,6 @@ impl Client { .map_err(|e| format!("Unable to open Anchor database: {e}"))?, ); - let subnet_tracker = start_subnet_tracker( - database.watch(), - network::SUBNET_COUNT, - config.network.subscribe_all_subnets, - &executor, - ); - // Initialize slashing protection. let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME); let slashing_protection = @@ -436,13 +429,10 @@ impl Client { key.clone(), operator_id, Some(message_validator.clone()), - network::SUBNET_COUNT, + SUBNET_COUNT, )?) } else { - Arc::new(ImpostorMessageSender::new( - network_tx.clone(), - network::SUBNET_COUNT, - )) + Arc::new(ImpostorMessageSender::new(network_tx.clone(), SUBNET_COUNT)) }; // Create the signature collector @@ -465,6 +455,17 @@ impl Client { ) .map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?; + // Start the subnet service now that we have slot_clock + let subnet_service = start_subnet_service::( + database.watch(), + SUBNET_COUNT, + config.network.subscribe_all_subnets, + config.network.disable_gossipsub_topic_scoring, + &executor, + slot_clock.clone(), + spec.clone(), + ); + let (outcome_tx, outcome_rx) = mpsc::channel::(9000); let message_receiver = NetworkMessageReceiver::new( @@ -479,12 +480,12 @@ impl Client { // Start the p2p network let mut network = Network::try_new::( &config.network, - subnet_tracker, + subnet_service, network_rx, Arc::new(message_receiver), outcome_rx, executor.clone(), - &spec, + spec.clone(), ) .await .map_err(|e| format!("Unable to start network: {e}"))?; @@ -495,7 +496,7 @@ impl Client { } // Spawn the network listening task - executor.spawn(network.run(), "network"); + executor.spawn(network.run::(), "network"); let validator_store = AnchorValidatorStore::<_, E>::new( database.watch(), diff --git a/anchor/message_sender/Cargo.toml b/anchor/message_sender/Cargo.toml index 95844a3e4..913c833ce 100644 --- a/anchor/message_sender/Cargo.toml +++ b/anchor/message_sender/Cargo.toml @@ -14,6 +14,6 @@ openssl = { workspace = true } processor = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } -subnet_tracker = { workspace = true } +subnet_service = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/anchor/message_sender/src/impostor.rs b/anchor/message_sender/src/impostor.rs index 81d818bf0..1ae23d624 100644 --- a/anchor/message_sender/src/impostor.rs +++ b/anchor/message_sender/src/impostor.rs @@ -1,5 +1,5 @@ use ssv_types::{CommitteeId, consensus::UnsignedSSVMessage, message::SignedSSVMessage}; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::sync::mpsc; use tracing::debug; diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 94a4d8313..ecd52eee5 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -13,7 +13,7 @@ use ssv_types::{ CommitteeId, OperatorId, consensus::UnsignedSSVMessage, message::SignedSSVMessage, }; use ssz::Encode; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::sync::{mpsc, mpsc::error::TrySendError}; use tracing::{debug, error, warn}; diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index d31cb1690..3aa666c8e 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -33,7 +33,7 @@ serde = { workspace = true } serde_json = "1.0.137" ssv_types = { workspace = true } ssz_types = "0.10" -subnet_tracker = { workspace = true } +subnet_service = { workspace = true } task_executor = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 9251e5acc..32b61b63d 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -102,7 +102,7 @@ impl AnchorBehaviour { .map_err(|e| Gossipsub(e.to_string()))?; // Add peer scoring if not disabled - if !network_config.disable_peer_scoring { + if !network_config.disable_gossipsub_peer_scoring { let slots_per_epoch = E::slots_per_epoch(); let slot_duration = Duration::from_secs(spec.seconds_per_slot); let one_epoch_duration = slot_duration * slots_per_epoch as u32; diff --git a/anchor/network/src/config.rs b/anchor/network/src/config.rs index c2f696696..a94ecbd25 100644 --- a/anchor/network/src/config.rs +++ b/anchor/network/src/config.rs @@ -54,8 +54,11 @@ pub struct Config { /// List of nodes to initially connect to, on Multiaddr format. pub boot_nodes_multiaddr: Vec, - /// Disables peer scoring altogether. - pub disable_peer_scoring: bool, + /// Disables gossipsub peer scoring altogether. + pub disable_gossipsub_peer_scoring: bool, + + /// Disables gossipsub topic scoring and message rate calculations. + pub disable_gossipsub_topic_scoring: bool, /// Disables the discovery protocol from starting. pub disable_discovery: bool, @@ -103,7 +106,8 @@ impl Default for Config { target_peers: 50, boot_nodes_enr: vec![], boot_nodes_multiaddr: vec![], - disable_peer_scoring: false, + disable_gossipsub_peer_scoring: false, + disable_gossipsub_topic_scoring: true, disable_discovery: false, disable_quic_support: false, subscribe_all_subnets: false, diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 5a7a4ddc4..059e19f52 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -37,7 +37,7 @@ use lighthouse_network::{ use ssv_types::domain_type::DomainType; use ssz::{Decode, Encode}; use ssz_types::{BitVector, Bitfield, length::Fixed, typenum::U128}; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::sync::mpsc; use tracing::{debug, error, info, trace, warn}; diff --git a/anchor/network/src/handshake/node_info.rs b/anchor/network/src/handshake/node_info.rs index b99408008..9f4ce7fe4 100644 --- a/anchor/network/src/handshake/node_info.rs +++ b/anchor/network/src/handshake/node_info.rs @@ -1,15 +1,12 @@ use discv5::libp2p_identity::{Keypair, SigningError}; use serde::{Deserialize, Serialize}; use serde_json; -use subnet_tracker::SubnetId; +use subnet_service::{SubnetBits, SubnetId}; use thiserror::Error; -use crate::{ - SubnetBits, - handshake::{ - envelope::{Envelope, make_unsigned}, - node_info::Error::Validation, - }, +use crate::handshake::{ + envelope::{Envelope, make_unsigned}, + node_info::Error::Validation, }; #[derive(Debug, Error)] diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 6e0d39b76..161b2bd1b 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -14,6 +14,3 @@ pub use config::Config; pub use lighthouse_network::{ListenAddr, ListenAddress}; pub use network::Network; pub type Enr = discv5::enr::Enr; - -pub const SUBNET_COUNT: usize = 128; -type SubnetBits = [u8; SUBNET_COUNT / 8]; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 933033adb..61ea831ad 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -19,11 +19,11 @@ use libp2p::{ use lighthouse_network::{discovery::DiscoveredPeers, prometheus_client::registry::Registry}; use message_receiver::{MessageReceiver, Outcome}; use ssv_types::domain_type::DomainType; -use subnet_tracker::{SubnetEvent, SubnetId}; +use subnet_service::{SUBNET_COUNT, SubnetEvent, SubnetId}; use task_executor::TaskExecutor; use thiserror::Error; use tokio::sync::mpsc; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec}; use version::version_with_platform; @@ -37,6 +37,7 @@ use crate::{ network::NetworkError::SwarmConfig, peer_manager, peer_manager::{ConnectActions, PeerManager}, + scoring::topic_score_config::topic_score_params_for_subnet_with_rate, transport::build_transport, }; @@ -74,6 +75,7 @@ pub struct Network { outcome_rx: mpsc::Receiver, domain_type: DomainType, metrics_registry: Option, + spec: Arc, } impl Network { @@ -86,7 +88,7 @@ impl Network { message_receiver: Arc, outcome_rx: mpsc::Receiver, executor: TaskExecutor, - spec: &ChainSpec, + spec: Arc, ) -> Result, Box> { let local_keypair: Keypair = load_private_key(&config.network_dir); @@ -95,7 +97,7 @@ impl Network { let mut metrics_registry = Registry::default(); let behaviour = - AnchorBehaviour::new::(local_keypair.clone(), config, &mut metrics_registry, spec) + AnchorBehaviour::new::(local_keypair.clone(), config, &mut metrics_registry, &spec) .await .map_err(|e| Box::new(NetworkError::Behaviour(e)))?; @@ -127,6 +129,7 @@ impl Network { outcome_rx, domain_type: config.domain_type.clone(), metrics_registry: Some(metrics_registry), + spec, }; info!(%peer_id, "Network starting"); @@ -159,7 +162,7 @@ impl Network { } /// Main loop for polling and handling swarm and channels. - pub async fn run(mut self) { + pub async fn run(mut self) { loop { tokio::select! { swarm_message = self.swarm.select_next_some() => { @@ -222,7 +225,7 @@ impl Network { } }, Some(event) = self.subnet_event_receiver.recv() => { - self.on_subnet_tracker_event(event) + self.on_subnet_tracker_event::(event) } event = self.message_rx.recv() => { match event { @@ -272,13 +275,73 @@ impl Network { } } - fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { + /// Update topic score parameters for a subnet with pre-calculated message rate + fn update_topic_score_for_subnet_with_rate( + &mut self, + subnet: SubnetId, + topic: IdentTopic, + message_rate: f64, + ) { + debug!( + subnet = *subnet, + topic = %topic, + message_rate = message_rate, + "Setting topic score parameters with pre-calculated message rate" + ); + + // Generate topic-specific score parameters using pre-calculated message rate + let topic_score_params = topic_score_params_for_subnet_with_rate::( + subnet, + SUBNET_COUNT, + message_rate, + &self.spec, + ); + + // Apply the score parameters to the topic + match self + .swarm + .behaviour_mut() + .gossipsub + .set_topic_params(topic.clone(), topic_score_params) + { + Ok(_) => { + debug!( + subnet = *subnet, + topic = %topic, + message_rate = message_rate, + "Successfully updated topic score parameters with pre-calculated rate" + ); + } + Err(e) => { + warn!( + subnet = *subnet, + topic = %topic, + error = %e, + "Failed to set topic score params with pre-calculated rate" + ); + } + } + } + + fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { let (subnet, subscribed) = match event { - SubnetEvent::Join(subnet) => { - if let Err(err) = self.gossipsub().subscribe(&subnet_to_topic(subnet)) { + SubnetEvent::Join(subnet, message_rate_opt) => { + let topic = subnet_to_topic(subnet); + if let Err(err) = self.gossipsub().subscribe(&topic) { error!(?err, subnet = *subnet, "can't subscribe"); return; } + + // Only set topic score parameters if message rate is provided (scoring enabled) + if let Some(message_rate) = message_rate_opt { + self.update_topic_score_for_subnet_with_rate::(subnet, topic, message_rate); + } else { + debug!( + subnet = *subnet, + "Skipping topic score parameter setup - gossipsub scoring disabled" + ); + } + let actions = self.peer_manager().join_subnet(subnet); self.handle_connect_actions(actions); (subnet, true) @@ -287,6 +350,20 @@ impl Network { self.gossipsub().unsubscribe(&subnet_to_topic(subnet)); (subnet, false) } + SubnetEvent::RateUpdate(subnet, message_rate) => { + let topic = subnet_to_topic(subnet); + + debug!( + subnet = *subnet, + message_rate = message_rate, + "Updating topic scores for subnet due to rate changes" + ); + + self.update_topic_score_for_subnet_with_rate::(subnet, topic, message_rate); + + // No subscription change needed, just score update + return; + } }; // update enr and metadata to new state diff --git a/anchor/network/src/peer_manager.rs b/anchor/network/src/peer_manager.rs index 3f543667f..f400e4b62 100644 --- a/anchor/network/src/peer_manager.rs +++ b/anchor/network/src/peer_manager.rs @@ -24,7 +24,7 @@ use peer_store::{ }; use rand::seq::SliceRandom; use ssz_types::{Bitfield, length::Fixed, typenum::U128}; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::time::{MissedTickBehavior, interval}; use tracing::{debug, info}; diff --git a/anchor/network/src/scoring/mod.rs b/anchor/network/src/scoring/mod.rs index 4895af3d2..1442684e4 100644 --- a/anchor/network/src/scoring/mod.rs +++ b/anchor/network/src/scoring/mod.rs @@ -1,4 +1,3 @@ -mod message_rate; pub(crate) mod peer_score_config; pub(crate) mod topic_score_config; diff --git a/anchor/network/src/scoring/topic_score_config.rs b/anchor/network/src/scoring/topic_score_config.rs index 3082e3cb3..6319fd606 100644 --- a/anchor/network/src/scoring/topic_score_config.rs +++ b/anchor/network/src/scoring/topic_score_config.rs @@ -6,14 +6,12 @@ use std::time::Duration; use gossipsub::TopicScoreParams; -use ssv_types::CommitteeInfo; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tracing::{debug, warn}; use types::{ChainSpec, EthSpec}; use crate::scoring::{ decay_threshold, - message_rate::calculate_message_rate_for_topic, peer_score_config::{GRAYLIST_THRESHOLD, calculate_score_decay_factor, decay_convergence}, }; @@ -42,8 +40,6 @@ const MAX_INVALID_MESSAGES_ALLOWED: usize = 20; /// Network-wide configuration options for topic scoring #[derive(Debug, Clone)] pub struct NetworkConfig { - /// Total number of active validators in the network - pub active_validators: u64, /// Number of subnets in the network pub subnets: usize, /// Duration of one epoch @@ -111,18 +107,16 @@ impl Default for TopicConfig { } impl TopicScoringOptions { - /// Create new options with the given network parameters - pub fn new( - active_validators: u64, + /// Create new options with the given network parameters and pre-calculated message rate + pub fn new_with_rate( subnets: usize, - committees: &[CommitteeInfo], + message_rate: f64, chain_spec: &ChainSpec, ) -> Self { let slot_duration = Duration::from_secs(chain_spec.seconds_per_slot); let one_epoch_duration = E::slots_per_epoch() as u32 * slot_duration; let network = NetworkConfig { - active_validators, subnets, one_epoch_duration, total_topics_weight: TOTAL_TOPICS_WEIGHT, @@ -133,7 +127,7 @@ impl TopicScoringOptions { topic_weight: network.total_topics_weight / subnets as f64, /* Set topic weight with * equal weights across * all subnets */ - expected_msg_rate: calculate_message_rate_for_topic::(committees, chain_spec), + expected_msg_rate: message_rate, ..Default::default() }; @@ -311,32 +305,24 @@ impl TopicScoringOptions { } } -/// Generate topic score parameters for a specific subnet -pub fn topic_score_params_for_subnet( +/// Generate topic score parameters for a specific subnet with pre-calculated message rate +pub fn topic_score_params_for_subnet_with_rate( subnet: SubnetId, - validator_count: u64, - subnet_count: u64, - committees: &[CommitteeInfo], + subnet_count: usize, + message_rate: f64, chain_spec: &ChainSpec, ) -> TopicScoreParams { - // Create options using committee-based calculation with the new message rate function - let opts = TopicScoringOptions::new::( - validator_count, - subnet_count as usize, - committees, - chain_spec, - ); + // Create options using pre-calculated message rate + let opts = TopicScoringOptions::new_with_rate::(subnet_count, message_rate, chain_spec); // Generate and return parameters match opts.to_topic_score_params() { Ok(params) => { debug!( subnet = *subnet, - validator_count = validator_count, - committee_count = committees.len(), expected_rate = opts.topic.expected_msg_rate, topic_weight = opts.topic.topic_weight, - "Generated topic score parameters for subnet" + "Generated topic score parameters for subnet with pre-calculated rate" ); params } @@ -346,7 +332,6 @@ pub fn topic_score_params_for_subnet( error = %e, "Failed to generate topic score parameters, using defaults" ); - // Return safe default parameters TopicScoreParams::default() } } diff --git a/anchor/subnet_tracker/Cargo.toml b/anchor/subnet_service/Cargo.toml similarity index 81% rename from anchor/subnet_tracker/Cargo.toml rename to anchor/subnet_service/Cargo.toml index 26beb5192..be91aa67b 100644 --- a/anchor/subnet_tracker/Cargo.toml +++ b/anchor/subnet_service/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "subnet_tracker" +name = "subnet_service" version = "0.1.0" edition = { workspace = true } authors = ["Sigma Prime "] @@ -9,7 +9,9 @@ alloy = { workspace = true } database = { workspace = true } ethereum_serde_utils = "0.7.0" serde = { workspace = true } +slot_clock = { workspace = true } ssv_types = { workspace = true } task_executor = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +types = { workspace = true } diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs new file mode 100644 index 000000000..cfa9d61dc --- /dev/null +++ b/anchor/subnet_service/src/lib.rs @@ -0,0 +1,343 @@ +use std::{collections::HashSet, ops::Deref, sync::Arc, time::Duration}; + +use alloy::primitives::ruint::aliases::U256; +use database::{NetworkState, NonUniqueIndex, UniqueIndex}; +use serde::{Deserialize, Serialize}; +use slot_clock::SlotClock; +use ssv_types::{CommitteeId, CommitteeInfo}; +use task_executor::TaskExecutor; +use tokio::{ + sync::{mpsc, watch}, + time::sleep, +}; +use tracing::{debug, error, warn}; +use types::{ChainSpec, EthSpec}; + +pub mod message_rate; + +pub const SUBNET_COUNT: usize = 128; +pub type SubnetBits = [u8; SUBNET_COUNT / 8]; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct SubnetId(#[serde(with = "serde_utils::quoted_u64")] u64); + +impl SubnetId { + pub fn new(id: u64) -> Self { + id.into() + } + + pub fn from_committee(committee_id: CommitteeId, subnet_count: usize) -> Self { + // Derive a numeric "committee ID" and convert to an index in [0..subnet_count]. + let id = U256::from_be_bytes(*committee_id); + SubnetId( + (id % U256::from(subnet_count)) + .try_into() + .expect("modulo must be < subnet_count"), + ) + } +} + +impl From for SubnetId { + fn from(x: u64) -> Self { + Self(x) + } +} + +impl Deref for SubnetId { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub enum SubnetEvent { + Join(SubnetId, Option), // subnet_id and optional message_rate + Leave(SubnetId), + /// Message rate has changed for an already-joined subnet + RateUpdate(SubnetId, f64), /* subnet_id and new message_rate (only emitted when scoring is + * enabled) */ +} + +pub fn start_subnet_service( + db: watch::Receiver, + subnet_count: usize, + subscribe_all_subnets: bool, + disable_gossipsub_topic_scoring: bool, + executor: &TaskExecutor, + slot_clock: impl SlotClock + 'static, + chain_spec: Arc, +) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(if subscribe_all_subnets { + subnet_count + } else { + 1 + }); + + executor.spawn( + subnet_service::( + tx, + db, + subnet_count, + subscribe_all_subnets, + disable_gossipsub_topic_scoring, + slot_clock, + chain_spec, + ), + "subnet_service", + ); + + rx +} + +/// The main background task: +/// - Gathers the current subnets from `NetworkState`. +/// - Compares them to the previously-seen subnets. +/// - Emits `Join` events for newly-added subnets and `Leave` events for removed subnets. +/// - Recalculates topic scores for all subnets at epoch boundaries. +async fn subnet_service( + tx: mpsc::Sender, + mut db: watch::Receiver, + subnet_count: usize, + subscribe_all_subnets: bool, + disable_gossipsub_topic_scoring: bool, + slot_clock: impl SlotClock, + chain_spec: Arc, +) { + // If subscribe_all_subnets is true, initialize by joining all subnets + if subscribe_all_subnets { + let initial_events: Vec<_> = { + let current_state = db.borrow(); + (0..(subnet_count as u64)) + .map(SubnetId) + .map(|subnet| { + let message_rate = if disable_gossipsub_topic_scoring { + None + } else { + let committees_info = + get_committee_info_for_subnet(&subnet, &*current_state); + Some(message_rate::calculate_message_rate_for_topic::( + &committees_info, + &chain_spec, + )) + }; + (subnet, message_rate) + }) + .collect() + }; + + for (subnet, message_rate) in initial_events { + if let Err(err) = tx.send(SubnetEvent::Join(subnet, message_rate)).await { + error!( + ?err, + subnet = *subnet, + "Failed to send subnet join event during initialization" + ); + return; // If we can't send, the receiver is dropped, so exit + } + } + + // If scoring is disabled, we've sent all Join events and there's nothing more to do + if disable_gossipsub_topic_scoring { + debug!("All subnets joined and scoring disabled - subnet service task complete"); + return; + } + } + + // `previous_subnets` tracks which subnets were joined in the last iteration. + // For subscribe_all_subnets, we track all subnets; otherwise, only the ones we're subscribed + // to. + let mut previous_subnets = if subscribe_all_subnets { + (0..(subnet_count as u64)).map(SubnetId).collect() + } else { + HashSet::new() + }; + + // Calculate duration until the first epoch boundary + let mut next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); + + loop { + tokio::select! { + // Handle database changes for subnet join/leave (only if not subscribe_all_subnets) + _ = db.changed(), if !subscribe_all_subnets => { + handle_subnet_changes::(&tx, &mut db, &mut previous_subnets, subnet_count, &chain_spec, disable_gossipsub_topic_scoring).await; + } + + // Handle scheduled epoch boundaries (for both modes, but only if scoring is enabled) + _ = sleep(next_epoch_delay), if !disable_gossipsub_topic_scoring => { + handle_epoch_committee_update::(&tx, &mut db, &previous_subnets, &chain_spec).await; + // Recalculate the next epoch delay only after we've processed the epoch boundary + next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); + } + } + } +} + +/// Calculate duration until the next epoch boundary +fn calculate_duration_to_next_epoch(slot_clock: &impl SlotClock) -> Duration { + if let Some(duration_to_next_epoch) = slot_clock.duration_to_next_epoch(E::slots_per_epoch()) { + duration_to_next_epoch + } else { + // Fallback: if we can't get current slot, use a conservative short interval + let slot_duration = slot_clock.slot_duration(); + warn!("Could not get current slot for epoch delay calculation, using fallback timing"); + slot_duration * 3 // Wait 3 slots before next check + } +} + +/// Handle subnet join/leave events when database changes +async fn handle_subnet_changes( + tx: &mpsc::Sender, + db: &mut watch::Receiver, + previous_subnets: &mut HashSet, + subnet_count: usize, + chain_spec: &ChainSpec, + disable_gossipsub_topic_scoring: bool, +) { + // Build the `current_subnets` set by examining the clusters we own. + let mut current_subnets = HashSet::new(); + + // Get current subnets from database + { + let state = db.borrow(); + for cluster_id in state.get_own_clusters() { + if let Some(cluster) = state.clusters().get_by(cluster_id) { + let subnet_id = SubnetId::from_committee(cluster.committee_id(), subnet_count); + current_subnets.insert(subnet_id); + } + } + } + + // For every subnet that was previously joined but is no longer in `current_subnets`, + // send a `Leave` event. + for subnet in previous_subnets.difference(¤t_subnets) { + debug!(?subnet, "send leave"); + if tx.send(SubnetEvent::Leave(*subnet)).await.is_err() { + warn!("Network no longer listening for subnets"); + return; + } + } + + // For every subnet that was not previously joined but is now in `current_subnets`, + // send a `Join` event. + for subnet in current_subnets.difference(previous_subnets) { + debug!(?subnet, "send join"); + // Calculate current message rate for this subnet (or None if scoring is disabled) + let message_rate = if disable_gossipsub_topic_scoring { + None + } else { + let state = db.borrow(); + Some(calculate_message_rate_for_subnet::( + subnet, &*state, chain_spec, + )) + }; + + if tx + .send(SubnetEvent::Join(*subnet, message_rate)) + .await + .is_err() + { + warn!("Network no longer listening for subnets"); + return; + } + } + + // Update the previous_subnets for next iteration + *previous_subnets = current_subnets; +} + +/// Handle epoch-based committee updates for all currently joined subnets +async fn handle_epoch_committee_update( + tx: &mpsc::Sender, + db: &mut watch::Receiver, + current_subnets: &HashSet, + chain_spec: &ChainSpec, +) { + debug!( + subnet_count = current_subnets.len(), + "Recalculating message rates for all subnets at epoch boundary" + ); + + // Recalculate message rates for all currently joined subnets + for &subnet in current_subnets { + let message_rate = { + let state = db.borrow(); + calculate_message_rate_for_subnet::(&subnet, &*state, chain_spec) + }; + + if tx + .send(SubnetEvent::RateUpdate(subnet, message_rate)) + .await + .is_err() + { + warn!("Network no longer listening for subnets"); + return; + } + } +} + +/// Calculate message rate for a specific subnet from the current network state +pub fn calculate_message_rate_for_subnet( + subnet: &SubnetId, + network_state: impl Deref, + chain_spec: &ChainSpec, +) -> f64 { + let committees_info = get_committee_info_for_subnet(subnet, network_state); + message_rate::calculate_message_rate_for_topic::(&committees_info, chain_spec) +} + +/// Get committee info for a specific subnet from the current network state +/// +/// This function retrieves clusters for the subnet and converts them to CommitteeInfo +/// which includes both the committee members and validator indices. +pub fn get_committee_info_for_subnet( + subnet: &SubnetId, + network_state: impl Deref, +) -> Vec { + network_state + .clusters() + .values() + .filter(|cluster| { + let cluster_subnet = SubnetId::from_committee(cluster.committee_id(), SUBNET_COUNT); + cluster_subnet == *subnet + }) + .map(|cluster| { + // Convert cluster to CommitteeInfo by getting validator indices + let validator_indices = network_state + .metadata() + .get_all_by(&cluster.cluster_id) + .flat_map(|metadata| metadata.index) + .collect::>(); + + CommitteeInfo { + committee_members: cluster.cluster_members.clone(), + validator_indices, + } + }) + .collect() +} + +/// only useful for testing - introduce feature flag? +pub fn test_tracker( + executor: TaskExecutor, + events: Vec, + msg_delay: Duration, +) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(1); + + executor.spawn( + async move { + for event in events { + sleep(msg_delay).await; + tx.send(event).await.unwrap(); + } + while !tx.is_closed() { + sleep(Duration::from_millis(100)).await; + } + }, + "test_subnet_tracker", + ); + + rx +} diff --git a/anchor/network/src/scoring/message_rate.rs b/anchor/subnet_service/src/message_rate.rs similarity index 100% rename from anchor/network/src/scoring/message_rate.rs rename to anchor/subnet_service/src/message_rate.rs diff --git a/anchor/subnet_tracker/src/lib.rs b/anchor/subnet_tracker/src/lib.rs deleted file mode 100644 index 6659d07d7..000000000 --- a/anchor/subnet_tracker/src/lib.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::{collections::HashSet, ops::Deref, time::Duration}; - -use alloy::primitives::ruint::aliases::U256; -use database::{NetworkState, UniqueIndex}; -use serde::{Deserialize, Serialize}; -use ssv_types::CommitteeId; -use task_executor::TaskExecutor; -use tokio::{ - sync::{mpsc, watch}, - time::sleep, -}; -use tracing::{debug, error, warn}; - -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(transparent)] -pub struct SubnetId(#[serde(with = "serde_utils::quoted_u64")] u64); - -impl SubnetId { - pub fn new(id: u64) -> Self { - id.into() - } - - pub fn from_committee(committee_id: CommitteeId, subnet_count: usize) -> Self { - // Derive a numeric "committee ID" and convert to an index in [0..subnet_count]. - let id = U256::from_be_bytes(*committee_id); - SubnetId( - (id % U256::from(subnet_count)) - .try_into() - .expect("modulo must be < subnet_count"), - ) - } -} - -impl From for SubnetId { - fn from(x: u64) -> Self { - Self(x) - } -} - -impl Deref for SubnetId { - type Target = u64; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -pub enum SubnetEvent { - Join(SubnetId), - Leave(SubnetId), -} - -pub fn start_subnet_tracker( - db: watch::Receiver, - subnet_count: usize, - subscribe_all_subnets: bool, - executor: &TaskExecutor, -) -> mpsc::Receiver { - if !subscribe_all_subnets { - // a channel capacity of 1 is fine - the subnet_tracker does not do anything else, it can - // wait. - let (tx, rx) = mpsc::channel(1); - executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_tracker"); - rx - } else { - let (tx, rx) = mpsc::channel(subnet_count); - for subnet in (0..(subnet_count as u64)).map(SubnetId) { - if let Err(err) = tx.try_send(SubnetEvent::Join(subnet)) { - error!(?err, "Impossible error while subscribing to all subnets"); - } - } - rx - } -} - -/// The main background task: -/// - Gathers the current subnets from `NetworkState`. -/// - Compares them to the previously-seen subnets. -/// - Emits `Join` events for newly-added subnets and `Leave` events for removed subnets. -async fn subnet_tracker( - tx: mpsc::Sender, - mut db: watch::Receiver, - subnet_count: usize, -) { - // `previous_subnets` tracks which subnets were joined in the last iteration. - let mut previous_subnets = HashSet::new(); - - loop { - // Build the `current_subnets` set by examining the clusters we own. - let mut current_subnets = HashSet::new(); - - // do not await while holding lock! - // explicit scope needed because rustc cant handle equivalent drop(state) - { - // Acquire the current snapshot of the database state (this is synchronous). - let state = db.borrow(); - for cluster_id in state.get_own_clusters() { - if let Some(cluster) = state.clusters().get_by(cluster_id) { - let subnet_id = SubnetId::from_committee(cluster.committee_id(), subnet_count); - current_subnets.insert(subnet_id); - } - } - } - - // For every subnet that was previously joined but is no longer in `current_subnets`, - // send a `Leave` event. - for subnet in previous_subnets.difference(¤t_subnets) { - debug!(?subnet, "send leave"); - if tx.send(SubnetEvent::Leave(*subnet)).await.is_err() { - warn!("Network no longer listening for subnets"); - return; - } - } - - // For every subnet that was not previously joined but is now in `current_subnets`, - // send a `Join` event. - for subnet in current_subnets.difference(&previous_subnets) { - debug!(?subnet, "send join"); - if tx.send(SubnetEvent::Join(*subnet)).await.is_err() { - warn!("Network no longer listening for subnets"); - return; - } - } - - // Update `previous_subnets` to reflect the current snapshot for the next iteration. - previous_subnets = current_subnets; - - // Wait for the watch channel to signal a changed value before re-running the loop. - if db.changed().await.is_err() { - warn!("Database no longer provides updates"); - return; - } - } -} - -/// only useful for testing - introduce feature flag? -pub fn test_tracker( - executor: TaskExecutor, - events: Vec, - msg_delay: Duration, -) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(1); - - executor.spawn( - async move { - for event in events { - sleep(msg_delay).await; - tx.send(event).await.unwrap(); - } - while !tx.is_closed() { - sleep(Duration::from_millis(100)).await; - } - }, - "test_subnet_tracker", - ); - - rx -}