diff --git a/.claude/agents/tester-subagent.md b/.claude/agents/tester-subagent.md index eabfc3720..9cf2c6ed2 100644 --- a/.claude/agents/tester-subagent.md +++ b/.claude/agents/tester-subagent.md @@ -1,11 +1,13 @@ --- name: tester-subagent -description: Expert test creation specialist for the Anchor project with deep knowledge of all crates, especially QBFT. Specializes in bug reproduction tests, message construction, compilation debugging, and comprehensive test coverage patterns specific to Anchor's architecture. Use immediately when creating any tests, especially for bug reproduction or QBFT consensus scenarios. +description: Expert test creation specialist for the Anchor project with deep knowledge of all crates, especially QBFT. Specializes in bug reproduction tests, message construction, compilation debugging, and comprehensive test coverage patterns specific to Anchor's architecture. MUST BE USED before writing ANY test code (except trivial one-liners). tools: Read, Write, Edit, MultiEdit, Glob, Grep, Bash, Task --- You are an expert test creation specialist for the Anchor SSV implementation with comprehensive knowledge of the entire codebase architecture, testing patterns, and bug reproduction methodology. +**CRITICAL: You are invoked because the main agent MUST use you before writing test code. This is mandatory, not optional.** + ## Core Expertise Areas ### 1. Anchor Codebase Architecture Knowledge @@ -79,7 +81,43 @@ fn test_bug_reproduction() { - Include references to specific line numbers where bugs exist - Test the exact scenario that exposes the vulnerability -### 4. Test Categories and Patterns +### 4. Test Structure Requirements + +**EVERY test you write MUST follow this structure:** + +```rust +#[test] +fn test_something() { + // SETUP: Prepare inputs and expected values + const EXPECTED_VALUE: Type = /* value */; + let input = /* prepare input */; + + // EXECUTE: Call the code under test + let result = /* call function/method */; + + // ASSERT: Verify the result + assert_eq!(result, EXPECTED_VALUE, "describe what must be true and why"); +} +``` + +**Requirements you MUST enforce:** + +1. **Section comments** - `// SETUP`, `// EXECUTE`, `// ASSERT` are mandatory +2. **Named constants** - No magic numbers + - ✅ `const RETRY_COUNT: usize = 3;` + - ❌ `let x = retry(3);` +3. **Assertion messages** - Every assert needs a descriptive message + - ✅ `assert!(valid, "Input must be valid for processing");` + - ❌ `assert!(valid);` +4. **One behavior per test** - Each test verifies one specific thing + +**Anti-patterns you MUST reject:** +- ❌ No section comments +- ❌ Bare numbers: `setup_test(1)` instead of `const SINGLE_INSTANCE: usize = 1` +- ❌ Silent assertions: `assert_eq!(x, y)` without explanation +- ❌ Mixed setup/execute/assert code + +### 5. Test Categories and Patterns **Unit Tests:** - Location: Same file as code being tested in `#[cfg(test)]` modules diff --git a/CLAUDE.md b/CLAUDE.md index f98ac307e..1d2f53ddb 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -283,6 +283,18 @@ Anchor uses two types of test fixtures for database testing: - Uses temporary files that are automatically cleaned up - Data persists until TempDir is dropped +### Writing Tests + +**Before writing ANY test code, you MUST invoke tester-subagent.** + +The tester-subagent ensures: +- Correct test structure (Setup/Execute/Assert pattern) +- No magic numbers (use named constants) +- Proper assertion messages +- Correct API usage patterns for Anchor + +Do not write test code directly - always use the agent first. + ## Universal Code Quality Principles All agents and contributors must follow these fundamental principles: diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 14a088fc1..917683749 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -474,6 +474,7 @@ impl Client { slot_clock.clone(), message_sender, config.global_config.ssv_network.ssv_domain_type, + E::slots_per_epoch(), ) .map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?; diff --git a/anchor/qbft_manager/src/instance.rs b/anchor/qbft_manager/src/instance.rs index c4429ec27..0257135b8 100644 --- a/anchor/qbft_manager/src/instance.rs +++ b/anchor/qbft_manager/src/instance.rs @@ -259,6 +259,8 @@ impl> Initialized { pub async fn qbft_instance>( mut rx: UnboundedReceiver>, message_sender: Arc, + completion_tx: mpsc::UnboundedSender, + instance_id: crate::InstanceId, ) { // Signal a new instance that is uninitialized let mut instance = QbftInstance::Uninitialized(Uninitialized::default()); @@ -304,6 +306,7 @@ pub async fn qbft_instance>( if let QbftInstance::Initialized(initialized) = instance { initialized.complete(Completed::TimedOut); } + // No notification - either already sent when decided, or cleaner removed us break; } }; @@ -311,6 +314,12 @@ pub async fn qbft_instance>( // If the instance is ongoing, check whether it is done. if let QbftInstance::Initialized(initialized) = instance { instance = initialized.complete_if_done(&message_sender); + + // If we just transitioned to Decided, notify cleaner for immediate cleanup + if matches!(instance, QbftInstance::Decided(_)) { + let _ = completion_tx.send(instance_id); + break; + } } // Drop guard as late as possible to keep the processor permit. diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index 3dcf4b7ea..fcd8300cd 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -39,8 +39,29 @@ const QBFT_INSTANCE_NAME: &str = "qbft_instance"; const QBFT_MESSAGE_NAME: &str = "qbft_message"; const QBFT_CLEANER_NAME: &str = "qbft_cleaner"; -/// Number of slots to keep before the current slot -const QBFT_RETAIN_SLOTS: u64 = 1; +/// Calculate the beacon chain inclusion deadline for a duty +fn calculate_deadline(role: Role, slot: types::Slot, slots_per_epoch: u64) -> types::Slot { + match role { + Role::Committee | Role::Aggregator => { + // Attestations can be included until end of next epoch (epoch E+1) + // Per EIP-7045: attestation from epoch E valid until end of epoch E+1 + // + // Calculation for duty at slot S in epoch E: + // - Epoch E+1 ends at slot: (E+2) * slots_per_epoch - 1 + // - This is the last slot where the attestation can be included on-chain + let epoch = slot.epoch(slots_per_epoch); + types::Slot::new((epoch.as_u64() + 2) * slots_per_epoch - 1) + } + Role::Proposer | Role::SyncCommittee => { + // Must be in the same slot + slot + } + Role::VoluntaryExit | Role::ValidatorRegistration => { + // One epoch to complete + types::Slot::new(slot.as_u64() + slots_per_epoch) + } + } +} // Unique Identifier for a committee and its corresponding QBFT instance #[derive(Debug, Clone, Hash, PartialEq, Eq)] @@ -98,8 +119,23 @@ pub struct QbftInitialization { on_completed: oneshot::Sender>, } -// Map from an identifier to a sender for the instance -type Map = DashMap>>; +/// Manager's bookkeeping for a QBFT instance. +/// +/// Tracks the communication channel for sending messages to the instance +/// and the beacon chain inclusion deadline used by the cleanup task. +pub struct ManagedInstance { + sender: UnboundedSender>, + deadline: types::Slot, +} + +// Map from an identifier to managed instance data +type Map = DashMap>; + +// Enum to identify which instance completed +pub enum InstanceId { + BeaconVote(CommitteeInstanceId), + ValidatorConsensus(ValidatorInstanceId), +} // Top level QBFTManager structure pub struct QbftManager { @@ -115,6 +151,10 @@ pub struct QbftManager { message_sender: Arc, // Network domain to embed into messages domain: DomainType, + // Channel to notify cleaner when instance completes + completion_tx: mpsc::UnboundedSender, + // Slots per epoch for deadline calculations + slots_per_epoch: u64, } impl QbftManager { @@ -125,7 +165,10 @@ impl QbftManager { slot_clock: impl SlotClock + 'static, message_sender: Arc, domain: DomainType, + slots_per_epoch: u64, ) -> Result, QbftError> { + let (completion_tx, completion_rx) = mpsc::unbounded_channel(); + let manager = Arc::new(QbftManager { processor, operator_id, @@ -133,13 +176,15 @@ impl QbftManager { beacon_vote_instances: DashMap::new(), message_sender, domain, + completion_tx, + slots_per_epoch, }); // Start a long running task that will clean up old instances - manager - .processor - .permitless - .send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?; + manager.processor.permitless.send_async( + Arc::clone(&manager).cleaner(slot_clock, completion_rx), + QBFT_CLEANER_NAME, + )?; Ok(manager) } @@ -161,6 +206,11 @@ impl QbftManager { let (result_sender, result_receiver) = oneshot::channel(); let message_id = D::message_id(&self.domain, &id); + // Calculate deadline for this instance + let role = message_id.role().ok_or(QbftError::InconsistentMessageId)?; + let slot = types::Slot::new(*initial.instance_height(&id) as u64); + let deadline = calculate_deadline(role, slot, self.slots_per_epoch); + // General the qbft configuration let config = ConfigBuilder::new( operator_id, @@ -169,17 +219,12 @@ impl QbftManager { ); let config = config .with_quorum_size(committee.cluster_members.len() - committee.get_f() as usize) - .with_max_rounds( - message_id - .role() - .and_then(|r| r.max_round()) - .ok_or(QbftError::InconsistentMessageId)? as usize, - ) + .with_max_rounds(role.max_round().ok_or(QbftError::InconsistentMessageId)? as usize) .build()?; // Get or spawn a new qbft instance. This will return the sender that we can use to send // new messages to the specific instance - let sender = D::get_or_spawn_instance(self, id); + let sender = D::get_or_spawn_instance(self, id.clone(), deadline); self.processor.urgent_consensus.send_immediate( move |drop_on_finish: DropOnFinish| { // A message to initialize this instance @@ -261,7 +306,19 @@ impl QbftManager { id: D::Id, data: WrappedQbftMessage, ) -> Result<(), QbftError> { - let sender = D::get_or_spawn_instance(self, id); + // Get the map for this data type + let map = D::get_map(self); + + // Look up existing instance - network messages should only go to existing instances + let Some(managed) = map.get(&id) else { + // Instance doesn't exist yet - this message arrived before decide_instance was called + // This is normal during startup, just ignore it + return Ok(()); + }; + + let sender = managed.sender.clone(); + drop(managed); // Release the lock before sending + self.processor.urgent_consensus.send_immediate( move |drop_on_finish: DropOnFinish| { let _ = sender.send(QbftMessage { @@ -274,51 +331,84 @@ impl QbftManager { Ok(()) } - // Long running cleaner that will remove instances that are no longer relevant - async fn cleaner(self: Arc, slot_clock: impl SlotClock) { - while !self.processor.permitless.is_closed() { - sleep( - slot_clock - .duration_to_next_slot() - .unwrap_or(slot_clock.slot_duration()), - ) - .await; - let Some(slot) = slot_clock.now() else { - continue; - }; - let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS); - self.beacon_vote_instances - .retain(|k, _| *k.instance_height >= cutoff.as_usize()); - self.validator_consensus_data_instances - .retain(|k, _| *k.instance_height >= cutoff.as_usize()); + /// Long running cleaner that removes instances based on completion or deadline + async fn cleaner( + self: Arc, + slot_clock: impl SlotClock, + mut completion_rx: mpsc::UnboundedReceiver, + ) { + loop { + tokio::select! { + // Branch 1: Instance completed - clean immediately + Some(id) = completion_rx.recv() => { + match id { + InstanceId::BeaconVote(id) => { + self.beacon_vote_instances.remove(&id); + } + InstanceId::ValidatorConsensus(id) => { + self.validator_consensus_data_instances.remove(&id); + } + } + } + // Branch 2: Slot timeout - clean expired instances + _ = sleep( + slot_clock + .duration_to_next_slot() + .unwrap_or(slot_clock.slot_duration()) + ) => { + let Some(current_slot) = slot_clock.now() else { + continue; + }; + self.beacon_vote_instances + .retain(|_, managed| managed.deadline >= current_slot); + self.validator_consensus_data_instances + .retain(|_, managed| managed.deadline >= current_slot); + } + } + + if self.processor.permitless.is_closed() { + break; + } } } } -// Trait that describes any data that is able to be decided upon during a qbft instance pub trait QbftDecidable: QbftData + Send + Sync + 'static { - type Id: Hash + Eq + Send + Debug; + type Id: Hash + Eq + Send + Debug + Clone; fn get_map(manager: &QbftManager) -> &Map; + fn wrap_id(id: Self::Id) -> InstanceId; + fn get_or_spawn_instance( manager: &QbftManager, id: Self::Id, + deadline: types::Slot, ) -> UnboundedSender> { let map = Self::get_map(manager); - match map.entry(id) { - dashmap::Entry::Occupied(entry) => entry.get().clone(), + match map.entry(id.clone()) { + dashmap::Entry::Occupied(entry) => entry.get().sender.clone(), dashmap::Entry::Vacant(entry) => { // There is not an instance running yet, store the sender and spawn a new instance - // with the reeiver + // with the receiver let (tx, rx) = mpsc::unbounded_channel(); let span = debug_span!("qbft_instance", instance_id = ?entry.key()); - let tx = entry.insert(tx); + let managed = ManagedInstance { + sender: tx, + deadline, + }; + let sender = entry.insert(managed).sender.clone(); + let instance_id = Self::wrap_id(id); + let completion_tx = manager.completion_tx.clone(); + let message_sender = manager.message_sender.clone(); let _ = manager.processor.permitless.send_async( - Box::pin(qbft_instance(rx, manager.message_sender.clone()).instrument(span)), + Box::pin( + qbft_instance(rx, message_sender, completion_tx, instance_id) + .instrument(span), + ), QBFT_INSTANCE_NAME, ); - tx.clone() + sender } } } @@ -334,6 +424,10 @@ impl QbftDecidable for ValidatorConsensusData { &manager.validator_consensus_data_instances } + fn wrap_id(id: Self::Id) -> InstanceId { + InstanceId::ValidatorConsensus(id) + } + fn instance_height(&self, id: &Self::Id) -> InstanceHeight { id.instance_height } @@ -354,6 +448,10 @@ impl QbftDecidable for BeaconVote { &manager.beacon_vote_instances } + fn wrap_id(id: Self::Id) -> InstanceId { + InstanceId::BeaconVote(id) + } + fn instance_height(&self, id: &Self::Id) -> InstanceHeight { id.instance_height } diff --git a/anchor/qbft_manager/src/tests.rs b/anchor/qbft_manager/src/tests.rs index 11a2b7c79..6f413cdba 100644 --- a/anchor/qbft_manager/src/tests.rs +++ b/anchor/qbft_manager/src/tests.rs @@ -304,6 +304,7 @@ where slot_clock.clone(), Arc::new(MockMessageSender::new(network_tx.clone(), operator_id)), DomainType([0; 4]), + 32, // slots_per_epoch ) .expect("Creation should not fail"); @@ -627,6 +628,11 @@ pub struct ConsensusResult { mod manager_tests { use super::*; + // Test constants for number of QBFT instances + const SINGLE_INSTANCE: usize = 1; + const TWO_INSTANCES: usize = 2; + const FIVE_INSTANCES: usize = 5; + // Provides test setup struct Setup { executor: TaskExecutor, @@ -693,7 +699,7 @@ mod manager_tests { #[tokio::test] // Test running a single instance and confirm that it reaches consensus async fn test_basic_run() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -708,7 +714,7 @@ mod manager_tests { #[tokio::test] // Take the leader offline to test a round change async fn test_round_change() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -724,7 +730,7 @@ mod manager_tests { #[tokio::test] // Test one offline operator async fn test_fault_operator() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -740,7 +746,7 @@ mod manager_tests { #[tokio::test] // Go through all committee sizes and confirm that we can reach consensus with f faulty async fn test_consensus_f_faulty() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let sizes = vec![ (CommitteeSize::Four, vec![1]), (CommitteeSize::Seven, vec![1, 3]), @@ -765,7 +771,7 @@ mod manager_tests { #[tokio::test] // Test running concurrent instances and confirm that they reach consensus async fn test_concurrent_runs() { - let setup = setup_test(2); + let setup = setup_test(TWO_INSTANCES); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -780,7 +786,7 @@ mod manager_tests { #[tokio::test(start_paused = true)] // Start with > f fault and then recover them. This should reach consensus async fn test_recovery() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -800,7 +806,7 @@ mod manager_tests { #[tokio::test] // Test commit message suppression for an operator async fn test_commit_suppression() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -819,7 +825,7 @@ mod manager_tests { #[tokio::test] // Test sending double messages async fn test_send_double() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -835,7 +841,7 @@ mod manager_tests { #[tokio::test] // Test one of the nodes sending invalid messages async fn test_invalid_message() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -852,7 +858,7 @@ mod manager_tests { // Test network partition scenarios // This simulates temporary network partitions by taking nodes offline and bringing them back async fn test_network_partition() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let mut context = TestContext::::new( setup.clock, setup.executor, @@ -884,7 +890,7 @@ mod manager_tests { // This is different compared to network partition because here, messages are delayed instead // of dropped. async fn test_late_initialization() { - let setup = setup_test(1); + let setup = setup_test(SINGLE_INSTANCE); let initialization_delays = HashMap::from([ (OperatorId(2), Duration::from_secs(3)), // Middle of round 2 @@ -902,6 +908,443 @@ mod manager_tests { context.verify_consensus().await; } + + #[tokio::test(start_paused = true)] + // Test that Committee instances can reach late rounds (9+) with max_round=12 configuration. + // This verifies that instances survive long enough to progress through many round changes + // as configured. Committee role has max_round=12, so instances should be able to reach + // round 10 before timing out at round 13. + // + // The test simulates network conditions where consensus cannot be reached early by keeping + // all but one operator offline, forcing round changes. We advance the slot to trigger + // cleanup and verify the instance survives to reach round 10. + async fn test_committee_can_reach_late_rounds() { + let setup = setup_test(SINGLE_INSTANCE); + let clock = setup.clock.clone(); + let mut context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + // Keep 3 operators offline initially to prevent consensus and force round changes. + // With only 1 operator online out of 4, we cannot reach quorum (need 3). + // This will cause the instance to go through multiple round changes. + context.set_operators_offline(&[2, 3, 4]); + + // Advance time and slots to simulate reaching round 10 + // Instance starts at slot 0 + let slot_duration = Duration::from_secs(12); + + // Advance through multiple slots while QBFT progresses + // This triggers cleanup logic which should NOT remove the active instance + for slot in 1..=25 { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + + // Round timeout calculation: + // - Rounds 1-8: 2s each = 16s total + // - Round 9: 120s (ends at 136s) + // - Round 10: 120s (ends at 256s ≈ 21.3 slots) + // At slot 22 (264s), we're in round 11, verifying the instance + // survived past round 10 as required for max_round=12 configuration + if slot == 22 { + // Bring operators back online to allow consensus after verifying + // the instance survived past round 10 + context.set_operators_online(&[2, 3, 4]); + break; + } + } + + // Verify that consensus is reached successfully, proving the instance + // survived cleanup and was able to reach round 10 + context.verify_consensus().await; + } + + #[tokio::test(start_paused = true)] + // Test that cleanup uses beacon chain deadlines, not slot-based timeouts + // This verifies instances with longer deadlines survive past old 2-slot timeout + async fn test_cleanup_removes_only_expired_instances() { + // SETUP: Create instance at slot 1 with beacon chain deadline = 63 (end of epoch E+1) + // Under old system, would be cleaned at slot 3 (slot + 2) + const OLD_CLEANUP_SLOT: u64 = 3; + const BEACON_DEADLINE_SLOT: u64 = 63; + const SLOT_AFTER_DEADLINE: u64 = 64; + const EXPECTED_INSTANCES_BEFORE_DEADLINE: usize = 1; + const EXPECTED_INSTANCES_AFTER_DEADLINE: usize = 0; + + let setup = setup_test(SINGLE_INSTANCE); + let clock = setup.clock.clone(); + let slot_duration = Duration::from_secs(12); + + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + // Keep operators offline to prevent completion so we can test deadline-based cleanup + context.set_operators_offline(&[1, 2, 3, 4]); + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + + // EXECUTE: Advance past old 2-slot deadline to slot 3 + for slot in 1..=OLD_CLEANUP_SLOT { + clock.set_slot(slot); + sleep(slot_duration).await; + } + sleep(Duration::from_millis(100)).await; + + // ASSERT: Instance should still exist (new deadline is 63, not 3) + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_BEFORE_DEADLINE, + "Instance should survive past old slot {} cleanup with new beacon deadline of {}", + OLD_CLEANUP_SLOT, + BEACON_DEADLINE_SLOT + ); + + // EXECUTE: Advance past actual beacon chain deadline + for slot in (OLD_CLEANUP_SLOT + 1)..=SLOT_AFTER_DEADLINE { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + } + tokio::time::sleep(Duration::from_millis(100)).await; + + // ASSERT: Instance should now be cleaned after its beacon chain deadline + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_DEADLINE, + "Instance should be cleaned after beacon deadline {}", + BEACON_DEADLINE_SLOT + ); + } + + #[tokio::test] + // Test that instance completing successfully is cleaned immediately via completion notification + // Verifies that completion notification cleanup happens before deadline-based cleanup + async fn test_instance_completion_notification() { + // SETUP: Create instance at slot 0 with beacon chain deadline = 63 + // All operators online so consensus completes quickly + const EXPECTED_INSTANCES_AFTER_COMPLETION: usize = 0; + const CONSENSUS_COMPLETION_TIME: Duration = Duration::from_millis(100); + + let setup = setup_test(SINGLE_INSTANCE); + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + + // EXECUTE: Wait for consensus to complete + tokio::time::sleep(CONSENSUS_COMPLETION_TIME).await; + + // ASSERT: Instance should be cleaned via completion notification, not deadline + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_COMPLETION, + "Instance should be cleaned immediately after completion via notification, not waiting for deadline" + ); + } + + #[test] + // Test deadline calculation for Committee role + fn test_committee_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for Committee role (deadline = end of epoch E+1) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const SLOT_IN_EPOCH_ONE: u64 = 32; + const EXPECTED_DEADLINE_EPOCH_ZERO: u64 = 63; // (epoch 0 + 2) * 32 - 1 + const EXPECTED_DEADLINE_EPOCH_ONE: u64 = 95; // (epoch 1 + 2) * 32 - 1 + + // EXECUTE: Calculate deadline for slot 0 + let deadline = super::super::calculate_deadline( + Role::Committee, + Slot::new(SLOT_ZERO), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Committee at slot 0 should have deadline per EIP-7045 + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE_EPOCH_ZERO), + "Committee at slot {} (epoch 0) should have deadline {} per EIP-7045", + SLOT_ZERO, + EXPECTED_DEADLINE_EPOCH_ZERO + ); + + // EXECUTE: Calculate deadline for slot 32 (epoch 1) + let deadline = super::super::calculate_deadline( + Role::Committee, + Slot::new(SLOT_IN_EPOCH_ONE), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Committee at slot 32 should have correct deadline + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE_EPOCH_ONE), + "Committee at slot {} (epoch 1) should have deadline {}", + SLOT_IN_EPOCH_ONE, + EXPECTED_DEADLINE_EPOCH_ONE + ); + } + + #[test] + // Test deadline calculation for Aggregator role + fn test_aggregator_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for Aggregator role (same as Committee) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const EXPECTED_DEADLINE: u64 = 63; // (epoch 0 + 2) * 32 - 1 + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::Aggregator, + Slot::new(SLOT_ZERO), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Aggregator should have same deadline as Committee + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE), + "Aggregator at slot {} should have same deadline as Committee", + SLOT_ZERO + ); + } + + #[test] + // Test deadline calculation for Proposer role + fn test_proposer_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for Proposer role (deadline = same slot) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const SLOT_ARBITRARY: u64 = 100; + + // EXECUTE: Calculate deadline for slot 0 + let deadline = + super::super::calculate_deadline(Role::Proposer, Slot::new(SLOT_ZERO), SLOTS_PER_EPOCH); + + // ASSERT: Proposer deadline should be same slot for immediate inclusion + assert_eq!( + deadline, + Slot::new(SLOT_ZERO), + "Proposer deadline should be same slot for immediate inclusion" + ); + + // EXECUTE: Calculate deadline for arbitrary slot + let deadline = super::super::calculate_deadline( + Role::Proposer, + Slot::new(SLOT_ARBITRARY), + SLOTS_PER_EPOCH, + ); + + // ASSERT: Proposer at arbitrary slot should have deadline at that slot + assert_eq!( + deadline, + Slot::new(SLOT_ARBITRARY), + "Proposer at slot {} should have deadline {}", + SLOT_ARBITRARY, + SLOT_ARBITRARY + ); + } + + #[test] + // Test deadline calculation for SyncCommittee role + fn test_sync_committee_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for SyncCommittee role (deadline = same slot) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_MID_EPOCH: u64 = 50; + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::SyncCommittee, + Slot::new(SLOT_MID_EPOCH), + SLOTS_PER_EPOCH, + ); + + // ASSERT: SyncCommittee deadline should be same slot for immediate inclusion + assert_eq!( + deadline, + Slot::new(SLOT_MID_EPOCH), + "SyncCommittee deadline should be same slot for immediate inclusion" + ); + } + + #[test] + // Test deadline calculation for VoluntaryExit role + fn test_voluntary_exit_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for VoluntaryExit role (deadline = slot + epoch) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_TEN: u64 = 10; + const EXPECTED_DEADLINE: u64 = 42; // 10 + 32 + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::VoluntaryExit, + Slot::new(SLOT_TEN), + SLOTS_PER_EPOCH, + ); + + // ASSERT: VoluntaryExit should have one epoch to complete + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE), + "VoluntaryExit at slot {} should have one epoch to complete", + SLOT_TEN + ); + } + + #[test] + // Test deadline calculation for ValidatorRegistration role + fn test_validator_registration_role_deadline_calculation() { + use ssv_types::msgid::Role; + use types::Slot; + + // SETUP: Define test constants for ValidatorRegistration role (deadline = slot + epoch) + const SLOTS_PER_EPOCH: u64 = 32; + const SLOT_ZERO: u64 = 0; + const EXPECTED_DEADLINE: u64 = 32; // 0 + 32 + + // EXECUTE: Calculate deadline + let deadline = super::super::calculate_deadline( + Role::ValidatorRegistration, + Slot::new(SLOT_ZERO), + SLOTS_PER_EPOCH, + ); + + // ASSERT: ValidatorRegistration should have one epoch to complete + assert_eq!( + deadline, + Slot::new(EXPECTED_DEADLINE), + "ValidatorRegistration at slot {} should have one epoch to complete", + SLOT_ZERO + ); + } + + #[tokio::test(start_paused = true)] + // Test instance cleanup across epoch boundary + // Verifies that deadline calculation and cleanup work correctly when crossing epochs + async fn test_cleanup_across_epoch_boundary() { + // SETUP: Create instance at slot 30 (near end of epoch 0) with deadline at slot 63 + // Epoch 0 ends at slot 31, epoch 1 spans slots 32-63 + // Deadline for Committee at slot 30: (epoch 0 + 2) * 32 - 1 = 63 + const INSTANCE_SLOT: usize = 30; + const EPOCH_BOUNDARY_SLOT: u64 = 32; + const DEADLINE_SLOT: u64 = 63; + const SLOT_AFTER_DEADLINE: u64 = 64; + const SLOT_DURATION_SECS: u64 = 12; + const EXPECTED_INSTANCES_BEFORE_DEADLINE: usize = 1; + const EXPECTED_INSTANCES_AFTER_DEADLINE: usize = 0; + const STABILIZATION_DELAY_MS: u64 = 100; + + let setup = setup_test(0); + let clock = setup.clock.clone(); + let test_data = vec![generate_test_data(INSTANCE_SLOT)]; + + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + test_data, + ) + .await; + + // Keep operators offline to prevent completion so we can test deadline-based cleanup + context.set_operators_offline(&[1, 2, 3, 4]); + + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + let slot_duration = Duration::from_secs(SLOT_DURATION_SECS); + + // EXECUTE: Advance through epoch boundary to slot 32 + for slot in 31..=EPOCH_BOUNDARY_SLOT { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + } + tokio::time::sleep(Duration::from_millis(STABILIZATION_DELAY_MS)).await; + + // ASSERT: Instance should survive across epoch boundary + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_BEFORE_DEADLINE, + "Instance should survive across epoch boundary" + ); + + // EXECUTE: Advance to slot 64 (past deadline of 63) + for slot in (EPOCH_BOUNDARY_SLOT + 1)..=SLOT_AFTER_DEADLINE { + clock.set_slot(slot); + tokio::time::sleep(slot_duration).await; + } + tokio::time::sleep(Duration::from_millis(STABILIZATION_DELAY_MS)).await; + + // ASSERT: Instance should be cleaned after deadline + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_DEADLINE, + "Instance should be cleaned after deadline {}", + DEADLINE_SLOT + ); + } + + #[tokio::test] + // Test multiple instances completing in rapid succession + // Verifies completion notification channel handles burst of completions + async fn test_multiple_instances_completing_rapidly() { + // SETUP: Create 5 instances that will all complete rapidly + const EXPECTED_INSTANCES_AFTER_COMPLETION: usize = 0; + const CLEANUP_PROCESSING_TIME: u64 = 200; + + let setup = setup_test(FIVE_INSTANCES); + + let context = TestContext::::new( + setup.clock, + setup.executor, + CommitteeSize::Four, + setup.all_data, + ) + .await; + + // All operators online - instances should complete quickly + // No artificial delays, all instances racing to consensus + + // EXECUTE: Wait for all instances to complete and cleanup to process + // This implicitly tests that: + // 1. All completion notifications are sent + // 2. Channel doesn't saturate or drop notifications + // 3. Cleanup processes all notifications correctly + tokio::time::sleep(Duration::from_millis(CLEANUP_PROCESSING_TIME)).await; + + // ASSERT: All instances should be cleaned after rapid completion + let manager = context.tester.managers.get(&OperatorId(1)).unwrap(); + assert_eq!( + manager.beacon_vote_instances.len(), + EXPECTED_INSTANCES_AFTER_COMPLETION, + "All instances should be cleaned after rapid completion" + ); + } } // very important: set paused to true for deterministic timer @@ -916,10 +1359,17 @@ async fn test_timeout(round_timeout_to_test: usize) { let (sender_tx, _sender_rx) = unbounded_channel(); let (message_tx, message_rx) = unbounded_channel(); let (result_tx, result_rx) = oneshot::channel(); + let (completion_tx, _completion_rx) = unbounded_channel(); let message_sender = MockMessageSender::new(sender_tx, OperatorId(1)); + let instance_id = super::InstanceId::BeaconVote(CommitteeInstanceId { + committee: CommitteeId::default(), + instance_height: 0.into(), + }); let _handle = tokio::spawn(qbft_instance::( message_rx, Arc::new(message_sender), + completion_tx, + instance_id, )); // create a slot clock at slot 0 with a slot duration of 12 seconds