From 2a7f0f8dafafee4fd08d73fe32d2cab0fe1f994e Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 23 Jan 2025 22:20:21 +0800 Subject: [PATCH 1/3] Subscribe to peerdas topics on Fulu fork --- .../lighthouse_network/src/service/mod.rs | 8 +++- .../lighthouse_network/src/types/globals.rs | 9 ++++ .../lighthouse_network/src/types/mod.rs | 4 +- .../lighthouse_network/src/types/topics.rs | 43 +++++++++++++++--- beacon_node/network/src/service.rs | 44 ++----------------- 5 files changed, 58 insertions(+), 50 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 354def79b03..7268dca26f8 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -708,11 +708,17 @@ impl Network { } // Subscribe to core topics for the new fork - for kind in fork_core_topics::(&new_fork, &self.fork_context.spec) { + for kind in fork_core_topics::( + &new_fork, + &self.fork_context.spec, + &self.network_globals.topic_config(), + ) { let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); self.subscribe(topic); } + // TODO(das): unsubscribe from blob topics at the Fulu fork + // Register the new topics for metrics let topics_to_keep_metrics_for = attestation_sync_committee_topics::() .map(|gossip_kind| { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index c9e84e2dd11..d2db8a2b856 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,4 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. +use super::TopicConfig; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; @@ -183,6 +184,14 @@ impl NetworkGlobals { .collect::>() } + /// Returns the TopicConfig to compute the set of Gossip topics for a given fork + pub fn topic_config(&self) -> TopicConfig { + TopicConfig { + subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, + sampling_subnets: self.sampling_subnets.iter().copied().collect(), + } + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals( trusted_peers: Vec, diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index a1eedaef746..58ba7588b98 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -17,6 +17,6 @@ pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics, - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS, - BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, + subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig, + ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, }; diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 2c79f934237..62cac2ae34d 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -41,8 +41,17 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [ GossipKind::LightClientOptimisticUpdate, ]; +pub struct TopicConfig { + pub subscribe_all_data_column_subnets: bool, + pub sampling_subnets: Vec, +} + /// Returns the core topics associated with each fork that are new to the previous fork -pub fn fork_core_topics(fork_name: &ForkName, spec: &ChainSpec) -> Vec { +pub fn fork_core_topics( + fork_name: &ForkName, + spec: &ChainSpec, + topic_config: &TopicConfig, +) -> Vec { match fork_name { ForkName::Base => BASE_CORE_TOPICS.to_vec(), ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(), @@ -64,7 +73,21 @@ pub fn fork_core_topics(fork_name: &ForkName, spec: &ChainSpec) -> V } electra_blob_topics } - ForkName::Fulu => vec![], + ForkName::Fulu => { + let mut topics = vec![]; + if topic_config.subscribe_all_data_column_subnets { + for column_subnet in 0..spec.data_column_sidecar_subnet_count { + topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new( + column_subnet, + ))); + } + } else { + for column_subnet in &topic_config.sampling_subnets { + topics.push(GossipKind::DataColumnSidecar(*column_subnet)); + } + } + topics + } } } @@ -84,10 +107,11 @@ pub fn attestation_sync_committee_topics() -> impl Iterator( mut current_fork: ForkName, spec: &ChainSpec, + topic_config: &TopicConfig, ) -> Vec { - let mut topics = fork_core_topics::(¤t_fork, spec); + let mut topics = fork_core_topics::(¤t_fork, spec, topic_config); while let Some(previous_fork) = current_fork.previous_fork() { - let previous_fork_topics = fork_core_topics::(&previous_fork, spec); + let previous_fork_topics = fork_core_topics::(&previous_fork, spec, topic_config); topics.extend(previous_fork_topics); current_fork = previous_fork; } @@ -475,8 +499,13 @@ mod tests { type E = MainnetEthSpec; let spec = E::default_spec(); let mut all_topics = Vec::new(); - let mut electra_core_topics = fork_core_topics::(&ForkName::Electra, &spec); - let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec); + let topic_config = TopicConfig { + subscribe_all_data_column_subnets: false, + sampling_subnets: vec![], + }; + let mut electra_core_topics = + fork_core_topics::(&ForkName::Electra, &spec, &topic_config); + let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec, &topic_config); all_topics.append(&mut electra_core_topics); all_topics.append(&mut deneb_core_topics); all_topics.extend(CAPELLA_CORE_TOPICS); @@ -484,7 +513,7 @@ mod tests { all_topics.extend(BASE_CORE_TOPICS); let latest_fork = *ForkName::list_all().last().unwrap(); - let core_topics = core_topics_to_subscribe::(latest_fork, &spec); + let core_topics = core_topics_to_subscribe::(latest_fork, &spec, &topic_config); // Need to check all the topics exist in an order independent manner for topic in all_topics { assert!(core_topics.contains(&topic)); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 49f73bf9c8d..cad0c7495b2 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -33,8 +33,8 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ - ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, - SyncSubnetId, Unsigned, ValidatorSubscription, + ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + Unsigned, ValidatorSubscription, }; mod tests; @@ -181,8 +181,6 @@ pub struct NetworkService { next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, - /// Subscribe to all the data column subnets. - subscribe_all_data_column_subnets: bool, /// Subscribe to all the subnets once synced. subscribe_all_subnets: bool, /// Shutdown beacon node after sync is complete. @@ -349,7 +347,6 @@ impl NetworkService { next_fork_update, next_fork_subscriptions, next_unsubscribe, - subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets, subscribe_all_subnets: config.subscribe_all_subnets, shutdown_after_sync: config.shutdown_after_sync, metrics_enabled: config.metrics_enabled, @@ -717,6 +714,7 @@ impl NetworkService { for topic_kind in core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, + &self.network_globals.topic_config(), ) { for fork_digest in self.required_gossip_fork_digests() { let topic = GossipTopic::new( @@ -751,10 +749,6 @@ impl NetworkService { } } - if self.fork_context.spec.is_peer_das_scheduled() { - self.subscribe_to_peer_das_topics(&mut subscribed_topics); - } - // If we are to subscribe to all subnets we do it here if self.subscribe_all_subnets { for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { @@ -801,37 +795,6 @@ impl NetworkService { } } - /// Keeping these separate from core topics because it has custom logic: - /// 1. Data column subscription logic depends on subscription configuration. - /// 2. Data column topic subscriptions will be dynamic based on validator balances due to - /// validator custody. - /// - /// TODO(das): The downside with not including it in core fork topic is - we subscribe to - /// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork. - /// If this is an issue we could potentially consider adding the logic to - /// `network.subscribe_new_fork_topics()`. - fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec) { - let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets { - &(0..self.fork_context.spec.data_column_sidecar_subnet_count) - .map(DataColumnSubnetId::new) - .collect() - } else { - &self.network_globals.sampling_subnets - }; - - for column_subnet in column_subnets_to_subscribe.iter() { - for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = Subnet::DataColumn(*column_subnet).into(); - let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } - } - /// Handle a message sent to the network service. async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) { match msg { @@ -947,6 +910,7 @@ impl NetworkService { let core_topics = core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, + &self.network_globals.topic_config(), ); let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics); let subscriptions = self.network_globals.gossipsub_subscriptions.read(); From 521cdd5dd132d9d2441d2bb7c5626e5cc3ac6ed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 27 Jan 2025 19:26:29 +0000 Subject: [PATCH 2/3] avoid double iteration by using lifetimes --- beacon_node/lighthouse_network/src/service/mod.rs | 2 +- beacon_node/lighthouse_network/src/types/globals.rs | 4 ++-- beacon_node/lighthouse_network/src/types/topics.rs | 11 +++++++---- beacon_node/network/src/service.rs | 4 ++-- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 7268dca26f8..8586fd9cd36 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -711,7 +711,7 @@ impl Network { for kind in fork_core_topics::( &new_fork, &self.fork_context.spec, - &self.network_globals.topic_config(), + &self.network_globals.as_topic_config(), ) { let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); self.subscribe(topic); diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index d2db8a2b856..2800b75133b 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -185,10 +185,10 @@ impl NetworkGlobals { } /// Returns the TopicConfig to compute the set of Gossip topics for a given fork - pub fn topic_config(&self) -> TopicConfig { + pub fn as_topic_config(&self) -> TopicConfig { TopicConfig { subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, - sampling_subnets: self.sampling_subnets.iter().copied().collect(), + sampling_subnets: &self.sampling_subnets, } } diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 62cac2ae34d..454eea246a7 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,5 +1,6 @@ use gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use strum::AsRefStr; use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; @@ -41,9 +42,10 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [ GossipKind::LightClientOptimisticUpdate, ]; -pub struct TopicConfig { +#[derive(Debug)] +pub struct TopicConfig<'a> { pub subscribe_all_data_column_subnets: bool, - pub sampling_subnets: Vec, + pub sampling_subnets: &'a HashSet, } /// Returns the core topics associated with each fork that are new to the previous fork @@ -82,7 +84,7 @@ pub fn fork_core_topics( ))); } } else { - for column_subnet in &topic_config.sampling_subnets { + for column_subnet in topic_config.sampling_subnets { topics.push(GossipKind::DataColumnSidecar(*column_subnet)); } } @@ -499,9 +501,10 @@ mod tests { type E = MainnetEthSpec; let spec = E::default_spec(); let mut all_topics = Vec::new(); + let sampling_subnets = HashSet::new(); let topic_config = TopicConfig { subscribe_all_data_column_subnets: false, - sampling_subnets: vec![], + sampling_subnets: &sampling_subnets, }; let mut electra_core_topics = fork_core_topics::(&ForkName::Electra, &spec, &topic_config); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index cad0c7495b2..1b2a681c644 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -714,7 +714,7 @@ impl NetworkService { for topic_kind in core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, - &self.network_globals.topic_config(), + &self.network_globals.as_topic_config(), ) { for fork_digest in self.required_gossip_fork_digests() { let topic = GossipTopic::new( @@ -910,7 +910,7 @@ impl NetworkService { let core_topics = core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, - &self.network_globals.topic_config(), + &self.network_globals.as_topic_config(), ); let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics); let subscriptions = self.network_globals.gossipsub_subscriptions.read(); From 43ae790dfd473af10629dd1a1905c52f241e911f Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 31 Jan 2025 17:33:11 -0300 Subject: [PATCH 3/3] Add fulu test --- beacon_node/lighthouse_network/src/types/topics.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 454eea246a7..171dab09a35 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -501,14 +501,15 @@ mod tests { type E = MainnetEthSpec; let spec = E::default_spec(); let mut all_topics = Vec::new(); - let sampling_subnets = HashSet::new(); let topic_config = TopicConfig { subscribe_all_data_column_subnets: false, - sampling_subnets: &sampling_subnets, + sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)), }; + let mut fulu_core_topics = fork_core_topics::(&ForkName::Fulu, &spec, &topic_config); let mut electra_core_topics = fork_core_topics::(&ForkName::Electra, &spec, &topic_config); let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec, &topic_config); + all_topics.append(&mut fulu_core_topics); all_topics.append(&mut electra_core_topics); all_topics.append(&mut deneb_core_topics); all_topics.extend(CAPELLA_CORE_TOPICS);