Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion anchor/database/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ impl NetworkState {
nonces.collect()
}

fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option<IndexSet<OperatorId>> {
pub fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option<IndexSet<OperatorId>> {
self.multi_state
.clusters
.get_all_by(committee_id)
Expand Down
1 change: 1 addition & 0 deletions anchor/qbft_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dashmap = { workspace = true }
database = { workspace = true }
ethereum_ssz = { workspace = true }
fork = { workspace = true }
indexmap = { workspace = true }
message_sender = { workspace = true }
processor = { workspace = true }
qbft = { workspace = true }
Expand Down
16 changes: 11 additions & 5 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bls::PublicKeyBytes;
use dashmap::DashMap;
use database::OwnOperatorId;
use fork::{Fork, ForkSchedule};
use indexmap::IndexSet;
use message_sender::MessageSender;
use processor::{Error::Queue, Senders, work::DropOnFinish};
use qbft::{
Expand All @@ -12,7 +13,7 @@ use qbft::{
};
use slot_clock::SlotClock;
use ssv_types::{
Cluster, CommitteeId,
CommitteeId, OperatorId,
consensus::{
AggregatorCommitteeConsensusData, BeaconVote, ProposerConsensusData, QbftData,
QbftDataValidator,
Expand Down Expand Up @@ -195,7 +196,7 @@ impl<E: EthSpec, S: SlotClock + Clone + 'static> QbftManager<E, S> {
initial: D,
validator: Box<dyn QbftDataValidator<D>>,
timeout_mode: TimeoutMode,
committee: &Cluster,
committee_members: &IndexSet<OperatorId>,
) -> Result<Completed<D>, QbftError> {
let Some(operator_id) = self.operator_id.get() else {
return Err(QbftError::OwnOperatorIdUnknown);
Expand All @@ -213,15 +214,20 @@ impl<E: EthSpec, S: SlotClock + Clone + 'static> QbftManager<E, S> {
let include_epoch_shift = self.fork_schedule.active_fork(epoch) >= Fork::Boole;
let leader_fn = DefaultLeaderFunction::new(self.slots_per_epoch, include_epoch_shift);

// General the qbft configuration
// Calculate fault tolerance: f = (n - 1) / 3, quorum = n - f
let n = committee_members.len();
let f = (n.saturating_sub(1) / 3) as u64;
let quorum_size = n - f as usize;

// Generate the qbft configuration
let config = ConfigBuilder::new_with_leader_fn(
operator_id,
instance_height,
committee.cluster_members.iter().copied().collect(),
committee_members.iter().copied().collect(),
leader_fn,
);
let config = config
.with_quorum_size(committee.cluster_members.len() - committee.get_f() as usize)
.with_quorum_size(quorum_size)
.with_max_rounds(
message_id
.role()
Expand Down
2 changes: 1 addition & 1 deletion anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ where
TimeoutMode::SlotTime {
instance_start_time: Instant::now(),
},
&cluster,
&cluster.cluster_members,
)
.await;
let _ = tx_clone.send((data_clone.hash(), result));
Expand Down
126 changes: 126 additions & 0 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,132 @@ impl<S: SlotClock + Clone + 'static> SignatureCollectorManager<S> {
Ok(result_rx.await?)
}

/// Sign messages for multiple validators and wait until all signatures are reconstructed.
/// This is more efficient than calling sign_and_collect multiple times because:
/// 1. All partial signatures are sent in a single network message
/// 2. No risk of deadlock from sequential processing in Committee mode
pub async fn sign_and_collect_batch(
self: &Arc<Self>,
metadata: SignatureMetadata,
validators: Vec<ValidatorSigningData>,
) -> Result<HashMap<ValidatorIndex, Arc<Signature>>, CollectionError> {
let Some(signer) = self.operator_id.get() else {
return Err(CollectionError::OwnOperatorIdUnknown);
};

if validators.is_empty() {
trace!("sign_and_collect_batch called with empty validators");
return Ok(HashMap::new());
}

trace!(
num_validators = validators.len(),
slot = %metadata.slot,
kind = ?metadata.kind,
"sign_and_collect_batch starting"
);

// 1. Register notifiers for ALL validators
let mut notifiers = Vec::with_capacity(validators.len());
for validator in &validators {
let (result_tx, result_rx) = oneshot::channel();
let manager = self.clone();
let root = validator.root;
let index = validator.index;
let slot = metadata.slot;
let threshold = metadata.threshold;

self.processor.permitless.send_immediate(
move |drop_on_finish| {
let sender = manager.get_or_spawn(root, index, slot);
let _ = sender.send(CollectorMessage {
kind: CollectorMessageKind::RegisterNotifier {
notify: result_tx,
threshold,
},
_drop_on_finish: drop_on_finish,
});
},
COLLECTOR_MESSAGE_NAME,
)?;
notifiers.push((index, result_rx));
}

// 2. Sign ALL partial signatures and send ONE message
let manager = self.clone();
self.processor.urgent_consensus.send_blocking(
move || {
let mut partial_signatures = Vec::with_capacity(validators.len());

for validator in &validators {
let partial_signature = if let Some(share) = &validator.share {
share.sign(validator.root)
} else {
Signature::empty()
};

partial_signatures.push(PartialSignatureMessage {
partial_signature,
signing_root: validator.root,
signer,
validator_index: validator.index,
});
}

// Send ONE message with ALL partial signatures
let msg = match manager.create_message(
&metadata,
partial_signatures.clone(),
&DutyExecutor::Committee(metadata.committee_id),
) {
Ok(msg) => msg,
Err(err) => {
error!(%err, "Failed to create batch partial signature message");
return;
}
};

if let Err(err) =
manager
.message_sender
.sign_and_send(msg, metadata.committee_id, None)
{
error!(?err, "Failed to send batch partial signatures");
}

// Make local instances aware of the partial signatures
for (validator, message) in validators.iter().zip(partial_signatures) {
if validator.share.is_some() {
let _ = manager.receive_partial_signature(message, metadata.slot);
}
}
},
SIGNER_NAME,
)?;

// 3. Await ALL notifiers and collect results
let total_validators = notifiers.len();
let mut results = HashMap::with_capacity(total_validators);
for (index, rx) in notifiers {
match rx.await {
Ok(sig) => {
results.insert(index, sig);
}
Err(e) => {
warn!(?index, ?e, "Failed to receive signature");
}
}
}

trace!(
successful = results.len(),
total = total_validators,
"sign_and_collect_batch completed"
);

Ok(results)
}

fn create_message(
&self,
metadata: &SignatureMetadata,
Expand Down
Loading