From 82eaa98a44e29666e4a55827a8292d2e6259a4c9 Mon Sep 17 00:00:00 2001 From: diego Date: Sat, 25 Oct 2025 18:08:46 +0200 Subject: [PATCH 1/5] test: add test for Committee instances reaching late rounds Adds test to verify QBFT Committee instances can reach late rounds (9+) as configured with max_round=12. The test creates a Committee instance, forces round changes by keeping operators offline, then advances through multiple slots while verifying the instance survives to reach round 10. Currently fails - instance is cleaned up after 2 slots, reaching round 9 but unable to complete it (needs 120s, gets 8s). --- anchor/qbft_manager/src/tests.rs | 49 ++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 11a2b7c79..6c889eb0f 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -902,6 +902,55 @@ mod manager_tests { context.verify_consensus().await; } + + #[tokio::test(start_paused = true)] + // Test that Committee instances can reach late rounds (9+) with max_round=12 configuration. + // This verifies that instances survive long enough to progress through many round changes + // as configured. Committee role has max_round=12, so instances should be able to reach + // round 10 before timing out at round 13. + // + // The test simulates network conditions where consensus cannot be reached early by keeping + // all but one operator offline, forcing round changes. We advance the slot to trigger + // cleanup and verify the instance survives to reach round 10. + async fn test_committee_can_reach_late_rounds() { + let setup = setup_test(1); + let clock = setup.clock.clone(); + let mut context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + // Keep 3 operators offline initially to prevent consensus and force round changes. + // With only 1 operator online out of 4, we cannot reach quorum (need 3). + // This will cause the instance to go through multiple round changes. + context.set_operators_offline(&[2, 3, 4]); + + // Advance time and slots to simulate reaching round 10 + // Instance starts at slot 0 + let slot_duration = Duration::from_secs(12); + + // Advance through multiple slots while QBFT progresses + // This triggers cleanup logic which should NOT remove the active instance + for slot in 1..=50 { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + + // At slot 22 (256 seconds = 16s + 240s), we should be around round 10 + // Rounds 1-8: 16s, Rounds 9-10: 240s = 256s total + if slot == 22 { + // Bring operators back online during round 10 to allow consensus + context.set_operators_online(&[2, 3, 4]); + break; + } + } + + // Verify that consensus is reached successfully, proving the instance + // survived cleanup and was able to reach round 10 + context.verify_consensus().await; + } } // very important: set paused to true for deterministic timer From 4767f9df7bb0ae0a21d30454a407ddb11311f969 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 28 Oct 2025 21:22:07 +0100 Subject: [PATCH 2/5] refactor: improve QBFT instance cleanup with duty-aware deadlines Replace slot-based cleanup with duty-specific beacon chain inclusion deadlines. This allows QBFT instances to progress through all configured rounds without premature removal. Key changes: - Separate instance identity from manager metadata using ManagedInstance wrapper - Calculate duty-specific deadlines per EIP-7045 (attestations valid until end of epoch E+1) - Add slots_per_epoch configuration parameter - Implement dual-trigger cleaner (completion notification + deadline timeout) Fixes instances being cleaned after 2 slots, now properly respecting beacon chain inclusion windows (32-63 slots for attestations). --- anchor/client/src/lib.rs | 1 + anchor/qbft_manager/src/instance.rs | 9 ++ anchor/qbft_manager/src/lib.rs | 173 +++++++++++++++++++++------- anchor/qbft_manager/src/tests.rs | 8 ++ 4 files changed, 150 insertions(+), 41 deletions(-) diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 2580d7b55..03582f32a 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -450,6 +450,7 @@ impl Client { slot_clock.clone(), message_sender, config.global_config.ssv_network.ssv_domain_type, + E::slots_per_epoch(), ) .map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?; diff --git a/anchor/qbft_manager/src/instance.rs b/anchor/qbft_manager/src/instance.rs index c4429ec27..0257135b8 100644 --- a/anchor/qbft_manager/src/instance.rs +++ b/anchor/qbft_manager/src/instance.rs @@ -259,6 +259,8 @@ impl> Initialized { pub async fn qbft_instance>( mut rx: UnboundedReceiver>, message_sender: Arc, + completion_tx: mpsc::UnboundedSender, + instance_id: crate::InstanceId, ) { // Signal a new instance that is uninitialized let mut instance = QbftInstance::Uninitialized(Uninitialized::default()); @@ -304,6 +306,7 @@ pub async fn qbft_instance>( if let QbftInstance::Initialized(initialized) = instance { initialized.complete(Completed::TimedOut); } + // No notification - either already sent when decided, or cleaner removed us break; } }; @@ -311,6 +314,12 @@ pub async fn qbft_instance>( // If the instance is ongoing, check whether it is done. if let QbftInstance::Initialized(initialized) = instance { instance = initialized.complete_if_done(&message_sender); + + // If we just transitioned to Decided, notify cleaner for immediate cleanup + if matches!(instance, QbftInstance::Decided(_)) { + let _ = completion_tx.send(instance_id); + break; + } } // Drop guard as late as possible to keep the processor permit. diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index 3dcf4b7ea..2ef98d79d 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -39,8 +39,25 @@ const QBFT_INSTANCE_NAME: &str = "qbft_instance"; const QBFT_MESSAGE_NAME: &str = "qbft_message"; const QBFT_CLEANER_NAME: &str = "qbft_cleaner"; -/// Number of slots to keep before the current slot -const QBFT_RETAIN_SLOTS: u64 = 1; +/// Calculate the beacon chain inclusion deadline for a duty +fn calculate_deadline(role: Role, slot: types::Slot, slots_per_epoch: u64) -> types::Slot { + match role { + Role::Committee | Role::Aggregator => { + // Attestations can be included until end of next epoch (epoch E+1) + // Per EIP-7045: attestation from epoch E valid until end of epoch E+1 + let epoch = slot.epoch(slots_per_epoch); + types::Slot::new((epoch.as_u64() + 2) * slots_per_epoch - 1) + } + Role::Proposer | Role::SyncCommittee => { + // Must be in the same slot + slot + } + Role::VoluntaryExit | Role::ValidatorRegistration => { + // One epoch to complete + types::Slot::new(slot.as_u64() + slots_per_epoch) + } + } +} // Unique Identifier for a committee and its corresponding QBFT instance #[derive(Debug, Clone, Hash, PartialEq, Eq)] @@ -98,8 +115,20 @@ pub struct QbftInitialization { on_completed: oneshot::Sender>, } -// Map from an identifier to a sender for the instance -type Map = DashMap>>; +// Manager's bookkeeping for an instance +pub struct ManagedInstance { + sender: UnboundedSender>, + deadline: types::Slot, +} + +// Map from an identifier to managed instance data +type Map = DashMap>; + +// Enum to identify which instance completed +pub enum InstanceId { + BeaconVote(CommitteeInstanceId), + ValidatorConsensus(ValidatorInstanceId), +} // Top level QBFTManager structure pub struct QbftManager { @@ -115,6 +144,10 @@ pub struct QbftManager { message_sender: Arc, // Network domain to embed into messages domain: DomainType, + // Channel to notify cleaner when instance completes + completion_tx: mpsc::UnboundedSender, + // Slots per epoch for deadline calculations + slots_per_epoch: u64, } impl QbftManager { @@ -125,7 +158,10 @@ impl QbftManager { slot_clock: impl SlotClock + 'static, message_sender: Arc, domain: DomainType, + slots_per_epoch: u64, ) -> Result, QbftError> { + let (completion_tx, completion_rx) = mpsc::unbounded_channel(); + let manager = Arc::new(QbftManager { processor, operator_id, @@ -133,13 +169,15 @@ impl QbftManager { beacon_vote_instances: DashMap::new(), message_sender, domain, + completion_tx, + slots_per_epoch, }); // Start a long running task that will clean up old instances - manager - .processor - .permitless - .send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?; + manager.processor.permitless.send_async( + Arc::clone(&manager).cleaner(slot_clock, completion_rx), + QBFT_CLEANER_NAME, + )?; Ok(manager) } @@ -161,6 +199,11 @@ impl QbftManager { let (result_sender, result_receiver) = oneshot::channel(); let message_id = D::message_id(&self.domain, &id); + // Calculate deadline for this instance + let role = message_id.role().ok_or(QbftError::InconsistentMessageId)?; + let slot = types::Slot::new(*initial.instance_height(&id) as u64); + let deadline = calculate_deadline(role, slot, self.slots_per_epoch); + // General the qbft configuration let config = ConfigBuilder::new( operator_id, @@ -169,17 +212,12 @@ impl QbftManager { ); let config = config .with_quorum_size(committee.cluster_members.len() - committee.get_f() as usize) - .with_max_rounds( - message_id - .role() - .and_then(|r| r.max_round()) - .ok_or(QbftError::InconsistentMessageId)? as usize, - ) + .with_max_rounds(role.max_round().ok_or(QbftError::InconsistentMessageId)? as usize) .build()?; // Get or spawn a new qbft instance. This will return the sender that we can use to send // new messages to the specific instance - let sender = D::get_or_spawn_instance(self, id); + let sender = D::get_or_spawn_instance(self, id.clone(), deadline); self.processor.urgent_consensus.send_immediate( move |drop_on_finish: DropOnFinish| { // A message to initialize this instance @@ -261,7 +299,19 @@ impl QbftManager { id: D::Id, data: WrappedQbftMessage, ) -> Result<(), QbftError> { - let sender = D::get_or_spawn_instance(self, id); + // Get the map for this data type + let map = D::get_map(self); + + // Look up existing instance - network messages should only go to existing instances + let Some(managed) = map.get(&id) else { + // Instance doesn't exist yet - this message arrived before decide_instance was called + // This is normal during startup, just ignore it + return Ok(()); + }; + + let sender = managed.sender.clone(); + drop(managed); // Release the lock before sending + self.processor.urgent_consensus.send_immediate( move |drop_on_finish: DropOnFinish| { let _ = sender.send(QbftMessage { @@ -274,51 +324,84 @@ impl QbftManager { Ok(()) } - // Long running cleaner that will remove instances that are no longer relevant - async fn cleaner(self: Arc, slot_clock: impl SlotClock) { - while !self.processor.permitless.is_closed() { - sleep( - slot_clock - .duration_to_next_slot() - .unwrap_or(slot_clock.slot_duration()), - ) - .await; - let Some(slot) = slot_clock.now() else { - continue; - }; - let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS); - self.beacon_vote_instances - .retain(|k, _| *k.instance_height >= cutoff.as_usize()); - self.validator_consensus_data_instances - .retain(|k, _| *k.instance_height >= cutoff.as_usize()); + /// Long running cleaner that removes instances based on completion or deadline + async fn cleaner( + self: Arc, + slot_clock: impl SlotClock, + mut completion_rx: mpsc::UnboundedReceiver, + ) { + loop { + tokio::select! { + // Branch 1: Instance completed - clean immediately + Some(id) = completion_rx.recv() => { + match id { + InstanceId::BeaconVote(id) => { + self.beacon_vote_instances.remove(&id); + } + InstanceId::ValidatorConsensus(id) => { + self.validator_consensus_data_instances.remove(&id); + } + } + } + // Branch 2: Slot timeout - clean expired instances + _ = sleep( + slot_clock + .duration_to_next_slot() + .unwrap_or(slot_clock.slot_duration()) + ) => { + let Some(current_slot) = slot_clock.now() else { + continue; + }; + self.beacon_vote_instances + .retain(|_, managed| managed.deadline >= current_slot); + self.validator_consensus_data_instances + .retain(|_, managed| managed.deadline >= current_slot); + } + } + + if self.processor.permitless.is_closed() { + break; + } } } } -// Trait that describes any data that is able to be decided upon during a qbft instance pub trait QbftDecidable: QbftData + Send + Sync + 'static { - type Id: Hash + Eq + Send + Debug; + type Id: Hash + Eq + Send + Debug + Clone; fn get_map(manager: &QbftManager) -> &Map; + fn wrap_id(id: Self::Id) -> InstanceId; + fn get_or_spawn_instance( manager: &QbftManager, id: Self::Id, + deadline: types::Slot, ) -> UnboundedSender> { let map = Self::get_map(manager); - match map.entry(id) { - dashmap::Entry::Occupied(entry) => entry.get().clone(), + match map.entry(id.clone()) { + dashmap::Entry::Occupied(entry) => entry.get().sender.clone(), dashmap::Entry::Vacant(entry) => { // There is not an instance running yet, store the sender and spawn a new instance - // with the reeiver + // with the receiver let (tx, rx) = mpsc::unbounded_channel(); let span = debug_span!("qbft_instance", instance_id = ?entry.key()); - let tx = entry.insert(tx); + let managed = ManagedInstance { + sender: tx, + deadline, + }; + let sender = entry.insert(managed).sender.clone(); + let instance_id = Self::wrap_id(id); + let completion_tx = manager.completion_tx.clone(); + let message_sender = manager.message_sender.clone(); let _ = manager.processor.permitless.send_async( - Box::pin(qbft_instance(rx, manager.message_sender.clone()).instrument(span)), + Box::pin( + qbft_instance(rx, message_sender, completion_tx, instance_id) + .instrument(span), + ), QBFT_INSTANCE_NAME, ); - tx.clone() + sender } } } @@ -334,6 +417,10 @@ impl QbftDecidable for ValidatorConsensusData { &manager.validator_consensus_data_instances } + fn wrap_id(id: Self::Id) -> InstanceId { + InstanceId::ValidatorConsensus(id) + } + fn instance_height(&self, id: &Self::Id) -> InstanceHeight { id.instance_height } @@ -354,6 +441,10 @@ impl QbftDecidable for BeaconVote { &manager.beacon_vote_instances } + fn wrap_id(id: Self::Id) -> InstanceId { + InstanceId::BeaconVote(id) + } + fn instance_height(&self, id: &Self::Id) -> InstanceHeight { id.instance_height } diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 6c889eb0f..89dd4261c 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -304,6 +304,7 @@ where slot_clock.clone(), Arc::new(MockMessageSender::new(network_tx.clone(), operator_id)), DomainType([0; 4]), + 32, // slots_per_epoch ) .expect("Creation should not fail"); @@ -965,10 +966,17 @@ async fn test_timeout(round_timeout_to_test: usize) { let (sender_tx, _sender_rx) = unbounded_channel(); let (message_tx, message_rx) = unbounded_channel(); let (result_tx, result_rx) = oneshot::channel(); + let (completion_tx, _completion_rx) = unbounded_channel(); let message_sender = MockMessageSender::new(sender_tx, OperatorId(1)); + let instance_id = super::InstanceId::BeaconVote(CommitteeInstanceId { + committee: CommitteeId::default(), + instance_height: 0.into(), + }); let _handle = tokio::spawn(qbft_instance::( message_rx, Arc::new(message_sender), + completion_tx, + instance_id, )); // create a slot clock at slot 0 with a slot duration of 12 seconds From d181178a6ae542d83b3b3a58163e85e8a597eea6 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 29 Oct 2025 23:46:18 +0100 Subject: [PATCH 3/5] test: refactor qbft_manager tests for readability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve test readability by applying Setup/Execute/Assert structure: - Replace magic numbers with named constants (SINGLE_INSTANCE, TWO_INSTANCES, etc.) - Add mandatory section comments (// SETUP, // EXECUTE, // ASSERT) to all new tests - Split oversized test_role_based_deadline_calculations into 6 focused tests (one per role) - Add descriptive assertion messages explaining what must be true - Named all literals in new tests (OLD_CLEANUP_SLOT, BEACON_DEADLINE_SLOT, etc.) All 23 tests pass (up from 18 due to role deadline test split). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- anchor/qbft_manager/src/tests.rs | 425 +++++++++++++++++++++++++++++-- 1 file changed, 409 insertions(+), 16 deletions(-) diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 89dd4261c..6f413cdba 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -628,6 +628,11 @@ pub struct ConsensusResult { mod manager_tests { use super::*; + // Test constants for number of QBFT instances + const SINGLE_INSTANCE: usize = 1; + const TWO_INSTANCES: usize = 2; + const FIVE_INSTANCES: usize = 5; + // Provides test setup struct Setup { executor: TaskExecutor, @@ -694,7 +699,7 @@ mod manager_tests { #[tokio::test] // Test running a single instance and confirm that it reaches consensus async fn test_basic_run() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -709,7 +714,7 @@ mod manager_tests { #[tokio::test] // Take the leader offline to test a round change async fn test_round_change() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -725,7 +730,7 @@ mod manager_tests { #[tokio::test] // Test one offline operator async fn test_fault_operator() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -741,7 +746,7 @@ mod manager_tests { #[tokio::test] // Go through all committee sizes and confirm that we can reach consensus with f faulty async fn test_consensus_f_faulty() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let sizes = vec![ (CommitteeSize::Four, vec![1]), (CommitteeSize::Seven, vec![1, 3]), @@ -766,7 +771,7 @@ mod manager_tests { #[tokio::test] // Test running concurrent instances and confirm that they reach consensus async fn test_concurrent_runs() { - let setup = setup_test(2); + let setup = setup_test(TWO_INSTANCES); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -781,7 +786,7 @@ mod manager_tests { #[tokio::test(start_paused = true)] // Start with > f fault and then recover them. This should reach consensus async fn test_recovery() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -801,7 +806,7 @@ mod manager_tests { #[tokio::test] // Test commit message suppression for an operator async fn test_commit_suppression() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -820,7 +825,7 @@ mod manager_tests { #[tokio::test] // Test sending double messages async fn test_send_double() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -836,7 +841,7 @@ mod manager_tests { #[tokio::test] // Test one of the nodes sending invalid messages async fn test_invalid_message() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -853,7 +858,7 @@ mod manager_tests { // Test network partition scenarios // This simulates temporary network partitions by taking nodes offline and bringing them back async fn test_network_partition() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -885,7 +890,7 @@ mod manager_tests { // This is different compared to network partition because here, messages are delayed instead // of dropped. async fn test_late_initialization() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let initialization_delays = HashMap::from([ (OperatorId(2), Duration::from_secs(3)), // Middle of round 2 @@ -914,7 +919,7 @@ mod manager_tests { // all but one operator offline, forcing round changes. We advance the slot to trigger // cleanup and verify the instance survives to reach round 10. async fn test_committee_can_reach_late_rounds() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let clock = setup.clock.clone(); let mut context = TestContext::::new( setup.clock, @@ -935,14 +940,19 @@ mod manager_tests { // Advance through multiple slots while QBFT progresses // This triggers cleanup logic which should NOT remove the active instance - for slot in 1..=50 { + for slot in 1..=25 { clock.set_slot(slot); tokio::time::sleep(slot_duration).await; - // At slot 22 (256 seconds = 16s + 240s), we should be around round 10 - // Rounds 1-8: 16s, Rounds 9-10: 240s = 256s total + // Round timeout calculation: + // - Rounds 1-8: 2s each = 16s total + // - Round 9: 120s (ends at 136s) + // - Round 10: 120s (ends at 256s ≈ 21.3 slots) + // At slot 22 (264s), we're in round 11, verifying the instance + // survived past round 10 as required for max_round=12 configuration if slot == 22 { - // Bring operators back online during round 10 to allow consensus + // Bring operators back online to allow consensus after verifying + // the instance survived past round 10 context.set_operators_online(&[2, 3, 4]); break; } @@ -952,6 +962,389 @@ mod manager_tests { // survived cleanup and was able to reach round 10 context.verify_consensus().await; } + + #[tokio::test(start_paused = true)] + // Test that cleanup uses beacon chain deadlines, not slot-based timeouts + // This verifies instances with longer deadlines survive past old 2-slot timeout + async fn test_cleanup_removes_only_expired_instances() { + // SETUP: Create instance at slot 1 with beacon chain deadline = 63 (end of epoch E+1) + // Under old system, would be cleaned at slot 3 (slot + 2) + const OLD_CLEANUP_SLOT: u64 = 3; + const BEACON_DEADLINE_SLOT: u64 = 63; + const SLOT_AFTER_DEADLINE: u64 = 64; + const EXPECTED_INSTANCES_BEFORE_DEADLINE: usize = 1; + const EXPECTED_INSTANCES_AFTER_DEADLINE: usize = 0; + + let setup = setup_test(SINGLE_INSTANCE); + let clock = setup.clock.clone(); + let slot_duration = Duration::from_secs(12); + + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + // Keep operators offline to prevent completion so we can test deadline-based cleanup + context.set_operators_offline(&[1, 2, 3, 4]); + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + + // EXECUTE: Advance past old 2-slot deadline to slot 3 + for slot in 1..=OLD_CLEANUP_SLOT { + clock.set_slot(slot); + sleep(slot_duration).await; + } + sleep(Duration::from_millis(100)).await; + + // ASSERT: Instance should still exist (new deadline is 63, not 3) + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_BEFORE_DEADLINE, + "Instance should survive past old slot {} cleanup with new beacon deadline of {}", + OLD_CLEANUP_SLOT, + BEACON_DEADLINE_SLOT + ); + + // EXECUTE: Advance past actual beacon chain deadline + for slot in (OLD_CLEANUP_SLOT + 1)..=SLOT_AFTER_DEADLINE { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + } + tokio::time::sleep(Duration::from_millis(100)).await; + + // ASSERT: Instance should now be cleaned after its beacon chain deadline + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_DEADLINE, + "Instance should be cleaned after beacon deadline {}", + BEACON_DEADLINE_SLOT + ); + } + + #[tokio::test] + // Test that instance completing successfully is cleaned immediately via completion notification + // Verifies that completion notification cleanup happens before deadline-based cleanup + async fn test_instance_completion_notification() { + // SETUP: Create instance at slot 0 with beacon chain deadline = 63 + // All operators online so consensus completes quickly + const EXPECTED_INSTANCES_AFTER_COMPLETION: usize = 0; + const CONSENSUS_COMPLETION_TIME: Duration = Duration::from_millis(100); + + let setup = setup_test(SINGLE_INSTANCE); + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + + // EXECUTE: Wait for consensus to complete + tokio::time::sleep(CONSENSUS_COMPLETION_TIME).await; + + // ASSERT: Instance should be cleaned via completion notification, not deadline + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_COMPLETION, + "Instance should be cleaned immediately after completion via notification, not waiting for deadline" + ); + } + + #[test] + // Test deadline calculation for Committee role + fn test_committee_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for Committee role (deadline = end of epoch E+1) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const SLOT_IN_EPOCH_ONE: u64 = 32; + const EXPECTED_DEADLINE_EPOCH_ZERO: u64 = 63; // (epoch 0 + 2) * 32 - 1 + const EXPECTED_DEADLINE_EPOCH_ONE: u64 = 95; // (epoch 1 + 2) * 32 - 1 + + // EXECUTE: Calculate deadline for slot 0 + let deadline = super::super::calculate_deadline( + Role::Committee, + Slot::new(SLOT_ZERO), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Committee at slot 0 should have deadline per EIP-7045 + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE_EPOCH_ZERO), + "Committee at slot {} (epoch 0) should have deadline {} per EIP-7045", + SLOT_ZERO, + EXPECTED_DEADLINE_EPOCH_ZERO + ); + + // EXECUTE: Calculate deadline for slot 32 (epoch 1) + let deadline = super::super::calculate_deadline( + Role::Committee, + Slot::new(SLOT_IN_EPOCH_ONE), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Committee at slot 32 should have correct deadline + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE_EPOCH_ONE), + "Committee at slot {} (epoch 1) should have deadline {}", + SLOT_IN_EPOCH_ONE, + EXPECTED_DEADLINE_EPOCH_ONE + ); + } + + #[test] + // Test deadline calculation for Aggregator role + fn test_aggregator_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for Aggregator role (same as Committee) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const EXPECTED_DEADLINE: u64 = 63; // (epoch 0 + 2) * 32 - 1 + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::Aggregator, + Slot::new(SLOT_ZERO), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Aggregator should have same deadline as Committee + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE), + "Aggregator at slot {} should have same deadline as Committee", + SLOT_ZERO + ); + } + + #[test] + // Test deadline calculation for Proposer role + fn test_proposer_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for Proposer role (deadline = same slot) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const SLOT_ARBITRARY: u64 = 100; + + // EXECUTE: Calculate deadline for slot 0 + let deadline = + super::super::calculate_deadline(Role::Proposer, Slot::new(SLOT_ZERO), SLOTS_PER_EPOCH); + + // ASSERT: Proposer deadline should be same slot for immediate inclusion + assert_eq!( + deadline, + Slot::new(SLOT_ZERO), + "Proposer deadline should be same slot for immediate inclusion" + ); + + // EXECUTE: Calculate deadline for arbitrary slot + let deadline = super::super::calculate_deadline( + Role::Proposer, + Slot::new(SLOT_ARBITRARY), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Proposer at arbitrary slot should have deadline at that slot + assert_eq!( + deadline, + Slot::new(SLOT_ARBITRARY), + "Proposer at slot {} should have deadline {}", + SLOT_ARBITRARY, + SLOT_ARBITRARY + ); + } + + #[test] + // Test deadline calculation for SyncCommittee role + fn test_sync_committee_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for SyncCommittee role (deadline = same slot) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_MID_EPOCH: u64 = 50; + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::SyncCommittee, + Slot::new(SLOT_MID_EPOCH), + SLOTS_PER_EPOCH, + ); + + // ASSERT: SyncCommittee deadline should be same slot for immediate inclusion + assert_eq!( + deadline, + Slot::new(SLOT_MID_EPOCH), + "SyncCommittee deadline should be same slot for immediate inclusion" + ); + } + + #[test] + // Test deadline calculation for VoluntaryExit role + fn test_voluntary_exit_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for VoluntaryExit role (deadline = slot + epoch) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_TEN: u64 = 10; + const EXPECTED_DEADLINE: u64 = 42; // 10 + 32 + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::VoluntaryExit, + Slot::new(SLOT_TEN), + SLOTS_PER_EPOCH, + ); + + // ASSERT: VoluntaryExit should have one epoch to complete + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE), + "VoluntaryExit at slot {} should have one epoch to complete", + SLOT_TEN + ); + } + + #[test] + // Test deadline calculation for ValidatorRegistration role + fn test_validator_registration_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for ValidatorRegistration role (deadline = slot + epoch) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const EXPECTED_DEADLINE: u64 = 32; // 0 + 32 + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::ValidatorRegistration, + Slot::new(SLOT_ZERO), + SLOTS_PER_EPOCH, + ); + + // ASSERT: ValidatorRegistration should have one epoch to complete + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE), + "ValidatorRegistration at slot {} should have one epoch to complete", + SLOT_ZERO + ); + } + + #[tokio::test(start_paused = true)] + // Test instance cleanup across epoch boundary + // Verifies that deadline calculation and cleanup work correctly when crossing epochs + async fn test_cleanup_across_epoch_boundary() { + // SETUP: Create instance at slot 30 (near end of epoch 0) with deadline at slot 63 + // Epoch 0 ends at slot 31, epoch 1 spans slots 32-63 + // Deadline for Committee at slot 30: (epoch 0 + 2) * 32 - 1 = 63 + const INSTANCE_SLOT: usize = 30; + const EPOCH_BOUNDARY_SLOT: u64 = 32; + const DEADLINE_SLOT: u64 = 63; + const SLOT_AFTER_DEADLINE: u64 = 64; + const SLOT_DURATION_SECS: u64 = 12; + const EXPECTED_INSTANCES_BEFORE_DEADLINE: usize = 1; + const EXPECTED_INSTANCES_AFTER_DEADLINE: usize = 0; + const STABILIZATION_DELAY_MS: u64 = 100; + + let setup = setup_test(0); + let clock = setup.clock.clone(); + let test_data = vec![generate_test_data(INSTANCE_SLOT)]; + + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + test_data, + ) + .await; + + // Keep operators offline to prevent completion so we can test deadline-based cleanup + context.set_operators_offline(&[1, 2, 3, 4]); + + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + let slot_duration = Duration::from_secs(SLOT_DURATION_SECS); + + // EXECUTE: Advance through epoch boundary to slot 32 + for slot in 31..=EPOCH_BOUNDARY_SLOT { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + } + tokio::time::sleep(Duration::from_millis(STABILIZATION_DELAY_MS)).await; + + // ASSERT: Instance should survive across epoch boundary + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_BEFORE_DEADLINE, + "Instance should survive across epoch boundary" + ); + + // EXECUTE: Advance to slot 64 (past deadline of 63) + for slot in (EPOCH_BOUNDARY_SLOT + 1)..=SLOT_AFTER_DEADLINE { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + } + tokio::time::sleep(Duration::from_millis(STABILIZATION_DELAY_MS)).await; + + // ASSERT: Instance should be cleaned after deadline + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_DEADLINE, + "Instance should be cleaned after deadline {}", + DEADLINE_SLOT + ); + } + + #[tokio::test] + // Test multiple instances completing in rapid succession + // Verifies completion notification channel handles burst of completions + async fn test_multiple_instances_completing_rapidly() { + // SETUP: Create 5 instances that will all complete rapidly + const EXPECTED_INSTANCES_AFTER_COMPLETION: usize = 0; + const CLEANUP_PROCESSING_TIME: u64 = 200; + + let setup = setup_test(FIVE_INSTANCES); + + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + // All operators online - instances should complete quickly + // No artificial delays, all instances racing to consensus + + // EXECUTE: Wait for all instances to complete and cleanup to process + // This implicitly tests that: + // 1. All completion notifications are sent + // 2. Channel doesn't saturate or drop notifications + // 3. Cleanup processes all notifications correctly + tokio::time::sleep(Duration::from_millis(CLEANUP_PROCESSING_TIME)).await; + + // ASSERT: All instances should be cleaned after rapid completion + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_COMPLETION, + "All instances should be cleaned after rapid completion" + ); + } } // very important: set paused to true for deterministic timer From c1f7ff6c512a9c9e9373bb582092a35fad2835d8 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 29 Oct 2025 23:46:39 +0100 Subject: [PATCH 4/5] docs: add mandatory test structure guidelines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add Setup/Execute/Assert pattern as mandatory requirement for all tests: CLAUDE.md: - Add "Writing Tests" section requiring tester-subagent usage before writing any test code - Ensures consistent test structure across codebase tester-subagent.md: - Add section 4 "Test Structure Requirements" with generic example - Define 4 mandatory requirements: section comments, named constants, assertion messages, one behavior per test - List anti-patterns to reject (no section comments, magic numbers, silent assertions, mixed code) - Update description to emphasize "MUST BE USED before writing ANY test code" This ensures all new tests follow readable, maintainable patterns. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .claude/agents/tester-subagent.md | 42 +++++++++++++++++++++++++++++-- CLAUDE.md | 12 +++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/.claude/agents/tester-subagent.md b/.claude/agents/tester-subagent.md index eabfc3720..9cf2c6ed2 100644 --- a/.claude/agents/tester-subagent.md +++ b/.claude/agents/tester-subagent.md @@ -1,11 +1,13 @@ --- name: tester-subagent -description: Expert test creation specialist for the Anchor project with deep knowledge of all crates, especially QBFT. Specializes in bug reproduction tests, message construction, compilation debugging, and comprehensive test coverage patterns specific to Anchor's architecture. Use immediately when creating any tests, especially for bug reproduction or QBFT consensus scenarios. +description: Expert test creation specialist for the Anchor project with deep knowledge of all crates, especially QBFT. Specializes in bug reproduction tests, message construction, compilation debugging, and comprehensive test coverage patterns specific to Anchor's architecture. MUST BE USED before writing ANY test code (except trivial one-liners). tools: Read, Write, Edit, MultiEdit, Glob, Grep, Bash, Task --- You are an expert test creation specialist for the Anchor SSV implementation with comprehensive knowledge of the entire codebase architecture, testing patterns, and bug reproduction methodology. +**CRITICAL: You are invoked because the main agent MUST use you before writing test code. This is mandatory, not optional.** + ## Core Expertise Areas ### 1. Anchor Codebase Architecture Knowledge @@ -79,7 +81,43 @@ fn test_bug_reproduction() { - Include references to specific line numbers where bugs exist - Test the exact scenario that exposes the vulnerability -### 4. Test Categories and Patterns +### 4. Test Structure Requirements + +**EVERY test you write MUST follow this structure:** + +```rust +#[test] +fn test_something() { + // SETUP: Prepare inputs and expected values + const EXPECTED_VALUE: Type = /* value */; + let input = /* prepare input */; + + // EXECUTE: Call the code under test + let result = /* call function/method */; + + // ASSERT: Verify the result + assert_eq!(result, EXPECTED_VALUE, "describe what must be true and why"); +} +``` + +**Requirements you MUST enforce:** + +1. **Section comments** - `// SETUP`, `// EXECUTE`, `// ASSERT` are mandatory +2. **Named constants** - No magic numbers + - ✅ `const RETRY_COUNT: usize = 3;` + - ❌ `let x = retry(3);` +3. **Assertion messages** - Every assert needs a descriptive message + - ✅ `assert!(valid, "Input must be valid for processing");` + - ❌ `assert!(valid);` +4. **One behavior per test** - Each test verifies one specific thing + +**Anti-patterns you MUST reject:** +- ❌ No section comments +- ❌ Bare numbers: `setup_test(1)` instead of `const SINGLE_INSTANCE: usize = 1` +- ❌ Silent assertions: `assert_eq!(x, y)` without explanation +- ❌ Mixed setup/execute/assert code + +### 5. Test Categories and Patterns **Unit Tests:** - Location: Same file as code being tested in `#[cfg(test)]` modules diff --git a/CLAUDE.md b/CLAUDE.md index f98ac307e..1d2f53ddb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -283,6 +283,18 @@ Anchor uses two types of test fixtures for database testing: - Uses temporary files that are automatically cleaned up - Data persists until TempDir is dropped +### Writing Tests + +**Before writing ANY test code, you MUST invoke tester-subagent.** + +The tester-subagent ensures: +- Correct test structure (Setup/Execute/Assert pattern) +- No magic numbers (use named constants) +- Proper assertion messages +- Correct API usage patterns for Anchor + +Do not write test code directly - always use the agent first. + ## Universal Code Quality Principles All agents and contributors must follow these fundamental principles: From 739902c0e0f5d932e93a8da8647de2dcca0b8f7f Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 29 Oct 2025 23:47:10 +0100 Subject: [PATCH 5/5] docs: improve qbft_manager deadline calculation comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add detailed explanation for Committee/Aggregator deadline calculation: - Document the calculation formula: (E+2) * slots_per_epoch - 1 - Explain that this represents the last slot for on-chain inclusion - Reference EIP-7045 specification Enhance ManagedInstance documentation: - Convert to doc comment for better API documentation - Clarify that it tracks both channel and beacon chain deadline - Explain its role in the cleanup task 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- anchor/qbft_manager/src/lib.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index 2ef98d79d..fcd8300cd 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -45,6 +45,10 @@ fn calculate_deadline(role: Role, slot: types::Slot, slots_per_epoch: u64) -> ty Role::Committee | Role::Aggregator => { // Attestations can be included until end of next epoch (epoch E+1) // Per EIP-7045: attestation from epoch E valid until end of epoch E+1 + // + // Calculation for duty at slot S in epoch E: + // - Epoch E+1 ends at slot: (E+2) * slots_per_epoch - 1 + // - This is the last slot where the attestation can be included on-chain let epoch = slot.epoch(slots_per_epoch); types::Slot::new((epoch.as_u64() + 2) * slots_per_epoch - 1) } @@ -115,7 +119,10 @@ pub struct QbftInitialization { on_completed: oneshot::Sender>, } -// Manager's bookkeeping for an instance +/// Manager's bookkeeping for a QBFT instance. +/// +/// Tracks the communication channel for sending messages to the instance +/// and the beacon chain inclusion deadline used by the cleanup task. pub struct ManagedInstance { sender: UnboundedSender>, deadline: types::Slot,