Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ impl Client {
slot_clock.clone(),
message_sender,
config.global_config.ssv_network.ssv_domain_type,
E::slots_per_epoch(),
)
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;

Expand Down
9 changes: 9 additions & 0 deletions anchor/qbft_manager/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ impl<D: QbftData<Hash = Hash256>> Initialized<D> {
pub async fn qbft_instance<D: QbftData<Hash = Hash256>>(
mut rx: UnboundedReceiver<QbftMessage<D>>,
message_sender: Arc<dyn MessageSender>,
completion_tx: mpsc::UnboundedSender<crate::InstanceId>,
instance_id: crate::InstanceId,
) {
// Signal a new instance that is uninitialized
let mut instance = QbftInstance::Uninitialized(Uninitialized::default());
Expand Down Expand Up @@ -304,13 +306,20 @@ pub async fn qbft_instance<D: QbftData<Hash = Hash256>>(
if let QbftInstance::Initialized(initialized) = instance {
initialized.complete(Completed::TimedOut);
}
// No notification - either already sent when decided, or cleaner removed us
break;
}
};

// If the instance is ongoing, check whether it is done.
if let QbftInstance::Initialized(initialized) = instance {
instance = initialized.complete_if_done(&message_sender);

// If we just transitioned to Decided, notify cleaner for immediate cleanup
if matches!(instance, QbftInstance::Decided(_)) {
let _ = completion_tx.send(instance_id);
break;
}
}

// Drop guard as late as possible to keep the processor permit.
Expand Down
173 changes: 132 additions & 41 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,25 @@ const QBFT_INSTANCE_NAME: &str = "qbft_instance";
const QBFT_MESSAGE_NAME: &str = "qbft_message";
const QBFT_CLEANER_NAME: &str = "qbft_cleaner";

/// Number of slots to keep before the current slot
const QBFT_RETAIN_SLOTS: u64 = 1;
/// Calculate the beacon chain inclusion deadline for a duty
fn calculate_deadline(role: Role, slot: types::Slot, slots_per_epoch: u64) -> types::Slot {
match role {
Role::Committee | Role::Aggregator => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance Comment for Clarity

The calculation (epoch.as_u64() + 2) * slots_per_epoch - 1 implements EIP-7045 correctly, but the "why" could be clearer:

Suggested enhancement:

Role::Committee | Role::Aggregator => {
    // Attestations can be included until end of next epoch (epoch E+1)
    // Per EIP-7045: attestation from epoch E valid until end of epoch E+1
    // 
    // Calculation explanation:
    // - Epoch E+1 starts at: (E+1) * slots_per_epoch
    // - Epoch E+1 ends at: (E+2) * slots_per_epoch - 1 (last slot of E+1)
    let epoch = slot.epoch(slots_per_epoch);
    types::Slot::new((epoch.as_u64() + 2) * slots_per_epoch - 1)
}

This makes it immediately clear why we use E+2 in the calculation.

// Attestations can be included until end of next epoch (epoch E+1)
// Per EIP-7045: attestation from epoch E valid until end of epoch E+1
let epoch = slot.epoch(slots_per_epoch);
types::Slot::new((epoch.as_u64() + 2) * slots_per_epoch - 1)
}
Role::Proposer | Role::SyncCommittee => {
// Must be in the same slot
slot
}
Role::VoluntaryExit | Role::ValidatorRegistration => {
// One epoch to complete
types::Slot::new(slot.as_u64() + slots_per_epoch)
}
}
}

// Unique Identifier for a committee and its corresponding QBFT instance
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -98,8 +115,20 @@ pub struct QbftInitialization<D: QbftData> {
on_completed: oneshot::Sender<Completed<D>>,
}

// Map from an identifier to a sender for the instance
type Map<I, D> = DashMap<I, UnboundedSender<QbftMessage<D>>>;
// Manager's bookkeeping for an instance
pub struct ManagedInstance<D: QbftData> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider Adding Documentation

The ManagedInstance struct is a key part of the new architecture but lacks documentation. Consider adding:

