Skip to content

Commit 0cdb9bb

Browse files
authored
Commit aggregation (sigp#136)
1 parent afa6c0d commit 0cdb9bb

File tree

5 files changed

+123
-17
lines changed

5 files changed

+123
-17
lines changed

anchor/common/qbft/src/lib.rs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ where
9696
/// Past prepare consensus that we have reached
9797
past_consensus: HashMap<Round, D::Hash>,
9898

99+
/// Aggregated commit message
100+
aggregated_commit: Option<SignedSSVMessage>,
101+
99102
// Network sender
100103
send_message: S,
101104
}
@@ -136,6 +139,8 @@ where
136139

137140
past_consensus: HashMap::new(),
138141

142+
aggregated_commit: None,
143+
139144
send_message,
140145
};
141146
qbft.data
@@ -160,6 +165,11 @@ where
160165
self.start_round();
161166
}
162167

168+
// Get the aggregated commit message, if it exists
169+
pub fn get_aggregated_commit(&self) -> Option<SignedSSVMessage> {
170+
self.aggregated_commit.clone()
171+
}
172+
163173
// Validation and check functions.
164174
fn check_leader(&self, operator_id: &OperatorId) -> bool {
165175
self.config.leader_fn().leader_function(
@@ -659,17 +669,48 @@ where
659669

660670
// All validation successful, make sure we are in the proper commit state
661671
if matches!(self.state, InstanceState::Commit) {
662-
// Todo!(). Commit aggregation
663-
664-
// We have come to commit consensus, mark ourself as completed and record the agreed upon
665-
// value
666-
self.state = InstanceState::Complete;
667-
self.completed = Some(Completed::Success(hash));
668-
debug!(in = ?self.config.operator_id(), state = ?self.state, "Reached a COMMIT consensus. Success!");
672+
// Aggregate all of the commit messages
673+
let commit_quorum = self.commit_container.get_quorum_of_messages(round);
674+
let aggregated_commit = self.aggregate_commit_messages(commit_quorum);
675+
if aggregated_commit.is_some() {
676+
debug!(in = ?self.config.operator_id(), state = ?self.state, "Reached a COMMIT consensus. Success!");
677+
self.state = InstanceState::Complete;
678+
self.completed = Some(Completed::Success(hash));
679+
self.aggregated_commit = aggregated_commit;
680+
} else {
681+
error!("Failed to aggregate commit quorum")
682+
}
669683
}
670684
}
671685
}
672686

