diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 354def79b03..8586fd9cd36 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -708,11 +708,17 @@ impl Network { } // Subscribe to core topics for the new fork - for kind in fork_core_topics::(&new_fork, &self.fork_context.spec) { + for kind in fork_core_topics::( + &new_fork, + &self.fork_context.spec, + &self.network_globals.as_topic_config(), + ) { let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest); self.subscribe(topic); } + // TODO(das): unsubscribe from blob topics at the Fulu fork + // Register the new topics for metrics let topics_to_keep_metrics_for = attestation_sync_committee_topics::() .map(|gossip_kind| { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index c9e84e2dd11..2800b75133b 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,4 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. +use super::TopicConfig; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; @@ -183,6 +184,14 @@ impl NetworkGlobals { .collect::>() } + /// Returns the TopicConfig to compute the set of Gossip topics for a given fork + pub fn as_topic_config(&self) -> TopicConfig { + TopicConfig { + subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, + sampling_subnets: &self.sampling_subnets, + } + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals( trusted_peers: Vec, diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index a1eedaef746..58ba7588b98 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -17,6 +17,6 @@ pub use subnet::{Subnet, SubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics, - subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS, - BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, + subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig, + ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, }; diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 2c79f934237..171dab09a35 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,5 +1,6 @@ use gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use strum::AsRefStr; use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned}; @@ -41,8 +42,18 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [ GossipKind::LightClientOptimisticUpdate, ]; +#[derive(Debug)] +pub struct TopicConfig<'a> { + pub subscribe_all_data_column_subnets: bool, + pub sampling_subnets: &'a HashSet, +} + /// Returns the core topics associated with each fork that are new to the previous fork -pub fn fork_core_topics(fork_name: &ForkName, spec: &ChainSpec) -> Vec { +pub fn fork_core_topics( + fork_name: &ForkName, + spec: &ChainSpec, + topic_config: &TopicConfig, +) -> Vec { match fork_name { ForkName::Base => BASE_CORE_TOPICS.to_vec(), ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(), @@ -64,7 +75,21 @@ pub fn fork_core_topics(fork_name: &ForkName, spec: &ChainSpec) -> V } electra_blob_topics } - ForkName::Fulu => vec![], + ForkName::Fulu => { + let mut topics = vec![]; + if topic_config.subscribe_all_data_column_subnets { + for column_subnet in 0..spec.data_column_sidecar_subnet_count { + topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new( + column_subnet, + ))); + } + } else { + for column_subnet in topic_config.sampling_subnets { + topics.push(GossipKind::DataColumnSidecar(*column_subnet)); + } + } + topics + } } } @@ -84,10 +109,11 @@ pub fn attestation_sync_committee_topics() -> impl Iterator( mut current_fork: ForkName, spec: &ChainSpec, + topic_config: &TopicConfig, ) -> Vec { - let mut topics = fork_core_topics::(¤t_fork, spec); + let mut topics = fork_core_topics::(¤t_fork, spec, topic_config); while let Some(previous_fork) = current_fork.previous_fork() { - let previous_fork_topics = fork_core_topics::(&previous_fork, spec); + let previous_fork_topics = fork_core_topics::(&previous_fork, spec, topic_config); topics.extend(previous_fork_topics); current_fork = previous_fork; } @@ -475,8 +501,15 @@ mod tests { type E = MainnetEthSpec; let spec = E::default_spec(); let mut all_topics = Vec::new(); - let mut electra_core_topics = fork_core_topics::(&ForkName::Electra, &spec); - let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec); + let topic_config = TopicConfig { + subscribe_all_data_column_subnets: false, + sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)), + }; + let mut fulu_core_topics = fork_core_topics::(&ForkName::Fulu, &spec, &topic_config); + let mut electra_core_topics = + fork_core_topics::(&ForkName::Electra, &spec, &topic_config); + let mut deneb_core_topics = fork_core_topics::(&ForkName::Deneb, &spec, &topic_config); + all_topics.append(&mut fulu_core_topics); all_topics.append(&mut electra_core_topics); all_topics.append(&mut deneb_core_topics); all_topics.extend(CAPELLA_CORE_TOPICS); @@ -484,7 +517,7 @@ mod tests { all_topics.extend(BASE_CORE_TOPICS); let latest_fork = *ForkName::list_all().last().unwrap(); - let core_topics = core_topics_to_subscribe::(latest_fork, &spec); + let core_topics = core_topics_to_subscribe::(latest_fork, &spec, &topic_config); // Need to check all the topics exist in an order independent manner for topic in all_topics { assert!(core_topics.contains(&topic)); diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 49f73bf9c8d..1b2a681c644 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -33,8 +33,8 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ - ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, - SyncSubnetId, Unsigned, ValidatorSubscription, + ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + Unsigned, ValidatorSubscription, }; mod tests; @@ -181,8 +181,6 @@ pub struct NetworkService { next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, - /// Subscribe to all the data column subnets. - subscribe_all_data_column_subnets: bool, /// Subscribe to all the subnets once synced. subscribe_all_subnets: bool, /// Shutdown beacon node after sync is complete. @@ -349,7 +347,6 @@ impl NetworkService { next_fork_update, next_fork_subscriptions, next_unsubscribe, - subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets, subscribe_all_subnets: config.subscribe_all_subnets, shutdown_after_sync: config.shutdown_after_sync, metrics_enabled: config.metrics_enabled, @@ -717,6 +714,7 @@ impl NetworkService { for topic_kind in core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, + &self.network_globals.as_topic_config(), ) { for fork_digest in self.required_gossip_fork_digests() { let topic = GossipTopic::new( @@ -751,10 +749,6 @@ impl NetworkService { } } - if self.fork_context.spec.is_peer_das_scheduled() { - self.subscribe_to_peer_das_topics(&mut subscribed_topics); - } - // If we are to subscribe to all subnets we do it here if self.subscribe_all_subnets { for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { @@ -801,37 +795,6 @@ impl NetworkService { } } - /// Keeping these separate from core topics because it has custom logic: - /// 1. Data column subscription logic depends on subscription configuration. - /// 2. Data column topic subscriptions will be dynamic based on validator balances due to - /// validator custody. - /// - /// TODO(das): The downside with not including it in core fork topic is - we subscribe to - /// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork. - /// If this is an issue we could potentially consider adding the logic to - /// `network.subscribe_new_fork_topics()`. - fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec) { - let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets { - &(0..self.fork_context.spec.data_column_sidecar_subnet_count) - .map(DataColumnSubnetId::new) - .collect() - } else { - &self.network_globals.sampling_subnets - }; - - for column_subnet in column_subnets_to_subscribe.iter() { - for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = Subnet::DataColumn(*column_subnet).into(); - let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } - } - /// Handle a message sent to the network service. async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) { match msg { @@ -947,6 +910,7 @@ impl NetworkService { let core_topics = core_topics_to_subscribe::( self.fork_context.current_fork(), &self.fork_context.spec, + &self.network_globals.as_topic_config(), ); let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics); let subscriptions = self.network_globals.gossipsub_subscriptions.read();