/// Manager's bookkeeping for a QBFT instance.
/// 
/// Tracks the communication channel and beacon chain inclusion deadline
/// for each active instance. The `deadline` field determines when the
/// instance should be cleaned up if not completed earlier. Instances are
/// removed either when they complete (via completion notification) or when
/// their deadline expires (via the periodic cleaner sweep).
pub struct ManagedInstance<D: QbftData> {
    sender: UnboundedSender<QbftMessage<D>>,
    deadline: types::Slot,
}

This helps future maintainers understand the purpose and lifecycle management.

sender: UnboundedSender<QbftMessage<D>>,
deadline: types::Slot,
}

// Map from an identifier to managed instance data
type Map<I, D> = DashMap<I, ManagedInstance<D>>;

// Enum to identify which instance completed
pub enum InstanceId {
BeaconVote(CommitteeInstanceId),
ValidatorConsensus(ValidatorInstanceId),
}

// Top level QBFTManager structure
pub struct QbftManager {
Expand All @@ -115,6 +144,10 @@ pub struct QbftManager {
message_sender: Arc<dyn MessageSender>,
// Network domain to embed into messages
domain: DomainType,
// Channel to notify cleaner when instance completes
completion_tx: mpsc::UnboundedSender<InstanceId>,
// Slots per epoch for deadline calculations
slots_per_epoch: u64,
}

impl QbftManager {
Expand All @@ -125,21 +158,26 @@ impl QbftManager {
slot_clock: impl SlotClock + 'static,
message_sender: Arc<dyn MessageSender>,
domain: DomainType,
slots_per_epoch: u64,
) -> Result<Arc<Self>, QbftError> {
let (completion_tx, completion_rx) = mpsc::unbounded_channel();

let manager = Arc::new(QbftManager {
processor,
operator_id,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
message_sender,
domain,
completion_tx,
slots_per_epoch,
});

// Start a long running task that will clean up old instances
manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(slot_clock), QBFT_CLEANER_NAME)?;
manager.processor.permitless.send_async(
Arc::clone(&manager).cleaner(slot_clock, completion_rx),
QBFT_CLEANER_NAME,
)?;

Ok(manager)
}
Expand All @@ -161,6 +199,11 @@ impl QbftManager {
let (result_sender, result_receiver) = oneshot::channel();
let message_id = D::message_id(&self.domain, &id);

// Calculate deadline for this instance
let role = message_id.role().ok_or(QbftError::InconsistentMessageId)?;
let slot = types::Slot::new(*initial.instance_height(&id) as u64);
let deadline = calculate_deadline(role, slot, self.slots_per_epoch);

// General the qbft configuration
let config = ConfigBuilder::new(
operator_id,
Expand All @@ -169,17 +212,12 @@ impl QbftManager {
);
let config = config
.with_quorum_size(committee.cluster_members.len() - committee.get_f() as usize)
.with_max_rounds(
message_id
.role()
.and_then(|r| r.max_round())
.ok_or(QbftError::InconsistentMessageId)? as usize,
)
.with_max_rounds(role.max_round().ok_or(QbftError::InconsistentMessageId)? as usize)
.build()?;

// Get or spawn a new qbft instance. This will return the sender that we can use to send
// new messages to the specific instance
let sender = D::get_or_spawn_instance(self, id);
let sender = D::get_or_spawn_instance(self, id.clone(), deadline);
self.processor.urgent_consensus.send_immediate(
move |drop_on_finish: DropOnFinish| {
// A message to initialize this instance
Expand Down Expand Up @@ -261,7 +299,19 @@ impl QbftManager {
id: D::Id,
data: WrappedQbftMessage,
) -> Result<(), QbftError> {
let sender = D::get_or_spawn_instance(self, id);
// Get the map for this data type
let map = D::get_map(self);

// Look up existing instance - network messages should only go to existing instances
let Some(managed) = map.get(&id) else {
// Instance doesn't exist yet - this message arrived before decide_instance was called
// This is normal during startup, just ignore it
return Ok(());
};

let sender = managed.sender.clone();
drop(managed); // Release the lock before sending

self.processor.urgent_consensus.send_immediate(
move |drop_on_finish: DropOnFinish| {
let _ = sender.send(QbftMessage {
Expand All @@ -274,51 +324,84 @@ impl QbftManager {
Ok(())
}

// Long running cleaner that will remove instances that are no longer relevant
async fn cleaner(self: Arc<Self>, slot_clock: impl SlotClock) {
while !self.processor.permitless.is_closed() {
sleep(
slot_clock
.duration_to_next_slot()
.unwrap_or(slot_clock.slot_duration()),
)
.await;
let Some(slot) = slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(QBFT_RETAIN_SLOTS);
self.beacon_vote_instances
.retain(|k, _| *k.instance_height >= cutoff.as_usize());
self.validator_consensus_data_instances
.retain(|k, _| *k.instance_height >= cutoff.as_usize());
/// Long running cleaner that removes instances based on completion or deadline
async fn cleaner(
self: Arc<Self>,
slot_clock: impl SlotClock,
mut completion_rx: mpsc::UnboundedReceiver<InstanceId>,
) {
loop {
tokio::select! {
// Branch 1: Instance completed - clean immediately
Some(id) = completion_rx.recv() => {
match id {
InstanceId::BeaconVote(id) => {
self.beacon_vote_instances.remove(&id);
}
InstanceId::ValidatorConsensus(id) => {
self.validator_consensus_data_instances.remove(&id);
}
}
}
Comment on lines +342 to +352
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem with this approach:

In theory, there might be a race condition where some tasks try to register their oneshot channel to an instance after it has completed. This might e.g. be the case if multiple validator attestation duties wait for the same committee instance. If we start the instance late (e.g. because of a struggling BN), the first thread will start the instance, which might complete immediately due to replayed messages, giving no opportunity for the other tasks to register their listeners. This is why the current code cleans up at a fixed time regardless of completion.

Instead, we could move the cleanup time in this branch - to give some time (til end of next slot?) to get the instance result. Wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need more context to understand what's described in the first paragraph.

// Branch 2: Slot timeout - clean expired instances
_ = sleep(
slot_clock
.duration_to_next_slot()
.unwrap_or(slot_clock.slot_duration())
) => {
let Some(current_slot) = slot_clock.now() else {
continue;
};
self.beacon_vote_instances
.retain(|_, managed| managed.deadline >= current_slot);
self.validator_consensus_data_instances
.retain(|_, managed| managed.deadline >= current_slot);
}
}

if self.processor.permitless.is_closed() {
break;
}
}
}
}

// Trait that describes any data that is able to be decided upon during a qbft instance
pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
type Id: Hash + Eq + Send + Debug;
type Id: Hash + Eq + Send + Debug + Clone;

fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self>;

fn wrap_id(id: Self::Id) -> InstanceId;

fn get_or_spawn_instance(
manager: &QbftManager,
id: Self::Id,
deadline: types::Slot,
) -> UnboundedSender<QbftMessage<Self>> {
let map = Self::get_map(manager);
match map.entry(id) {
dashmap::Entry::Occupied(entry) => entry.get().clone(),
match map.entry(id.clone()) {
dashmap::Entry::Occupied(entry) => entry.get().sender.clone(),
dashmap::Entry::Vacant(entry) => {
// There is not an instance running yet, store the sender and spawn a new instance
// with the reeiver
// with the receiver
let (tx, rx) = mpsc::unbounded_channel();
let span = debug_span!("qbft_instance", instance_id = ?entry.key());
let tx = entry.insert(tx);
let managed = ManagedInstance {
sender: tx,
deadline,
};
let sender = entry.insert(managed).sender.clone();
let instance_id = Self::wrap_id(id);
let completion_tx = manager.completion_tx.clone();
let message_sender = manager.message_sender.clone();
let _ = manager.processor.permitless.send_async(
Box::pin(qbft_instance(rx, manager.message_sender.clone()).instrument(span)),
Box::pin(
qbft_instance(rx, message_sender, completion_tx, instance_id)
.instrument(span),
),
QBFT_INSTANCE_NAME,
);
tx.clone()
sender
}
}
}
Expand All @@ -334,6 +417,10 @@ impl QbftDecidable for ValidatorConsensusData {
&manager.validator_consensus_data_instances
}

fn wrap_id(id: Self::Id) -> InstanceId {
InstanceId::ValidatorConsensus(id)
}

fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
id.instance_height
}
Expand All @@ -354,6 +441,10 @@ impl QbftDecidable for BeaconVote {
&manager.beacon_vote_instances
}

fn wrap_id(id: Self::Id) -> InstanceId {
InstanceId::BeaconVote(id)
}

fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
id.instance_height
}
Expand Down
57 changes: 57 additions & 0 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ where
slot_clock.clone(),
Arc::new(MockMessageSender::new(network_tx.clone(), operator_id)),
DomainType([0; 4]),
32, // slots_per_epoch
)
.expect("Creation should not fail");

Expand Down Expand Up @@ -902,6 +903,55 @@ mod manager_tests {

context.verify_consensus().await;
}

#[tokio::test(start_paused = true)]
// Test that Committee instances can reach late rounds (9+) with max_round=12 configuration.
// This verifies that instances survive long enough to progress through many round changes
// as configured. Committee role has max_round=12, so instances should be able to reach
// round 10 before timing out at round 13.
//
// The test simulates network conditions where consensus cannot be reached early by keeping
// all but one operator offline, forcing round changes. We advance the slot to trigger
// cleanup and verify the instance survives to reach round 10.
async fn test_committee_can_reach_late_rounds() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation: Test expectations need clarification

The comment says "Currently fails" but doesn't specify:

  1. What failure mode to expect - Does it panic? Timeout? Return an error?
  2. Whether this is expected behavior - Is this test marked with #[should_panic] or #[ignore]?
  3. When it should pass - What changes need to be made for this test to pass?

Recommendations:

  1. If the test is expected to fail, use #[ignore] or #[should_panic]:
#[tokio::test(start_paused = true)]
#[ignore = "Fails due to premature cleanup - see issue #XXX"]
async fn test_committee_can_reach_late_rounds() {
  1. Or, restructure as a negative test that explicitly verifies the current behavior:
#[tokio::test(start_paused = true)]
async fn test_committee_cleanup_prevents_late_rounds() {
    // Explicitly test that instances ARE cleaned up at slot 2
    // This documents current behavior before fix is implemented
  1. Add a GitHub issue reference so the test can be tracked to a fix

This prevents CI from failing and clearly communicates test intent to future developers.

let setup = setup_test(1);
let clock = setup.clock.clone();
let mut context = TestContext::<BeaconVote>::new(
setup.clock,
setup.executor,
CommitteeSize::Four,
setup.all_data,
)
.await;

// Keep 3 operators offline initially to prevent consensus and force round changes.
// With only 1 operator online out of 4, we cannot reach quorum (need 3).
// This will cause the instance to go through multiple round changes.
context.set_operators_offline(&[2, 3, 4]);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test Design: Consider testing the actual cleanup boundary

The test keeps 3 out of 4 operators offline to force round changes, which is good. However, it doesn't verify the specific failure mode mentioned in the PR description. Consider adding:

  1. Log inspection or metrics to verify the instance actually progresses through rounds 9-10
  2. Intermediate assertions to check the instance is still alive at critical points (e.g., after slot 2 when cleanup occurs)
  3. Test the boundary condition: What happens at exactly slot 2 when cleanup runs?

Example enhancement:

// Keep 3 operators offline to force round changes
context.set_operators_offline(&[2, 3, 4]);

// Advance to slot 2 where cleanup happens (cutoff = slot 2 - 1 = slot 1)
// At this point, the instance starting at slot 0 should be removed
clock.set_slot(1);
tokio::time::sleep(slot_duration).await;

clock.set_slot(2);
tokio::time::sleep(slot_duration).await;

// TODO: Add assertion here to verify instance is still alive
// This is where the bug manifests - instance gets cleaned up too early

This would make the test more explicitly demonstrate the issue described in the PR.


// Advance time and slots to simulate reaching round 10
// Instance starts at slot 0
let slot_duration = Duration::from_secs(12);

// Advance through multiple slots while QBFT progresses
// This triggers cleanup logic which should NOT remove the active instance
for slot in 1..=50 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test Logic Issue: Unrealistic loop bound

The test loops to slot 50, but breaks at slot 22. This creates confusion about the test's intent. Consider these improvements:

  1. Use a more reasonable upper bound (e.g., 1..=25) that better reflects when you expect the test to complete
  2. Add assertion after the loop to verify we actually broke at slot 22 and didn't fall through
  3. Consider making this data-driven: Calculate the expected slot based on round timeout constants rather than hard-coding slot 22
const EXPECTED_ROUND_10_SLOT: u64 = 22; // Document the calculation
for slot in 1..=EXPECTED_ROUND_10_SLOT + 3 {  // Small buffer
    clock.set_slot(slot);
    tokio::time::sleep(slot_duration).await;
    
    if slot == EXPECTED_ROUND_10_SLOT {
        context.set_operators_online(&[2, 3, 4]);
        break;
    }
}
// Verify we didn't fall through
assert!(clock.now().unwrap().as_u64() == EXPECTED_ROUND_10_SLOT);

clock.set_slot(slot);
tokio::time::sleep(slot_duration).await;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mathematical Accuracy: Comment contains calculation error

The comment states "Rounds 1-8: 16s" but this is incorrect:

  • Rounds 1-8 each take 2 seconds (QUICK_TIMEOUT)
  • Total for rounds 1-8: 8 * 2s = 16s
  • But the comment should clarify this is cumulative, not per-round

Correct breakdown to round 10:

  • Rounds 1-8: 8 × 2s = 16s
  • Round 9: 120s
  • Round 10: 120s
  • Total: 256s = 21.33 slots

At 12s per slot:

  • 256s ÷ 12s/slot = 21.33 slots
  • So slot 22 is actually 8 seconds into round 10, not "around round 10"

Suggest updating comment to:

// At slot 22 (264 seconds = 22 * 12s):
// - Rounds 1-8: 8 * 2s = 16s
// - Round 9: 120s  
// - Total to complete round 9: 136s (11.33 slots)
// - Round 10 starts at 136s (slot 11), so at slot 22 (264s) we're 128s into round 10
if slot == 22 {

This makes the test's timing expectations explicit and verifiable.

// At slot 22 (256 seconds = 16s + 240s), we should be around round 10
// Rounds 1-8: 16s, Rounds 9-10: 240s = 256s total

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Math Error in Comment

The comment's calculation is incorrect. Here's the accurate breakdown:

Correct Timing:

  • Rounds 1-8: 8 × 2s = 16s (completes at ~slot 1.33)
  • Round 9: 120s (completes at 136s / ~slot 11.33)
  • Round 10: 120s (starts at 136s)
  • At slot 22 (264s): We're 128 seconds into round 10 (with 8s remaining)

Suggested fix:

// At slot 22 (264 seconds = 22 * 12s):
// - Rounds 1-8: 8 * 2s = 16s (completes at slot 1.33)
// - Round 9: 120s (completes at 136s / slot 11.33)
// - Round 10: started at 136s, now 128s into it
if slot == 22 {

This makes the timing expectations explicit and verifiable.

if slot == 22 {
// Bring operators back online during round 10 to allow consensus
context.set_operators_online(&[2, 3, 4]);
break;
}
}

// Verify that consensus is reached successfully, proving the instance
// survived cleanup and was able to reach round 10
context.verify_consensus().await;
}
}

// very important: set paused to true for deterministic timer
Expand All @@ -916,10 +966,17 @@ async fn test_timeout(round_timeout_to_test: usize) {
let (sender_tx, _sender_rx) = unbounded_channel();
let (message_tx, message_rx) = unbounded_channel();
let (result_tx, result_rx) = oneshot::channel();
let (completion_tx, _completion_rx) = unbounded_channel();
let message_sender = MockMessageSender::new(sender_tx, OperatorId(1));
let instance_id = super::InstanceId::BeaconVote(CommitteeInstanceId {
committee: CommitteeId::default(),
instance_height: 0.into(),
});
let _handle = tokio::spawn(qbft_instance::<BeaconVote>(
message_rx,
Arc::new(message_sender),
completion_tx,
instance_id,
));

// create a slot clock at slot 0 with a slot duration of 12 seconds
Expand Down
Loading