687+
fn aggregate_commit_messages(
688+
&self,
689+
commit_quorum: Vec<WrappedQbftMessage>,
690+
) -> Option<SignedSSVMessage> {
691+
// We know this exists, but in favor of avoiding expect match the first element to Some.
692+
// This will be the commit message that we aggregate on top of
693+
if let Some(first_commit) = commit_quorum.first() {
694+
let mut aggregated_commit = first_commit.signed_message.clone();
695+
let aggregated_ssv = aggregated_commit.ssv_message();
696+
697+
// Sanity check that all of the messages match
698+
commit_quorum[1..]
699+
.iter()
700+
.all(|commit_msg| aggregated_ssv == commit_msg.signed_message.ssv_message())
701+
.then_some(())?;
702+
703+
// Aggregate all of the commits together
704+
let signed_commits = commit_quorum[1..]
705+
.iter()
706+
.map(|msg| msg.signed_message.clone());
707+
aggregated_commit.aggregate(signed_commits);
708+
return Some(aggregated_commit);
709+
}
710+
711+
None
712+
}
713+
673714
/// We have received a round change message.
674715
fn received_round_change(
675716
&mut self,

anchor/common/qbft/src/msg_container.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,22 @@ impl MessageContainer {
8181
.unwrap_or(0)
8282
}
8383

84+
/// If we have a quorum for the round, get all of the messages that correspond to that quorum
85+
pub fn get_quorum_of_messages(&self, round: Round) -> Vec<WrappedQbftMessage> {
86+
let mut msgs = vec![];
87+
if let Some(hash) = self.has_quorum(round) {
88+
// collect all of the messages where root = quorum hash
89+
if let Some(round_messages) = self.messages.get(&round) {
90+
for msg in round_messages.values() {
91+
if msg.qbft_message.root == hash {
92+
msgs.push(msg.clone());
93+
}
94+
}
95+
}
96+
}
97+
msgs
98+
}
99+
84100
/// Gets all messages for a specific round
85101
pub fn get_messages_for_round(&self, round: Round) -> Vec<&WrappedQbftMessage> {
86102
// If we have messages for this round in our container, return them all

anchor/common/ssv_types/src/message.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,21 @@ impl SignedSSVMessage {
300300
&self.full_data
301301
}
302302

303+
/// Aggregate a set of signed ssv messages into Self
304+
pub fn aggregate<I>(&mut self, others: I)
305+
where
306+
I: IntoIterator<Item = SignedSSVMessage>,
307+
{
308+
for signed_msg in others {
309+
// These will only all have 1 signature/operator, but we call extend for safety
310+
self.signatures.extend(signed_msg.signatures);
311+
self.operator_ids.extend(signed_msg.operator_ids);
312+
}
313+
314+
self.signatures.sort();
315+
self.operator_ids.sort();
316+
}
317+
303318
// Validate the signed message to ensure that it is well formed for qbft processing
304319
pub fn validate(&self) -> bool {
305320
// OperatorID must have at least one element

anchor/qbft_manager/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,17 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
431431
error!("could not send qbft result");
432432
}
433433
}
434+
435+
// Send the decided message (aggregated commit)
436+
match qbft.get_aggregated_commit() {
437+
Some(msg) => {
438+
network_tx.send(msg).unwrap_or_else(|e| {
439+
error!("Failed to send signed ssv message to network: {:?}", e)
440+
});
441+
}
442+
None => error!("Aggregated commit does not exist"),
443+
}
444+
434445
instance = QbftInstance::Decided { value: completed };
435446
} else {
436447
instance = QbftInstance::Initialized {

anchor/qbft_manager/src/tests.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,17 @@ where
9292
// Helper to verify consensus is reached
9393
pub async fn verify_consensus(&mut self) {
9494
while let Some(result) = self.consensus_rx.recv().await {
95+
// Confirm that consensus was reached
9596
assert!(result.reached_consensus, "Consensus was not reached");
97+
98+
// Confirm that the aggregated message contains a quorum of signatures
99+
let aggregated_commit = result
100+
.aggregated_commit
101+
.expect("If consensus was reached, this exists");
102+
assert!(
103+
aggregated_commit.signatures().len() as u64
104+
>= (self.tester.size as u64 - self.tester.size.get_f())
105+
);
96106
}
97107
}
98108
}
@@ -129,7 +139,7 @@ where
129139
// Track mapping from operator id to the respective manager
130140
managers: HashMap<OperatorId, Arc<QbftManager<ManualSlotClock>>>,
131141
// The size of the committee
132-
size: CommitteeSize,
142+
pub size: CommitteeSize,
133143
// Mapping of the data hash to the data identifier. This is to send data to the proper instance
134144
identifiers: HashMap<u64, D::Id>,
135145
// Mapping from data to the results of the consensus
@@ -367,15 +377,6 @@ where
367377
drop(consensus_tx);
368378
}
369379

370-
fn signed_to_wrapped(&self, signed: SignedSSVMessage) -> WrappedQbftMessage {
371-
let deser_qbft = QbftMessage::from_ssz_bytes(signed.ssv_message().data())
372-
.expect("We have a valid qbft message");
373-
WrappedQbftMessage {
374-
signed_message: signed,
375-
qbft_message: deser_qbft,
376-
}
377-
}
378-
379380
// Once an instance has completed, we want to record what happened
380381
fn handle_completion(&self, hash: Hash256, msg: Result<Completed<D>, QbftError>) {
381382
// Decrement the amount of instances running for this data
@@ -415,6 +416,16 @@ where
415416
finished
416417
}
417418

419+
// Convert a signed ssv message into a wrapped ssv message
420+
fn signed_to_wrapped(&self, signed: SignedSSVMessage) -> WrappedQbftMessage {
421+
let deser_qbft = QbftMessage::from_ssz_bytes(signed.ssv_message().data())
422+
.expect("We have a valid qbft message");
423+
WrappedQbftMessage {
424+
signed_message: signed,
425+
qbft_message: deser_qbft,
426+
}
427+
}
428+
418429
// Process and send a network message to the correct instance
419430
fn process_network_message(&self, mut wrapped_msg: WrappedQbftMessage) {
420431
let sender_operator_id = wrapped_msg
@@ -424,6 +435,17 @@ where
424435
.expect("One signer");
425436
let sender_operator_id = OperatorId::from(*sender_operator_id);
426437

438+
// If this is a decided message, want to record it in the consensus results.
439+
// We know this is an aggregated commit if the number of signatures is > 1
440+
if wrapped_msg.signed_message.signatures().len() > 1 {
441+
let mut results_write = self.results.write().unwrap();
442+
let results = results_write
443+
.get_mut(&wrapped_msg.qbft_message.root)
444+
.expect("Value exists");
445+
results.aggregated_commit = Some(wrapped_msg.signed_message);
446+
return;
447+
}
448+
427449
// Now we have a message ready to be sent back into the instance. Get the id
428450
// corresponding to the message.
429451
let data_id = self
@@ -500,6 +522,7 @@ pub struct ConsensusResult {
500522
min_for_consensus: u64,
501523
successful: u64,
502524
timed_out: u64,
525+
aggregated_commit: Option<SignedSSVMessage>,
503526
}
504527

505528
#[cfg(test)]

0 commit comments

Comments
 (0)