Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,17 @@ impl<E: EthSpec> Network<E> {
}

// Subscribe to core topics for the new fork
for kind in fork_core_topics::<E>(&new_fork, &self.fork_context.spec) {
for kind in fork_core_topics::<E>(
&new_fork,
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}

// TODO(das): unsubscribe from blob topics at the Fulu fork

// Register the new topics for metrics
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A collection of variables that are accessible outside of the network thread itself.
use super::TopicConfig;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV3};
use crate::types::{BackFillState, SyncState};
Expand Down Expand Up @@ -183,6 +184,14 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect::<Vec<_>>()
}

/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig {
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
sampling_subnets: &self.sampling_subnets,
}
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
};
47 changes: 40 additions & 7 deletions beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use gossipsub::{IdentTopic as Topic, TopicHash};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use strum::AsRefStr;
use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId, Unsigned};

Expand Down Expand Up @@ -41,8 +42,18 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
GossipKind::LightClientOptimisticUpdate,
];

#[derive(Debug)]
pub struct TopicConfig<'a> {
pub subscribe_all_data_column_subnets: bool,
pub sampling_subnets: &'a HashSet<DataColumnSubnetId>,
}

/// Returns the core topics associated with each fork that are new to the previous fork
pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
pub fn fork_core_topics<E: EthSpec>(
fork_name: &ForkName,
spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> {
match fork_name {
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
Expand All @@ -64,7 +75,21 @@ pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
}
electra_blob_topics
}
ForkName::Fulu => vec![],
ForkName::Fulu => {
let mut topics = vec![];
if topic_config.subscribe_all_data_column_subnets {
for column_subnet in 0..spec.data_column_sidecar_subnet_count {
topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new(
column_subnet,
)));
}
} else {
for column_subnet in topic_config.sampling_subnets {
topics.push(GossipKind::DataColumnSidecar(*column_subnet));
}
}
topics
}
}
}

Expand All @@ -84,10 +109,11 @@ pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = G
pub fn core_topics_to_subscribe<E: EthSpec>(
mut current_fork: ForkName,
spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> {
let mut topics = fork_core_topics::<E>(&current_fork, spec);
let mut topics = fork_core_topics::<E>(&current_fork, spec, topic_config);
while let Some(previous_fork) = current_fork.previous_fork() {
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec);
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec, topic_config);
topics.extend(previous_fork_topics);
current_fork = previous_fork;
}
Expand Down Expand Up @@ -475,16 +501,23 @@ mod tests {
type E = MainnetEthSpec;
let spec = E::default_spec();
let mut all_topics = Vec::new();
let mut electra_core_topics = fork_core_topics::<E>(&ForkName::Electra, &spec);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec);
let topic_config = TopicConfig {
subscribe_all_data_column_subnets: false,
sampling_subnets: &HashSet::from_iter([1, 2].map(DataColumnSubnetId::new)),
};
let mut fulu_core_topics = fork_core_topics::<E>(&ForkName::Fulu, &spec, &topic_config);
let mut electra_core_topics =
fork_core_topics::<E>(&ForkName::Electra, &spec, &topic_config);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec, &topic_config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add fulu_core_topics too to cover the new changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added fulu_core_topics

all_topics.append(&mut fulu_core_topics);
all_topics.append(&mut electra_core_topics);
all_topics.append(&mut deneb_core_topics);
all_topics.extend(CAPELLA_CORE_TOPICS);
all_topics.extend(ALTAIR_CORE_TOPICS);
all_topics.extend(BASE_CORE_TOPICS);

let latest_fork = *ForkName::list_all().last().unwrap();
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec);
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec, &topic_config);
// Need to check all the topics exist in an order independent manner
for topic in all_topics {
assert!(core_topics.contains(&topic));
Expand Down
44 changes: 4 additions & 40 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
};

mod tests;
Expand Down Expand Up @@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the data column subnets.
subscribe_all_data_column_subnets: bool,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete.
Expand Down Expand Up @@ -349,7 +347,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
Expand Down Expand Up @@ -717,6 +714,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
Expand Down Expand Up @@ -751,10 +749,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

if self.fork_context.spec.is_peer_das_scheduled() {
self.subscribe_to_peer_das_topics(&mut subscribed_topics);
}

// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
Expand Down Expand Up @@ -801,37 +795,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

/// Keeping these separate from core topics because it has custom logic:
/// 1. Data column subscription logic depends on subscription configuration.
/// 2. Data column topic subscriptions will be dynamic based on validator balances due to
/// validator custody.
///
/// TODO(das): The downside with not including it in core fork topic is - we subscribe to
/// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork.
/// If this is an issue we could potentially consider adding the logic to
/// `network.subscribe_new_fork_topics()`.
fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec<GossipTopic>) {
let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets {
&(0..self.fork_context.spec.data_column_sidecar_subnet_count)
.map(DataColumnSubnetId::new)
.collect()
} else {
&self.network_globals.sampling_subnets
};

for column_subnet in column_subnets_to_subscribe.iter() {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}

/// Handle a message sent to the network service.
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {
Expand Down Expand Up @@ -947,6 +910,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.as_topic_config(),
);
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
Expand Down