From 7bd525c2cf488305002a2401b8178bafd5c8ff05 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 11 Feb 2026 12:17:46 -0300 Subject: [PATCH 1/7] feat(fork): add ForkLifecycle enum and SharedForkLifecycle type Add a new lifecycle module to the fork crate that models fork transition states as a single enum with three variants: - Normal: operating on a single fork (pre-fork or post-grace-period) - WarmUp: preparing for an upcoming fork (dual-subscribing to new topics) - GracePeriod: fork activated but keeping old subscriptions for late messages SharedForkLifecycle wraps this in Arc> for cross-component sharing, following the same pattern as the SharedDomainType it will replace. Each variant carries domain_type, making invalid states unrepresentable. Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 1 + anchor/common/fork/Cargo.toml | 1 + anchor/common/fork/src/lib.rs | 2 + anchor/common/fork/src/lifecycle.rs | 200 ++++++++++++++++++++++++++++ 4 files changed, 204 insertions(+) create mode 100644 anchor/common/fork/src/lifecycle.rs diff --git a/Cargo.lock b/Cargo.lock index f8b33b194..6cf8ef9b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3237,6 +3237,7 @@ name = "fork" version = "0.1.0" dependencies = [ "async-broadcast", + "parking_lot", "serde", "slot_clock", "ssv_types", diff --git a/anchor/common/fork/Cargo.toml b/anchor/common/fork/Cargo.toml index 2238a2d58..2183fcd63 100644 --- a/anchor/common/fork/Cargo.toml +++ b/anchor/common/fork/Cargo.toml @@ -6,6 +6,7 @@ edition = { workspace = true } [dependencies] async-broadcast = { workspace = true } +parking_lot = { workspace = true } serde = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } diff --git a/anchor/common/fork/src/lib.rs b/anchor/common/fork/src/lib.rs index c10ed0e3a..b9d5db133 100644 --- a/anchor/common/fork/src/lib.rs +++ b/anchor/common/fork/src/lib.rs @@ -20,9 +20,11 @@ //! - **"What"**: Each subsystem queries the active fork to determine behavior mod fork; +mod lifecycle; pub mod monitor; mod schedule; pub use fork::{ALAN_TOPIC_PREFIX, Fork}; +pub use lifecycle::{ForkLifecycle, SharedForkLifecycle}; pub use monitor::{ForkPhase, ForkPhaseSender}; pub use schedule::{FORK_PREPARATION_EPOCHS, ForkConfig, ForkSchedule, SUBSEQUENT_WINDOW_SLOTS}; diff --git a/anchor/common/fork/src/lifecycle.rs b/anchor/common/fork/src/lifecycle.rs new file mode 100644 index 000000000..da1d2815a --- /dev/null +++ b/anchor/common/fork/src/lifecycle.rs @@ -0,0 +1,200 @@ +//! Fork lifecycle state management. +//! +//! Provides [`ForkLifecycle`] and [`SharedForkLifecycle`] for tracking the current +//! fork transition state across all components. The [`ForkMonitor`](crate::monitor) +//! is the sole writer; all other components read via [`SharedForkLifecycle`]. + +use std::sync::Arc; + +use parking_lot::RwLock; +use ssv_types::domain_type::DomainType; + +use crate::Fork; + +/// Fork lifecycle state. Updated only by ForkMonitor. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ForkLifecycle { + /// Operating on a single fork. No transition in progress. + /// + /// Used in two scenarios: + /// - Pre-fork: only one fork exists (e.g., Alan at genesis). + /// - Post-grace-period: the fork transition is complete and only + /// the current fork's context is relevant. + Normal { + current: Fork, + domain_type: DomainType, + }, + + /// Preparing for an upcoming fork. Dual-subscribing to new topics. + /// + /// Both forks' contexts are relevant — peers subscribed to either + /// the current or upcoming fork are useful. + WarmUp { + current: Fork, + upcoming: Fork, + domain_type: DomainType, + }, + + /// Fork activated but grace period still active. Keeping old subscriptions + /// to catch late messages from the previous fork. + /// + /// Both forks' contexts are relevant — peers subscribed to either + /// the current or previous fork are still useful. + GracePeriod { + current: Fork, + previous: Fork, + domain_type: DomainType, + }, +} + +impl ForkLifecycle { + /// Returns the current active fork. + pub fn current_fork(&self) -> Fork { + match self { + Self::Normal { current, .. } + | Self::WarmUp { current, .. } + | Self::GracePeriod { current, .. } => *current, + } + } + + /// Returns the domain type for the current fork. + pub fn domain_type(&self) -> DomainType { + match self { + Self::Normal { domain_type, .. } + | Self::WarmUp { domain_type, .. } + | Self::GracePeriod { domain_type, .. } => *domain_type, + } + } +} + +/// Shared fork lifecycle state, readable by all components. +/// +/// Updated only by [`ForkMonitor`](crate::monitor). Uses [`parking_lot::RwLock`] +/// for interior mutability. Writes are brief and rare (only on fork transitions), +/// so contention is negligible. +#[derive(Clone, Debug)] +pub struct SharedForkLifecycle(Arc>); + +impl SharedForkLifecycle { + /// Create a new shared lifecycle with the given initial state. + pub fn new(lifecycle: ForkLifecycle) -> Self { + Self(Arc::new(RwLock::new(lifecycle))) + } + + /// Read the full lifecycle state. + pub fn get(&self) -> ForkLifecycle { + self.0.read().clone() + } + + /// Update the lifecycle state. Called only by ForkMonitor. + pub fn set(&self, lifecycle: ForkLifecycle) { + *self.0.write() = lifecycle; + } + + /// Convenience: get current domain type without cloning full enum. + pub fn domain_type(&self) -> DomainType { + self.0.read().domain_type() + } + + /// Convenience: get current fork without cloning full enum. + pub fn current_fork(&self) -> Fork { + self.0.read().current_fork() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const ALAN_DOMAIN: DomainType = DomainType([0, 0, 0, 1]); + const BOOLE_DOMAIN: DomainType = DomainType([0, 0, 0, 2]); + + #[test] + fn shared_fork_lifecycle_get_set_roundtrip() { + let shared = SharedForkLifecycle::new(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ALAN_DOMAIN, + }); + + assert_eq!( + shared.get(), + ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ALAN_DOMAIN, + } + ); + + shared.set(ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: BOOLE_DOMAIN, + }); + + assert_eq!( + shared.get(), + ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: BOOLE_DOMAIN, + } + ); + } + + #[test] + fn current_fork_returns_correct_fork_for_each_variant() { + let normal = ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ALAN_DOMAIN, + }; + assert_eq!(normal.current_fork(), Fork::Alan); + + let warmup = ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ALAN_DOMAIN, + }; + assert_eq!(warmup.current_fork(), Fork::Alan); + + let grace = ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: BOOLE_DOMAIN, + }; + assert_eq!(grace.current_fork(), Fork::Boole); + } + + #[test] + fn domain_type_returns_correct_type_for_each_variant() { + let normal = ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ALAN_DOMAIN, + }; + assert_eq!(normal.domain_type(), ALAN_DOMAIN); + + let warmup = ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ALAN_DOMAIN, + }; + assert_eq!(warmup.domain_type(), ALAN_DOMAIN); + + let grace = ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: BOOLE_DOMAIN, + }; + assert_eq!(grace.domain_type(), BOOLE_DOMAIN); + } + + #[test] + fn shared_convenience_methods_match_full_get() { + let shared = SharedForkLifecycle::new(ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ALAN_DOMAIN, + }); + + assert_eq!(shared.current_fork(), Fork::Alan); + assert_eq!(shared.domain_type(), ALAN_DOMAIN); + } +} From aeb81e68d892d05fe7dcb87a166853798d30b9fb Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 11 Feb 2026 12:18:13 -0300 Subject: [PATCH 2/7] feat(fork): update ForkMonitor to manage SharedForkLifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ForkMonitor now accepts a SharedForkLifecycle and updates it right before broadcasting each ForkPhase event: - Preparing → WarmUp { current, upcoming, domain_type } - Activated → GracePeriod { current, previous, domain_type } - GracePeriodEnded → Normal { current, domain_type } This makes ForkMonitor the single writer of fork lifecycle state. Components that previously needed to react to events just to cache derived values can now read the shared state directly. Co-Authored-By: Claude Opus 4.6 --- anchor/common/fork/src/monitor.rs | 77 ++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/anchor/common/fork/src/monitor.rs b/anchor/common/fork/src/monitor.rs index 8f73281a8..8cd946de4 100644 --- a/anchor/common/fork/src/monitor.rs +++ b/anchor/common/fork/src/monitor.rs @@ -22,7 +22,10 @@ use task_executor::TaskExecutor; use tracing::{debug, info, warn}; use types::{Epoch, Slot}; -use crate::{FORK_PREPARATION_EPOCHS, Fork, ForkConfig, ForkSchedule, SUBSEQUENT_WINDOW_SLOTS}; +use crate::{ + FORK_PREPARATION_EPOCHS, Fork, ForkConfig, ForkLifecycle, ForkSchedule, + SUBSEQUENT_WINDOW_SLOTS, SharedForkLifecycle, +}; /// Fork transition events sent to Network and other components. /// @@ -325,7 +328,6 @@ impl ForkMonitorState { } /// Returns the current fork being monitored. - #[cfg(test)] pub fn current_fork(&self) -> Fork { self.current_fork } @@ -420,6 +422,7 @@ pub async fn run( slots_per_epoch: u64, seconds_per_slot: u64, phase_sender: ForkPhaseSender, + fork_lifecycle: SharedForkLifecycle, ) -> MonitorResult { // Get initial state let Some(current_slot) = slot_clock.now() else { @@ -433,6 +436,10 @@ pub async fn run( return MonitorResult::Completed; } + if let Some(phase) = &initial_phase { + update_lifecycle_for_phase(&fork_lifecycle, phase, &fork_schedule, state.current_fork()); + } + if let Some(phase) = initial_phase { let _ = phase_sender.broadcast_direct(phase).await; } @@ -457,6 +464,14 @@ pub async fn run( }; let phases = state.check_slot(slot); + for phase in &phases { + update_lifecycle_for_phase( + &fork_lifecycle, + phase, + &fork_schedule, + state.current_fork(), + ); + } for phase in phases { let _ = phase_sender.broadcast_direct(phase).await; } @@ -471,6 +486,48 @@ pub async fn run( MonitorResult::Completed } +/// Update the shared fork lifecycle based on a fork phase event. +/// +/// Called right before broadcasting each phase event so readers see the new state +/// at the same time or before listeners process the event. +fn update_lifecycle_for_phase( + fork_lifecycle: &SharedForkLifecycle, + phase: &ForkPhase, + fork_schedule: &ForkSchedule, + current_fork: Fork, +) { + match phase { + ForkPhase::Preparing { upcoming } => { + // During preparation, current fork's domain type is still active + let Some(domain_type) = fork_schedule.domain_type(current_fork) else { + tracing::error!( + fork = %current_fork, + "Missing domain type for current fork during preparation" + ); + return; + }; + fork_lifecycle.set(ForkLifecycle::WarmUp { + current: current_fork, + upcoming: upcoming.fork, + domain_type, + }); + } + ForkPhase::Activated { current, previous } => { + fork_lifecycle.set(ForkLifecycle::GracePeriod { + current: current.fork, + previous: previous.fork, + domain_type: current.domain_type, + }); + } + ForkPhase::GracePeriodEnded { current, .. } => { + fork_lifecycle.set(ForkLifecycle::Normal { + current: current.fork, + domain_type: current.domain_type, + }); + } + } +} + /// Spawns a standalone task that monitors and logs fork transitions. /// /// The monitor will exit automatically when all scheduled forks have activated, @@ -485,6 +542,7 @@ pub fn spawn( seconds_per_slot: u64, executor: TaskExecutor, phase_sender: ForkPhaseSender, + fork_lifecycle: SharedForkLifecycle, ) { executor.spawn( async move { @@ -494,6 +552,7 @@ pub fn spawn( slots_per_epoch, seconds_per_slot, phase_sender, + fork_lifecycle, ) .await; @@ -588,6 +647,14 @@ mod tests { tx } + /// Create a test SharedForkLifecycle starting on Alan. + fn test_fork_lifecycle() -> SharedForkLifecycle { + SharedForkLifecycle::new(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: TEST_BASELINE_DOMAIN, + }) + } + /// Convert epoch to slot for testing. fn epoch_to_slot(epoch: u64) -> Slot { Slot::new(epoch * slots_per_epoch()) @@ -825,6 +892,7 @@ mod tests { let schedule = make_schedule_no_future_forks(); let clock = clock_at_epoch(CURRENT_EPOCH); let sender = test_phase_sender(); + let lifecycle = test_fork_lifecycle(); // Act let result = run( @@ -833,6 +901,7 @@ mod tests { slots_per_epoch(), seconds_per_slot(), sender, + lifecycle, ) .await; @@ -846,6 +915,7 @@ mod tests { let schedule = make_schedule_with_boole(BOOLE_FORK_EPOCH); let clock = clock_at_epoch(AFTER_FORK_EPOCH); let sender = test_phase_sender(); + let lifecycle = test_fork_lifecycle(); // Act let result = run( @@ -854,6 +924,7 @@ mod tests { slots_per_epoch(), seconds_per_slot(), sender, + lifecycle, ) .await; @@ -869,6 +940,7 @@ mod tests { let schedule = make_schedule_with_boole(ASYNC_BOOLE_FORK_EPOCH); let clock = clock_at_epoch(ASYNC_START_EPOCH); let (sender, mut receiver) = async_broadcast::broadcast(16); + let lifecycle = test_fork_lifecycle(); // Act: Spawn monitor and advance time through fork activation and grace period let monitor = tokio::spawn({ @@ -880,6 +952,7 @@ mod tests { slots_per_epoch(), seconds_per_slot(), sender, + lifecycle, ) .await } From bb325382ea2b8e204e0a464b59ca098b0b32d831 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 11 Feb 2026 12:18:31 -0300 Subject: [PATCH 3/7] feat(network): replace SharedDomainType with SharedForkLifecycle Replace the SharedDomainType abstraction with SharedForkLifecycle across all network components, making peer selection fork-aware. Key changes: - Remove SharedDomainType from network crate entirely - Discovery and Handshake read domain_type via fork_lifecycle - Network::on_fork_phase simplified to only update ENR (domain type updates are now handled by ForkMonitor via SharedForkLifecycle) - ConnectionManager uses fork lifecycle to filter peer subnets: in Normal state only the current fork's bitmap is returned; during WarmUp/GracePeriod both forks are aggregated - Client creates SharedForkLifecycle and passes to both ForkMonitor and Network This prevents peers subscribed only to a defunct fork from appearing useful for subnet coverage after the grace period ends. Co-Authored-By: Claude Opus 4.6 --- anchor/client/src/lib.rs | 8 + anchor/network/src/behaviour.rs | 17 +- anchor/network/src/discovery.rs | 16 +- anchor/network/src/handshake/mod.rs | 67 ++++--- anchor/network/src/lib.rs | 39 ---- anchor/network/src/network.rs | 25 +-- anchor/network/src/peer_manager/connection.rs | 188 ++++++++++++++++-- anchor/network/src/peer_manager/mod.rs | 10 +- 8 files changed, 252 insertions(+), 118 deletions(-) diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 21e9e7d7f..af026da91 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -378,6 +378,12 @@ impl Client { // Create fork phase channel for fork transition events let (fork_phase_tx, fork_phase_rx) = async_broadcast::broadcast(16); + // Create shared fork lifecycle state for cross-component fork awareness + let fork_lifecycle = fork::SharedForkLifecycle::new(fork::ForkLifecycle::Normal { + current: initial_fork_config.fork, + domain_type: initial_fork_config.domain_type, + }); + // Start fork monitor to log fork transitions and send ForkPhase events fork::monitor::spawn( fork_schedule.clone(), @@ -386,6 +392,7 @@ impl Client { spec.seconds_per_slot, executor.clone(), fork_phase_tx, + fork_lifecycle.clone(), ); // Start validator index syncer @@ -552,6 +559,7 @@ impl Client { executor.clone(), spec.clone(), fork_phase_rx, + fork_lifecycle, ) .await .map_err(|e| format!("Unable to start network: {e}"))?; diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 3c4deeb18..4c9e91089 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,5 +1,6 @@ use std::{hash::Hasher, time::Duration}; +use fork::SharedForkLifecycle; use gossipsub::{ConfigBuilderError, MessageAuthenticity, ValidationMode}; use libp2p::{ identify, ping, @@ -13,7 +14,7 @@ use types::{ChainSpec, EthSpec}; use version::version_with_platform; use crate::{ - Config, SharedDomainType, + Config, behaviour::BehaviourError::Gossipsub, discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}, handshake, @@ -92,7 +93,7 @@ impl AnchorBehaviour { network_config: &Config, metrics_registry: &mut Registry, spec: &ChainSpec, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, ) -> Result { let identify = { let local_public_key = local_keypair.public(); @@ -156,8 +157,12 @@ impl AnchorBehaviour { let discovery = { // Build and start the discovery sub-behaviour - let mut discovery = - Discovery::new(local_keypair.clone(), network_config, domain_type.clone()).await?; + let mut discovery = Discovery::new( + local_keypair.clone(), + network_config, + fork_lifecycle.clone(), + ) + .await?; // start searching for peers discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); discovery @@ -167,7 +172,7 @@ impl AnchorBehaviour { let slots_per_epoch = E::slots_per_epoch(); let slot_duration = Duration::from_secs(spec.seconds_per_slot); let one_epoch_duration = slot_duration * slots_per_epoch as u32; - PeerManager::new(network_config, one_epoch_duration) + PeerManager::new(network_config, one_epoch_duration, fork_lifecycle.clone()) }; let handshake = { @@ -177,7 +182,7 @@ impl AnchorBehaviour { consensus_node: "lighthouse/v1.5.0".to_string(), subnets: "00000000000000000000000000000000".to_string(), }; - handshake::Behaviour::new(local_keypair, domain_type, metadata) + handshake::Behaviour::new(local_keypair, fork_lifecycle, metadata) }; let upnp = Toggle::from( diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index bfbaee99f..c76e827ba 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -15,6 +15,7 @@ use discv5::{ libp2p_identity::{Keypair, PeerId}, multiaddr::Multiaddr, }; +use fork::SharedForkLifecycle; use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use libp2p::{ bytes::Bytes, @@ -35,7 +36,7 @@ use tracing::{debug, error, info, trace, warn}; use typenum::U128; use crate::{ - Config, SharedDomainType, + Config, discovery::DiscoveryError::{Discv5Init, Discv5Start, EnrKey}, }; @@ -152,7 +153,7 @@ pub struct Discovery { /// been started update_ports: UpdatePorts, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, enr_file_path: PathBuf, } @@ -161,7 +162,7 @@ impl Discovery { pub async fn new( local_keypair: Keypair, network_config: &Config, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, ) -> Result { let protocol_identity = ProtocolIdentity { protocol_id: *b"ssvdv5", @@ -307,7 +308,7 @@ impl Discovery { discv5, event_stream, started: !network_config.disable_discovery, - domain_type, + fork_lifecycle, update_ports, enr_file_path, }) @@ -447,12 +448,13 @@ impl Discovery { // predicate for finding nodes with a valid tcp port let tcp_predicate = move |enr: &Enr| enr.tcp4().is_some() || enr.tcp6().is_some(); - // Clone the shared domain type so the closure can read the current value at query time. - let shared_domain_type = self.domain_type.clone(); + // Clone the shared fork lifecycle so the closure can read the current domain type at query + // time. + let shared_lifecycle = self.fork_lifecycle.clone(); let domain_type_predicate = move |enr: &Enr| { if let Some(Ok(domain_type)) = enr.get_decodable::<[u8; 4]>("domaintype") { - shared_domain_type.get().0 == domain_type + shared_lifecycle.domain_type().0 == domain_type } else { trace!(?enr, "Rejecting ENR with missing domaintype"); false diff --git a/anchor/network/src/handshake/mod.rs b/anchor/network/src/handshake/mod.rs index 86592c16d..44ad39332 100644 --- a/anchor/network/src/handshake/mod.rs +++ b/anchor/network/src/handshake/mod.rs @@ -8,6 +8,7 @@ use std::{ }; use discv5::libp2p_identity::Keypair; +use fork::SharedForkLifecycle; use libp2p::{ PeerId, StreamProtocol, request_response::{ @@ -18,10 +19,7 @@ use libp2p::{ }; use tracing::{debug, trace}; -use crate::{ - SharedDomainType, - handshake::{codec::Codec, node_info::NodeInfo}, -}; +use crate::handshake::{codec::Codec, node_info::NodeInfo}; /// Event emitted on handshake completion or failure. #[derive(Debug)] @@ -40,7 +38,7 @@ pub enum Event { /// Automatically initiates handshakes on outbound connections. pub struct Behaviour { inner: RequestResponseBehaviour, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, metadata: node_info::NodeMetadata, events: VecDeque, } @@ -62,7 +60,7 @@ impl Behaviour { /// The behaviour automatically initiates handshakes on outbound connections. pub fn new( keypair: Keypair, - domain_type: SharedDomainType, + fork_lifecycle: SharedForkLifecycle, metadata: node_info::NodeMetadata, ) -> Self { let protocol = StreamProtocol::new("/ssv/info/0.0.1"); @@ -73,7 +71,7 @@ impl Behaviour { ); Self { inner, - domain_type, + fork_lifecycle, metadata, events: VecDeque::new(), } @@ -82,7 +80,7 @@ impl Behaviour { /// Construct our [`NodeInfo`] from the current shared domain type and metadata. fn our_node_info(&self) -> NodeInfo { NodeInfo { - domain_type: self.domain_type.get().into(), + domain_type: self.fork_lifecycle.domain_type().into(), metadata: Some(self.metadata.clone()), } } @@ -350,16 +348,24 @@ mod tests { use std::sync::LazyLock; use discv5::libp2p_identity::Keypair; + use fork::{Fork, ForkLifecycle, SharedForkLifecycle}; use libp2p::swarm::Swarm; use libp2p_swarm_test::{SwarmExt, drive}; use ssv_types::domain_type::DomainType; use super::*; - use crate::{SharedDomainType, handshake::node_info::NodeMetadata}; + use crate::handshake::node_info::NodeMetadata; const DOMAIN_A: DomainType = DomainType([0, 0, 0, 1]); const DOMAIN_B: DomainType = DomainType([0, 0, 0, 2]); + fn lifecycle_normal(domain_type: DomainType) -> SharedForkLifecycle { + SharedForkLifecycle::new(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type, + }) + } + fn test_metadata(version: &str) -> NodeMetadata { NodeMetadata { node_version: version.to_string(), @@ -374,7 +380,7 @@ mod tests { domain_type: DomainType, version: &str, ) -> Swarm { - let shared = SharedDomainType::new(domain_type); + let shared = lifecycle_normal(domain_type); let metadata = test_metadata(version); Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, shared, metadata)) } @@ -515,25 +521,25 @@ mod tests { .expect("test completed"); } - fn create_test_swarm_with_shared_domain( + fn create_test_swarm_with_shared_lifecycle( keypair: Keypair, - shared: SharedDomainType, + shared: SharedForkLifecycle, version: &str, ) -> Swarm { let metadata = test_metadata(version); Swarm::new_ephemeral_tokio(|_| Behaviour::new(keypair, shared, metadata)) } - /// Tests that updates to `SharedDomainType` propagate to subsequent handshakes. + /// Tests that updates to `SharedForkLifecycle` propagate to subsequent handshakes. /// /// This validates the core fix from PR #814: when a fork activates and updates the - /// shared domain type, all future handshakes use the new value. The test runs through - /// three phases: + /// shared fork lifecycle, all future handshakes use the new domain type. The test + /// runs through three phases: /// 1. Both peers on DOMAIN_A - handshake succeeds /// 2. Only local updates to DOMAIN_B - handshake fails with NetworkMismatch /// 3. Both peers update to DOMAIN_B - handshake succeeds again #[tokio::test] - async fn shared_domain_type_updates_propagate_to_handshakes() { + async fn shared_fork_lifecycle_updates_propagate_to_handshakes() { use futures::future::Either; use libp2p::swarm::SwarmEvent; @@ -542,17 +548,20 @@ mod tests { let domain_a_hex: String = DOMAIN_A.into(); let domain_b_hex: String = DOMAIN_B.into(); - // Arrange: Create SharedDomainType instances externally so we can update them - let local_shared = SharedDomainType::new(DOMAIN_A); - let remote_shared = SharedDomainType::new(DOMAIN_A); + // Arrange: Create SharedForkLifecycle instances externally so we can update them + let local_shared = lifecycle_normal(DOMAIN_A); + let remote_shared = lifecycle_normal(DOMAIN_A); let local_keypair = Keypair::generate_ed25519(); let remote_keypair = Keypair::generate_ed25519(); let mut local_swarm = - create_test_swarm_with_shared_domain(local_keypair, local_shared.clone(), "local"); - let mut remote_swarm = - create_test_swarm_with_shared_domain(remote_keypair, remote_shared.clone(), "remote"); + create_test_swarm_with_shared_lifecycle(local_keypair, local_shared.clone(), "local"); + let mut remote_swarm = create_test_swarm_with_shared_lifecycle( + remote_keypair, + remote_shared.clone(), + "remote", + ); tokio::spawn(async move { // ==================== Phase 1: Both on DOMAIN_A - handshake succeeds @@ -571,8 +580,11 @@ mod tests { // ==================== Phase 2: Only local updates to DOMAIN_B - mismatch // ==================== - // Act: Update only the local shared domain type - local_shared.set(DOMAIN_B); + // Act: Update only the local shared fork lifecycle (simulates fork activation) + local_shared.set(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: DOMAIN_B, + }); // Disconnect both peers let remote_peer = *remote_swarm.local_peer_id(); @@ -620,8 +632,11 @@ mod tests { // ==================== Phase 3: Both update to DOMAIN_B - handshake succeeds // ==================== - // Act: Update remote's shared domain type to match - remote_shared.set(DOMAIN_B); + // Act: Update remote's shared fork lifecycle to match + remote_shared.set(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: DOMAIN_B, + }); // Disconnect both peers again local_swarm diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 1893478dc..507bf7f18 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -11,47 +11,8 @@ mod peer_manager; mod scoring; mod transport; -use std::sync::Arc; - pub use config::{Config, DEFAULT_DISC_PORT, DEFAULT_QUIC_PORT, DEFAULT_TCP_PORT}; pub use network::Network; pub use network_utils::listen_addr::{ListenAddr, ListenAddress}; -use parking_lot::RwLock; -use ssv_types::domain_type::DomainType; pub type Enr = discv5::enr::Enr; pub use peer_manager::types::{ClientType, PeerInfo}; - -/// A shared, thread-safe domain type that serves as a single source of truth. -/// -/// # Lifecycle -/// -/// Created by [`Network::try_new`] with the initial domain type from config. Clones are -/// passed to `Discovery` and `handshake::Behaviour` so all three components share the -/// same underlying value. -/// -/// # Updates -/// -/// When a fork activates, [`Network::on_fork_phase`] calls [`SharedDomainType::set`] once. -/// All components reading via [`SharedDomainType::get`] see the new domain type immediately, -/// eliminating the need for per-component update methods. -/// -/// # Concurrency -/// -/// Uses [`parking_lot::RwLock`] for interior mutability. Writes are brief (single `Copy` -/// assignment) and rare (only on fork activation), so contention is negligible. -#[derive(Clone, Debug)] -pub(crate) struct SharedDomainType(Arc>); - -impl SharedDomainType { - pub fn new(domain_type: DomainType) -> Self { - Self(Arc::new(RwLock::new(domain_type))) - } - - pub fn get(&self) -> DomainType { - *self.0.read() - } - - pub fn set(&self, domain_type: DomainType) { - *self.0.write() = domain_type; - } -} diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index c1f332722..f24e6a036 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use fork::SharedForkLifecycle; use futures::StreamExt; use gossipsub::{IdentTopic, PublishError}; use libp2p::{ @@ -32,7 +33,7 @@ use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec}; use crate::{ - Config, Enr, SharedDomainType, + Config, Enr, behaviour::{AnchorBehaviour, AnchorBehaviourEvent, BehaviourError}, discovery::{DiscoveredPeers, Discovery, DiscoveryError}, handshake, @@ -76,7 +77,6 @@ pub struct Network { peer_id: PeerId, message_receiver: Arc, outcome_rx: mpsc::Receiver, - domain_type: SharedDomainType, metrics_registry: Option, spec: Arc, is_dynamic_target_peers: bool, @@ -99,6 +99,7 @@ impl Network { executor: TaskExecutor, spec: Arc, fork_phase_rx: async_broadcast::Receiver, + fork_lifecycle: SharedForkLifecycle, ) -> Result, Box> { let local_keypair: Keypair = load_private_key(&config.network_dir.key_file()); @@ -110,14 +111,12 @@ impl Network { let mut metrics_registry = Registry::default(); - let domain_type = SharedDomainType::new(config.domain_type); - let behaviour = AnchorBehaviour::new::( local_keypair.clone(), config, &mut metrics_registry, &spec, - domain_type.clone(), + fork_lifecycle, ) .await .map_err(|e| Box::new(NetworkError::Behaviour(e)))?; @@ -137,7 +136,6 @@ impl Network { peer_id, message_receiver, outcome_rx, - domain_type, metrics_registry: Some(metrics_registry), spec, is_dynamic_target_peers, @@ -427,18 +425,11 @@ impl Network { /// Handle fork phase transition events. /// - /// - `Activated`: Update shared domain type and ENR. + /// Domain type updates are handled by `SharedForkLifecycle` (updated by ForkMonitor). + /// This method only handles ENR updates that require direct discv5 interaction. fn on_fork_phase(&mut self, phase: ForkPhase) { - if let ForkPhase::Activated { current, previous } = phase { - info!( - current_fork = %current.fork, - previous_fork = %previous.fork, - "Fork activated, updating domain type" - ); - - // Update the shared domain type — all components (discovery, handshake) see the - // new value immediately. - self.domain_type.set(current.domain_type); + if let ForkPhase::Activated { current, .. } = phase { + info!(current_fork = %current.fork, "Fork activated, updating ENR"); // Update ENR so other nodes can discover us with the new fork's domain if let Err(e) = self.discovery().update_enr_domain_type(current.domain_type) { diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 2b4a7ceb2..2ce5eacf2 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -4,7 +4,7 @@ use std::{ }; use discv5::libp2p_identity::PeerId; -use fork::Fork; +use fork::{Fork, ForkLifecycle, SharedForkLifecycle}; use libp2p::{ Multiaddr, connection_limits::{self, ConnectionLimits}, @@ -52,6 +52,9 @@ pub struct ConnectionManager { // bit for the same subnet on another fork's topic. // See: https://github.com/sigp/anchor/issues/818 observed_peer_subnets: HashMap>>>, + // Shared fork lifecycle state for fork-aware peer selection. + // After the grace period ends, only peers on the current fork are considered useful. + fork_lifecycle: SharedForkLifecycle, // Track inbound vs outbound connection counts inbound_count: usize, outbound_count: usize, @@ -78,8 +81,8 @@ impl ConnectionManager { connection_limits::Behaviour::new(limits) } - /// Initialize ConnectionManager with a target peer count. - pub fn new(target_peers: usize) -> Self { + /// Initialize ConnectionManager with a target peer count and shared fork lifecycle. + pub fn new(target_peers: usize, fork_lifecycle: SharedForkLifecycle) -> Self { let connection_limits = Self::create_connection_limits(target_peers); let max_priority_peers = (target_peers as f32 @@ -92,6 +95,7 @@ impl ConnectionManager { target_peers, max_with_priority_peers: max_priority_peers, observed_peer_subnets: HashMap::new(), + fork_lifecycle, inbound_count: 0, outbound_count: 0, } @@ -216,9 +220,11 @@ impl ConnectionManager { /// gossipsub. This only counts peers we've observed via gossipsub, no ENR fallback. /// Used for making decisions about existing connections and subnet health. pub fn count_observed_peers_for_subnets(&self, subnet_ids: &[SubnetId]) -> Vec { + // Read the fork lifecycle once for the entire iteration rather than per-peer. + let lifecycle = self.fork_lifecycle.get(); let mut peer_subnet_counts = vec![0; subnet_ids.len()]; for peer in self.connected.iter() { - let Some(subnets) = self.get_peer_subnets_observed_only(peer) else { + let Some(subnets) = self.get_peer_subnets_for_lifecycle(peer, &lifecycle) else { continue; }; for (&subnet_id, count) in subnet_ids.iter().zip(&mut peer_subnet_counts) { @@ -242,7 +248,7 @@ impl ConnectionManager { return true; } - // Only use observed subscriptions (aggregated across forks), no ENR fallback + // Only use observed subscriptions, no ENR fallback let Some(observed) = self.get_peer_subnets_observed_only(peer) else { return false; }; @@ -288,11 +294,34 @@ impl ConnectionManager { false } - /// Get subnets a peer claims to support from observed gossipsub only, - /// aggregated across all forks (union of per-fork bitmaps). + /// Get subnets a peer claims to support from observed gossipsub subscriptions. + /// + /// Convenience wrapper that reads the fork lifecycle once per call. For bulk + /// operations over many peers, prefer [`get_peer_subnets_for_lifecycle`] with + /// a pre-read lifecycle value to avoid repeated lock acquisition. fn get_peer_subnets_observed_only(&self, peer: &PeerId) -> Option>> { + let lifecycle = self.fork_lifecycle.get(); + self.get_peer_subnets_for_lifecycle(peer, &lifecycle) + } + + /// Fork-aware peer subnet lookup with a pre-read lifecycle value. + /// + /// In `Normal` state (post-grace-period), only the current fork's bitmap is + /// returned. During `WarmUp` or `GracePeriod`, bitmaps from both relevant + /// forks are aggregated (union). This prevents peers subscribed only to a + /// defunct fork from appearing useful for subnet coverage. + fn get_peer_subnets_for_lifecycle( + &self, + peer: &PeerId, + lifecycle: &ForkLifecycle, + ) -> Option>> { let fork_map = self.observed_peer_subnets.get(peer)?; - Some(Self::aggregate_fork_bitmaps(fork_map)) + match lifecycle { + ForkLifecycle::Normal { current, .. } => fork_map.get(current).cloned(), + ForkLifecycle::WarmUp { .. } | ForkLifecycle::GracePeriod { .. } => { + Some(Self::aggregate_fork_bitmaps(fork_map)) + } + } } /// OR all per-fork bitmaps together into a single aggregate bitfield. @@ -543,8 +572,17 @@ mod tests { // ==================== Helper functions ==================== /// Creates a `ConnectionManager` with default target peers for testing. + /// Defaults to `ForkLifecycle::Normal { current: Alan }` for backward compatibility. fn create_test_manager() -> ConnectionManager { - ConnectionManager::new(TARGET_PEERS) + create_test_manager_with_lifecycle(ForkLifecycle::Normal { + current: Fork::Alan, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }) + } + + /// Creates a `ConnectionManager` with a specific fork lifecycle state. + fn create_test_manager_with_lifecycle(lifecycle: ForkLifecycle) -> ConnectionManager { + ConnectionManager::new(TARGET_PEERS, SharedForkLifecycle::new(lifecycle)) } /// Connects a peer to the manager (adds to `connected` set and initializes @@ -606,6 +644,18 @@ mod tests { } // ==================== Multi-fork bug scenario (issue #818) ==================== + // + // These tests use GracePeriod lifecycle because they test cross-fork aggregation + // behavior, which only occurs during WarmUp or GracePeriod states. + + /// Creates a `ConnectionManager` in GracePeriod lifecycle for multi-fork tests. + fn create_grace_period_manager() -> ConnectionManager { + create_test_manager_with_lifecycle(ForkLifecycle::GracePeriod { + current: Fork::Boole, + previous: Fork::Alan, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 2]), + }) + } /// Regression test for https://github.com/sigp/anchor/issues/818 /// @@ -614,8 +664,8 @@ mod tests { /// aggregated view because the other fork still holds it. #[test] fn test_unsubscribe_one_fork_retains_subnet_from_other_fork() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); @@ -633,8 +683,8 @@ mod tests { #[test] fn test_unsubscribe_both_forks_clears_subnet() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); @@ -655,8 +705,8 @@ mod tests { #[test] fn test_aggregation_unions_subnets_across_forks() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); // Act: subscribe to different subnets on different forks @@ -723,8 +773,8 @@ mod tests { #[test] fn test_count_observed_peers_counts_across_forks() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer_a = connect_random_peer(&mut mgr); let peer_b = connect_random_peer(&mut mgr); @@ -777,8 +827,8 @@ mod tests { #[test] fn test_peer_offers_needed_subnets_across_forks() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_D), true); @@ -811,8 +861,8 @@ mod tests { /// the subnet via the remaining fork. #[test] fn test_peer_offers_needed_subnets_after_partial_unsubscribe() { - // Arrange - let mut mgr = create_test_manager(); + // Arrange: grace period aggregates across forks + let mut mgr = create_grace_period_manager(); let peer = connect_random_peer(&mut mgr); mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_B), true); mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); @@ -848,4 +898,100 @@ mod tests { "A single unsubscribe should clear the bit regardless of duplicate subscribes" ); } + + // ==================== Fork-aware peer selection ==================== + + #[test] + fn test_normal_alan_only_shows_alan_bitmap() { + // Arrange: Normal state on Alan + let mut mgr = create_test_manager(); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: only Alan bitmap is visible + assert!(is_subnet_set(&mgr, &peer, SUBNET_A)); + assert!( + !is_subnet_set(&mgr, &peer, SUBNET_B), + "Boole subnet should not be visible in Normal(Alan) state" + ); + } + + #[test] + fn test_warmup_aggregates_both_forks() { + // Arrange: WarmUp state (preparing for Boole) + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: both forks' bitmaps are aggregated + assert!(is_subnet_set(&mgr, &peer, SUBNET_A)); + assert!(is_subnet_set(&mgr, &peer, SUBNET_B)); + } + + #[test] + fn test_grace_period_aggregates_both_forks() { + // Arrange: GracePeriod state + let mut mgr = create_grace_period_manager(); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: both forks' bitmaps are aggregated + assert!(is_subnet_set(&mgr, &peer, SUBNET_A)); + assert!(is_subnet_set(&mgr, &peer, SUBNET_B)); + } + + #[test] + fn test_normal_boole_only_shows_boole_bitmap() { + // Arrange: Normal state on Boole (post-grace-period) + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 2]), + }); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_B), true); + + // Assert: only Boole bitmap is visible + assert!( + !is_subnet_set(&mgr, &peer, SUBNET_A), + "Alan subnet should not be visible in Normal(Boole) state" + ); + assert!(is_subnet_set(&mgr, &peer, SUBNET_B)); + } + + #[test] + fn test_peer_only_on_alan_becomes_invisible_after_boole_normal() { + // Arrange: peer only subscribed to Alan subnets + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::Normal { + current: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 2]), + }); + let peer = connect_random_peer(&mut mgr); + + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); + + // Assert: peer is invisible (returns None since no Boole subscriptions) + assert!( + aggregated_bitfield(&mgr, &peer).is_none(), + "Peer with only Alan subscriptions should return None in Normal(Boole) state" + ); + + // Also verify it doesn't offer needed subnets + let needed = HashSet::from([subnet(SUBNET_A)]); + assert!( + !mgr.peer_offers_needed_subnets_observed_only(&peer, &needed), + "Peer only on Alan should not offer subnets in Normal(Boole) state" + ); + } } diff --git a/anchor/network/src/peer_manager/mod.rs b/anchor/network/src/peer_manager/mod.rs index 564a0c49c..8c7ab7d15 100644 --- a/anchor/network/src/peer_manager/mod.rs +++ b/anchor/network/src/peer_manager/mod.rs @@ -5,6 +5,7 @@ use std::{ }; use discv5::libp2p_identity::PeerId; +use fork::SharedForkLifecycle; use libp2p::{ Multiaddr, core::{Endpoint, transport::PortUse}, @@ -72,7 +73,12 @@ impl PeerManager { /// # Arguments /// * `config` - Network configuration (may contain user-provided target_peers) /// * `one_epoch_duration` - Duration of one epoch for blocking calculations - pub fn new(config: &Config, one_epoch_duration: Duration) -> Self { + /// * `fork_lifecycle` - Shared fork lifecycle for fork-aware peer selection + pub fn new( + config: &Config, + one_epoch_duration: Duration, + fork_lifecycle: SharedForkLifecycle, + ) -> Self { let peer_store = peer_store::Behaviour::new(MemoryStore::new(memory_store::Config::default())); @@ -81,7 +87,7 @@ impl PeerManager { // subnet_service. let target_peers = config.target_peers.unwrap_or(BASE_PEER_COUNT); - let connection_manager = ConnectionManager::new(target_peers); + let connection_manager = ConnectionManager::new(target_peers, fork_lifecycle); let heartbeat_manager = HeartbeatManager::new(); let blocking_manager = BlockingManager::new(one_epoch_duration); From 50378e68c2d87e8e223b9f0a8ef335fac1ee89a3 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 11 Feb 2026 12:20:17 -0300 Subject: [PATCH 4/7] style(fork): fix doc comment line wrapping in lifecycle.rs Co-Authored-By: Claude Opus 4.6 --- anchor/common/fork/src/lifecycle.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/anchor/common/fork/src/lifecycle.rs b/anchor/common/fork/src/lifecycle.rs index da1d2815a..a1c62b4e9 100644 --- a/anchor/common/fork/src/lifecycle.rs +++ b/anchor/common/fork/src/lifecycle.rs @@ -18,8 +18,8 @@ pub enum ForkLifecycle { /// /// Used in two scenarios: /// - Pre-fork: only one fork exists (e.g., Alan at genesis). - /// - Post-grace-period: the fork transition is complete and only - /// the current fork's context is relevant. + /// - Post-grace-period: the fork transition is complete and only the current fork's context is + /// relevant. Normal { current: Fork, domain_type: DomainType, From e4a2dc137a245c0a4ce7a11381315bb7e35a729b Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 23 Feb 2026 02:22:30 -0300 Subject: [PATCH 5/7] test(fork): remove low-value lifecycle accessor tests --- anchor/common/fork/src/lifecycle.rs | 57 ----------------------------- 1 file changed, 57 deletions(-) diff --git a/anchor/common/fork/src/lifecycle.rs b/anchor/common/fork/src/lifecycle.rs index a1c62b4e9..e7fcf5a97 100644 --- a/anchor/common/fork/src/lifecycle.rs +++ b/anchor/common/fork/src/lifecycle.rs @@ -140,61 +140,4 @@ mod tests { ); } - #[test] - fn current_fork_returns_correct_fork_for_each_variant() { - let normal = ForkLifecycle::Normal { - current: Fork::Alan, - domain_type: ALAN_DOMAIN, - }; - assert_eq!(normal.current_fork(), Fork::Alan); - - let warmup = ForkLifecycle::WarmUp { - current: Fork::Alan, - upcoming: Fork::Boole, - domain_type: ALAN_DOMAIN, - }; - assert_eq!(warmup.current_fork(), Fork::Alan); - - let grace = ForkLifecycle::GracePeriod { - current: Fork::Boole, - previous: Fork::Alan, - domain_type: BOOLE_DOMAIN, - }; - assert_eq!(grace.current_fork(), Fork::Boole); - } - - #[test] - fn domain_type_returns_correct_type_for_each_variant() { - let normal = ForkLifecycle::Normal { - current: Fork::Alan, - domain_type: ALAN_DOMAIN, - }; - assert_eq!(normal.domain_type(), ALAN_DOMAIN); - - let warmup = ForkLifecycle::WarmUp { - current: Fork::Alan, - upcoming: Fork::Boole, - domain_type: ALAN_DOMAIN, - }; - assert_eq!(warmup.domain_type(), ALAN_DOMAIN); - - let grace = ForkLifecycle::GracePeriod { - current: Fork::Boole, - previous: Fork::Alan, - domain_type: BOOLE_DOMAIN, - }; - assert_eq!(grace.domain_type(), BOOLE_DOMAIN); - } - - #[test] - fn shared_convenience_methods_match_full_get() { - let shared = SharedForkLifecycle::new(ForkLifecycle::WarmUp { - current: Fork::Alan, - upcoming: Fork::Boole, - domain_type: ALAN_DOMAIN, - }); - - assert_eq!(shared.current_fork(), Fork::Alan); - assert_eq!(shared.domain_type(), ALAN_DOMAIN); - } } From e9fdc62a9f210ee5bc2566af56519e05d83111da Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 23 Feb 2026 02:23:45 -0300 Subject: [PATCH 6/7] style(fork): remove extra blank line in lifecycle tests --- anchor/common/fork/src/lifecycle.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/anchor/common/fork/src/lifecycle.rs b/anchor/common/fork/src/lifecycle.rs index e7fcf5a97..29ddf28bc 100644 --- a/anchor/common/fork/src/lifecycle.rs +++ b/anchor/common/fork/src/lifecycle.rs @@ -139,5 +139,4 @@ mod tests { } ); } - } From e21d3e4fb51c90e085d2e016602cd99744dfe7c4 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 23 Feb 2026 11:37:51 -0300 Subject: [PATCH 7/7] fix(network): model observed subnet state explicitly --- anchor/network/src/peer_manager/connection.rs | 177 ++++++++++++++---- 1 file changed, 141 insertions(+), 36 deletions(-) diff --git a/anchor/network/src/peer_manager/connection.rs b/anchor/network/src/peer_manager/connection.rs index 7783da799..3d7fce778 100644 --- a/anchor/network/src/peer_manager/connection.rs +++ b/anchor/network/src/peer_manager/connection.rs @@ -41,17 +41,35 @@ pub enum PeerConnectionError { MissingNeededSubnets, } +/// Observed gossipsub subnet state for a peer. +/// +/// This distinguishes "no observed evidence yet" from "observed and empty", which +/// controls whether ENR fallback is allowed. +#[derive(Debug, Clone)] +enum ObservedPeerSubnets { + /// No gossipsub subscription events observed for this peer yet. + Unknown, + /// At least one subscription event observed and currently no subnet bits are set. + KnownEmpty, + /// At least one subnet bit observed as set (tracked per fork). + Known(HashMap>>), +} + /// Manages peer connections and connection limits pub struct ConnectionManager { pub connection_limits: connection_limits::Behaviour, pub connected: HashSet, pub target_peers: usize, pub max_with_priority_peers: usize, - // Per-fork observed gossipsub subscriptions per peer. Prefer this over ENR claims. - // Tracked per fork so that unsubscribing from one fork's topic doesn't clear the - // bit for the same subnet on another fork's topic. + // Observed gossipsub subscriptions per peer with explicit state: + // - Unknown: no observed subscription evidence yet (ENR fallback allowed) + // - KnownEmpty: observed but currently no subscribed subnets + // - Known: per-fork bitmaps + // + // Per-fork tracking prevents unsubscribing from one fork's topic from clearing + // the bit for the same subnet on another fork's topic. // See: https://github.com/sigp/anchor/issues/818 - observed_peer_subnets: HashMap>>>, + observed_peer_subnets: HashMap, // Shared fork lifecycle state for fork-aware peer selection. // After the grace period ends, only peers on the current fork are considered useful. fork_lifecycle: SharedForkLifecycle, @@ -137,27 +155,52 @@ impl ConnectionManager { subscribed: bool, ) { let idx = *subnet.deref() as usize; - let fork_map = self.observed_peer_subnets.entry(peer).or_default(); - let bitfield = fork_map.entry(fork).or_default(); + let bitfield_len = Bitfield::>::default().len(); - if idx < bitfield.len() { - let _ = bitfield.set(idx, subscribed); - } else { + if idx >= bitfield_len { tracing::warn!( %peer, subnet = idx, - max = bitfield.len(), + max = bitfield_len, "Subnet ID exceeds bitfield capacity" ); + return; } - // Clean up empty entries to keep maps small - if !subscribed { - if bitfield.is_zero() { - fork_map.remove(&fork); + let state = self + .observed_peer_subnets + .entry(peer) + .or_insert(ObservedPeerSubnets::Unknown); + + if subscribed { + if !matches!(state, ObservedPeerSubnets::Known(_)) { + *state = ObservedPeerSubnets::Known(HashMap::new()); + } + + if let ObservedPeerSubnets::Known(fork_map) = state { + let bitfield = fork_map.entry(fork).or_default(); + let _ = bitfield.set(idx, true); } - if fork_map.is_empty() { - self.observed_peer_subnets.remove(&peer); + return; + } + + match state { + ObservedPeerSubnets::Unknown => { + // We have now observed explicit subscription state, but no positive bits. + *state = ObservedPeerSubnets::KnownEmpty; + } + ObservedPeerSubnets::KnownEmpty => {} + ObservedPeerSubnets::Known(fork_map) => { + if let Some(bitfield) = fork_map.get_mut(&fork) { + let _ = bitfield.set(idx, false); + if bitfield.is_zero() { + fork_map.remove(&fork); + } + } + + if fork_map.is_empty() { + *state = ObservedPeerSubnets::KnownEmpty; + } } } } @@ -315,12 +358,20 @@ impl ConnectionManager { peer: &PeerId, lifecycle: &ForkLifecycle, ) -> Option>> { - let fork_map = self.observed_peer_subnets.get(peer)?; - match lifecycle { - ForkLifecycle::Normal { current, .. } => fork_map.get(current).cloned(), - ForkLifecycle::WarmUp { .. } | ForkLifecycle::GracePeriod { .. } => { - Some(Self::aggregate_fork_bitmaps(fork_map)) - } + match self.observed_peer_subnets.get(peer)? { + ObservedPeerSubnets::Unknown => None, + ObservedPeerSubnets::KnownEmpty => Some(Bitfield::default()), + ObservedPeerSubnets::Known(fork_map) => match lifecycle { + ForkLifecycle::Normal { current, .. } => Some( + fork_map + .get(current) + .cloned() + .unwrap_or_else(Bitfield::default), + ), + ForkLifecycle::WarmUp { .. } | ForkLifecycle::GracePeriod { .. } => { + Some(Self::aggregate_fork_bitmaps(fork_map)) + } + }, } } @@ -351,9 +402,10 @@ impl ConnectionManager { /// Handle connection established event pub fn on_connection_established(&mut self, peer_id: PeerId, is_outbound: bool) -> bool { - // Initialize with empty fork map to indicate we're now observing this peer. - // If they never subscribe to anything, we'll know they offer no subnets. - self.observed_peer_subnets.entry(peer_id).or_default(); + // Initialize as Unknown: no observed gossipsub evidence yet. + self.observed_peer_subnets + .entry(peer_id) + .or_insert(ObservedPeerSubnets::Unknown); // Track connection direction counter let is_new = self.connected.insert(peer_id); @@ -585,8 +637,8 @@ mod tests { ConnectionManager::new(TARGET_PEERS, SharedForkLifecycle::new(lifecycle)) } - /// Connects a peer to the manager (adds to `connected` set and initializes - /// `observed_peer_subnets`), returning the generated `PeerId`. + /// Connects a peer to the manager (adds to `connected` and marks observed + /// subscription state as Unknown), returning the generated `PeerId`. fn connect_random_peer(mgr: &mut ConnectionManager) -> PeerId { let peer = PeerId::random(); mgr.on_connection_established(peer, /* is_outbound = */ true); @@ -729,10 +781,10 @@ mod tests { ); } - // ==================== Cleanup on full unsubscribe ==================== + // ==================== Explicit observed-state transitions ==================== #[test] - fn test_full_unsubscribe_removes_peer_entry() { + fn test_full_unsubscribe_marks_peer_known_empty() { // Arrange let mut mgr = create_test_manager(); let peer = connect_random_peer(&mut mgr); @@ -743,11 +795,17 @@ mod tests { mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); mgr.set_peer_subscribed(peer, Fork::Boole, subnet(SUBNET_E), false); - // Assert: the peer should be completely removed from observed_peer_subnets + // Assert: the peer remains tracked as explicitly known-empty (not Unknown) assert!( - !mgr.observed_peer_subnets.contains_key(&peer), - "Peer entry must be removed from observed_peer_subnets when all \ - per-fork bitmaps are empty" + matches!( + mgr.observed_peer_subnets.get(&peer), + Some(ObservedPeerSubnets::KnownEmpty) + ), + "Peer should transition to KnownEmpty after unsubscribing from all observed subnets" + ); + assert!( + aggregated_bitfield(&mgr, &peer).is_some(), + "KnownEmpty peers should return an explicit zero bitfield" ); } @@ -769,6 +827,51 @@ mod tests { ); } + #[test] + fn test_newly_connected_peer_is_unknown_and_returns_none_in_transition() { + // Arrange: WarmUp state + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }); + let peer = connect_random_peer(&mut mgr); + + // Assert: Unknown state yields None (allows ENR fallback) + assert!( + aggregated_bitfield(&mgr, &peer).is_none(), + "Newly connected peers should start in Unknown state" + ); + } + + #[test] + fn test_known_empty_blocks_enr_fallback_path() { + // Arrange: WarmUp state with no peer ENR available in store + let mut mgr = create_test_manager_with_lifecycle(ForkLifecycle::WarmUp { + current: Fork::Alan, + upcoming: Fork::Boole, + domain_type: ssv_types::domain_type::DomainType([0, 0, 0, 1]), + }); + let peer = connect_random_peer(&mut mgr); + let peer_store = MemoryStore::new(peer_store::memory_store::Config::default()); + let needed = HashSet::from([subnet(SUBNET_A)]); + + // Unknown -> no observed evidence yet, so fallback path is lenient + assert!( + mgr.peer_offers_needed_subnets_with_enr_fallback(&peer, &peer_store, &needed), + "Unknown peers should be treated as no-observation-yet in fallback path" + ); + + // Act: observe explicit unsubscribe, transitioning to KnownEmpty + mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), false); + + // Assert: KnownEmpty returns explicit zero bitfield and no fallback is applied + assert!( + !mgr.peer_offers_needed_subnets_with_enr_fallback(&peer, &peer_store, &needed), + "KnownEmpty peers should not use ENR fallback" + ); + } + // ==================== count_observed_peers_for_subnets with multi-fork ==================== #[test] @@ -981,10 +1084,12 @@ mod tests { mgr.set_peer_subscribed(peer, Fork::Alan, subnet(SUBNET_A), true); - // Assert: peer is invisible (returns None since no Boole subscriptions) + // Assert: observed state is known, but no current-fork bits are set + let bf = aggregated_bitfield(&mgr, &peer) + .expect("Peer with known old-fork subscriptions should return zero bitfield"); assert!( - aggregated_bitfield(&mgr, &peer).is_none(), - "Peer with only Alan subscriptions should return None in Normal(Boole) state" + !bf.get(SUBNET_A as usize).unwrap_or(false), + "Alan-only subscriptions should not be visible in Normal(Boole) state" ); // Also verify it doesn't offer needed subnets