From 7516535422520897a11a72c1df8526daeafed50d Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 4 Jul 2025 00:26:55 +0200 Subject: [PATCH 01/16] set param for topic --- Cargo.lock | 1 + anchor/client/src/lib.rs | 1 + anchor/network/Cargo.toml | 1 + anchor/network/src/network.rs | 110 ++++++++++++++++++++++++++++++++-- 4 files changed, 107 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83c8add7b..152644885 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5569,6 +5569,7 @@ name = "network" version = "0.1.0" dependencies = [ "async-trait", + "database", "dirs 6.0.0", "discv5", "ethereum_ssz", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 4adfab5dc..c1d9d1612 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -482,6 +482,7 @@ impl Client { outcome_rx, executor.clone(), &spec, + database.watch(), ) .await .map_err(|e| format!("Unable to start network: {e}"))?; diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index d31cb1690..b9556573b 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -6,6 +6,7 @@ authors = ["Sigma Prime "] [dependencies] async-trait = "0.1.85" +database = { workspace = true } dirs = { workspace = true } discv5 = { workspace = true } ethereum_ssz = { workspace = true } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 933033adb..2d7e802d7 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -6,8 +6,9 @@ use std::{ time::Instant, }; +use database::{NetworkState, NonUniqueIndex}; use futures::StreamExt; -use gossipsub::{IdentTopic, PublishError}; +use gossipsub::{Hasher, IdentTopic, PublishError, Topic}; use libp2p::{ Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, core::{ConnectedPoint, muxing::StreamMuxerBox, transport::Boxed}, @@ -18,17 +19,17 @@ use libp2p::{ }; use lighthouse_network::{discovery::DiscoveredPeers, prometheus_client::registry::Registry}; use message_receiver::{MessageReceiver, Outcome}; -use ssv_types::domain_type::DomainType; +use ssv_types::{CommitteeInfo, domain_type::DomainType}; use subnet_tracker::{SubnetEvent, SubnetId}; use task_executor::TaskExecutor; use thiserror::Error; -use tokio::sync::mpsc; -use tracing::{debug, error, info, trace}; +use tokio::sync::{mpsc, watch}; +use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec}; use version::version_with_platform; use crate::{ - Config, Enr, + Config, Enr, SUBNET_COUNT, behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError}, discovery::{Discovery, DiscoveryError}, handshake, @@ -37,6 +38,7 @@ use crate::{ network::NetworkError::SwarmConfig, peer_manager, peer_manager::{ConnectActions, PeerManager}, + scoring::topic_score_config::topic_score_params_for_subnet, transport::build_transport, }; @@ -74,11 +76,13 @@ pub struct Network { outcome_rx: mpsc::Receiver, domain_type: DomainType, metrics_registry: Option, + network_state: watch::Receiver, } impl Network { // Creates an instance of the Network struct to start sending and receiving information on the // p2p network. + #[allow(clippy::too_many_arguments)] pub async fn try_new( config: &Config, subnet_event_receiver: mpsc::Receiver, @@ -87,6 +91,7 @@ impl Network { outcome_rx: mpsc::Receiver, executor: TaskExecutor, spec: &ChainSpec, + network_state: watch::Receiver, ) -> Result, Box> { let local_keypair: Keypair = load_private_key(&config.network_dir); @@ -127,6 +132,7 @@ impl Network { outcome_rx, domain_type: config.domain_type.clone(), metrics_registry: Some(metrics_registry), + network_state, }; info!(%peer_id, "Network starting"); @@ -272,10 +278,71 @@ impl Network { } } + /// Update topic score parameters for a newly joined subnet + fn update_topic_score_for_subnet( + &mut self, + subnet: SubnetId, + topic: Topic, + chain_spec: &ChainSpec, + ) { + let current_state = self.network_state.borrow(); + + // Get committee info for this subnet + let committees = get_committee_info_for_subnet(subnet, ¤t_state); + + // Calculate validator count for this subnet from committees + let validator_count = committees + .iter() + .map(|committee| committee.validator_indices.len()) + .sum::(); + + debug!( + subnet = *subnet, + topic = %topic, + committee_count = committees.len(), + validator_count = validator_count, + "Setting topic score parameters for newly joined subnet" + ); + + // Generate topic-specific score parameters using the SSV reference implementation + let topic_score_params = topic_score_params_for_subnet::( + subnet, + validator_count as u64, + SUBNET_COUNT as u64, + &committees, + chain_spec, + ); + + // Apply the score parameters to the topic + match self + .swarm + .behaviour_mut() + .gossipsub + .set_topic_params(topic.clone(), topic_score_params) + { + Ok(_) => { + debug!( + subnet = *subnet, + topic = %topic, + "Successfully updated topic score parameters" + ); + } + Err(e) => { + warn!( + subnet = *subnet, + topic = %topic, + error = %e, + "Failed to set topic score params for newly joined subnet" + ); + } + } + } + fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { let (subnet, subscribed) = match event { SubnetEvent::Join(subnet) => { - if let Err(err) = self.gossipsub().subscribe(&subnet_to_topic(subnet)) { + let topic = subnet_to_topic(subnet); + if let Err(err) = self.gossipsub().subscribe(&topic) { error!(?err, subnet = *subnet, "can't subscribe"); return; } @@ -379,3 +446,34 @@ fn build_swarm( fn subnet_to_topic(subnet: SubnetId) -> IdentTopic { IdentTopic::new(format!("ssv.v2.{}", *subnet)) } + +/// Get committee info for a specific subnet from the current network state +/// +/// This function retrieves clusters for the subnet and converts them to CommitteeInfo +/// which includes both the committee members and validator indices. +pub fn get_committee_info_for_subnet( + subnet: SubnetId, + network_state: &NetworkState, +) -> Vec { + network_state + .clusters() + .values() + .filter(|cluster| { + let cluster_subnet = SubnetId::from_committee(cluster.committee_id(), SUBNET_COUNT); + cluster_subnet == subnet + }) + .map(|cluster| { + // Convert cluster to CommitteeInfo by getting validator indices + let validator_indices = network_state + .metadata() + .get_all_by(&cluster.cluster_id) + .flat_map(|metadata| metadata.index) + .collect::>(); + + CommitteeInfo { + committee_members: cluster.cluster_members.clone(), + validator_indices, + } + }) + .collect() +} From 4f2e06a8daf2ba164d89e74aca93652e073bc8a5 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 4 Jul 2025 12:49:00 +0200 Subject: [PATCH 02/16] call update_topic_score_for_subnet --- anchor/client/src/lib.rs | 4 ++-- anchor/network/src/network.rs | 25 +++++++++++++++---------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index c1d9d1612..3f2b94e25 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -481,7 +481,7 @@ impl Client { Arc::new(message_receiver), outcome_rx, executor.clone(), - &spec, + spec.clone(), database.watch(), ) .await @@ -493,7 +493,7 @@ impl Client { } // Spawn the network listening task - executor.spawn(network.run(), "network"); + executor.spawn(network.run::(), "network"); let validator_store = AnchorValidatorStore::<_, E>::new( database.watch(), diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 2d7e802d7..e71f4af02 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -8,7 +8,7 @@ use std::{ use database::{NetworkState, NonUniqueIndex}; use futures::StreamExt; -use gossipsub::{Hasher, IdentTopic, PublishError, Topic}; +use gossipsub::{IdentTopic, PublishError}; use libp2p::{ Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, core::{ConnectedPoint, muxing::StreamMuxerBox, transport::Boxed}, @@ -76,6 +76,7 @@ pub struct Network { outcome_rx: mpsc::Receiver, domain_type: DomainType, metrics_registry: Option, + spec: Arc, network_state: watch::Receiver, } @@ -90,7 +91,7 @@ impl Network { message_receiver: Arc, outcome_rx: mpsc::Receiver, executor: TaskExecutor, - spec: &ChainSpec, + spec: Arc, network_state: watch::Receiver, ) -> Result, Box> { let local_keypair: Keypair = load_private_key(&config.network_dir); @@ -100,7 +101,7 @@ impl Network { let mut metrics_registry = Registry::default(); let behaviour = - AnchorBehaviour::new::(local_keypair.clone(), config, &mut metrics_registry, spec) + AnchorBehaviour::new::(local_keypair.clone(), config, &mut metrics_registry, &spec) .await .map_err(|e| Box::new(NetworkError::Behaviour(e)))?; @@ -132,6 +133,7 @@ impl Network { outcome_rx, domain_type: config.domain_type.clone(), metrics_registry: Some(metrics_registry), + spec, network_state, }; @@ -165,7 +167,7 @@ impl Network { } /// Main loop for polling and handling swarm and channels. - pub async fn run(mut self) { + pub async fn run(mut self) { loop { tokio::select! { swarm_message = self.swarm.select_next_some() => { @@ -228,7 +230,7 @@ impl Network { } }, Some(event) = self.subnet_event_receiver.recv() => { - self.on_subnet_tracker_event(event) + self.on_subnet_tracker_event::(event) } event = self.message_rx.recv() => { match event { @@ -279,11 +281,11 @@ impl Network { } /// Update topic score parameters for a newly joined subnet - fn update_topic_score_for_subnet( + fn update_topic_score_for_subnet( &mut self, subnet: SubnetId, - topic: Topic, - chain_spec: &ChainSpec, + topic: IdentTopic, + chain_spec: Arc, ) { let current_state = self.network_state.borrow(); @@ -310,7 +312,7 @@ impl Network { validator_count as u64, SUBNET_COUNT as u64, &committees, - chain_spec, + &chain_spec, ); // Apply the score parameters to the topic @@ -338,7 +340,7 @@ impl Network { } } - fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { + fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { let (subnet, subscribed) = match event { SubnetEvent::Join(subnet) => { let topic = subnet_to_topic(subnet); @@ -346,6 +348,9 @@ impl Network { error!(?err, subnet = *subnet, "can't subscribe"); return; } + + self.update_topic_score_for_subnet::(subnet, topic, self.spec.clone()); + let actions = self.peer_manager().join_subnet(subnet); self.handle_connect_actions(actions); (subnet, true) From 819f2de333e1de048da5bfbbfc535576ecae0b98 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 4 Jul 2025 15:30:06 +0200 Subject: [PATCH 03/16] move message_rate.rs to subnet_tracker --- Cargo.lock | 1 + anchor/network/src/scoring/mod.rs | 1 - anchor/network/src/scoring/topic_score_config.rs | 3 +-- anchor/subnet_tracker/Cargo.toml | 1 + anchor/subnet_tracker/src/lib.rs | 2 ++ .../src/scoring => subnet_tracker/src}/message_rate.rs | 0 6 files changed, 5 insertions(+), 3 deletions(-) rename anchor/{network/src/scoring => subnet_tracker/src}/message_rate.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 152644885..372a63376 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7863,6 +7863,7 @@ dependencies = [ "task_executor", "tokio", "tracing", + "types", ] [[package]] diff --git a/anchor/network/src/scoring/mod.rs b/anchor/network/src/scoring/mod.rs index 4895af3d2..1442684e4 100644 --- a/anchor/network/src/scoring/mod.rs +++ b/anchor/network/src/scoring/mod.rs @@ -1,4 +1,3 @@ -mod message_rate; pub(crate) mod peer_score_config; pub(crate) mod topic_score_config; diff --git a/anchor/network/src/scoring/topic_score_config.rs b/anchor/network/src/scoring/topic_score_config.rs index 3082e3cb3..711ada7d1 100644 --- a/anchor/network/src/scoring/topic_score_config.rs +++ b/anchor/network/src/scoring/topic_score_config.rs @@ -7,13 +7,12 @@ use std::time::Duration; use gossipsub::TopicScoreParams; use ssv_types::CommitteeInfo; -use subnet_tracker::SubnetId; +use subnet_tracker::{SubnetId, message_rate::calculate_message_rate_for_topic}; use tracing::{debug, warn}; use types::{ChainSpec, EthSpec}; use crate::scoring::{ decay_threshold, - message_rate::calculate_message_rate_for_topic, peer_score_config::{GRAYLIST_THRESHOLD, calculate_score_decay_factor, decay_convergence}, }; diff --git a/anchor/subnet_tracker/Cargo.toml b/anchor/subnet_tracker/Cargo.toml index 26beb5192..aadfb3b29 100644 --- a/anchor/subnet_tracker/Cargo.toml +++ b/anchor/subnet_tracker/Cargo.toml @@ -13,3 +13,4 @@ ssv_types = { workspace = true } task_executor = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +types = { workspace = true } diff --git a/anchor/subnet_tracker/src/lib.rs b/anchor/subnet_tracker/src/lib.rs index 6659d07d7..3de6505d4 100644 --- a/anchor/subnet_tracker/src/lib.rs +++ b/anchor/subnet_tracker/src/lib.rs @@ -11,6 +11,8 @@ use tokio::{ }; use tracing::{debug, error, warn}; +pub mod message_rate; + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(transparent)] pub struct SubnetId(#[serde(with = "serde_utils::quoted_u64")] u64); diff --git a/anchor/network/src/scoring/message_rate.rs b/anchor/subnet_tracker/src/message_rate.rs similarity index 100% rename from anchor/network/src/scoring/message_rate.rs rename to anchor/subnet_tracker/src/message_rate.rs From 21e561be3f2bbefc60a9a4bf468b9642b7778bcd Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 4 Jul 2025 15:32:27 +0200 Subject: [PATCH 04/16] rename subnet_tracker to subnet_service and move get_committee_info_for_subnet there --- Cargo.lock | 11 ++-- Cargo.toml | 4 +- anchor/client/Cargo.toml | 2 +- anchor/client/src/lib.rs | 16 ++---- anchor/fuzz/Cargo.toml | 2 +- anchor/fuzz/fuzz_targets/setup.rs | 2 +- anchor/message_sender/Cargo.toml | 2 +- anchor/message_sender/src/impostor.rs | 2 +- anchor/message_sender/src/network.rs | 2 +- anchor/network/Cargo.toml | 3 +- anchor/network/src/discovery.rs | 2 +- anchor/network/src/handshake/node_info.rs | 11 ++-- anchor/network/src/lib.rs | 3 - anchor/network/src/network.rs | 57 ++++--------------- anchor/network/src/peer_manager.rs | 2 +- .../network/src/scoring/topic_score_config.rs | 2 +- .../Cargo.toml | 2 +- .../src/lib.rs | 57 ++++++++++++++++--- .../src/message_rate.rs | 0 19 files changed, 88 insertions(+), 94 deletions(-) rename anchor/{subnet_tracker => subnet_service}/Cargo.toml (94%) rename anchor/{subnet_tracker => subnet_service}/src/lib.rs (72%) rename anchor/{subnet_tracker => subnet_service}/src/message_rate.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 372a63376..218d962d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -771,7 +771,7 @@ dependencies = [ "signature_collector", "slot_clock", "ssv_types", - "subnet_tracker", + "subnet_service", "task_executor", "tempfile", "thiserror 2.0.12", @@ -1925,7 +1925,7 @@ dependencies = [ "ssv_network_config", "ssv_types", "strum 0.27.1", - "subnet_tracker", + "subnet_service", "task_executor", "tokio", "tracing", @@ -5247,7 +5247,7 @@ dependencies = [ "processor", "slot_clock", "ssv_types", - "subnet_tracker", + "subnet_service", "tokio", "tracing", ] @@ -5569,7 +5569,6 @@ name = "network" version = "0.1.0" dependencies = [ "async-trait", - "database", "dirs 6.0.0", "discv5", "ethereum_ssz", @@ -5587,7 +5586,7 @@ dependencies = [ "serde_json", "ssv_types", "ssz_types", - "subnet_tracker", + "subnet_service", "task_executor", "thiserror 2.0.12", "tokio", @@ -7852,7 +7851,7 @@ dependencies = [ ] [[package]] -name = "subnet_tracker" +name = "subnet_service" version = "0.1.0" dependencies = [ "alloy", diff --git a/Cargo.toml b/Cargo.toml index 9ee07ffbc..5a9a475a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ members = [ "anchor/processor", "anchor/qbft_manager", "anchor/signature_collector", - "anchor/subnet_tracker", + "anchor/subnet_service", "anchor/validator_store", ] @@ -59,7 +59,7 @@ qbft_manager = { path = "anchor/qbft_manager" } signature_collector = { path = "anchor/signature_collector" } ssv_network_config = { path = "anchor/common/ssv_network_config" } ssv_types = { path = "anchor/common/ssv_types" } -subnet_tracker = { path = "anchor/subnet_tracker" } +subnet_service = { path = "anchor/subnet_service" } version = { path = "anchor/common/version" } beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "9b84dac" } diff --git a/anchor/client/Cargo.toml b/anchor/client/Cargo.toml index b52cd8718..7aabf56b0 100644 --- a/anchor/client/Cargo.toml +++ b/anchor/client/Cargo.toml @@ -43,7 +43,7 @@ slot_clock = { workspace = true } ssv_network_config = { workspace = true } ssv_types = { workspace = true } strum = { workspace = true } -subnet_tracker = { workspace = true } +subnet_service = { workspace = true } task_executor = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 3f2b94e25..94d1b03cb 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -40,7 +40,7 @@ use signature_collector::SignatureCollectorManager; use slashing_protection::SlashingDatabase; use slot_clock::{SlotClock, SystemTimeSlotClock}; use ssv_types::OperatorId; -use subnet_tracker::{SubnetId, start_subnet_tracker}; +use subnet_service::{SUBNET_COUNT, SubnetId, start_subnet_service}; use task_executor::TaskExecutor; use tokio::{ net::TcpListener, @@ -197,9 +197,9 @@ impl Client { .map_err(|e| format!("Unable to open Anchor database: {e}"))?, ); - let subnet_tracker = start_subnet_tracker( + let subnet_service = start_subnet_service( database.watch(), - network::SUBNET_COUNT, + SUBNET_COUNT, config.network.subscribe_all_subnets, &executor, ); @@ -433,13 +433,10 @@ impl Client { key.clone(), operator_id, Some(message_validator.clone()), - network::SUBNET_COUNT, + SUBNET_COUNT, )?) } else { - Arc::new(ImpostorMessageSender::new( - network_tx.clone(), - network::SUBNET_COUNT, - )) + Arc::new(ImpostorMessageSender::new(network_tx.clone(), SUBNET_COUNT)) }; // Create the signature collector @@ -476,13 +473,12 @@ impl Client { // Start the p2p network let mut network = Network::try_new::( &config.network, - subnet_tracker, + subnet_service, network_rx, Arc::new(message_receiver), outcome_rx, executor.clone(), spec.clone(), - database.watch(), ) .await .map_err(|e| format!("Unable to start network: {e}"))?; diff --git a/anchor/fuzz/Cargo.toml b/anchor/fuzz/Cargo.toml index 81719b6c0..ca1fdfc92 100644 --- a/anchor/fuzz/Cargo.toml +++ b/anchor/fuzz/Cargo.toml @@ -64,7 +64,7 @@ sha2 = { workspace = true } signature_collector = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true, features = ["arbitrary-fuzz"] } -subnet_tracker = { workspace = true } +subnet_service = { workspace = true } task_executor = { workspace = true } tempfile = "3.14.0" thiserror = { workspace = true } diff --git a/anchor/fuzz/fuzz_targets/setup.rs b/anchor/fuzz/fuzz_targets/setup.rs index ad8ba7626..189b319f6 100644 --- a/anchor/fuzz/fuzz_targets/setup.rs +++ b/anchor/fuzz/fuzz_targets/setup.rs @@ -21,7 +21,7 @@ use slot_clock::{ManualSlotClock, SlotClock}; use ssv_types::{ OperatorId, ValidatorIndex, consensus::BeaconVote, domain_type::DomainType, msgid::MessageId, }; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use task_executor::TaskExecutor; use tempfile::tempdir; use tokio::sync::mpsc; diff --git a/anchor/message_sender/Cargo.toml b/anchor/message_sender/Cargo.toml index 95844a3e4..913c833ce 100644 --- a/anchor/message_sender/Cargo.toml +++ b/anchor/message_sender/Cargo.toml @@ -14,6 +14,6 @@ openssl = { workspace = true } processor = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } -subnet_tracker = { workspace = true } +subnet_service = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/anchor/message_sender/src/impostor.rs b/anchor/message_sender/src/impostor.rs index 81d818bf0..1ae23d624 100644 --- a/anchor/message_sender/src/impostor.rs +++ b/anchor/message_sender/src/impostor.rs @@ -1,5 +1,5 @@ use ssv_types::{CommitteeId, consensus::UnsignedSSVMessage, message::SignedSSVMessage}; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::sync::mpsc; use tracing::debug; diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 94a4d8313..ecd52eee5 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -13,7 +13,7 @@ use ssv_types::{ CommitteeId, OperatorId, consensus::UnsignedSSVMessage, message::SignedSSVMessage, }; use ssz::Encode; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::sync::{mpsc, mpsc::error::TrySendError}; use tracing::{debug, error, warn}; diff --git a/anchor/network/Cargo.toml b/anchor/network/Cargo.toml index b9556573b..3aa666c8e 100644 --- a/anchor/network/Cargo.toml +++ b/anchor/network/Cargo.toml @@ -6,7 +6,6 @@ authors = ["Sigma Prime "] [dependencies] async-trait = "0.1.85" -database = { workspace = true } dirs = { workspace = true } discv5 = { workspace = true } ethereum_ssz = { workspace = true } @@ -34,7 +33,7 @@ serde = { workspace = true } serde_json = "1.0.137" ssv_types = { workspace = true } ssz_types = "0.10" -subnet_tracker = { workspace = true } +subnet_service = { workspace = true } task_executor = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 5a7a4ddc4..059e19f52 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -37,7 +37,7 @@ use lighthouse_network::{ use ssv_types::domain_type::DomainType; use ssz::{Decode, Encode}; use ssz_types::{BitVector, Bitfield, length::Fixed, typenum::U128}; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::sync::mpsc; use tracing::{debug, error, info, trace, warn}; diff --git a/anchor/network/src/handshake/node_info.rs b/anchor/network/src/handshake/node_info.rs index b99408008..9f4ce7fe4 100644 --- a/anchor/network/src/handshake/node_info.rs +++ b/anchor/network/src/handshake/node_info.rs @@ -1,15 +1,12 @@ use discv5::libp2p_identity::{Keypair, SigningError}; use serde::{Deserialize, Serialize}; use serde_json; -use subnet_tracker::SubnetId; +use subnet_service::{SubnetBits, SubnetId}; use thiserror::Error; -use crate::{ - SubnetBits, - handshake::{ - envelope::{Envelope, make_unsigned}, - node_info::Error::Validation, - }, +use crate::handshake::{ + envelope::{Envelope, make_unsigned}, + node_info::Error::Validation, }; #[derive(Debug, Error)] diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 6e0d39b76..161b2bd1b 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -14,6 +14,3 @@ pub use config::Config; pub use lighthouse_network::{ListenAddr, ListenAddress}; pub use network::Network; pub type Enr = discv5::enr::Enr; - -pub const SUBNET_COUNT: usize = 128; -type SubnetBits = [u8; SUBNET_COUNT / 8]; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index e71f4af02..943ce6304 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -6,7 +6,6 @@ use std::{ time::Instant, }; -use database::{NetworkState, NonUniqueIndex}; use futures::StreamExt; use gossipsub::{IdentTopic, PublishError}; use libp2p::{ @@ -20,16 +19,16 @@ use libp2p::{ use lighthouse_network::{discovery::DiscoveredPeers, prometheus_client::registry::Registry}; use message_receiver::{MessageReceiver, Outcome}; use ssv_types::{CommitteeInfo, domain_type::DomainType}; -use subnet_tracker::{SubnetEvent, SubnetId}; +use subnet_service::{SUBNET_COUNT, SubnetEvent, SubnetId}; use task_executor::TaskExecutor; use thiserror::Error; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec}; use version::version_with_platform; use crate::{ - Config, Enr, SUBNET_COUNT, + Config, Enr, behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError}, discovery::{Discovery, DiscoveryError}, handshake, @@ -77,13 +76,11 @@ pub struct Network { domain_type: DomainType, metrics_registry: Option, spec: Arc, - network_state: watch::Receiver, } impl Network { // Creates an instance of the Network struct to start sending and receiving information on the // p2p network. - #[allow(clippy::too_many_arguments)] pub async fn try_new( config: &Config, subnet_event_receiver: mpsc::Receiver, @@ -92,7 +89,6 @@ impl Network { outcome_rx: mpsc::Receiver, executor: TaskExecutor, spec: Arc, - network_state: watch::Receiver, ) -> Result, Box> { let local_keypair: Keypair = load_private_key(&config.network_dir); @@ -134,7 +130,6 @@ impl Network { domain_type: config.domain_type.clone(), metrics_registry: Some(metrics_registry), spec, - network_state, }; info!(%peer_id, "Network starting"); @@ -285,13 +280,9 @@ impl Network { &mut self, subnet: SubnetId, topic: IdentTopic, + committees: Vec, chain_spec: Arc, ) { - let current_state = self.network_state.borrow(); - - // Get committee info for this subnet - let committees = get_committee_info_for_subnet(subnet, ¤t_state); - // Calculate validator count for this subnet from committees let validator_count = committees .iter() @@ -342,14 +333,19 @@ impl Network { fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { let (subnet, subscribed) = match event { - SubnetEvent::Join(subnet) => { + SubnetEvent::Join(subnet, committees) => { let topic = subnet_to_topic(subnet); if let Err(err) = self.gossipsub().subscribe(&topic) { error!(?err, subnet = *subnet, "can't subscribe"); return; } - self.update_topic_score_for_subnet::(subnet, topic, self.spec.clone()); + self.update_topic_score_for_subnet::( + subnet, + topic, + committees, + self.spec.clone(), + ); let actions = self.peer_manager().join_subnet(subnet); self.handle_connect_actions(actions); @@ -451,34 +447,3 @@ fn build_swarm( fn subnet_to_topic(subnet: SubnetId) -> IdentTopic { IdentTopic::new(format!("ssv.v2.{}", *subnet)) } - -/// Get committee info for a specific subnet from the current network state -/// -/// This function retrieves clusters for the subnet and converts them to CommitteeInfo -/// which includes both the committee members and validator indices. -pub fn get_committee_info_for_subnet( - subnet: SubnetId, - network_state: &NetworkState, -) -> Vec { - network_state - .clusters() - .values() - .filter(|cluster| { - let cluster_subnet = SubnetId::from_committee(cluster.committee_id(), SUBNET_COUNT); - cluster_subnet == subnet - }) - .map(|cluster| { - // Convert cluster to CommitteeInfo by getting validator indices - let validator_indices = network_state - .metadata() - .get_all_by(&cluster.cluster_id) - .flat_map(|metadata| metadata.index) - .collect::>(); - - CommitteeInfo { - committee_members: cluster.cluster_members.clone(), - validator_indices, - } - }) - .collect() -} diff --git a/anchor/network/src/peer_manager.rs b/anchor/network/src/peer_manager.rs index 3f543667f..f400e4b62 100644 --- a/anchor/network/src/peer_manager.rs +++ b/anchor/network/src/peer_manager.rs @@ -24,7 +24,7 @@ use peer_store::{ }; use rand::seq::SliceRandom; use ssz_types::{Bitfield, length::Fixed, typenum::U128}; -use subnet_tracker::SubnetId; +use subnet_service::SubnetId; use tokio::time::{MissedTickBehavior, interval}; use tracing::{debug, info}; diff --git a/anchor/network/src/scoring/topic_score_config.rs b/anchor/network/src/scoring/topic_score_config.rs index 711ada7d1..2017d73a2 100644 --- a/anchor/network/src/scoring/topic_score_config.rs +++ b/anchor/network/src/scoring/topic_score_config.rs @@ -7,7 +7,7 @@ use std::time::Duration; use gossipsub::TopicScoreParams; use ssv_types::CommitteeInfo; -use subnet_tracker::{SubnetId, message_rate::calculate_message_rate_for_topic}; +use subnet_service::{SubnetId, message_rate::calculate_message_rate_for_topic}; use tracing::{debug, warn}; use types::{ChainSpec, EthSpec}; diff --git a/anchor/subnet_tracker/Cargo.toml b/anchor/subnet_service/Cargo.toml similarity index 94% rename from anchor/subnet_tracker/Cargo.toml rename to anchor/subnet_service/Cargo.toml index aadfb3b29..7dc158b5d 100644 --- a/anchor/subnet_tracker/Cargo.toml +++ b/anchor/subnet_service/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "subnet_tracker" +name = "subnet_service" version = "0.1.0" edition = { workspace = true } authors = ["Sigma Prime "] diff --git a/anchor/subnet_tracker/src/lib.rs b/anchor/subnet_service/src/lib.rs similarity index 72% rename from anchor/subnet_tracker/src/lib.rs rename to anchor/subnet_service/src/lib.rs index 3de6505d4..277167dc8 100644 --- a/anchor/subnet_tracker/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -1,9 +1,9 @@ use std::{collections::HashSet, ops::Deref, time::Duration}; use alloy::primitives::ruint::aliases::U256; -use database::{NetworkState, UniqueIndex}; +use database::{NetworkState, NonUniqueIndex, UniqueIndex}; use serde::{Deserialize, Serialize}; -use ssv_types::CommitteeId; +use ssv_types::{CommitteeId, CommitteeInfo}; use task_executor::TaskExecutor; use tokio::{ sync::{mpsc, watch}, @@ -13,6 +13,9 @@ use tracing::{debug, error, warn}; pub mod message_rate; +pub const SUBNET_COUNT: usize = 128; +pub type SubnetBits = [u8; SUBNET_COUNT / 8]; + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(transparent)] pub struct SubnetId(#[serde(with = "serde_utils::quoted_u64")] u64); @@ -48,26 +51,28 @@ impl Deref for SubnetId { } pub enum SubnetEvent { - Join(SubnetId), + Join(SubnetId, Vec), Leave(SubnetId), } -pub fn start_subnet_tracker( +pub fn start_subnet_service( db: watch::Receiver, subnet_count: usize, subscribe_all_subnets: bool, executor: &TaskExecutor, ) -> mpsc::Receiver { if !subscribe_all_subnets { - // a channel capacity of 1 is fine - the subnet_tracker does not do anything else, it can + // a channel capacity of 1 is fine - the subnet_service does not do anything else, it can // wait. let (tx, rx) = mpsc::channel(1); - executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_tracker"); + executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_service"); rx } else { let (tx, rx) = mpsc::channel(subnet_count); for subnet in (0..(subnet_count as u64)).map(SubnetId) { - if let Err(err) = tx.try_send(SubnetEvent::Join(subnet)) { + // For the "all subnets" case, we don't have specific committee info, so pass an empty + // vec + if let Err(err) = tx.try_send(SubnetEvent::Join(subnet, Vec::new())) { error!(?err, "Impossible error while subscribing to all subnets"); } } @@ -118,7 +123,12 @@ async fn subnet_tracker( // send a `Join` event. for subnet in current_subnets.difference(&previous_subnets) { debug!(?subnet, "send join"); - if tx.send(SubnetEvent::Join(*subnet)).await.is_err() { + let committees_info = get_committee_info_for_subnet(subnet, db.borrow()); + if tx + .send(SubnetEvent::Join(*subnet, committees_info)) + .await + .is_err() + { warn!("Network no longer listening for subnets"); return; } @@ -135,6 +145,37 @@ async fn subnet_tracker( } } +/// Get committee info for a specific subnet from the current network state +/// +/// This function retrieves clusters for the subnet and converts them to CommitteeInfo +/// which includes both the committee members and validator indices. +pub fn get_committee_info_for_subnet( + subnet: &SubnetId, + network_state: impl Deref, +) -> Vec { + network_state + .clusters() + .values() + .filter(|cluster| { + let cluster_subnet = SubnetId::from_committee(cluster.committee_id(), SUBNET_COUNT); + cluster_subnet == *subnet + }) + .map(|cluster| { + // Convert cluster to CommitteeInfo by getting validator indices + let validator_indices = network_state + .metadata() + .get_all_by(&cluster.cluster_id) + .flat_map(|metadata| metadata.index) + .collect::>(); + + CommitteeInfo { + committee_members: cluster.cluster_members.clone(), + validator_indices, + } + }) + .collect() +} + /// only useful for testing - introduce feature flag? pub fn test_tracker( executor: TaskExecutor, diff --git a/anchor/subnet_tracker/src/message_rate.rs b/anchor/subnet_service/src/message_rate.rs similarity index 100% rename from anchor/subnet_tracker/src/message_rate.rs rename to anchor/subnet_service/src/message_rate.rs From 9a12aa9605723284781feacbc14f527aa2b6d686 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 4 Jul 2025 18:33:59 +0200 Subject: [PATCH 05/16] update topic score on committee change --- anchor/network/src/network.rs | 19 +++++++++ anchor/subnet_service/src/lib.rs | 72 +++++++++++++++++++++++++++++++- 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 943ce6304..e3d64af0f 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -355,6 +355,25 @@ impl Network { self.gossipsub().unsubscribe(&subnet_to_topic(subnet)); (subnet, false) } + SubnetEvent::CommitteeUpdate(subnet, committees) => { + let topic = subnet_to_topic(subnet); + + debug!( + subnet = *subnet, + committee_count = committees.len(), + "Updating topic scores for subnet due to committee changes" + ); + + self.update_topic_score_for_subnet::( + subnet, + topic, + committees, + self.spec.clone(), + ); + + // No subscription change needed, just score update + return; + } }; // update enr and metadata to new state diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index 277167dc8..89669102a 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -1,4 +1,8 @@ -use std::{collections::HashSet, ops::Deref, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, + time::Duration, +}; use alloy::primitives::ruint::aliases::U256; use database::{NetworkState, NonUniqueIndex, UniqueIndex}; @@ -53,6 +57,8 @@ impl Deref for SubnetId { pub enum SubnetEvent { Join(SubnetId, Vec), Leave(SubnetId), + /// Committee information has changed for an already-joined subnet + CommitteeUpdate(SubnetId, Vec), } pub fn start_subnet_service( @@ -84,6 +90,7 @@ pub fn start_subnet_service( /// - Gathers the current subnets from `NetworkState`. /// - Compares them to the previously-seen subnets. /// - Emits `Join` events for newly-added subnets and `Leave` events for removed subnets. +/// - Emits `CommitteeUpdate` events when committee information changes for existing subnets. async fn subnet_tracker( tx: mpsc::Sender, mut db: watch::Receiver, @@ -91,10 +98,13 @@ async fn subnet_tracker( ) { // `previous_subnets` tracks which subnets were joined in the last iteration. let mut previous_subnets = HashSet::new(); + // Track committee info for each subnet to detect changes + let mut previous_committee_info: HashMap> = HashMap::new(); loop { // Build the `current_subnets` set by examining the clusters we own. let mut current_subnets = HashSet::new(); + let mut current_committee_info = HashMap::new(); // do not await while holding lock! // explicit scope needed because rustc cant handle equivalent drop(state) @@ -105,6 +115,10 @@ async fn subnet_tracker( if let Some(cluster) = state.clusters().get_by(cluster_id) { let subnet_id = SubnetId::from_committee(cluster.committee_id(), subnet_count); current_subnets.insert(subnet_id); + + // Get committee info for this subnet + let committees = get_committee_info_for_subnet(&subnet_id, &*state); + current_committee_info.insert(subnet_id, committees); } } } @@ -123,7 +137,10 @@ async fn subnet_tracker( // send a `Join` event. for subnet in current_subnets.difference(&previous_subnets) { debug!(?subnet, "send join"); - let committees_info = get_committee_info_for_subnet(subnet, db.borrow()); + let committees_info = current_committee_info + .get(subnet) + .cloned() + .unwrap_or_default(); if tx .send(SubnetEvent::Join(*subnet, committees_info)) .await @@ -134,8 +151,34 @@ async fn subnet_tracker( } } + // Check for updates in committee information for already-joined subnets + for subnet in current_subnets.intersection(&previous_subnets) { + let current_committees = current_committee_info + .get(subnet) + .cloned() + .unwrap_or_default(); + let previous_committees = previous_committee_info + .get(subnet) + .cloned() + .unwrap_or_default(); + + // If committee info has changed, send a CommitteeUpdate event + if committees_have_changed(¤t_committees, &previous_committees) { + debug!(?subnet, "send committee update"); + if tx + .send(SubnetEvent::CommitteeUpdate(*subnet, current_committees)) + .await + .is_err() + { + warn!("Network no longer listening for subnets"); + return; + } + } + } + // Update `previous_subnets` to reflect the current snapshot for the next iteration. previous_subnets = current_subnets; + previous_committee_info = current_committee_info; // Wait for the watch channel to signal a changed value before re-running the loop. if db.changed().await.is_err() { @@ -176,6 +219,31 @@ pub fn get_committee_info_for_subnet( .collect() } +/// Check if committee information has changed by comparing lengths and member sets +fn committees_have_changed(current: &[CommitteeInfo], previous: &[CommitteeInfo]) -> bool { + // Quick check: different number of committees + if current.len() != previous.len() { + return true; + } + + // Compare each committee + for (curr, prev) in current.iter().zip(previous.iter()) { + // Check if validator indices changed + if curr.validator_indices.len() != prev.validator_indices.len() + || curr.validator_indices != prev.validator_indices + { + return true; + } + + // Check if committee members changed + if curr.committee_members != prev.committee_members { + return true; + } + } + + false +} + /// only useful for testing - introduce feature flag? pub fn test_tracker( executor: TaskExecutor, From c11a8fd9383da5531dda55ebaa3d51dea4fcd2b3 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 4 Jul 2025 18:51:48 +0200 Subject: [PATCH 06/16] store a hash instead of a vec --- anchor/subnet_service/src/lib.rs | 92 +++++++++++++++++--------------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index 89669102a..cbeb7a701 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -1,5 +1,6 @@ use std::{ collections::{HashMap, HashSet}, + hash::{Hash, Hasher}, ops::Deref, time::Duration, }; @@ -98,13 +99,13 @@ async fn subnet_tracker( ) { // `previous_subnets` tracks which subnets were joined in the last iteration. let mut previous_subnets = HashSet::new(); - // Track committee info for each subnet to detect changes - let mut previous_committee_info: HashMap> = HashMap::new(); + // Track committee info hash for each subnet to detect changes efficiently + let mut previous_committee_hashes: HashMap = HashMap::new(); loop { // Build the `current_subnets` set by examining the clusters we own. let mut current_subnets = HashSet::new(); - let mut current_committee_info = HashMap::new(); + let mut current_committee_hashes = HashMap::new(); // do not await while holding lock! // explicit scope needed because rustc cant handle equivalent drop(state) @@ -116,9 +117,10 @@ async fn subnet_tracker( let subnet_id = SubnetId::from_committee(cluster.committee_id(), subnet_count); current_subnets.insert(subnet_id); - // Get committee info for this subnet + // Get committee info for this subnet and compute its hash let committees = get_committee_info_for_subnet(&subnet_id, &*state); - current_committee_info.insert(subnet_id, committees); + let committee_hash = compute_committee_hash(&committees); + current_committee_hashes.insert(subnet_id, committee_hash); } } } @@ -137,10 +139,12 @@ async fn subnet_tracker( // send a `Join` event. for subnet in current_subnets.difference(&previous_subnets) { debug!(?subnet, "send join"); - let committees_info = current_committee_info - .get(subnet) - .cloned() - .unwrap_or_default(); + // Get current committee info for this subnet + let committees_info = { + let state = db.borrow(); + get_committee_info_for_subnet(subnet, &*state) + }; + if tx .send(SubnetEvent::Join(*subnet, committees_info)) .await @@ -153,20 +157,25 @@ async fn subnet_tracker( // Check for updates in committee information for already-joined subnets for subnet in current_subnets.intersection(&previous_subnets) { - let current_committees = current_committee_info - .get(subnet) - .cloned() - .unwrap_or_default(); - let previous_committees = previous_committee_info - .get(subnet) - .cloned() - .unwrap_or_default(); - - // If committee info has changed, send a CommitteeUpdate event - if committees_have_changed(¤t_committees, &previous_committees) { - debug!(?subnet, "send committee update"); + let current_hash = current_committee_hashes.get(subnet).copied().unwrap_or(0); + let previous_hash = previous_committee_hashes.get(subnet).copied().unwrap_or(0); + + // If committee hash has changed, send a CommitteeUpdate event + if current_hash != previous_hash { + debug!( + ?subnet, + "send committee update - hash changed from {} to {}", + previous_hash, + current_hash + ); + // Get the current committee info for the event + let committees_info = { + let state = db.borrow(); + get_committee_info_for_subnet(subnet, &*state) + }; + if tx - .send(SubnetEvent::CommitteeUpdate(*subnet, current_committees)) + .send(SubnetEvent::CommitteeUpdate(*subnet, committees_info)) .await .is_err() { @@ -176,9 +185,10 @@ async fn subnet_tracker( } } - // Update `previous_subnets` to reflect the current snapshot for the next iteration. + // Update `previous_subnets` and `previous_committee_hashes` to reflect the current snapshot + // for the next iteration. previous_subnets = current_subnets; - previous_committee_info = current_committee_info; + previous_committee_hashes = current_committee_hashes; // Wait for the watch channel to signal a changed value before re-running the loop. if db.changed().await.is_err() { @@ -219,29 +229,25 @@ pub fn get_committee_info_for_subnet( .collect() } -/// Check if committee information has changed by comparing lengths and member sets -fn committees_have_changed(current: &[CommitteeInfo], previous: &[CommitteeInfo]) -> bool { - // Quick check: different number of committees - if current.len() != previous.len() { - return true; - } +/// Compute a lightweight hash of committee information to detect changes efficiently +fn compute_committee_hash(committees: &[CommitteeInfo]) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); - // Compare each committee - for (curr, prev) in current.iter().zip(previous.iter()) { - // Check if validator indices changed - if curr.validator_indices.len() != prev.validator_indices.len() - || curr.validator_indices != prev.validator_indices - { - return true; - } + // Hash the number of committees first + committees.len().hash(&mut hasher); - // Check if committee members changed - if curr.committee_members != prev.committee_members { - return true; - } + // Hash each committee's essential data + for committee in committees { + // Hash committee members by converting to a sorted vector + let mut members: Vec<_> = committee.committee_members.iter().collect(); + members.sort_unstable(); + members.hash(&mut hasher); + + // Hash validator indices + committee.validator_indices.hash(&mut hasher); } - false + hasher.finish() } /// only useful for testing - introduce feature flag? From 946e5a1b802a4684aa882f4b29c374de3623257e Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 7 Jul 2025 19:25:51 +0200 Subject: [PATCH 07/16] recalculate topic scores for all subnets --- Cargo.lock | 1 + anchor/client/src/lib.rs | 16 +- anchor/subnet_service/Cargo.toml | 1 + anchor/subnet_service/src/lib.rs | 246 +++++++++++++++++-------------- 4 files changed, 147 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 218d962d3..3a3788796 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7858,6 +7858,7 @@ dependencies = [ "database", "ethereum_serde_utils", "serde", + "slot_clock", "ssv_types", "task_executor", "tokio", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 94d1b03cb..364e70cd9 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -197,13 +197,6 @@ impl Client { .map_err(|e| format!("Unable to open Anchor database: {e}"))?, ); - let subnet_service = start_subnet_service( - database.watch(), - SUBNET_COUNT, - config.network.subscribe_all_subnets, - &executor, - ); - // Initialize slashing protection. let slashing_db_path = config.data_dir.join(SLASHING_PROTECTION_FILENAME); let slashing_protection = @@ -459,6 +452,15 @@ impl Client { ) .map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?; + // Start the subnet service now that we have slot_clock + let subnet_service = start_subnet_service::( + database.watch(), + SUBNET_COUNT, + config.network.subscribe_all_subnets, + &executor, + slot_clock.clone(), + ); + let (outcome_tx, outcome_rx) = mpsc::channel::(9000); let message_receiver = NetworkMessageReceiver::new( diff --git a/anchor/subnet_service/Cargo.toml b/anchor/subnet_service/Cargo.toml index 7dc158b5d..be91aa67b 100644 --- a/anchor/subnet_service/Cargo.toml +++ b/anchor/subnet_service/Cargo.toml @@ -9,6 +9,7 @@ alloy = { workspace = true } database = { workspace = true } ethereum_serde_utils = "0.7.0" serde = { workspace = true } +slot_clock = { workspace = true } ssv_types = { workspace = true } task_executor = { workspace = true } tokio = { workspace = true } diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index cbeb7a701..d149646e7 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -1,13 +1,9 @@ -use std::{ - collections::{HashMap, HashSet}, - hash::{Hash, Hasher}, - ops::Deref, - time::Duration, -}; +use std::{collections::HashSet, ops::Deref, time::Duration}; use alloy::primitives::ruint::aliases::U256; use database::{NetworkState, NonUniqueIndex, UniqueIndex}; use serde::{Deserialize, Serialize}; +use slot_clock::SlotClock; use ssv_types::{CommitteeId, CommitteeInfo}; use task_executor::TaskExecutor; use tokio::{ @@ -15,6 +11,7 @@ use tokio::{ time::sleep, }; use tracing::{debug, error, warn}; +use types::EthSpec; pub mod message_rate; @@ -62,17 +59,21 @@ pub enum SubnetEvent { CommitteeUpdate(SubnetId, Vec), } -pub fn start_subnet_service( +pub fn start_subnet_service( db: watch::Receiver, subnet_count: usize, subscribe_all_subnets: bool, executor: &TaskExecutor, + slot_clock: impl SlotClock + 'static, ) -> mpsc::Receiver { if !subscribe_all_subnets { // a channel capacity of 1 is fine - the subnet_service does not do anything else, it can // wait. let (tx, rx) = mpsc::channel(1); - executor.spawn(subnet_tracker(tx, db, subnet_count), "subnet_service"); + executor.spawn( + subnet_tracker::(tx, db, subnet_count, slot_clock), + "subnet_service", + ); rx } else { let (tx, rx) = mpsc::channel(subnet_count); @@ -91,108 +92,154 @@ pub fn start_subnet_service( /// - Gathers the current subnets from `NetworkState`. /// - Compares them to the previously-seen subnets. /// - Emits `Join` events for newly-added subnets and `Leave` events for removed subnets. -/// - Emits `CommitteeUpdate` events when committee information changes for existing subnets. -async fn subnet_tracker( +/// - Recalculates topic scores for all subnets at epoch boundaries. +async fn subnet_tracker( tx: mpsc::Sender, mut db: watch::Receiver, subnet_count: usize, + slot_clock: impl SlotClock, ) { // `previous_subnets` tracks which subnets were joined in the last iteration. let mut previous_subnets = HashSet::new(); - // Track committee info hash for each subnet to detect changes efficiently - let mut previous_committee_hashes: HashMap = HashMap::new(); + + // Calculate duration until the first epoch boundary + let mut next_epoch_delay = calculate_seconds_to_next_epoch::(&slot_clock); loop { - // Build the `current_subnets` set by examining the clusters we own. - let mut current_subnets = HashSet::new(); - let mut current_committee_hashes = HashMap::new(); + tokio::select! { + // Handle database changes for subnet join/leave + _ = db.changed() => { + handle_subnet_changes(&tx, &mut db, &mut previous_subnets, subnet_count).await; + } - // do not await while holding lock! - // explicit scope needed because rustc cant handle equivalent drop(state) - { - // Acquire the current snapshot of the database state (this is synchronous). - let state = db.borrow(); - for cluster_id in state.get_own_clusters() { - if let Some(cluster) = state.clusters().get_by(cluster_id) { - let subnet_id = SubnetId::from_committee(cluster.committee_id(), subnet_count); - current_subnets.insert(subnet_id); - - // Get committee info for this subnet and compute its hash - let committees = get_committee_info_for_subnet(&subnet_id, &*state); - let committee_hash = compute_committee_hash(&committees); - current_committee_hashes.insert(subnet_id, committee_hash); + // Handle scheduled epoch boundaries + _ = sleep(next_epoch_delay) => { + if let Some(current_slot) = slot_clock.now() { + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + debug!( + epoch = current_epoch.as_u64(), + "Epoch boundary reached - recalculating topic scores for all subnets" + ); + handle_epoch_committee_update(&tx, &mut db, &previous_subnets).await; + + // Schedule the next epoch boundary (one full epoch from now) + let epoch_duration = slot_clock.slot_duration() * E::slots_per_epoch() as u32; + next_epoch_delay = epoch_duration; + } else { + // If we can't get current slot, recalculate the delay + warn!("Could not get current slot during epoch boundary, recalculating delay"); + next_epoch_delay = calculate_seconds_to_next_epoch::(&slot_clock); } } } + } +} - // For every subnet that was previously joined but is no longer in `current_subnets`, - // send a `Leave` event. - for subnet in previous_subnets.difference(¤t_subnets) { - debug!(?subnet, "send leave"); - if tx.send(SubnetEvent::Leave(*subnet)).await.is_err() { - warn!("Network no longer listening for subnets"); - return; +/// Calculate duration until the next epoch boundary +fn calculate_seconds_to_next_epoch(slot_clock: &impl SlotClock) -> Duration { + if let Some(current_slot) = slot_clock.now() { + let slot_duration = slot_clock.slot_duration(); + let slots_per_epoch = E::slots_per_epoch(); + + // Calculate the current position within the epoch + let current_slot_in_epoch = current_slot.as_u64() % slots_per_epoch; + let remaining_slots_in_epoch = if current_slot_in_epoch == 0 { + // We're at epoch boundary, next epoch is one full epoch away + slots_per_epoch + } else { + // Calculate slots remaining in current epoch + slots_per_epoch - current_slot_in_epoch + }; + + // Calculate time to next epoch boundary + slot_duration * remaining_slots_in_epoch as u32 + } else { + // Fallback: if we can't get current slot, use a conservative short interval + let slot_duration = slot_clock.slot_duration(); + warn!("Could not get current slot for epoch delay calculation, using fallback timing"); + slot_duration * 3 // Wait 3 slots before next check + } +} + +/// Handle subnet join/leave events when database changes +async fn handle_subnet_changes( + tx: &mpsc::Sender, + db: &mut watch::Receiver, + previous_subnets: &mut HashSet, + subnet_count: usize, +) { + // Build the `current_subnets` set by examining the clusters we own. + let mut current_subnets = HashSet::new(); + + // Get current subnets from database + { + let state = db.borrow(); + for cluster_id in state.get_own_clusters() { + if let Some(cluster) = state.clusters().get_by(cluster_id) { + let subnet_id = SubnetId::from_committee(cluster.committee_id(), subnet_count); + current_subnets.insert(subnet_id); } } + } - // For every subnet that was not previously joined but is now in `current_subnets`, - // send a `Join` event. - for subnet in current_subnets.difference(&previous_subnets) { - debug!(?subnet, "send join"); - // Get current committee info for this subnet - let committees_info = { - let state = db.borrow(); - get_committee_info_for_subnet(subnet, &*state) - }; - - if tx - .send(SubnetEvent::Join(*subnet, committees_info)) - .await - .is_err() - { - warn!("Network no longer listening for subnets"); - return; - } + // For every subnet that was previously joined but is no longer in `current_subnets`, + // send a `Leave` event. + for subnet in previous_subnets.difference(¤t_subnets) { + debug!(?subnet, "send leave"); + if tx.send(SubnetEvent::Leave(*subnet)).await.is_err() { + warn!("Network no longer listening for subnets"); + return; } + } - // Check for updates in committee information for already-joined subnets - for subnet in current_subnets.intersection(&previous_subnets) { - let current_hash = current_committee_hashes.get(subnet).copied().unwrap_or(0); - let previous_hash = previous_committee_hashes.get(subnet).copied().unwrap_or(0); - - // If committee hash has changed, send a CommitteeUpdate event - if current_hash != previous_hash { - debug!( - ?subnet, - "send committee update - hash changed from {} to {}", - previous_hash, - current_hash - ); - // Get the current committee info for the event - let committees_info = { - let state = db.borrow(); - get_committee_info_for_subnet(subnet, &*state) - }; - - if tx - .send(SubnetEvent::CommitteeUpdate(*subnet, committees_info)) - .await - .is_err() - { - warn!("Network no longer listening for subnets"); - return; - } - } + // For every subnet that was not previously joined but is now in `current_subnets`, + // send a `Join` event. + for subnet in current_subnets.difference(previous_subnets) { + debug!(?subnet, "send join"); + // Get current committee info for this subnet + let committees_info = { + let state = db.borrow(); + get_committee_info_for_subnet(subnet, &*state) + }; + + if tx + .send(SubnetEvent::Join(*subnet, committees_info)) + .await + .is_err() + { + warn!("Network no longer listening for subnets"); + return; } + } - // Update `previous_subnets` and `previous_committee_hashes` to reflect the current snapshot - // for the next iteration. - previous_subnets = current_subnets; - previous_committee_hashes = current_committee_hashes; + // Update the previous_subnets for next iteration + *previous_subnets = current_subnets; +} - // Wait for the watch channel to signal a changed value before re-running the loop. - if db.changed().await.is_err() { - warn!("Database no longer provides updates"); +/// Handle epoch-based committee updates for all currently joined subnets +async fn handle_epoch_committee_update( + tx: &mpsc::Sender, + db: &mut watch::Receiver, + current_subnets: &HashSet, +) { + debug!( + subnet_count = current_subnets.len(), + "Recalculating topic scores for all subnets" + ); + + // Recalculate topic scores for all currently joined subnets + for &subnet in current_subnets { + let committees_info = { + let state = db.borrow(); + get_committee_info_for_subnet(&subnet, &*state) + }; + + if tx + .send(SubnetEvent::CommitteeUpdate(subnet, committees_info)) + .await + .is_err() + { + warn!("Network no longer listening for subnets"); return; } } @@ -229,27 +276,6 @@ pub fn get_committee_info_for_subnet( .collect() } -/// Compute a lightweight hash of committee information to detect changes efficiently -fn compute_committee_hash(committees: &[CommitteeInfo]) -> u64 { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - - // Hash the number of committees first - committees.len().hash(&mut hasher); - - // Hash each committee's essential data - for committee in committees { - // Hash committee members by converting to a sorted vector - let mut members: Vec<_> = committee.committee_members.iter().collect(); - members.sort_unstable(); - members.hash(&mut hasher); - - // Hash validator indices - committee.validator_indices.hash(&mut hasher); - } - - hasher.finish() -} - /// only useful for testing - introduce feature flag? pub fn test_tracker( executor: TaskExecutor, From 3521b193130c1907bed90555184dc2bb65ba455f Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 7 Jul 2025 19:56:32 +0200 Subject: [PATCH 08/16] pass the message rate in SubnetEvent --- anchor/client/src/lib.rs | 1 + anchor/network/src/network.rs | 60 +++++++----------- .../network/src/scoring/topic_score_config.rs | 38 ++++-------- anchor/subnet_service/src/lib.rs | 61 +++++++++++-------- 4 files changed, 71 insertions(+), 89 deletions(-) diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 364e70cd9..1eee7e413 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -459,6 +459,7 @@ impl Client { config.network.subscribe_all_subnets, &executor, slot_clock.clone(), + spec.clone(), ); let (outcome_tx, outcome_rx) = mpsc::channel::(9000); diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index e3d64af0f..6912b47cb 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -18,7 +18,7 @@ use libp2p::{ }; use lighthouse_network::{discovery::DiscoveredPeers, prometheus_client::registry::Registry}; use message_receiver::{MessageReceiver, Outcome}; -use ssv_types::{CommitteeInfo, domain_type::DomainType}; +use ssv_types::domain_type::DomainType; use subnet_service::{SUBNET_COUNT, SubnetEvent, SubnetId}; use task_executor::TaskExecutor; use thiserror::Error; @@ -37,7 +37,7 @@ use crate::{ network::NetworkError::SwarmConfig, peer_manager, peer_manager::{ConnectActions, PeerManager}, - scoring::topic_score_config::topic_score_params_for_subnet, + scoring::topic_score_config::topic_score_params_for_subnet_with_rate, transport::build_transport, }; @@ -275,35 +275,26 @@ impl Network { } } - /// Update topic score parameters for a newly joined subnet - fn update_topic_score_for_subnet( + /// Update topic score parameters for a subnet with pre-calculated message rate + fn update_topic_score_for_subnet_with_rate( &mut self, subnet: SubnetId, topic: IdentTopic, - committees: Vec, - chain_spec: Arc, + message_rate: f64, ) { - // Calculate validator count for this subnet from committees - let validator_count = committees - .iter() - .map(|committee| committee.validator_indices.len()) - .sum::(); - debug!( subnet = *subnet, topic = %topic, - committee_count = committees.len(), - validator_count = validator_count, - "Setting topic score parameters for newly joined subnet" + message_rate = message_rate, + "Setting topic score parameters with pre-calculated message rate" ); - // Generate topic-specific score parameters using the SSV reference implementation - let topic_score_params = topic_score_params_for_subnet::( + // Generate topic-specific score parameters using pre-calculated message rate + let topic_score_params = topic_score_params_for_subnet_with_rate::( subnet, - validator_count as u64, - SUBNET_COUNT as u64, - &committees, - &chain_spec, + SUBNET_COUNT, + message_rate, + &self.spec, ); // Apply the score parameters to the topic @@ -317,7 +308,8 @@ impl Network { debug!( subnet = *subnet, topic = %topic, - "Successfully updated topic score parameters" + message_rate = message_rate, + "Successfully updated topic score parameters with pre-calculated rate" ); } Err(e) => { @@ -325,7 +317,7 @@ impl Network { subnet = *subnet, topic = %topic, error = %e, - "Failed to set topic score params for newly joined subnet" + "Failed to set topic score params with pre-calculated rate" ); } } @@ -333,19 +325,14 @@ impl Network { fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { let (subnet, subscribed) = match event { - SubnetEvent::Join(subnet, committees) => { + SubnetEvent::Join(subnet, message_rate) => { let topic = subnet_to_topic(subnet); if let Err(err) = self.gossipsub().subscribe(&topic) { error!(?err, subnet = *subnet, "can't subscribe"); return; } - self.update_topic_score_for_subnet::( - subnet, - topic, - committees, - self.spec.clone(), - ); + self.update_topic_score_for_subnet_with_rate::(subnet, topic, message_rate); let actions = self.peer_manager().join_subnet(subnet); self.handle_connect_actions(actions); @@ -355,21 +342,16 @@ impl Network { self.gossipsub().unsubscribe(&subnet_to_topic(subnet)); (subnet, false) } - SubnetEvent::CommitteeUpdate(subnet, committees) => { + SubnetEvent::RateUpdate(subnet, message_rate) => { let topic = subnet_to_topic(subnet); debug!( subnet = *subnet, - committee_count = committees.len(), - "Updating topic scores for subnet due to committee changes" + message_rate = message_rate, + "Updating topic scores for subnet due to rate changes" ); - self.update_topic_score_for_subnet::( - subnet, - topic, - committees, - self.spec.clone(), - ); + self.update_topic_score_for_subnet_with_rate::(subnet, topic, message_rate); // No subscription change needed, just score update return; diff --git a/anchor/network/src/scoring/topic_score_config.rs b/anchor/network/src/scoring/topic_score_config.rs index 2017d73a2..6319fd606 100644 --- a/anchor/network/src/scoring/topic_score_config.rs +++ b/anchor/network/src/scoring/topic_score_config.rs @@ -6,8 +6,7 @@ use std::time::Duration; use gossipsub::TopicScoreParams; -use ssv_types::CommitteeInfo; -use subnet_service::{SubnetId, message_rate::calculate_message_rate_for_topic}; +use subnet_service::SubnetId; use tracing::{debug, warn}; use types::{ChainSpec, EthSpec}; @@ -41,8 +40,6 @@ const MAX_INVALID_MESSAGES_ALLOWED: usize = 20; /// Network-wide configuration options for topic scoring #[derive(Debug, Clone)] pub struct NetworkConfig { - /// Total number of active validators in the network - pub active_validators: u64, /// Number of subnets in the network pub subnets: usize, /// Duration of one epoch @@ -110,18 +107,16 @@ impl Default for TopicConfig { } impl TopicScoringOptions { - /// Create new options with the given network parameters - pub fn new( - active_validators: u64, + /// Create new options with the given network parameters and pre-calculated message rate + pub fn new_with_rate( subnets: usize, - committees: &[CommitteeInfo], + message_rate: f64, chain_spec: &ChainSpec, ) -> Self { let slot_duration = Duration::from_secs(chain_spec.seconds_per_slot); let one_epoch_duration = E::slots_per_epoch() as u32 * slot_duration; let network = NetworkConfig { - active_validators, subnets, one_epoch_duration, total_topics_weight: TOTAL_TOPICS_WEIGHT, @@ -132,7 +127,7 @@ impl TopicScoringOptions { topic_weight: network.total_topics_weight / subnets as f64, /* Set topic weight with * equal weights across * all subnets */ - expected_msg_rate: calculate_message_rate_for_topic::(committees, chain_spec), + expected_msg_rate: message_rate, ..Default::default() }; @@ -310,32 +305,24 @@ impl TopicScoringOptions { } } -/// Generate topic score parameters for a specific subnet -pub fn topic_score_params_for_subnet( +/// Generate topic score parameters for a specific subnet with pre-calculated message rate +pub fn topic_score_params_for_subnet_with_rate( subnet: SubnetId, - validator_count: u64, - subnet_count: u64, - committees: &[CommitteeInfo], + subnet_count: usize, + message_rate: f64, chain_spec: &ChainSpec, ) -> TopicScoreParams { - // Create options using committee-based calculation with the new message rate function - let opts = TopicScoringOptions::new::( - validator_count, - subnet_count as usize, - committees, - chain_spec, - ); + // Create options using pre-calculated message rate + let opts = TopicScoringOptions::new_with_rate::(subnet_count, message_rate, chain_spec); // Generate and return parameters match opts.to_topic_score_params() { Ok(params) => { debug!( subnet = *subnet, - validator_count = validator_count, - committee_count = committees.len(), expected_rate = opts.topic.expected_msg_rate, topic_weight = opts.topic.topic_weight, - "Generated topic score parameters for subnet" + "Generated topic score parameters for subnet with pre-calculated rate" ); params } @@ -345,7 +332,6 @@ pub fn topic_score_params_for_subnet( error = %e, "Failed to generate topic score parameters, using defaults" ); - // Return safe default parameters TopicScoreParams::default() } } diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index d149646e7..2691138d2 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, ops::Deref, time::Duration}; +use std::{collections::HashSet, ops::Deref, sync::Arc, time::Duration}; use alloy::primitives::ruint::aliases::U256; use database::{NetworkState, NonUniqueIndex, UniqueIndex}; @@ -11,7 +11,7 @@ use tokio::{ time::sleep, }; use tracing::{debug, error, warn}; -use types::EthSpec; +use types::{ChainSpec, EthSpec}; pub mod message_rate; @@ -53,10 +53,10 @@ impl Deref for SubnetId { } pub enum SubnetEvent { - Join(SubnetId, Vec), + Join(SubnetId, f64), // subnet_id and message_rate Leave(SubnetId), - /// Committee information has changed for an already-joined subnet - CommitteeUpdate(SubnetId, Vec), + /// Message rate has changed for an already-joined subnet + RateUpdate(SubnetId, f64), // subnet_id and new message_rate } pub fn start_subnet_service( @@ -65,22 +65,22 @@ pub fn start_subnet_service( subscribe_all_subnets: bool, executor: &TaskExecutor, slot_clock: impl SlotClock + 'static, + chain_spec: Arc, ) -> mpsc::Receiver { if !subscribe_all_subnets { // a channel capacity of 1 is fine - the subnet_service does not do anything else, it can // wait. let (tx, rx) = mpsc::channel(1); executor.spawn( - subnet_tracker::(tx, db, subnet_count, slot_clock), + subnet_service::(tx, db, subnet_count, slot_clock, chain_spec), "subnet_service", ); rx } else { let (tx, rx) = mpsc::channel(subnet_count); for subnet in (0..(subnet_count as u64)).map(SubnetId) { - // For the "all subnets" case, we don't have specific committee info, so pass an empty - // vec - if let Err(err) = tx.try_send(SubnetEvent::Join(subnet, Vec::new())) { + // For the "all subnets" case, we don't have specific committee info, so use 0.0 rate + if let Err(err) = tx.try_send(SubnetEvent::Join(subnet, 0.0)) { error!(?err, "Impossible error while subscribing to all subnets"); } } @@ -93,11 +93,12 @@ pub fn start_subnet_service( /// - Compares them to the previously-seen subnets. /// - Emits `Join` events for newly-added subnets and `Leave` events for removed subnets. /// - Recalculates topic scores for all subnets at epoch boundaries. -async fn subnet_tracker( +async fn subnet_service( tx: mpsc::Sender, mut db: watch::Receiver, subnet_count: usize, slot_clock: impl SlotClock, + chain_spec: Arc, ) { // `previous_subnets` tracks which subnets were joined in the last iteration. let mut previous_subnets = HashSet::new(); @@ -109,7 +110,7 @@ async fn subnet_tracker( tokio::select! { // Handle database changes for subnet join/leave _ = db.changed() => { - handle_subnet_changes(&tx, &mut db, &mut previous_subnets, subnet_count).await; + handle_subnet_changes::(&tx, &mut db, &mut previous_subnets, subnet_count, &chain_spec).await; } // Handle scheduled epoch boundaries @@ -118,9 +119,9 @@ async fn subnet_tracker( let current_epoch = current_slot.epoch(E::slots_per_epoch()); debug!( epoch = current_epoch.as_u64(), - "Epoch boundary reached - recalculating topic scores for all subnets" + "Epoch boundary reached - recalculating message rates for all subnets" ); - handle_epoch_committee_update(&tx, &mut db, &previous_subnets).await; + handle_epoch_committee_update::(&tx, &mut db, &previous_subnets, &chain_spec).await; // Schedule the next epoch boundary (one full epoch from now) let epoch_duration = slot_clock.slot_duration() * E::slots_per_epoch() as u32; @@ -162,11 +163,12 @@ fn calculate_seconds_to_next_epoch(slot_clock: &impl SlotClock) -> D } /// Handle subnet join/leave events when database changes -async fn handle_subnet_changes( +async fn handle_subnet_changes( tx: &mpsc::Sender, db: &mut watch::Receiver, previous_subnets: &mut HashSet, subnet_count: usize, + chain_spec: &ChainSpec, ) { // Build the `current_subnets` set by examining the clusters we own. let mut current_subnets = HashSet::new(); @@ -196,14 +198,14 @@ async fn handle_subnet_changes( // send a `Join` event. for subnet in current_subnets.difference(previous_subnets) { debug!(?subnet, "send join"); - // Get current committee info for this subnet - let committees_info = { + // Calculate current message rate for this subnet + let message_rate = { let state = db.borrow(); - get_committee_info_for_subnet(subnet, &*state) + calculate_message_rate_for_subnet::(subnet, &*state, chain_spec) }; if tx - .send(SubnetEvent::Join(*subnet, committees_info)) + .send(SubnetEvent::Join(*subnet, message_rate)) .await .is_err() { @@ -217,25 +219,26 @@ async fn handle_subnet_changes( } /// Handle epoch-based committee updates for all currently joined subnets -async fn handle_epoch_committee_update( +async fn handle_epoch_committee_update( tx: &mpsc::Sender, db: &mut watch::Receiver, current_subnets: &HashSet, + chain_spec: &ChainSpec, ) { debug!( subnet_count = current_subnets.len(), - "Recalculating topic scores for all subnets" + "Recalculating message rates for all subnets at epoch boundary" ); - // Recalculate topic scores for all currently joined subnets + // Recalculate message rates for all currently joined subnets for &subnet in current_subnets { - let committees_info = { + let message_rate = { let state = db.borrow(); - get_committee_info_for_subnet(&subnet, &*state) + calculate_message_rate_for_subnet::(&subnet, &*state, chain_spec) }; if tx - .send(SubnetEvent::CommitteeUpdate(subnet, committees_info)) + .send(SubnetEvent::RateUpdate(subnet, message_rate)) .await .is_err() { @@ -245,6 +248,16 @@ async fn handle_epoch_committee_update( } } +/// Calculate message rate for a specific subnet from the current network state +pub fn calculate_message_rate_for_subnet( + subnet: &SubnetId, + network_state: impl Deref, + chain_spec: &ChainSpec, +) -> f64 { + let committees_info = get_committee_info_for_subnet(subnet, network_state); + message_rate::calculate_message_rate_for_topic::(&committees_info, chain_spec) +} + /// Get committee info for a specific subnet from the current network state /// /// This function retrieves clusters for the subnet and converts them to CommitteeInfo From 3ecd1a7b7570eab99a4c2a40dcf540dc2ab1c04d Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 8 Jul 2025 00:12:36 +0200 Subject: [PATCH 09/16] fix msg rate when subscribed to all subnets --- anchor/subnet_service/src/lib.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index 2691138d2..ec4e2c514 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -78,10 +78,22 @@ pub fn start_subnet_service( rx } else { let (tx, rx) = mpsc::channel(subnet_count); - for subnet in (0..(subnet_count as u64)).map(SubnetId) { - // For the "all subnets" case, we don't have specific committee info, so use 0.0 rate - if let Err(err) = tx.try_send(SubnetEvent::Join(subnet, 0.0)) { - error!(?err, "Impossible error while subscribing to all subnets"); + { + let current_state = db.borrow(); + for subnet in (0..(subnet_count as u64)).map(SubnetId) { + let committees_info = get_committee_info_for_subnet(&subnet, &*current_state); + let message_rate = message_rate::calculate_message_rate_for_topic::( + &committees_info, + &chain_spec, + ); + + if let Err(err) = tx.try_send(SubnetEvent::Join(subnet, message_rate)) { + error!( + ?err, + subnet = *subnet, + "Failed to send subnet join event during initialization" + ); + } } } rx From 6643750488dab45c8e6ecc49b6bb589bb3987fae Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 8 Jul 2025 15:27:24 +0200 Subject: [PATCH 10/16] simplify calculate_duration_to_next_epoch --- anchor/subnet_service/src/lib.rs | 43 ++++++-------------------------- 1 file changed, 7 insertions(+), 36 deletions(-) diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index ec4e2c514..b5cd5b522 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -115,10 +115,10 @@ async fn subnet_service( // `previous_subnets` tracks which subnets were joined in the last iteration. let mut previous_subnets = HashSet::new(); - // Calculate duration until the first epoch boundary - let mut next_epoch_delay = calculate_seconds_to_next_epoch::(&slot_clock); - loop { + // Calculate duration until the next epoch boundary + let next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); + tokio::select! { // Handle database changes for subnet join/leave _ = db.changed() => { @@ -127,45 +127,16 @@ async fn subnet_service( // Handle scheduled epoch boundaries _ = sleep(next_epoch_delay) => { - if let Some(current_slot) = slot_clock.now() { - let current_epoch = current_slot.epoch(E::slots_per_epoch()); - debug!( - epoch = current_epoch.as_u64(), - "Epoch boundary reached - recalculating message rates for all subnets" - ); - handle_epoch_committee_update::(&tx, &mut db, &previous_subnets, &chain_spec).await; - - // Schedule the next epoch boundary (one full epoch from now) - let epoch_duration = slot_clock.slot_duration() * E::slots_per_epoch() as u32; - next_epoch_delay = epoch_duration; - } else { - // If we can't get current slot, recalculate the delay - warn!("Could not get current slot during epoch boundary, recalculating delay"); - next_epoch_delay = calculate_seconds_to_next_epoch::(&slot_clock); - } + handle_epoch_committee_update::(&tx, &mut db, &previous_subnets, &chain_spec).await; } } } } /// Calculate duration until the next epoch boundary -fn calculate_seconds_to_next_epoch(slot_clock: &impl SlotClock) -> Duration { - if let Some(current_slot) = slot_clock.now() { - let slot_duration = slot_clock.slot_duration(); - let slots_per_epoch = E::slots_per_epoch(); - - // Calculate the current position within the epoch - let current_slot_in_epoch = current_slot.as_u64() % slots_per_epoch; - let remaining_slots_in_epoch = if current_slot_in_epoch == 0 { - // We're at epoch boundary, next epoch is one full epoch away - slots_per_epoch - } else { - // Calculate slots remaining in current epoch - slots_per_epoch - current_slot_in_epoch - }; - - // Calculate time to next epoch boundary - slot_duration * remaining_slots_in_epoch as u32 +fn calculate_duration_to_next_epoch(slot_clock: &impl SlotClock) -> Duration { + if let Some(duration_to_next_epoch) = slot_clock.duration_to_next_epoch(E::slots_per_epoch()) { + duration_to_next_epoch } else { // Fallback: if we can't get current slot, use a conservative short interval let slot_duration = slot_clock.slot_duration(); From 58dda89b0a334fefd5a3e8c80ff49c1051540107 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 8 Jul 2025 16:17:30 +0200 Subject: [PATCH 11/16] fix next_epoch_delay calculation --- anchor/subnet_service/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index b5cd5b522..d2d65723d 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -114,11 +114,10 @@ async fn subnet_service( ) { // `previous_subnets` tracks which subnets were joined in the last iteration. let mut previous_subnets = HashSet::new(); + // Calculate duration until the first epoch boundary + let mut next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); loop { - // Calculate duration until the next epoch boundary - let next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); - tokio::select! { // Handle database changes for subnet join/leave _ = db.changed() => { @@ -128,6 +127,8 @@ async fn subnet_service( // Handle scheduled epoch boundaries _ = sleep(next_epoch_delay) => { handle_epoch_committee_update::(&tx, &mut db, &previous_subnets, &chain_spec).await; + // Recalculate the next epoch delay only after we've processed the epoch boundary + next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); } } } From ea59da05ab12fb2d6ecfc38c3719808e27444120 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 8 Jul 2025 17:11:08 +0200 Subject: [PATCH 12/16] update msg rate for --subscribe-all-subnets case --- anchor/subnet_service/src/lib.rs | 91 ++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index d2d65723d..0e8f89b31 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -67,37 +67,25 @@ pub fn start_subnet_service( slot_clock: impl SlotClock + 'static, chain_spec: Arc, ) -> mpsc::Receiver { - if !subscribe_all_subnets { - // a channel capacity of 1 is fine - the subnet_service does not do anything else, it can - // wait. - let (tx, rx) = mpsc::channel(1); - executor.spawn( - subnet_service::(tx, db, subnet_count, slot_clock, chain_spec), - "subnet_service", - ); - rx + let (tx, rx) = mpsc::channel(if subscribe_all_subnets { + subnet_count } else { - let (tx, rx) = mpsc::channel(subnet_count); - { - let current_state = db.borrow(); - for subnet in (0..(subnet_count as u64)).map(SubnetId) { - let committees_info = get_committee_info_for_subnet(&subnet, &*current_state); - let message_rate = message_rate::calculate_message_rate_for_topic::( - &committees_info, - &chain_spec, - ); + 1 + }); - if let Err(err) = tx.try_send(SubnetEvent::Join(subnet, message_rate)) { - error!( - ?err, - subnet = *subnet, - "Failed to send subnet join event during initialization" - ); - } - } - } - rx - } + executor.spawn( + subnet_service::( + tx, + db, + subnet_count, + subscribe_all_subnets, + slot_clock, + chain_spec, + ), + "subnet_service", + ); + + rx } /// The main background task: @@ -109,22 +97,59 @@ async fn subnet_service( tx: mpsc::Sender, mut db: watch::Receiver, subnet_count: usize, + subscribe_all_subnets: bool, slot_clock: impl SlotClock, chain_spec: Arc, ) { + // If subscribe_all_subnets is true, initialize by joining all subnets + if subscribe_all_subnets { + let initial_events: Vec<_> = { + let current_state = db.borrow(); + (0..(subnet_count as u64)) + .map(SubnetId) + .map(|subnet| { + let committees_info = get_committee_info_for_subnet(&subnet, &*current_state); + let message_rate = message_rate::calculate_message_rate_for_topic::( + &committees_info, + &chain_spec, + ); + (subnet, message_rate) + }) + .collect() + }; + + for (subnet, message_rate) in initial_events { + if let Err(err) = tx.send(SubnetEvent::Join(subnet, message_rate)).await { + error!( + ?err, + subnet = *subnet, + "Failed to send subnet join event during initialization" + ); + return; // If we can't send, the receiver is dropped, so exit + } + } + } + // `previous_subnets` tracks which subnets were joined in the last iteration. - let mut previous_subnets = HashSet::new(); + // For subscribe_all_subnets, we track all subnets; otherwise, only the ones we're subscribed + // to. + let mut previous_subnets = if subscribe_all_subnets { + (0..(subnet_count as u64)).map(SubnetId).collect() + } else { + HashSet::new() + }; + // Calculate duration until the first epoch boundary let mut next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); loop { tokio::select! { - // Handle database changes for subnet join/leave - _ = db.changed() => { + // Handle database changes for subnet join/leave (only if not subscribe_all_subnets) + _ = db.changed(), if !subscribe_all_subnets => { handle_subnet_changes::(&tx, &mut db, &mut previous_subnets, subnet_count, &chain_spec).await; } - // Handle scheduled epoch boundaries + // Handle scheduled epoch boundaries (for both modes) _ = sleep(next_epoch_delay) => { handle_epoch_committee_update::(&tx, &mut db, &previous_subnets, &chain_spec).await; // Recalculate the next epoch delay only after we've processed the epoch boundary From b2a7638a46895cc9555acbe997f4a21c6be08514 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 8 Jul 2025 19:13:43 +0200 Subject: [PATCH 13/16] add an option to disable topic scoring --- anchor/client/src/cli.rs | 4 +- anchor/client/src/config.rs | 2 +- anchor/client/src/lib.rs | 1 + anchor/network/src/behaviour.rs | 2 +- anchor/network/src/config.rs | 10 ++-- anchor/network/src/network.rs | 12 ++++- anchor/subnet_service/src/lib.rs | 85 +++++++++++++++++++++----------- 7 files changed, 77 insertions(+), 39 deletions(-) diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index 17ecec499..2d01f1b34 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -528,11 +528,11 @@ pub struct Node { #[clap( long, - help = "Disables peer scoring altogether.", + help = "Disables gossipsub peer scoring.", display_order = 0, help_heading = FLAG_HEADER )] - pub disable_peer_scoring: bool, + pub disable_gossipsub_peer_scoring: bool, #[clap(flatten)] pub logging_flags: LoggingFlags, diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index a00881b99..bec60db30 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -207,7 +207,7 @@ pub fn from_cli(cli_args: &Node) -> Result { config.network.subscribe_all_subnets = cli_args.subscribe_all_subnets; // Network related - set peer scoring configuration - config.network.disable_peer_scoring = cli_args.disable_peer_scoring; + config.network.disable_gossipsub_peer_scoring = cli_args.disable_gossipsub_peer_scoring; config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone(); config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone(); diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 1eee7e413..2edd0add2 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -457,6 +457,7 @@ impl Client { database.watch(), SUBNET_COUNT, config.network.subscribe_all_subnets, + config.network.disable_gossipsub_topic_scoring, &executor, slot_clock.clone(), spec.clone(), diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 9251e5acc..32b61b63d 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -102,7 +102,7 @@ impl AnchorBehaviour { .map_err(|e| Gossipsub(e.to_string()))?; // Add peer scoring if not disabled - if !network_config.disable_peer_scoring { + if !network_config.disable_gossipsub_peer_scoring { let slots_per_epoch = E::slots_per_epoch(); let slot_duration = Duration::from_secs(spec.seconds_per_slot); let one_epoch_duration = slot_duration * slots_per_epoch as u32; diff --git a/anchor/network/src/config.rs b/anchor/network/src/config.rs index 54b88f6c6..bc8088d22 100644 --- a/anchor/network/src/config.rs +++ b/anchor/network/src/config.rs @@ -54,8 +54,11 @@ pub struct Config { /// List of nodes to initially connect to, on Multiaddr format. pub boot_nodes_multiaddr: Vec, - /// Disables peer scoring altogether. - pub disable_peer_scoring: bool, + /// Disables gossipsub peer scoring altogether. + pub disable_gossipsub_peer_scoring: bool, + + /// Disables gossipsub topic scoring and message rate calculations. + pub disable_gossipsub_topic_scoring: bool, /// Disables the discovery protocol from starting. pub disable_discovery: bool, @@ -103,7 +106,8 @@ impl Default for Config { target_peers: 50, boot_nodes_enr: vec![], boot_nodes_multiaddr: vec![], - disable_peer_scoring: false, + disable_gossipsub_peer_scoring: false, + disable_gossipsub_topic_scoring: true, disable_discovery: false, disable_quic_support: false, subscribe_all_subnets: false, diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 6912b47cb..61ea831ad 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -325,14 +325,22 @@ impl Network { fn on_subnet_tracker_event(&mut self, event: SubnetEvent) { let (subnet, subscribed) = match event { - SubnetEvent::Join(subnet, message_rate) => { + SubnetEvent::Join(subnet, message_rate_opt) => { let topic = subnet_to_topic(subnet); if let Err(err) = self.gossipsub().subscribe(&topic) { error!(?err, subnet = *subnet, "can't subscribe"); return; } - self.update_topic_score_for_subnet_with_rate::(subnet, topic, message_rate); + // Only set topic score parameters if message rate is provided (scoring enabled) + if let Some(message_rate) = message_rate_opt { + self.update_topic_score_for_subnet_with_rate::(subnet, topic, message_rate); + } else { + debug!( + subnet = *subnet, + "Skipping topic score parameter setup - gossipsub scoring disabled" + ); + } let actions = self.peer_manager().join_subnet(subnet); self.handle_connect_actions(actions); diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index 0e8f89b31..ba237e8c8 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -53,16 +53,18 @@ impl Deref for SubnetId { } pub enum SubnetEvent { - Join(SubnetId, f64), // subnet_id and message_rate + Join(SubnetId, Option), // subnet_id and optional message_rate Leave(SubnetId), /// Message rate has changed for an already-joined subnet - RateUpdate(SubnetId, f64), // subnet_id and new message_rate + RateUpdate(SubnetId, f64), /* subnet_id and new message_rate (only emitted when scoring is + * enabled) */ } pub fn start_subnet_service( db: watch::Receiver, subnet_count: usize, subscribe_all_subnets: bool, + disable_gossipsub_topic_scoring: bool, executor: &TaskExecutor, slot_clock: impl SlotClock + 'static, chain_spec: Arc, @@ -79,6 +81,7 @@ pub fn start_subnet_service( db, subnet_count, subscribe_all_subnets, + disable_gossipsub_topic_scoring, slot_clock, chain_spec, ), @@ -98,34 +101,51 @@ async fn subnet_service( mut db: watch::Receiver, subnet_count: usize, subscribe_all_subnets: bool, + disable_gossipsub_topic_scoring: bool, slot_clock: impl SlotClock, chain_spec: Arc, ) { // If subscribe_all_subnets is true, initialize by joining all subnets if subscribe_all_subnets { - let initial_events: Vec<_> = { - let current_state = db.borrow(); - (0..(subnet_count as u64)) - .map(SubnetId) - .map(|subnet| { - let committees_info = get_committee_info_for_subnet(&subnet, &*current_state); - let message_rate = message_rate::calculate_message_rate_for_topic::( - &committees_info, - &chain_spec, + if disable_gossipsub_topic_scoring { + // When scoring is disabled, just send Join events without message rates + for subnet in (0..(subnet_count as u64)).map(SubnetId) { + if let Err(err) = tx.send(SubnetEvent::Join(subnet, None)).await { + error!( + ?err, + subnet = *subnet, + "Failed to send subnet join event during initialization" ); - (subnet, message_rate) - }) - .collect() - }; - - for (subnet, message_rate) in initial_events { - if let Err(err) = tx.send(SubnetEvent::Join(subnet, message_rate)).await { - error!( - ?err, - subnet = *subnet, - "Failed to send subnet join event during initialization" - ); - return; // If we can't send, the receiver is dropped, so exit + return; // If we can't send, the receiver is dropped, so exit + } + } + } else { + // When scoring is enabled, calculate message rates + let initial_events: Vec<_> = { + let current_state = db.borrow(); + (0..(subnet_count as u64)) + .map(SubnetId) + .map(|subnet| { + let committees_info = + get_committee_info_for_subnet(&subnet, &*current_state); + let message_rate = message_rate::calculate_message_rate_for_topic::( + &committees_info, + &chain_spec, + ); + (subnet, Some(message_rate)) + }) + .collect() + }; + + for (subnet, message_rate) in initial_events { + if let Err(err) = tx.send(SubnetEvent::Join(subnet, message_rate)).await { + error!( + ?err, + subnet = *subnet, + "Failed to send subnet join event during initialization" + ); + return; // If we can't send, the receiver is dropped, so exit + } } } } @@ -146,11 +166,11 @@ async fn subnet_service( tokio::select! { // Handle database changes for subnet join/leave (only if not subscribe_all_subnets) _ = db.changed(), if !subscribe_all_subnets => { - handle_subnet_changes::(&tx, &mut db, &mut previous_subnets, subnet_count, &chain_spec).await; + handle_subnet_changes::(&tx, &mut db, &mut previous_subnets, subnet_count, &chain_spec, disable_gossipsub_topic_scoring).await; } - // Handle scheduled epoch boundaries (for both modes) - _ = sleep(next_epoch_delay) => { + // Handle scheduled epoch boundaries (for both modes, but only if scoring is enabled) + _ = sleep(next_epoch_delay), if !disable_gossipsub_topic_scoring => { handle_epoch_committee_update::(&tx, &mut db, &previous_subnets, &chain_spec).await; // Recalculate the next epoch delay only after we've processed the epoch boundary next_epoch_delay = calculate_duration_to_next_epoch::(&slot_clock); @@ -178,6 +198,7 @@ async fn handle_subnet_changes( previous_subnets: &mut HashSet, subnet_count: usize, chain_spec: &ChainSpec, + disable_gossipsub_topic_scoring: bool, ) { // Build the `current_subnets` set by examining the clusters we own. let mut current_subnets = HashSet::new(); @@ -207,10 +228,14 @@ async fn handle_subnet_changes( // send a `Join` event. for subnet in current_subnets.difference(previous_subnets) { debug!(?subnet, "send join"); - // Calculate current message rate for this subnet - let message_rate = { + // Calculate current message rate for this subnet (or None if scoring is disabled) + let message_rate = if disable_gossipsub_topic_scoring { + None + } else { let state = db.borrow(); - calculate_message_rate_for_subnet::(subnet, &*state, chain_spec) + Some(calculate_message_rate_for_subnet::( + subnet, &*state, chain_spec, + )) }; if tx From f343fbe011e18c8540896dec263c7855df8c634b Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 8 Jul 2025 19:47:42 +0200 Subject: [PATCH 14/16] simplify message rate calculation --- anchor/subnet_service/src/lib.rs | 59 +++++++++++++------------------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index ba237e8c8..1cf0ea99b 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -107,45 +107,34 @@ async fn subnet_service( ) { // If subscribe_all_subnets is true, initialize by joining all subnets if subscribe_all_subnets { - if disable_gossipsub_topic_scoring { - // When scoring is disabled, just send Join events without message rates - for subnet in (0..(subnet_count as u64)).map(SubnetId) { - if let Err(err) = tx.send(SubnetEvent::Join(subnet, None)).await { - error!( - ?err, - subnet = *subnet, - "Failed to send subnet join event during initialization" - ); - return; // If we can't send, the receiver is dropped, so exit - } - } - } else { - // When scoring is enabled, calculate message rates - let initial_events: Vec<_> = { - let current_state = db.borrow(); - (0..(subnet_count as u64)) - .map(SubnetId) - .map(|subnet| { + let initial_events: Vec<_> = { + let current_state = db.borrow(); + (0..(subnet_count as u64)) + .map(SubnetId) + .map(|subnet| { + let message_rate = if disable_gossipsub_topic_scoring { + None + } else { let committees_info = get_committee_info_for_subnet(&subnet, &*current_state); - let message_rate = message_rate::calculate_message_rate_for_topic::( + Some(message_rate::calculate_message_rate_for_topic::( &committees_info, &chain_spec, - ); - (subnet, Some(message_rate)) - }) - .collect() - }; - - for (subnet, message_rate) in initial_events { - if let Err(err) = tx.send(SubnetEvent::Join(subnet, message_rate)).await { - error!( - ?err, - subnet = *subnet, - "Failed to send subnet join event during initialization" - ); - return; // If we can't send, the receiver is dropped, so exit - } + )) + }; + (subnet, message_rate) + }) + .collect() + }; + + for (subnet, message_rate) in initial_events { + if let Err(err) = tx.send(SubnetEvent::Join(subnet, message_rate)).await { + error!( + ?err, + subnet = *subnet, + "Failed to send subnet join event during initialization" + ); + return; // If we can't send, the receiver is dropped, so exit } } } From 9e6784213ea4bf364e6f3d40f747e5772e0559d5 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 9 Jul 2025 13:24:25 +0200 Subject: [PATCH 15/16] return from subnet_service when subscribe_all_subnets disable_gossipsub_topic_scoring are true --- anchor/subnet_service/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index 1cf0ea99b..cfa9d61dc 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -137,6 +137,12 @@ async fn subnet_service( return; // If we can't send, the receiver is dropped, so exit } } + + // If scoring is disabled, we've sent all Join events and there's nothing more to do + if disable_gossipsub_topic_scoring { + debug!("All subnets joined and scoring disabled - subnet service task complete"); + return; + } } // `previous_subnets` tracks which subnets were joined in the last iteration. From d193578e4920714541afb5c5c655ccb9bcbdfc2c Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 9 Jul 2025 14:58:10 +0200 Subject: [PATCH 16/16] add hidden param for disable_gossipsub_topic_scoring --- anchor/client/src/cli.rs | 9 +++++++++ anchor/client/src/config.rs | 1 + 2 files changed, 10 insertions(+) diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index ef1e44cdb..c92c6d535 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -564,6 +564,15 @@ pub struct Node { )] pub disable_gossipsub_peer_scoring: bool, + #[clap( + long, + help = "Disables gossipsub topic scoring.", + action = ArgAction::Set, + default_value = "true", + hide = true + )] + pub disable_gossipsub_topic_scoring: bool, + #[clap(flatten)] pub logging_flags: LoggingFlags, } diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index c8507fc7a..e84bae5e5 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -217,6 +217,7 @@ pub fn from_cli(cli_args: &Node) -> Result { // Network related - set peer scoring configuration config.network.disable_gossipsub_peer_scoring = cli_args.disable_gossipsub_peer_scoring; + config.network.disable_gossipsub_topic_scoring = cli_args.disable_gossipsub_topic_scoring; config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone(); config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone();