-
Notifications
You must be signed in to change notification settings - Fork 28
refactor: use beacon chain deadlines for QBFT instance cleanup #719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Changes from 2 commits
82eaa98
4767f9d
b2fc159
895c5c0
d181178
c1f7ff6
739902c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,8 +39,25 @@ const QBFT_INSTANCE_NAME: &str = "qbft_instance"; | |
| const QBFT_MESSAGE_NAME: &str = "qbft_message"; | ||
| const QBFT_CLEANER_NAME: &str = "qbft_cleaner"; | ||
|
|
||
| /// Number of slots to keep before the current slot | ||
| const QBFT_RETAIN_SLOTS: u64 = 1; | ||
| /// Calculate the beacon chain inclusion deadline for a duty | ||
| fn calculate_deadline(role: Role, slot: types::Slot, slots_per_epoch: u64) -> types::Slot { | ||
| match role { | ||
| Role::Committee | Role::Aggregator => { | ||
| // Attestations can be included until end of next epoch (epoch E+1) | ||
| // Per EIP-7045: attestation from epoch E valid until end of epoch E+1 | ||
| let epoch = slot.epoch(slots_per_epoch); | ||
| types::Slot::new((epoch.as_u64() + 2) * slots_per_epoch - 1) | ||
| } | ||
| Role::Proposer | Role::SyncCommittee => { | ||
| // Must be in the same slot | ||
| slot | ||
| } | ||
| Role::VoluntaryExit | Role::ValidatorRegistration => { | ||
| // One epoch to complete | ||
| types::Slot::new(slot.as_u64() + slots_per_epoch) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Unique Identifier for a committee and its corresponding QBFT instance | ||
| #[derive(Debug, Clone, Hash, PartialEq, Eq)] | ||
|
|
@@ -98,8 +115,20 @@ pub struct QbftInitialization<D: QbftData> { | |
| on_completed: oneshot::Sender<Completed<D>>, | ||
| } | ||
|
|
||
| // Map from an identifier to a sender for the instance | ||
| type Map<I, D> = DashMap<I, UnboundedSender<QbftMessage<D>>>; | ||
| // Manager's bookkeeping for an instance | ||
| pub struct ManagedInstance<D: QbftData> { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider Adding Documentation The /// Manager's bookkeeping for a QBFT instance.
///
/// Tracks the communication channel and beacon chain inclusion deadline
/// for each active instance. The `deadline` field determines when the
/// instance should be cleaned up if not completed earlier. Instances are
/// removed either when they complete (via completion notification) or when
/// their deadline expires (via the periodic cleaner sweep).
pub struct ManagedInstance<D: QbftData> {
sender: UnboundedSender<QbftMessage<D>>,
deadline: types::Slot,
}This helps future maintainers understand the purpose and lifecycle management. |
||
| sender: UnboundedSender<QbftMessage<D>>, | ||
| deadline: types::Slot, | ||
| } | ||
|
|
||
| // Map from an identifier to managed instance data | ||
| type Map<I, D> = DashMap<I, ManagedInstance<D>>; | ||
|
|
||
| // Enum to identify which instance completed | ||
| pub enum InstanceId { | ||
| BeaconVote(CommitteeInstanceId), | ||
| ValidatorConsensus(ValidatorInstanceId), | ||
| } | ||
|
|
||
| // Top level QBFTManager structure | ||
| pub struct QbftManager { | ||
|
|
@@ -115,6 +144,10 @@ pub struct QbftManager { | |
| message_sender: Arc<dyn MessageSender>, | ||
| // Network domain to embed into messages | ||
| domain: DomainType, | ||
| // Channel to notify cleaner when instance completes | ||
| completion_tx: mpsc::UnboundedSender<InstanceId>, | ||
| // Slots per epoch for deadline calculations | ||
| slots_per_epoch: u64, | ||
| } | ||
|
|
||
| impl QbftManager { | ||
|
|
@@ -125,21 +158,26 @@ impl QbftManager { | |
| slot_clock: impl SlotClock + 'static, | ||
| message_sender: Arc<dyn MessageSender>, | ||
| domain: DomainType, | ||
| slots_per_epoch: u64, | ||
| ) -> Result<Arc<Self>, QbftError> { | ||
| let (completion_tx, completion_rx) = mpsc::unbounded_channel(); | ||
|
|
||
| let manager = Arc::new(QbftManager { | ||
| processor, | ||
| operator_id, | ||
| validator_consensus_data_instances: DashMap::new(), | ||
| beacon_vote_instances: DashMap::new(), | ||
| message_sender, | ||
| domain, | ||
| completion_tx, | ||
| slots_per_epoch, | ||
| }); | ||
|
|
||
| // Start a long running task that will clean up old instances | ||
| manager | ||
| .processor | ||
| .permitless | ||
| .send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?; | ||
| manager.processor.permitless.send_async( | ||
| Arc::clone(&manager).cleaner(slot_clock, completion_rx), | ||
| QBFT_CLEANER_NAME, | ||
| )?; | ||
|
|
||
| Ok(manager) | ||
| } | ||
|
|
@@ -161,6 +199,11 @@ impl QbftManager { | |
| let (result_sender, result_receiver) = oneshot::channel(); | ||
| let message_id = D::message_id(&self.domain, &id); | ||
|
|
||
| // Calculate deadline for this instance | ||
| let role = message_id.role().ok_or(QbftError::InconsistentMessageId)?; | ||
| let slot = types::Slot::new(*initial.instance_height(&id) as u64); | ||
| let deadline = calculate_deadline(role, slot, self.slots_per_epoch); | ||
|
|
||
| // General the qbft configuration | ||
| let config = ConfigBuilder::new( | ||
| operator_id, | ||
|
|
@@ -169,17 +212,12 @@ impl QbftManager { | |
| ); | ||
| let config = config | ||
| .with_quorum_size(committee.cluster_members.len() - committee.get_f() as usize) | ||
| .with_max_rounds( | ||
| message_id | ||
| .role() | ||
| .and_then(|r| r.max_round()) | ||
| .ok_or(QbftError::InconsistentMessageId)? as usize, | ||
| ) | ||
| .with_max_rounds(role.max_round().ok_or(QbftError::InconsistentMessageId)? as usize) | ||
| .build()?; | ||
|
|
||
| // Get or spawn a new qbft instance. This will return the sender that we can use to send | ||
| // new messages to the specific instance | ||
| let sender = D::get_or_spawn_instance(self, id); | ||
| let sender = D::get_or_spawn_instance(self, id.clone(), deadline); | ||
| self.processor.urgent_consensus.send_immediate( | ||
| move |drop_on_finish: DropOnFinish| { | ||
| // A message to initialize this instance | ||
|
|
@@ -261,7 +299,19 @@ impl QbftManager { | |
| id: D::Id, | ||
| data: WrappedQbftMessage, | ||
| ) -> Result<(), QbftError> { | ||
| let sender = D::get_or_spawn_instance(self, id); | ||
| // Get the map for this data type | ||
| let map = D::get_map(self); | ||
|
|
||
| // Look up existing instance - network messages should only go to existing instances | ||
| let Some(managed) = map.get(&id) else { | ||
| // Instance doesn't exist yet - this message arrived before decide_instance was called | ||
| // This is normal during startup, just ignore it | ||
| return Ok(()); | ||
| }; | ||
|
|
||
| let sender = managed.sender.clone(); | ||
| drop(managed); // Release the lock before sending | ||
|
|
||
| self.processor.urgent_consensus.send_immediate( | ||
| move |drop_on_finish: DropOnFinish| { | ||
| let _ = sender.send(QbftMessage { | ||
|
|
@@ -274,51 +324,84 @@ impl QbftManager { | |
| Ok(()) | ||
| } | ||
|
|
||
| // Long running cleaner that will remove instances that are no longer relevant | ||
| async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) { | ||
| while !self.processor.permitless.is_closed() { | ||
| sleep( | ||
| slot_clock | ||
| .duration_to_next_slot() | ||
| .unwrap_or(slot_clock.slot_duration()), | ||
| ) | ||
| .await; | ||
| let Some(slot) = slot_clock.now() else { | ||
| continue; | ||
| }; | ||
| let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS); | ||
| self.beacon_vote_instances | ||
| .retain(|k, _| *k.instance_height >= cutoff.as_usize()); | ||
| self.validator_consensus_data_instances | ||
| .retain(|k, _| *k.instance_height >= cutoff.as_usize()); | ||
| /// Long running cleaner that removes instances based on completion or deadline | ||
| async fn cleaner( | ||
| self: Arc<Self>, | ||
| slot_clock: impl SlotClock, | ||
| mut completion_rx: mpsc::UnboundedReceiver<InstanceId>, | ||
| ) { | ||
| loop { | ||
| tokio::select! { | ||
| // Branch 1: Instance completed - clean immediately | ||
| Some(id) = completion_rx.recv() => { | ||
| match id { | ||
| InstanceId::BeaconVote(id) => { | ||
| self.beacon_vote_instances.remove(&id); | ||
| } | ||
| InstanceId::ValidatorConsensus(id) => { | ||
| self.validator_consensus_data_instances.remove(&id); | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+342
to
+352
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a problem with this approach: In theory, there might be a race condition where some tasks try to register their oneshot channel to an instance after it has completed. This might e.g. be the case if multiple validator attestation duties wait for the same committee instance. If we start the instance late (e.g. because of a struggling BN), the first thread will start the instance, which might complete immediately due to replayed messages, giving no opportunity for the other tasks to register their listeners. This is why the current code cleans up at a fixed time regardless of completion. Instead, we could move the cleanup time in this branch - to give some time (til end of next slot?) to get the instance result. Wdyt?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need more context to understand what's described in the first paragraph. |
||
| // Branch 2: Slot timeout - clean expired instances | ||
| _ = sleep( | ||
| slot_clock | ||
| .duration_to_next_slot() | ||
| .unwrap_or(slot_clock.slot_duration()) | ||
| ) => { | ||
| let Some(current_slot) = slot_clock.now() else { | ||
| continue; | ||
| }; | ||
| self.beacon_vote_instances | ||
| .retain(|_, managed| managed.deadline >= current_slot); | ||
| self.validator_consensus_data_instances | ||
| .retain(|_, managed| managed.deadline >= current_slot); | ||
| } | ||
| } | ||
|
|
||
| if self.processor.permitless.is_closed() { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Trait that describes any data that is able to be decided upon during a qbft instance | ||
| pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static { | ||
| type Id: Hash + Eq + Send + Debug; | ||
| type Id: Hash + Eq + Send + Debug + Clone; | ||
|
|
||
| fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self>; | ||
|
|
||
| fn wrap_id(id: Self::Id) -> InstanceId; | ||
|
|
||
| fn get_or_spawn_instance( | ||
| manager: &QbftManager, | ||
| id: Self::Id, | ||
| deadline: types::Slot, | ||
| ) -> UnboundedSender<QbftMessage<Self>> { | ||
| let map = Self::get_map(manager); | ||
| match map.entry(id) { | ||
| dashmap::Entry::Occupied(entry) => entry.get().clone(), | ||
| match map.entry(id.clone()) { | ||
| dashmap::Entry::Occupied(entry) => entry.get().sender.clone(), | ||
| dashmap::Entry::Vacant(entry) => { | ||
| // There is not an instance running yet, store the sender and spawn a new instance | ||
| // with the reeiver | ||
| // with the receiver | ||
| let (tx, rx) = mpsc::unbounded_channel(); | ||
| let span = debug_span!("qbft_instance", instance_id = ?entry.key()); | ||
| let tx = entry.insert(tx); | ||
| let managed = ManagedInstance { | ||
| sender: tx, | ||
| deadline, | ||
| }; | ||
| let sender = entry.insert(managed).sender.clone(); | ||
| let instance_id = Self::wrap_id(id); | ||
| let completion_tx = manager.completion_tx.clone(); | ||
| let message_sender = manager.message_sender.clone(); | ||
| let _ = manager.processor.permitless.send_async( | ||
| Box::pin(qbft_instance(rx, manager.message_sender.clone()).instrument(span)), | ||
| Box::pin( | ||
| qbft_instance(rx, message_sender, completion_tx, instance_id) | ||
| .instrument(span), | ||
| ), | ||
| QBFT_INSTANCE_NAME, | ||
| ); | ||
| tx.clone() | ||
| sender | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -334,6 +417,10 @@ impl QbftDecidable for ValidatorConsensusData { | |
| &manager.validator_consensus_data_instances | ||
| } | ||
|
|
||
| fn wrap_id(id: Self::Id) -> InstanceId { | ||
| InstanceId::ValidatorConsensus(id) | ||
| } | ||
|
|
||
| fn instance_height(&self, id: &Self::Id) -> InstanceHeight { | ||
| id.instance_height | ||
| } | ||
|
|
@@ -354,6 +441,10 @@ impl QbftDecidable for BeaconVote { | |
| &manager.beacon_vote_instances | ||
| } | ||
|
|
||
| fn wrap_id(id: Self::Id) -> InstanceId { | ||
| InstanceId::BeaconVote(id) | ||
| } | ||
|
|
||
| fn instance_height(&self, id: &Self::Id) -> InstanceHeight { | ||
| id.instance_height | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -304,6 +304,7 @@ where | |
| slot_clock.clone(), | ||
| Arc::new(MockMessageSender::new(network_tx.clone(), operator_id)), | ||
| DomainType([0; 4]), | ||
| 32, // slots_per_epoch | ||
| ) | ||
| .expect("Creation should not fail"); | ||
|
|
||
|
|
@@ -902,6 +903,55 @@ mod manager_tests { | |
|
|
||
| context.verify_consensus().await; | ||
| } | ||
|
|
||
| #[tokio::test(start_paused = true)] | ||
| // Test that Committee instances can reach late rounds (9+) with max_round=12 configuration. | ||
| // This verifies that instances survive long enough to progress through many round changes | ||
| // as configured. Committee role has max_round=12, so instances should be able to reach | ||
| // round 10 before timing out at round 13. | ||
| // | ||
| // The test simulates network conditions where consensus cannot be reached early by keeping | ||
| // all but one operator offline, forcing round changes. We advance the slot to trigger | ||
| // cleanup and verify the instance survives to reach round 10. | ||
| async fn test_committee_can_reach_late_rounds() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Documentation: Test expectations need clarification The comment says "Currently fails" but doesn't specify:
Recommendations:
#[tokio::test(start_paused = true)]
#[ignore = "Fails due to premature cleanup - see issue #XXX"]
async fn test_committee_can_reach_late_rounds() {
#[tokio::test(start_paused = true)]
async fn test_committee_cleanup_prevents_late_rounds() {
// Explicitly test that instances ARE cleaned up at slot 2
// This documents current behavior before fix is implemented
This prevents CI from failing and clearly communicates test intent to future developers. |
||
| let setup = setup_test(1); | ||
| let clock = setup.clock.clone(); | ||
| let mut context = TestContext::<BeaconVote>::new( | ||
| setup.clock, | ||
| setup.executor, | ||
| CommitteeSize::Four, | ||
| setup.all_data, | ||
| ) | ||
| .await; | ||
|
|
||
| // Keep 3 operators offline initially to prevent consensus and force round changes. | ||
| // With only 1 operator online out of 4, we cannot reach quorum (need 3). | ||
| // This will cause the instance to go through multiple round changes. | ||
| context.set_operators_offline(&[2, 3, 4]); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test Design: Consider testing the actual cleanup boundary The test keeps 3 out of 4 operators offline to force round changes, which is good. However, it doesn't verify the specific failure mode mentioned in the PR description. Consider adding:
Example enhancement: // Keep 3 operators offline to force round changes
context.set_operators_offline(&[2, 3, 4]);
// Advance to slot 2 where cleanup happens (cutoff = slot 2 - 1 = slot 1)
// At this point, the instance starting at slot 0 should be removed
clock.set_slot(1);
tokio::time::sleep(slot_duration).await;
clock.set_slot(2);
tokio::time::sleep(slot_duration).await;
// TODO: Add assertion here to verify instance is still alive
// This is where the bug manifests - instance gets cleaned up too earlyThis would make the test more explicitly demonstrate the issue described in the PR. |
||
|
|
||
| // Advance time and slots to simulate reaching round 10 | ||
| // Instance starts at slot 0 | ||
| let slot_duration = Duration::from_secs(12); | ||
|
|
||
| // Advance through multiple slots while QBFT progresses | ||
| // This triggers cleanup logic which should NOT remove the active instance | ||
| for slot in 1..=50 { | ||
|
||
| clock.set_slot(slot); | ||
| tokio::time::sleep(slot_duration).await; | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mathematical Accuracy: Comment contains calculation error The comment states "Rounds 1-8: 16s" but this is incorrect:
Correct breakdown to round 10:
At 12s per slot:
Suggest updating comment to: // At slot 22 (264 seconds = 22 * 12s):
// - Rounds 1-8: 8 * 2s = 16s
// - Round 9: 120s
// - Total to complete round 9: 136s (11.33 slots)
// - Round 10 starts at 136s (slot 11), so at slot 22 (264s) we're 128s into round 10
if slot == 22 {This makes the test's timing expectations explicit and verifiable. |
||
| // At slot 22 (256 seconds = 16s + 240s), we should be around round 10 | ||
| // Rounds 1-8: 16s, Rounds 9-10: 240s = 256s total | ||
|
||
| if slot == 22 { | ||
| // Bring operators back online during round 10 to allow consensus | ||
| context.set_operators_online(&[2, 3, 4]); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| // Verify that consensus is reached successfully, proving the instance | ||
| // survived cleanup and was able to reach round 10 | ||
| context.verify_consensus().await; | ||
| } | ||
| } | ||
|
|
||
| // very important: set paused to true for deterministic timer | ||
|
|
@@ -916,10 +966,17 @@ async fn test_timeout(round_timeout_to_test: usize) { | |
| let (sender_tx, _sender_rx) = unbounded_channel(); | ||
| let (message_tx, message_rx) = unbounded_channel(); | ||
| let (result_tx, result_rx) = oneshot::channel(); | ||
| let (completion_tx, _completion_rx) = unbounded_channel(); | ||
| let message_sender = MockMessageSender::new(sender_tx, OperatorId(1)); | ||
| let instance_id = super::InstanceId::BeaconVote(CommitteeInstanceId { | ||
| committee: CommitteeId::default(), | ||
| instance_height: 0.into(), | ||
| }); | ||
| let _handle = tokio::spawn(qbft_instance::<BeaconVote>( | ||
| message_rx, | ||
| Arc::new(message_sender), | ||
| completion_tx, | ||
| instance_id, | ||
| )); | ||
|
|
||
| // create a slot clock at slot 0 with a slot duration of 12 seconds | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enhance Comment for Clarity
The calculation
(epoch.as_u64() + 2) * slots_per_epoch - 1implements EIP-7045 correctly, but the "why" could be clearer:Suggested enhancement:
This makes it immediately clear why we use
E+2in the calculation.