diff --git a/Cargo.lock b/Cargo.lock index 2fea87b2d..a741b69fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3195,6 +3195,7 @@ name = "fork" version = "0.1.0" dependencies = [ "async-broadcast", + "parking_lot", "serde", "slot_clock", "ssv_types", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 07d88aa88..b68209129 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -386,6 +386,12 @@ impl Client { // Create fork phase channel for fork transition events let (fork_phase_tx, fork_phase_rx) = async_broadcast::broadcast(16); + // Create shared fork lifecycle state for cross-component fork awareness + let fork_lifecycle = fork::SharedForkLifecycle::new(fork::ForkLifecycle::Normal { + current: initial_fork_config.fork, + domain_type: initial_fork_config.domain_type, + }); + // Start fork monitor to log fork transitions and send ForkPhase events fork::monitor::spawn( fork_schedule.clone(), @@ -394,6 +400,7 @@ impl Client { spec.seconds_per_slot, executor.clone(), fork_phase_tx, + fork_lifecycle.clone(), ); // Start validator index syncer @@ -560,6 +567,7 @@ impl Client { executor.clone(), spec.clone(), fork_phase_rx, + fork_lifecycle, ) .await .map_err(|e| format!("Unable to start network: {e}"))?; diff --git a/anchor/common/fork/Cargo.toml b/anchor/common/fork/Cargo.toml index 2238a2d58..2183fcd63 100644 --- a/anchor/common/fork/Cargo.toml +++ b/anchor/common/fork/Cargo.toml @@ -6,6 +6,7 @@ edition = { workspace = true } [dependencies] async-broadcast = { workspace = true } +parking_lot = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } diff --git a/anchor/common/fork/src/lib.rs b/anchor/common/fork/src/lib.rs index c10ed0e3a..b9d5db133 100644 --- a/anchor/common/fork/src/lib.rs +++ b/anchor/common/fork/src/lib.rs @@ -20,9 +20,11 @@ //! - **"What"**: Each subsystem queries the active fork to determine behavior mod fork; +mod lifecycle; pub mod monitor; mod schedule; pub use fork::{ALAN_TOPIC_PREFIX, Fork}; +pub use lifecycle::{ForkLifecycle, SharedForkLifecycle}; pub use monitor::{ForkPhase, ForkPhaseSender}; pub use schedule::{FORK_PREPARATION_EPOCHS, ForkConfig, ForkSchedule, SUBSEQUENT_WINDOW_SLOTS}; diff --git a/anchor/common/fork/src/lifecycle.rs b/anchor/common/fork/src/lifecycle.rs new file mode 100644 index 000000000..29ddf28bc --- /dev/null +++ b/anchor/common/fork/src/lifecycle.rs @@ -0,0 +1,142 @@ +//! Fork lifecycle state management. +//! +//! Provides [`ForkLifecycle`] and [`SharedForkLifecycle`] for tracking the current +//! fork transition state across all components. The [`ForkMonitor`](crate::monitor) +//! is the sole writer; all other components read via [`SharedForkLifecycle`]. + +use std::sync::Arc; + +use parking_lot::RwLock; +use ssv_types::domain_type::DomainType; + +use crate::Fork; + +/// Fork lifecycle state. Updated only by ForkMonitor. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ForkLifecycle { + /// Operating on a single fork. No transition in progress. + /// + /// Used in two scenarios: + /// - Pre-fork: only one fork exists (e.g., Alan at genesis). + /// - Post-grace-period: the fork transition is complete and only the current fork's context is + /// relevant. + Normal { + current: Fork, + domain_type: DomainType, + }, + + /// Preparing for an upcoming fork. Dual-subscribing to new topics. + /// + /// Both forks' contexts are relevant — peers subscribed to either + /// the current or upcoming fork are useful. + WarmUp { + current: Fork, + upcoming: Fork, + domain_type: DomainType, + }, + + /// Fork activated but grace period still active. Keeping old subscriptions + /// to catch late messages from the previous fork. + /// + /// Both forks' contexts are relevant — peers subscribed to either + /// the current or previous fork are still useful. + GracePeriod { + current: Fork, + previous: Fork, + domain_type: DomainType, + }, +} + +impl ForkLifecycle { + /// Returns the current active fork. + pub fn current_fork(&self) -> Fork { + match self { + Self::Normal { current, .. } + | Self::WarmUp { current, .. } + | Self::GracePeriod { current, .. } => *current, + } + } + + /// Returns the domain type for the current fork. + pub fn domain_type(&self) -> DomainType { + match self { + Self::Normal { domain_type, .. } + | Self::WarmUp { domain_type, .. } + | Self::GracePeriod { domain_type, .. } => *domain_type, + } + } +} + +/// Shared fork lifecycle state, readable by all components. +/// +/// Updated only by [`ForkMonitor`](crate::monitor). Uses [`parking_lot::RwLock`] +/// for interior mutability. Writes are brief and rare (only on fork transitions), +/// so contention is negligible. +#[derive(Clone, Debug)] +pub struct SharedForkLifecycle(Arc>); + +impl SharedForkLifecycle { + /// Create a new shared lifecycle with the given initial state. + pub fn new(lifecycle: ForkLifecycle) -> Self { + Self(Arc::new(RwLock::new(lifecycle))) + } + + /// Read the full lifecycle state. + pub fn get(&self) -> ForkLifecycle { + self.0.read().clone() + } + + /// Update the lifecycle state. Called only by ForkMonitor. + pub fn set(&self, lifecycle: ForkLifecycle) { + *self.0.write() = lifecycle; + } + + /// Convenience: get current domain type without cloning full enum. + pub fn domain_type(&self) -> DomainType { + self.0.read().domain_type() + } + + /// Convenience: get current fork without cloning full enum. + pub fn current_fork(&self) -> Fork { + self.0.read().current_fork() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const ALAN_DOMAIN: DomainType = DomainType([0, 0, 0, 1]); + const BOOLE_DOMAIN: DomainType = DomainType([0, 0, 0, 2]); + + #[test] + fn shared_fork_lifecycle_get_set_roundtrip() { + let shared = SharedForkLifecycle::new(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ALAN_DOMAIN, + }); + + assert_eq!( + shared.get(), + ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ALAN_DOMAIN, + } + ); + + shared.set(ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: BOOLE_DOMAIN, + }); + + assert_eq!( + shared.get(), + ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: BOOLE_DOMAIN, + } + ); + } +} diff --git a/anchor/common/fork/src/monitor.rs b/anchor/common/fork/src/monitor.rs index 8f73281a8..8cd946de4 100644 --- a/anchor/common/fork/src/monitor.rs +++ b/anchor/common/fork/src/monitor.rs @@ -22,7 +22,10 @@ use task_executor::TaskExecutor; use tracing::{debug, info, warn}; use types::{Epoch, Slot}; -use crate::{FORK_PREPARATION_EPOCHS, Fork, ForkConfig, ForkSchedule, SUBSEQUENT_WINDOW_SLOTS}; +use crate::{ + FORK_PREPARATION_EPOCHS, Fork, ForkConfig, ForkLifecycle, ForkSchedule, + SUBSEQUENT_WINDOW_SLOTS, SharedForkLifecycle, +}; /// Fork transition events sent to Network and other components. /// @@ -325,7 +328,6 @@ impl ForkMonitorState { } /// Returns the current fork being monitored. - #[cfg(test)] pub fn current_fork(&self) -> Fork { self.current_fork } @@ -420,6 +422,7 @@ pub async fn run( slots_per_epoch: u64, seconds_per_slot: u64, phase_sender: ForkPhaseSender, + fork_lifecycle: SharedForkLifecycle, ) -> MonitorResult { // Get initial state let Some(current_slot) = slot_clock.now() else { @@ -433,6 +436,10 @@ pub async fn run( return MonitorResult::Completed; } + if let Some(phase) = &initial_phase { + update_lifecycle_for_phase(&fork_lifecycle, phase, &fork_schedule, state.current_fork()); + } + if let Some(phase) = initial_phase { let _ = phase_sender.broadcast_direct(phase).await; } @@ -457,6 +464,14 @@ pub async fn run( }; let phases = state.check_slot(slot); + for phase in &phases { + update_lifecycle_for_phase( + &fork_lifecycle, + phase, + &fork_schedule, + state.current_fork(), + ); + } for phase in phases { let _ = phase_sender.broadcast_direct(phase).await; } @@ -471,6 +486,48 @@ pub async fn run( MonitorResult::Completed } +/// Update the shared fork lifecycle based on a fork phase event. +/// +/// Called right before broadcasting each phase event so readers see the new state +/// at the same time or before listeners process the event. +fn update_lifecycle_for_phase( + fork_lifecycle: &SharedForkLifecycle, + phase: &ForkPhase, + fork_schedule: &ForkSchedule, + current_fork: Fork, +) { + match phase { + ForkPhase::Preparing { upcoming } => { + // During preparation, current fork's domain type is still active + let Some(domain_type) = fork_schedule.domain_type(current_fork) else { + tracing::error!( + fork = %current_fork, + "Missing domain type for current fork during preparation" + ); + return; + }; + fork_lifecycle.set(ForkLifecycle::WarmUp { + current: current_fork, + upcoming: upcoming.fork, + domain_type, + }); + } + ForkPhase::Activated { current, previous } => { + fork_lifecycle.set(ForkLifecycle::GracePeriod { + current: current.fork, + previous: previous.fork, + domain_type: current.domain_type, + }); + } + ForkPhase::GracePeriodEnded { current, .. } => { + fork_lifecycle.set(ForkLifecycle::Normal { + current: current.fork, + domain_type: current.domain_type, + }); + } + } +} + /// Spawns a standalone task that monitors and logs fork transitions. /// /// The monitor will exit automatically when all scheduled forks have activated, @@ -485,6 +542,7 @@ pub fn spawn( seconds_per_slot: u64, executor: TaskExecutor, phase_sender: ForkPhaseSender, + fork_lifecycle: SharedForkLifecycle, ) { executor.spawn( async move { @@ -494,6 +552,7 @@ pub fn spawn( slots_per_epoch, seconds_per_slot, phase_sender, + fork_lifecycle, ) .await; @@ -588,6 +647,14 @@ mod tests { tx } + /// Create a test SharedForkLifecycle starting on Alan. + fn test_fork_lifecycle() -> SharedForkLifecycle { + SharedForkLifecycle::new(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: TEST_BASELINE_DOMAIN, + }) + } + /// Convert epoch to slot for testing. fn epoch_to_slot(epoch: u64) -> Slot { Slot::new(epoch * slots_per_epoch()) @@ -825,6 +892,7 @@ mod tests { let schedule = make_schedule_no_future_forks(); let clock = clock_at_epoch(CURRENT_EPOCH); let sender = test_phase_sender(); + let lifecycle = test_fork_lifecycle(); // Act let result = run( @@ -833,6 +901,7 @@ mod tests { slots_per_epoch(), seconds_per_slot(), sender, + lifecycle, ) .await; @@ -846,6 +915,7 @@ mod tests { let schedule = make_schedule_with_boole(BOOLE_FORK_EPOCH); let clock = clock_at_epoch(AFTER_FORK_EPOCH); let sender = test_phase_sender(); + let lifecycle = test_fork_lifecycle(); // Act let result = run( @@ -854,6 +924,7 @@ mod tests { slots_per_epoch(), seconds_per_slot(), sender, + lifecycle, ) .await; @@ -869,6 +940,7 @@ mod tests { let schedule = make_schedule_with_boole(ASYNC_BOOLE_FORK_EPOCH); let clock = clock_at_epoch(ASYNC_START_EPOCH); let (sender, mut receiver) = async_broadcast::broadcast(16); + let lifecycle = test_fork_lifecycle(); // Act: Spawn monitor and advance time through fork activation and grace period let monitor = tokio::spawn({ @@ -880,6 +952,7 @@ mod tests { slots_per_epoch(), seconds_per_slot(), sender, + lifecycle, ) .await } diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 3c4deeb18..4c9e91089 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,5 +1,6 @@ use std::{hash::Hasher, time::Duration}; +use fork::SharedForkLifecycle; use gossipsub::{ConfigBuilderError, MessageAuthenticity, ValidationMode}; use libp2p::{ identify, ping, @@ -13,7 +14,7 @@ use types::{ChainSpec, EthSpec}; use version::version_with_platform; use crate::{ - Config, SharedDomainType, + Config, behaviour::BehaviourError::Gossipsub, discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}, handshake, @@ -92,7 +93,7 @@ impl AnchorBehaviour { network_config: &Config, metrics_registry: &mut Registry, spec: &ChainSpec, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, ) -> Result { let identify = { let local_public_key = local_keypair.public(); @@ -156,8 +157,12 @@ impl AnchorBehaviour { let discovery = { // Build and start the discovery sub-behaviour - let mut discovery = - Discovery::new(local_keypair.clone(), network_config, domain_type.clone()).await?; + let mut discovery = Discovery::new( + local_keypair.clone(), + network_config, + fork_lifecycle.clone(), + ) + .await?; // start searching for peers discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); discovery @@ -167,7 +172,7 @@ impl AnchorBehaviour { let slots_per_epoch = E::slots_per_epoch(); let slot_duration = Duration::from_secs(spec.seconds_per_slot); let one_epoch_duration = slot_duration * slots_per_epoch as u32; - PeerManager::new(network_config, one_epoch_duration) + PeerManager::new(network_config, one_epoch_duration, fork_lifecycle.clone()) }; let handshake = { @@ -177,7 +182,7 @@ impl AnchorBehaviour { consensus_node: "lighthouse/v1.5.0".to_string(), subnets: "00000000000000000000000000000000".to_string(), }; - handshake::Behaviour::new(local_keypair, domain_type, metadata) + handshake::Behaviour::new(local_keypair, fork_lifecycle, metadata) }; let upnp = Toggle::from( diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 0269f3464..37fa5f9fa 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -15,6 +15,7 @@ use discv5::{ libp2p_identity::{Keypair, PeerId}, multiaddr::Multiaddr, }; +use fork::SharedForkLifecycle; use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use libp2p::{ bytes::Bytes, @@ -35,7 +36,7 @@ use tracing::{debug, error, info, trace, warn}; use typenum::U128; use crate::{ - Config, SharedDomainType, + Config, discovery::DiscoveryError::{Discv5Init, Discv5Start, EnrKey}, }; @@ -152,7 +153,7 @@ pub struct Discovery { /// been started update_ports: UpdatePorts, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, enr_file_path: PathBuf, } @@ -161,7 +162,7 @@ impl Discovery { pub async fn new( local_keypair: Keypair, network_config: &Config, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, ) -> Result { let protocol_identity = ProtocolIdentity { protocol_id: *b"ssvdv5", @@ -307,7 +308,7 @@ impl Discovery { discv5, event_stream, started: !network_config.disable_discovery, - domain_type, + fork_lifecycle, update_ports, enr_file_path, }) @@ -447,12 +448,13 @@ impl Discovery { // predicate for finding nodes with a valid tcp port let tcp_predicate = move |enr: &Enr| enr.tcp4().is_some() || enr.tcp6().is_some(); - // Clone the shared domain type so the closure can read the current value at query time. - let shared_domain_type = self.domain_type.clone(); + // Clone the shared fork lifecycle so the closure can read the current domain type at query + // time. + let shared_lifecycle = self.fork_lifecycle.clone(); let domain_type_predicate = move |enr: &Enr| { if let Some(Ok(domain_type)) = enr.get_decodable::<[u8; 4]>("domaintype") { - shared_domain_type.get().0 == domain_type + shared_lifecycle.domain_type().0 == domain_type } else { trace!(?enr, "Rejecting ENR with missing domaintype"); false diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 86592c16d..44ad39332 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -8,6 +8,7 @@ use std::{ }; use discv5::libp2p_identity::Keypair; +use fork::SharedForkLifecycle; use libp2p::{ PeerId, StreamProtocol, request_response::{ @@ -18,10 +19,7 @@ use libp2p::{ }; use tracing::{debug, trace}; -use crate::{ - SharedDomainType, - handshake::{codec::Codec, node_info::NodeInfo}, -}; +use crate::handshake::{codec::Codec, node_info::NodeInfo}; /// Event emitted on handshake completion or failure. #[derive(Debug)] @@ -40,7 +38,7 @@ pub enum Event { /// Automatically initiates handshakes on outbound connections. pub struct Behaviour { inner: RequestResponseBehaviour, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, metadata: node_info::NodeMetadata, events: VecDeque, } @@ -62,7 +60,7 @@ impl Behaviour { /// The behaviour automatically initiates handshakes on outbound connections. pub fn new( keypair: Keypair, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, metadata: node_info::NodeMetadata, ) -> Self { let protocol = StreamProtocol::new("/ssv/info/0.0.1"); @@ -73,7 +71,7 @@ impl Behaviour { ); Self { inner, - domain_type, + fork_lifecycle, metadata, events: VecDeque::new(), } @@ -82,7 +80,7 @@ impl Behaviour { /// Construct our [`NodeInfo`] from the current shared domain type and metadata. fn our_node_info(&self) -> NodeInfo { NodeInfo { - domain_type: self.domain_type.get().into(), + domain_type: self.fork_lifecycle.domain_type().into(), metadata: Some(self.metadata.clone()), } } @@ -350,16 +348,24 @@ mod tests { use std::sync::LazyLock; use discv5::libp2p_identity::Keypair; + use fork::{Fork, ForkLifecycle, SharedForkLifecycle}; use libp2p::swarm::Swarm; use libp2p_swarm_test::{SwarmExt, drive}; use ssv_types::domain_type::DomainType; use super::*; - use crate::{SharedDomainType, handshake::node_info::NodeMetadata}; + use crate::handshake::node_info::NodeMetadata; const DOMAIN_A: DomainType = DomainType([0, 0, 0, 1]); const DOMAIN_B: DomainType = DomainType([0, 0, 0, 2]); + fn lifecycle_normal(domain_type: DomainType) -> SharedForkLifecycle { + SharedForkLifecycle::new(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type, + }) + } + fn test_metadata(version: &str) -> NodeMetadata { NodeMetadata { node_version: version.to_string(), @@ -374,7 +380,7 @@ mod tests { domain_type: DomainType, version: &str, ) -> Swarm { - let shared = SharedDomainType::new(domain_type); + let shared = lifecycle_normal(domain_type); let metadata = test_metadata(version); Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, shared, metadata)) } @@ -515,25 +521,25 @@ mod tests { .expect("test completed"); } - fn create_test_swarm_with_shared_domain( + fn create_test_swarm_with_shared_lifecycle( keypair: Keypair, - shared: SharedDomainType, + shared: SharedForkLifecycle, version: &str, ) -> Swarm { let metadata = test_metadata(version); Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, shared, metadata)) } - /// Tests that updates to `SharedDomainType` propagate to subsequent handshakes. + /// Tests that updates to `SharedForkLifecycle` propagate to subsequent handshakes. /// /// This validates the core fix from PR #814: when a fork activates and updates the - /// shared domain type, all future handshakes use the new value. The test runs through - /// three phases: + /// shared fork lifecycle, all future handshakes use the new domain type. The test + /// runs through three phases: /// 1. Both peers on DOMAIN_A - handshake succeeds /// 2. Only local updates to DOMAIN_B - handshake fails with NetworkMismatch /// 3. Both peers update to DOMAIN_B - handshake succeeds again #[tokio::test] - async fn shared_domain_type_updates_propagate_to_handshakes() { + async fn shared_fork_lifecycle_updates_propagate_to_handshakes() { use futures::future::Either; use libp2p::swarm::SwarmEvent; @@ -542,17 +548,20 @@ mod tests { let domain_a_hex: String = DOMAIN_A.into(); let domain_b_hex: String = DOMAIN_B.into(); - // Arrange: Create SharedDomainType instances externally so we can update them - let local_shared = SharedDomainType::new(DOMAIN_A); - let remote_shared = SharedDomainType::new(DOMAIN_A); + // Arrange: Create SharedForkLifecycle instances externally so we can update them + let local_shared = lifecycle_normal(DOMAIN_A); + let remote_shared = lifecycle_normal(DOMAIN_A); let local_keypair = Keypair::generate_ed25519(); let remote_keypair = Keypair::generate_ed25519(); let mut local_swarm = - create_test_swarm_with_shared_domain(local_keypair, local_shared.clone(), "local"); - let mut remote_swarm = - create_test_swarm_with_shared_domain(remote_keypair, remote_shared.clone(), "remote"); + create_test_swarm_with_shared_lifecycle(local_keypair, local_shared.clone(), "local"); + let mut remote_swarm = create_test_swarm_with_shared_lifecycle( + remote_keypair, + remote_shared.clone(), + "remote", + ); tokio::spawn(async move { // ==================== Phase 1: Both on DOMAIN_A - handshake succeeds @@ -571,8 +580,11 @@ mod tests { // ==================== Phase 2: Only local updates to DOMAIN_B - mismatch // ==================== - // Act: Update only the local shared domain type - local_shared.set(DOMAIN_B); + // Act: Update only the local shared fork lifecycle (simulates fork activation) + local_shared.set(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: DOMAIN_B, + }); // Disconnect both peers let remote_peer = *remote_swarm.local_peer_id(); @@ -620,8 +632,11 @@ mod tests { // ==================== Phase 3: Both update to DOMAIN_B - handshake succeeds // ==================== - // Act: Update remote's shared domain type to match - remote_shared.set(DOMAIN_B); + // Act: Update remote's shared fork lifecycle to match + remote_shared.set(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: DOMAIN_B, + }); // Disconnect both peers again local_swarm diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 1893478dc..507bf7f18 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -11,47 +11,8 @@ mod peer_manager; mod scoring; mod transport; -use std::sync::Arc; - pub use config::{Config, DEFAULT_DISC_PORT, DEFAULT_QUIC_PORT, DEFAULT_TCP_PORT}; pub use network::Network; pub use network_utils::listen_addr::{ListenAddr, ListenAddress}; -use parking_lot::RwLock; -use ssv_types::domain_type::DomainType; pub type Enr = discv5::enr::Enr; pub use peer_manager::types::{ClientType, PeerInfo}; - -/// A shared, thread-safe domain type that serves as a single source of truth. -/// -/// # Lifecycle -/// -/// Created by [`Network::try_new`] with the initial domain type from config. Clones are -/// passed to `Discovery` and `handshake::Behaviour` so all three components share the -/// same underlying value. -/// -/// # Updates -/// -/// When a fork activates, [`Network::on_fork_phase`] calls [`SharedDomainType::set`] once. -/// All components reading via [`SharedDomainType::get`] see the new domain type immediately, -/// eliminating the need for per-component update methods. -/// -/// # Concurrency -/// -/// Uses [`parking_lot::RwLock`] for interior mutability. Writes are brief (single `Copy` -/// assignment) and rare (only on fork activation), so contention is negligible. -#[derive(Clone, Debug)] -pub(crate) struct SharedDomainType(Arc>); - -impl SharedDomainType { - pub fn new(domain_type: DomainType) -> Self { - Self(Arc::new(RwLock::new(domain_type))) - } - - pub fn get(&self) -> DomainType { - *self.0.read() - } - - pub fn set(&self, domain_type: DomainType) { - *self.0.write() = domain_type; - } -} diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 5cffbfc8d..4d79f5fb0 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use fork::SharedForkLifecycle; use futures::StreamExt; use gossipsub::{IdentTopic, PublishError}; use libp2p::{ @@ -32,7 +33,7 @@ use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec}; use crate::{ - Config, Enr, SharedDomainType, + Config, Enr, behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError}, discovery::{DiscoveredPeers, Discovery, DiscoveryError}, handshake, @@ -76,7 +77,6 @@ pub struct Network { peer_id: PeerId, message_receiver: Arc, outcome_rx: mpsc::Receiver, - domain_type: SharedDomainType, metrics_registry: Option, spec: Arc, is_dynamic_target_peers: bool, @@ -99,6 +99,7 @@ impl Network { executor: TaskExecutor, spec: Arc, fork_phase_rx: async_broadcast::Receiver, + fork_lifecycle: SharedForkLifecycle, ) -> Result, Box> { let local_keypair: Keypair = load_private_key(&config.network_dir.key_file()); @@ -110,14 +111,12 @@ impl Network { let mut metrics_registry = Registry::default(); - let domain_type = SharedDomainType::new(config.domain_type); - let behaviour = AnchorBehaviour::new::( local_keypair.clone(), config, &mut metrics_registry, &spec, - domain_type.clone(), + fork_lifecycle, ) .await .map_err(|e| Box::new(NetworkError::Behaviour(e)))?; @@ -137,7 +136,6 @@ impl Network { peer_id, message_receiver, outcome_rx, - domain_type, metrics_registry: Some(metrics_registry), spec, is_dynamic_target_peers, @@ -427,18 +425,11 @@ impl Network { /// Handle fork phase transition events. /// - /// - `Activated`: Update shared domain type and ENR. + /// Domain type updates are handled by `SharedForkLifecycle` (updated by ForkMonitor). + /// This method only handles ENR updates that require direct discv5 interaction. fn on_fork_phase(&mut self, phase: ForkPhase) { - if let ForkPhase::Activated { current, previous } = phase { - info!( - current_fork = %current.fork, - previous_fork = %previous.fork, - "Fork activated, updating domain type" - ); - - // Update the shared domain type — all components (discovery, handshake) see the - // new value immediately. - self.domain_type.set(current.domain_type); + if let ForkPhase::Activated { current, .. } = phase { + info!(current_fork = %current.fork, "Fork activated, updating ENR"); // Update ENR so other nodes can discover us with the new fork's domain if let Err(e) = self.discovery().update_enr_domain_type(current.domain_type) { diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 7740988b9..3d7fce778 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -4,7 +4,7 @@ use std::{ }; use discv5::libp2p_identity::PeerId; -use fork::Fork; +use fork::{Fork, ForkLifecycle, SharedForkLifecycle}; use libp2p::{ Multiaddr, connection_limits::{self, ConnectionLimits}, @@ -41,17 +41,38 @@ pub enum PeerConnectionError { MissingNeededSubnets, } +/// Observed gossipsub subnet state for a peer. +/// +/// This distinguishes "no observed evidence yet" from "observed and empty", which +/// controls whether ENR fallback is allowed. +#[derive(Debug, Clone)] +enum ObservedPeerSubnets { + /// No gossipsub subscription events observed for this peer yet. + Unknown, + /// At least one subscription event observed and currently no subnet bits are set. + KnownEmpty, + /// At least one subnet bit observed as set (tracked per fork). + Known(HashMap>>), +} + /// Manages peer connections and connection limits pub struct ConnectionManager { pub connection_limits: connection_limits::Behaviour, pub connected: HashSet, pub target_peers: usize, pub max_with_priority_peers: usize, - // Per-fork observed gossipsub subscriptions per peer. Prefer this over ENR claims. - // Tracked per fork so that unsubscribing from one fork's topic doesn't clear the - // bit for the same subnet on another fork's topic. + // Observed gossipsub subscriptions per peer with explicit state: + // - Unknown: no observed subscription evidence yet (ENR fallback allowed) + // - KnownEmpty: observed but currently no subscribed subnets + // - Known: per-fork bitmaps + // + // Per-fork tracking prevents unsubscribing from one fork's topic from clearing + // the bit for the same subnet on another fork's topic. // See: https://github.com/sigp/anchor/issues/818 - observed_peer_subnets: HashMap>>>, + observed_peer_subnets: HashMap, + // Shared fork lifecycle state for fork-aware peer selection. + // After the grace period ends, only peers on the current fork are considered useful. + fork_lifecycle: SharedForkLifecycle, // Track inbound vs outbound connection counts inbound_count: usize, outbound_count: usize, @@ -78,8 +99,8 @@ impl ConnectionManager { connection_limits::Behaviour::new(limits) } - /// Initialize ConnectionManager with a target peer count. - pub fn new(target_peers: usize) -> Self { + /// Initialize ConnectionManager with a target peer count and shared fork lifecycle. + pub fn new(target_peers: usize, fork_lifecycle: SharedForkLifecycle) -> Self { let connection_limits = Self::create_connection_limits(target_peers); let max_priority_peers = (target_peers as f32 @@ -92,6 +113,7 @@ impl ConnectionManager { target_peers, max_with_priority_peers: max_priority_peers, observed_peer_subnets: HashMap::new(), + fork_lifecycle, inbound_count: 0, outbound_count: 0, } @@ -133,27 +155,52 @@ impl ConnectionManager { subscribed: bool, ) { let idx = *subnet.deref() as usize; - let fork_map = self.observed_peer_subnets.entry(peer).or_default(); - let bitfield = fork_map.entry(fork).or_default(); + let bitfield_len = Bitfield::>::default().len(); - if idx < bitfield.len() { - let _ = bitfield.set(idx, subscribed); - } else { + if idx >= bitfield_len { tracing::warn!( %peer, subnet = idx, - max = bitfield.len(), + max = bitfield_len, "Subnet ID exceeds bitfield capacity" ); + return; } - // Clean up empty entries to keep maps small - if !subscribed { - if bitfield.is_zero() { - fork_map.remove(&fork); + let state = self + .observed_peer_subnets + .entry(peer) + .or_insert(ObservedPeerSubnets::Unknown); + + if subscribed { + if !matches!(state, ObservedPeerSubnets::Known(_)) { + *state = ObservedPeerSubnets::Known(HashMap::new()); } - if fork_map.is_empty() { - self.observed_peer_subnets.remove(&peer); + + if let ObservedPeerSubnets::Known(fork_map) = state { + let bitfield = fork_map.entry(fork).or_default(); + let _ = bitfield.set(idx, true); + } + return; + } + + match state { + ObservedPeerSubnets::Unknown => { + // We have now observed explicit subscription state, but no positive bits. + *state = ObservedPeerSubnets::KnownEmpty; + } + ObservedPeerSubnets::KnownEmpty => {} + ObservedPeerSubnets::Known(fork_map) => { + if let Some(bitfield) = fork_map.get_mut(&fork) { + let _ = bitfield.set(idx, false); + if bitfield.is_zero() { + fork_map.remove(&fork); + } + } + + if fork_map.is_empty() { + *state = ObservedPeerSubnets::KnownEmpty; + } } } } @@ -216,9 +263,11 @@ impl ConnectionManager { /// gossipsub. This only counts peers we've observed via gossipsub, no ENR fallback. /// Used for making decisions about existing connections and subnet health. pub fn count_observed_peers_for_subnets(&self, subnet_ids: &[SubnetId]) -> Vec { + // Read the fork lifecycle once for the entire iteration rather than per-peer. + let lifecycle = self.fork_lifecycle.get(); let mut peer_subnet_counts = vec![0; subnet_ids.len()]; for peer in self.connected.iter() { - let Some(subnets) = self.get_peer_subnets_observed_only(peer) else { + let Some(subnets) = self.get_peer_subnets_for_lifecycle(peer, &lifecycle) else { continue; }; for (&subnet_id, count) in subnet_ids.iter().zip(&mut peer_subnet_counts) { @@ -242,7 +291,7 @@ impl ConnectionManager { return true; } - // Only use observed subscriptions (aggregated across forks), no ENR fallback + // Only use observed subscriptions, no ENR fallback let Some(observed) = self.get_peer_subnets_observed_only(peer) else { return false; }; @@ -288,11 +337,42 @@ impl ConnectionManager { false } - /// Get subnets a peer claims to support from observed gossipsub only, - /// aggregated across all forks (union of per-fork bitmaps). + /// Get subnets a peer claims to support from observed gossipsub subscriptions. + /// + /// Convenience wrapper that reads the fork lifecycle once per call. For bulk + /// operations over many peers, prefer [`get_peer_subnets_for_lifecycle`] with + /// a pre-read lifecycle value to avoid repeated lock acquisition. fn get_peer_subnets_observed_only(&self, peer: &PeerId) -> Option>> { - let fork_map = self.observed_peer_subnets.get(peer)?; - Some(Self::aggregate_fork_bitmaps(fork_map)) + let lifecycle = self.fork_lifecycle.get(); + self.get_peer_subnets_for_lifecycle(peer, &lifecycle) + } + + /// Fork-aware peer subnet lookup with a pre-read lifecycle value. + /// + /// In `Normal` state (post-grace-period), only the current fork's bitmap is + /// returned. During `WarmUp` or `GracePeriod`, bitmaps from both relevant + /// forks are aggregated (union). This prevents peers subscribed only to a + /// defunct fork from appearing useful for subnet coverage. + fn get_peer_subnets_for_lifecycle( + &self, + peer: &PeerId, + lifecycle: &ForkLifecycle, + ) -> Option>> { + match self.observed_peer_subnets.get(peer)? { + ObservedPeerSubnets::Unknown => None, + ObservedPeerSubnets::KnownEmpty => Some(Bitfield::default()), + ObservedPeerSubnets::Known(fork_map) => match lifecycle { + ForkLifecycle::Normal { current, .. } => Some( + fork_map + .get(current) + .cloned() + .unwrap_or_else(Bitfield::default), + ), + ForkLifecycle::WarmUp { .. } | ForkLifecycle::GracePeriod { .. } => { + Some(Self::aggregate_fork_bitmaps(fork_map)) + } + }, + } } /// OR all per-fork bitmaps together into a single aggregate bitfield. @@ -322,9 +402,10 @@ impl ConnectionManager { /// Handle connection established event pub fn on_connection_established(&mut self, peer_id: PeerId, is_outbound: bool) -> bool { - // Initialize with empty fork map to indicate we're now observing this peer. - // If they never subscribe to anything, we'll know they offer no subnets. - self.observed_peer_subnets.entry(peer_id).or_default(); + // Initialize as Unknown: no observed gossipsub evidence yet. + self.observed_peer_subnets + .entry(peer_id) + .or_insert(ObservedPeerSubnets::Unknown); // Track connection direction counter let is_new = self.connected.insert(peer_id); @@ -543,12 +624,21 @@ mod tests { // ==================== Helper functions ==================== /// Creates a `ConnectionManager` with default target peers for testing. + /// Defaults to `ForkLifecycle::Normal { current: Alan }` for backward compatibility. fn create_test_manager() -> ConnectionManager { - ConnectionManager::new(TARGET_PEERS) + create_test_manager_with_lifecycle(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }) + } + + /// Creates a `ConnectionManager` with a specific fork lifecycle state. + fn create_test_manager_with_lifecycle(lifecycle: ForkLifecycle) -> ConnectionManager { + ConnectionManager::new(TARGET_PEERS, SharedForkLifecycle::new(lifecycle)) } - /// Connects a peer to the manager (adds to `connected` set and initializes - /// `observed_peer_subnets`), returning the generated `PeerId`. + /// Connects a peer to the manager (adds to `connected` and marks observed + /// subscription state as Unknown), returning the generated `PeerId`. fn connect_random_peer(mgr: &mut ConnectionManager) -> PeerId { let peer = PeerId::random(); mgr.on_connection_established(peer, /* is_outbound = */ true); @@ -606,6 +696,18 @@ mod tests { } // ==================== Multi-fork bug scenario (issue #818) ==================== + // + // These tests use GracePeriod lifecycle because they test cross-fork aggregation + // behavior, which only occurs during WarmUp or GracePeriod states. + + /// Creates a `ConnectionManager` in GracePeriod lifecycle for multi-fork tests. + fn create_grace_period_manager() -> ConnectionManager { + create_test_manager_with_lifecycle(ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 2]), + }) + } /// Regression test for https://github.com/sigp/anchor/issues/818 /// @@ -614,8 +716,8 @@ mod tests { /// aggregated view because the other fork still holds it. #[test] fn test_unsubscribe_one_fork_retains_subnet_from_other_fork() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); @@ -633,8 +735,8 @@ mod tests { #[test] fn test_unsubscribe_both_forks_clears_subnet() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); @@ -655,8 +757,8 @@ mod tests { #[test] fn test_aggregation_unions_subnets_across_forks() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); // Act: subscribe to different subnets on different forks @@ -679,10 +781,10 @@ mod tests { ); } - // ==================== Cleanup on full unsubscribe ==================== + // ==================== Explicit observed-state transitions ==================== #[test] - fn test_full_unsubscribe_removes_peer_entry() { + fn test_full_unsubscribe_marks_peer_known_empty() { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); @@ -693,11 +795,17 @@ mod tests { mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), false); - // Assert: the peer should be completely removed from observed_peer_subnets + // Assert: the peer remains tracked as explicitly known-empty (not Unknown) + assert!( + matches!( + mgr.observed_peer_subnets.get(&peer), + Some(ObservedPeerSubnets::KnownEmpty) + ), + "Peer should transition to KnownEmpty after unsubscribing from all observed subnets" + ); assert!( - !mgr.observed_peer_subnets.contains_key(&peer), - "Peer entry must be removed from observed_peer_subnets when all \ - per-fork bitmaps are empty" + aggregated_bitfield(&mgr, &peer).is_some(), + "KnownEmpty peers should return an explicit zero bitfield" ); } @@ -719,12 +827,57 @@ mod tests { ); } + #[test] + fn test_newly_connected_peer_is_unknown_and_returns_none_in_transition() { + // Arrange: WarmUp state + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }); + let peer = connect_random_peer(&mut mgr); + + // Assert: Unknown state yields None (allows ENR fallback) + assert!( + aggregated_bitfield(&mgr, &peer).is_none(), + "Newly connected peers should start in Unknown state" + ); + } + + #[test] + fn test_known_empty_blocks_enr_fallback_path() { + // Arrange: WarmUp state with no peer ENR available in store + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }); + let peer = connect_random_peer(&mut mgr); + let peer_store = MemoryStore::new(peer_store::memory_store::Config::default()); + let needed = HashSet::from([subnet(SUBNET_A)]); + + // Unknown -> no observed evidence yet, so fallback path is lenient + assert!( + mgr.peer_offers_needed_subnets_with_enr_fallback(&peer, &peer_store, &needed), + "Unknown peers should be treated as no-observation-yet in fallback path" + ); + + // Act: observe explicit unsubscribe, transitioning to KnownEmpty + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); + + // Assert: KnownEmpty returns explicit zero bitfield and no fallback is applied + assert!( + !mgr.peer_offers_needed_subnets_with_enr_fallback(&peer, &peer_store, &needed), + "KnownEmpty peers should not use ENR fallback" + ); + } + // ==================== count_observed_peers_for_subnets with multi-fork ==================== #[test] fn test_count_observed_peers_counts_across_forks() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer_a = connect_random_peer(&mut mgr); let peer_b = connect_random_peer(&mut mgr); @@ -777,8 +930,8 @@ mod tests { #[test] fn test_peer_offers_needed_subnets_across_forks() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_D), true); @@ -811,8 +964,8 @@ mod tests { /// the subnet via the remaining fork. #[test] fn test_peer_offers_needed_subnets_after_partial_unsubscribe() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); @@ -848,4 +1001,102 @@ mod tests { "A single unsubscribe should clear the bit regardless of duplicate subscribes" ); } + + // ==================== Fork-aware peer selection ==================== + + #[test] + fn test_normal_alan_only_shows_alan_bitmap() { + // Arrange: Normal state on Alan + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: only Alan bitmap is visible + assert!(is_subnet_set(&mgr, &peer, SUBNET_A)); + assert!( + !is_subnet_set(&mgr, &peer, SUBNET_B), + "Boole subnet should not be visible in Normal(Alan) state" + ); + } + + #[test] + fn test_warmup_aggregates_both_forks() { + // Arrange: WarmUp state (preparing for Boole) + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: both forks' bitmaps are aggregated + assert!(is_subnet_set(&mgr, &peer, SUBNET_A)); + assert!(is_subnet_set(&mgr, &peer, SUBNET_B)); + } + + #[test] + fn test_grace_period_aggregates_both_forks() { + // Arrange: GracePeriod state + let mut mgr = create_grace_period_manager(); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: both forks' bitmaps are aggregated + assert!(is_subnet_set(&mgr, &peer, SUBNET_A)); + assert!(is_subnet_set(&mgr, &peer, SUBNET_B)); + } + + #[test] + fn test_normal_boole_only_shows_boole_bitmap() { + // Arrange: Normal state on Boole (post-grace-period) + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 2]), + }); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: only Boole bitmap is visible + assert!( + !is_subnet_set(&mgr, &peer, SUBNET_A), + "Alan subnet should not be visible in Normal(Boole) state" + ); + assert!(is_subnet_set(&mgr, &peer, SUBNET_B)); + } + + #[test] + fn test_peer_only_on_alan_becomes_invisible_after_boole_normal() { + // Arrange: peer only subscribed to Alan subnets + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 2]), + }); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + + // Assert: observed state is known, but no current-fork bits are set + let bf = aggregated_bitfield(&mgr, &peer) + .expect("Peer with known old-fork subscriptions should return zero bitfield"); + assert!( + !bf.get(SUBNET_A as usize).unwrap_or(false), + "Alan-only subscriptions should not be visible in Normal(Boole) state" + ); + + // Also verify it doesn't offer needed subnets + let needed = HashSet::from([subnet(SUBNET_A)]); + assert!( + !mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer only on Alan should not offer subnets in Normal(Boole) state" + ); + } } diff --git a/anchor/network/src/peer_manager/mod.rs b/anchor/network/src/peer_manager/mod.rs index 564a0c49c..8c7ab7d15 100644 --- a/anchor/network/src/peer_manager/mod.rs +++ b/anchor/network/src/peer_manager/mod.rs @@ -5,6 +5,7 @@ use std::{ }; use discv5::libp2p_identity::PeerId; +use fork::SharedForkLifecycle; use libp2p::{ Multiaddr, core::{Endpoint, transport::PortUse}, @@ -72,7 +73,12 @@ impl PeerManager { /// # Arguments /// * `config` - Network configuration (may contain user-provided target_peers) /// * `one_epoch_duration` - Duration of one epoch for blocking calculations - pub fn new(config: &Config, one_epoch_duration: Duration) -> Self { + /// * `fork_lifecycle` - Shared fork lifecycle for fork-aware peer selection + pub fn new( + config: &Config, + one_epoch_duration: Duration, + fork_lifecycle: SharedForkLifecycle, + ) -> Self { let peer_store = peer_store::Behaviour::new(MemoryStore::new(memory_store::Config::default())); @@ -81,7 +87,7 @@ impl PeerManager { // subnet_service. let target_peers = config.target_peers.unwrap_or(BASE_PEER_COUNT); - let connection_manager = ConnectionManager::new(target_peers); + let connection_manager = ConnectionManager::new(target_peers, fork_lifecycle); let heartbeat_manager = HeartbeatManager::new(); let blocking_manager = BlockingManager::new(one_epoch_duration);