From 8484ab205eb6f3e68378c63d35c38d16f74e64c2 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 4 Mar 2025 17:49:38 +0100 Subject: [PATCH 01/19] validation when signers > 1 --- anchor/common/qbft/src/lib.rs | 18 ++--------- anchor/message_validator/src/lib.rs | 47 ++++++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index dd14e9143..02428182b 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -252,21 +252,9 @@ where // The rest of the verification only pertains to messages with one signature if wrapped_msg.signed_message.operator_ids().len() != 1 { - // If there is more than one signer, we also have to check if this is a decided message. - if matches!( - wrapped_msg.qbft_message.qbft_message_type, - QbftMessageType::Commit - ) { - // Do not care about data here, just that we had a success - let valid_data = Some(ValidData::new(None, wrapped_msg.qbft_message.root)); - return Some((valid_data, OperatorId::from(0))); - } - // Otherwise, this is invalid data - warn!( - num_signers = wrapped_msg.signed_message.operator_ids().len(), - "Message only allows one signer" - ); - return None; + // Do not care about data here, just that we had a success + let valid_data = Some(ValidData::new(None, wrapped_msg.qbft_message.root)); + return Some((valid_data, OperatorId::from(0))); } // Message is not a decide message, we know there is only one signer diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 2c12233ba..471683411 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -2,14 +2,14 @@ use libp2p::gossipsub::MessageAcceptance::{Accept, Reject}; use libp2p::gossipsub::{MessageAcceptance, MessageId}; use libp2p::PeerId; use processor::Senders; -use ssv_types::consensus::QbftMessage; +use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; use ssv_types::partial_sig::PartialSignatureMessages; use ssz::Decode; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError::{Closed, Full}; use tokio::sync::mpsc::Sender; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; // TODO taken from go-SSV as rough guidance. feel free to adjust as needed. https://github.com/ssvlabs/ssv/blob/e12abf7dfbbd068b99612fa2ebbe7e3372e57280/message/validation/errors.go#L55 #[derive(Debug)] @@ -58,8 +58,8 @@ pub enum ValidationFailure { UnknownQBFTMessageType, InvalidPartialSignatureType, PartialSignatureTypeRoleMismatch, - NonDecidedWithMultipleSigners, - DecidedNotEnoughSigners, + NonDecidedWithMultipleSigners { got: usize, want: usize }, + DecidedNotEnoughSigners { got: usize, want: usize}, DifferentProposalData, MalformedPrepareJustifications, UnexpectedPrepareJustifications, @@ -197,6 +197,35 @@ impl Validator { } } } + + fn validate_consensus_message_semantics(&self, signed_ssvmessage: SignedSSVMessage, qbft_message: &QbftMessage) -> Result<(), ValidationFailure> { + let signers = signed_ssvmessage.operator_ids().len(); + let quorum_size = compute_quorum_size(signers.len()); + let msg_type = qbft_message.qbft_message_type; + + if signers > 1 { + // Rule: Decided msg with different type than Commit + if msg_type != QbftMessageType::Commit { + return Err(ValidationFailure::NonDecidedWithMultipleSigners { + got: signers, + want: 1, + }); + } + + // Rule: Number of signers must be >= quorum size + if signers < quorum_size { + return Err(ValidationFailure::DecidedNotEnoughSigners { + got: signers, + want: quorum_size, + }); + } + } + + if !qbft_message.validate() { + return Err(ValidationFailure::UnknownQBFTMessageType); + } + Ok(()) + } } impl ValidatorService for Validator { @@ -275,3 +304,13 @@ impl ValidatorService for Validator { )?) } } + +fn compute_quorum_size(committee_size: usize) -> usize { + let f = get_f(committee_size); + f * 2 + 1 +} + +// # TODO centralize this and the one in the qbft crate +fn get_f(committee_size: usize) -> usize { + (committee_size - 1) / 3 +} \ No newline at end of file From 746787949c09795f03e3efa48f47cfa831843be1 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 5 Mar 2025 12:28:43 +0100 Subject: [PATCH 02/19] add validate_consensus_message_semantics --- Cargo.lock | 2 + anchor/common/qbft/src/lib.rs | 6 -- anchor/common/ssv_types/src/consensus.rs | 14 ++-- anchor/message_validator/Cargo.toml | 3 + anchor/message_validator/src/lib.rs | 85 +++++++++++++++++++----- 5 files changed, 81 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d44e725c..63c014269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5097,8 +5097,10 @@ name = "message_validator" version = "0.1.0" dependencies = [ "ethereum_ssz", + "hex", "libp2p", "processor", + "sha2 0.10.8", "ssv_types", "thiserror 2.0.11", "tokio", diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index 02428182b..962f68106 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -214,12 +214,6 @@ where &self, wrapped_msg: &WrappedQbftMessage, ) -> Option<(Option>, OperatorId)> { - // Validate the qbft message - if !wrapped_msg.qbft_message.validate() { - warn!("Invalid qbft_message"); - return None; - } - // Ensure that this message is for the correct round let current_round = self.current_round.get(); if (wrapped_msg.qbft_message.round < current_round as u64) diff --git a/anchor/common/ssv_types/src/consensus.rs b/anchor/common/ssv_types/src/consensus.rs index ac238e0f0..c6f78de24 100644 --- a/anchor/common/ssv_types/src/consensus.rs +++ b/anchor/common/ssv_types/src/consensus.rs @@ -1,5 +1,5 @@ use crate::message::*; -use crate::msgid::MessageId; +use crate::msgid::{MessageId, Role}; use crate::ValidatorIndex; use sha2::{Digest, Sha256}; use ssz::{Decode, DecodeError, Encode}; @@ -60,12 +60,14 @@ pub struct QbftMessage { } impl QbftMessage { - /// Do QBFTMessage specific validation - pub fn validate(&self) -> bool { - if self.qbft_message_type > QbftMessageType::RoundChange { - return false; + pub fn max_round(&self) -> Option { + match self.identifier.role() { + Some(role) => match role { + Role::Committee | Role::Aggregator => Some(12), + Role::Proposer | Role::SyncCommittee => Some(6), + }, + None => None, } - true } } diff --git a/anchor/message_validator/Cargo.toml b/anchor/message_validator/Cargo.toml index 2aa84e03d..02bfb175c 100644 --- a/anchor/message_validator/Cargo.toml +++ b/anchor/message_validator/Cargo.toml @@ -6,11 +6,14 @@ authors = ["Sigma Prime "] [dependencies] ethereum_ssz = { workspace = true } +hex = { workspace = true } libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16", default-features = false, features = [ "gossipsub", ] } processor = { workspace = true } +sha2 = { workspace = true } ssv_types = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } + diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 471683411..a7cc68956 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -2,6 +2,7 @@ use libp2p::gossipsub::MessageAcceptance::{Accept, Reject}; use libp2p::gossipsub::{MessageAcceptance, MessageId}; use libp2p::PeerId; use processor::Senders; +use sha2::{Digest, Sha256}; use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; use ssv_types::partial_sig::PartialSignatureMessages; @@ -9,7 +10,7 @@ use ssz::Decode; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError::{Closed, Full}; use tokio::sync::mpsc::Sender; -use tracing::{error, trace, warn}; +use tracing::{error, trace}; // TODO taken from go-SSV as rough guidance. feel free to adjust as needed. https://github.com/ssvlabs/ssv/blob/e12abf7dfbbd068b99612fa2ebbe7e3372e57280/message/validation/errors.go#L55 #[derive(Debug)] @@ -33,7 +34,7 @@ pub enum ValidationFailure { NoDuty, EstimatedRoundNotInAllowedSpread, EmptyData, - MismatchedIdentifier, + MismatchedIdentifier { got: String, want: String }, SignatureVerification, PubSubMessageHasNoData, MalformedPubSubMessage, @@ -59,7 +60,7 @@ pub enum ValidationFailure { InvalidPartialSignatureType, PartialSignatureTypeRoleMismatch, NonDecidedWithMultipleSigners { got: usize, want: usize }, - DecidedNotEnoughSigners { got: usize, want: usize}, + DecidedNotEnoughSigners { got: usize, want: usize }, DifferentProposalData, MalformedPrepareJustifications, UnexpectedPrepareJustifications, @@ -78,6 +79,7 @@ pub enum ValidationFailure { InvalidPartialSignatureTypeCount, TooManyPartialSignatureMessages, EncodeOperators, + FailedToGetMaxRound, } impl From<&ValidationFailure> for MessageAcceptance { @@ -182,13 +184,17 @@ impl Validator { fn validate_ssv_message( &self, + signed_ssv_message: &SignedSSVMessage, ssv_message: &SSVMessage, ) -> Result { match ssv_message.msg_type() { - MsgType::SSVConsensusMsgType => QbftMessage::from_ssz_bytes(ssv_message.data()) - .ok() - .map(ValidatedSSVMessage::QbftMessage) - .ok_or(ValidationFailure::UndecodableMessageData), + MsgType::SSVConsensusMsgType => { + let consensus_message = QbftMessage::from_ssz_bytes(ssv_message.data()) + .ok() + .ok_or(ValidationFailure::UndecodableMessageData)?; + self.validate_consensus_message_semantics(signed_ssv_message, &consensus_message)?; + Ok(ValidatedSSVMessage::QbftMessage(consensus_message)) + } MsgType::SSVPartialSignatureMsgType => { PartialSignatureMessages::from_ssz_bytes(ssv_message.data()) .ok() @@ -198,10 +204,14 @@ impl Validator { } } - fn validate_consensus_message_semantics(&self, signed_ssvmessage: SignedSSVMessage, qbft_message: &QbftMessage) -> Result<(), ValidationFailure> { - let signers = signed_ssvmessage.operator_ids().len(); - let quorum_size = compute_quorum_size(signers.len()); - let msg_type = qbft_message.qbft_message_type; + fn validate_consensus_message_semantics( + &self, + signed_ssv_message: &SignedSSVMessage, + consensus_message: &QbftMessage, + ) -> Result<(), ValidationFailure> { + let signers = signed_ssv_message.operator_ids().len(); + let quorum_size = compute_quorum_size(signers); + let msg_type = consensus_message.qbft_message_type; if signers > 1 { // Rule: Decided msg with different type than Commit @@ -221,9 +231,42 @@ impl Validator { } } - if !qbft_message.validate() { - return Err(ValidationFailure::UnknownQBFTMessageType); + if !signed_ssv_message.full_data().is_empty() { + // Rule: Prepare or commit messages must not have full data + if msg_type == QbftMessageType::Prepare + || (msg_type == QbftMessageType::Commit && signers == 1) + { + return Err(ValidationFailure::PrepareOrCommitWithFullData); + } + + let hashed_full_data = hash_data_root(signed_ssv_message.full_data()); + // Rule: Full data hash must match root + if hashed_full_data != consensus_message.root { + return Err(ValidationFailure::InvalidHash); + } + } + + if consensus_message.round == 0 { + return Err(ValidationFailure::ZeroRound); } + + let màx_round = match consensus_message.max_round() { + Some(max_round) => max_round, + None => return Err(ValidationFailure::FailedToGetMaxRound), + }; + + if consensus_message.round > màx_round { + return Err(ValidationFailure::RoundTooHigh); + } + + // Rule: consensus message must have the same identifier as the ssv message's identifier + if consensus_message.identifier != *signed_ssv_message.ssv_message().msg_id() { + return Err(ValidationFailure::MismatchedIdentifier { + got: hex::encode(&consensus_message.identifier), + want: hex::encode(signed_ssv_message.ssv_message().msg_id()), + }); + } + Ok(()) } } @@ -244,9 +287,10 @@ impl ValidatorService for Validator { trace!(msg = ?deserialized_message, "SignedSSVMessage deserialized"); match validator.do_validate(&deserialized_message) { Ok(()) => { - match validator - .validate_ssv_message(deserialized_message.ssv_message()) - { + match validator.validate_ssv_message( + &deserialized_message, + deserialized_message.ssv_message(), + ) { Ok(validated_ssv_message) => ( Accept, Some(ValidatedMessage::new( @@ -313,4 +357,11 @@ fn compute_quorum_size(committee_size: usize) -> usize { // # TODO centralize this and the one in the qbft crate fn get_f(committee_size: usize) -> usize { (committee_size - 1) / 3 -} \ No newline at end of file +} + +fn hash_data_root(full_data: &[u8]) -> [u8; 32] { + let mut hasher = Sha256::new(); + hasher.update(full_data); + let hash: [u8; 32] = hasher.finalize().into(); + hash +} From a41c6cb2bc23bddc249f1690ff463df9ac058060 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 5 Mar 2025 14:17:04 +0100 Subject: [PATCH 03/19] add validate_justifications --- anchor/message_validator/src/lib.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index a7cc68956..2138accc7 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -267,6 +267,32 @@ impl Validator { }); } + self.validate_justifications(consensus_message)?; + + Ok(()) + } + + fn validate_justifications( + &self, + consensus_message: &QbftMessage, + ) -> Result<(), ValidationFailure> { + // Rule: Can only exist for Proposal messages + let prepare_justifications = &consensus_message.prepare_justification; + if !prepare_justifications.is_empty() + && consensus_message.qbft_message_type != QbftMessageType::Proposal + { + return Err(ValidationFailure::UnexpectedPrepareJustifications); + } + + // Rule: Can only exist for Proposal or Round-Change messages + let round_change_justifications = &consensus_message.round_change_justification; + if !round_change_justifications.is_empty() + && consensus_message.qbft_message_type != QbftMessageType::Proposal + && consensus_message.qbft_message_type != QbftMessageType::RoundChange + { + return Err(ValidationFailure::UnexpectedRoundChangeJustifications); + } + Ok(()) } } From 6109eb79b8af746878088bfbb7129a1519ac91e9 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 6 Mar 2025 19:27:12 +0100 Subject: [PATCH 04/19] get committee members from the NetworkState --- Cargo.lock | 1 + anchor/client/src/lib.rs | 3 ++- anchor/common/ssv_types/src/lib.rs | 1 + anchor/database/src/lib.rs | 3 ++- anchor/database/src/state.rs | 22 +++++++++++++++++++-- anchor/message_validator/Cargo.toml | 2 +- anchor/message_validator/src/lib.rs | 30 +++++++++++++++++++++++++++-- 7 files changed, 55 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63c014269..20eb38eea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5096,6 +5096,7 @@ dependencies = [ name = "message_validator" version = "0.1.0" dependencies = [ + "database", "ethereum_ssz", "hex", "libp2p", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 8f92f23ee..97ca20361 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -356,7 +356,8 @@ impl Client { )?; let (results_tx, results_rx) = mpsc::channel::(9000); - let message_validator = Validator::new(processor_senders.clone(), results_tx); + let message_validator = + Validator::new(processor_senders.clone(), results_tx, database.watch()); // Start the p2p network let network = Network::try_new( diff --git a/anchor/common/ssv_types/src/lib.rs b/anchor/common/ssv_types/src/lib.rs index 2f0f9e222..c7c17668f 100644 --- a/anchor/common/ssv_types/src/lib.rs +++ b/anchor/common/ssv_types/src/lib.rs @@ -14,4 +14,5 @@ mod share; mod sql_conversions; mod util; +pub use indexmap::IndexSet; pub use share::ENCRYPTED_KEY_LENGTH; diff --git a/anchor/database/src/lib.rs b/anchor/database/src/lib.rs index 6f1acfce4..2ed997eca 100644 --- a/anchor/database/src/lib.rs +++ b/anchor/database/src/lib.rs @@ -1,7 +1,7 @@ use openssl::{pkey::Public, rsa::Rsa}; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::params; -use ssv_types::{Cluster, ClusterId, Operator, OperatorId, Share, ValidatorMetadata}; +use ssv_types::{Cluster, ClusterId, CommitteeId, Operator, OperatorId, Share, ValidatorMetadata}; use std::collections::{HashMap, HashSet}; use std::fs::File; use std::path::Path; @@ -63,6 +63,7 @@ struct MultiState { shares: ShareMultiIndexMap, validator_metadata: MetadataMultiIndexMap, clusters: ClusterMultiIndexMap, + clusters_by_committee_id: HashMap, } // General information that can be single index access diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index 9f18393e4..4fa3ac851 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -1,4 +1,6 @@ -use crate::{ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, ShareMultiIndexMap}; +use crate::{ + ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, ShareMultiIndexMap, UniqueIndex, +}; use crate::{DatabaseError, NetworkState, Pool, PoolConn}; use crate::{MultiState, SingleState}; use crate::{SqlStatement, SQL}; @@ -8,7 +10,8 @@ use openssl::rsa::Rsa; use rusqlite::{params, OptionalExtension}; use rusqlite::{types::Type, Error as SqlError}; use ssv_types::{ - Cluster, ClusterId, ClusterMember, Operator, OperatorId, Share, ValidatorMetadata, + Cluster, ClusterId, ClusterMember, CommitteeId, IndexSet, Operator, OperatorId, Share, + ValidatorMetadata, }; use std::collections::{HashMap, HashSet}; use std::str::FromStr; @@ -43,6 +46,9 @@ impl NetworkState { // 5) Owner -> Nonce (u16) let nonces = Self::fetch_nonces(&conn)?; + //ClusterId -> CommitteeId. It's populated in the loop that populates the multi-index maps + let mut clusters_by_committee_id = HashMap::new(); + // Second phase: Populate all in memory stores with data; let mut shares_multi: ShareMultiIndexMap = MultiIndexMap::new(); let mut metadata_multi: MetadataMultiIndexMap = MultiIndexMap::new(); @@ -81,6 +87,8 @@ impl NetworkState { validator.clone(), ); + clusters_by_committee_id.insert(cluster.committee_id(), *cluster_id); + // Process this validators shares if let Some(share_map) = &share_map { if let Some(shares) = share_map.get(cluster_id) { @@ -105,6 +113,7 @@ impl NetworkState { shares: shares_multi, validator_metadata: metadata_multi, clusters: cluster_multi, + clusters_by_committee_id, }, single_state, }) @@ -251,6 +260,15 @@ impl NetworkState { pub fn clusters(&self) -> &ClusterMultiIndexMap { &self.multi_state.clusters } + + pub fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option> { + self.multi_state + .clusters_by_committee_id + .get(committee_id) + .and_then(|cluster_id| self.multi_state.clusters.get_by(cluster_id)) + .map(|cluster| cluster.cluster_members) + } + /// Get the ID of our Operator if it exists pub fn get_own_id(&self) -> Option { self.single_state.id diff --git a/anchor/message_validator/Cargo.toml b/anchor/message_validator/Cargo.toml index 02bfb175c..01750b87d 100644 --- a/anchor/message_validator/Cargo.toml +++ b/anchor/message_validator/Cargo.toml @@ -5,6 +5,7 @@ edition = { workspace = true } authors = ["Sigma Prime "] [dependencies] +database = { workspace = true } ethereum_ssz = { workspace = true } hex = { workspace = true } libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16", default-features = false, features = [ @@ -16,4 +17,3 @@ ssv_types = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } - diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 2138accc7..29414c205 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -1,3 +1,4 @@ +use database::NetworkState; use libp2p::gossipsub::MessageAcceptance::{Accept, Reject}; use libp2p::gossipsub::{MessageAcceptance, MessageId}; use libp2p::PeerId; @@ -5,11 +6,13 @@ use processor::Senders; use sha2::{Digest, Sha256}; use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; +use ssv_types::msgid::DutyExecutor; use ssv_types::partial_sig::PartialSignatureMessages; use ssz::Decode; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError::{Closed, Full}; use tokio::sync::mpsc::Sender; +use tokio::sync::watch; use tracing::{error, trace}; // TODO taken from go-SSV as rough guidance. feel free to adjust as needed. https://github.com/ssvlabs/ssv/blob/e12abf7dfbbd068b99612fa2ebbe7e3372e57280/message/validation/errors.go#L55 @@ -159,6 +162,7 @@ pub enum Error { pub struct Validator { processor: Senders, result_tx: Sender, + network_state_rxx: watch::Receiver, } pub trait ValidatorService { @@ -171,10 +175,15 @@ pub trait ValidatorService { } impl Validator { - pub fn new(processor: Senders, result_tx: Sender) -> Self { + pub fn new( + processor: Senders, + result_tx: Sender, + network_state_rxx: watch::Receiver, + ) -> Self { Self { processor, result_tx, + network_state_rxx, } } @@ -210,7 +219,24 @@ impl Validator { consensus_message: &QbftMessage, ) -> Result<(), ValidationFailure> { let signers = signed_ssv_message.operator_ids().len(); - let quorum_size = compute_quorum_size(signers); + + let db = self.network_state_rxx.borrow(); + let committee_id = match signed_ssv_message.ssv_message().msg_id().duty_executor() { + Some(DutyExecutor::Committee(id)) => id, + _ => return Err(ValidationFailure::NonExistentCommitteeID), + }; + + let committee_members = match db.get_cluster_members(&committee_id) { + Some(committee_members) => { + if committee_members.is_empty() { + return Err(ValidationFailure::NoValidators); + } + committee_members + } + None => return Err(ValidationFailure::NonExistentCommitteeID), + }; + + let quorum_size = compute_quorum_size(committee_members.len()); let msg_type = consensus_message.qbft_message_type; if signers > 1 { From 1b40fa03c46ac0a0a05e183dfa08cbe14254b0a7 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 7 Mar 2025 14:27:15 +0100 Subject: [PATCH 05/19] multi index with 4 indices --- anchor/database/src/cluster_operations.rs | 27 ++- anchor/database/src/lib.rs | 28 ++- anchor/database/src/multi_index.rs | 265 ++++++++++++++++------ anchor/database/src/state.rs | 12 +- 4 files changed, 245 insertions(+), 87 deletions(-) diff --git a/anchor/database/src/cluster_operations.rs b/anchor/database/src/cluster_operations.rs index fcfafcab9..04fe5b798 100644 --- a/anchor/database/src/cluster_operations.rs +++ b/anchor/database/src/cluster_operations.rs @@ -58,27 +58,30 @@ impl NetworkDatabase { // Save the keyshare state.multi_state.shares.insert( - &validator.public_key, // The validator this keyshare belongs to - &cluster.cluster_id, // The id of the cluster - &cluster.owner, // The owner of the cluster - share.to_owned(), // The keyshare itself + &validator.public_key, // The validator this keyshare belongs to + &cluster.cluster_id, // The id of the cluster + &cluster.owner, // The owner of the cluster + &cluster.committee_id(), // The committee id of the cluster + share.to_owned(), // The keyshare itself ); } // Save all cluster related information state.multi_state.clusters.insert( - &cluster.cluster_id, // The id of the cluster - &validator.public_key, // The public key of validator added to the cluster - &cluster.owner, // Owner of the cluster - cluster.to_owned(), // The Cluster and all containing information + &cluster.cluster_id, // The id of the cluster + &validator.public_key, // The public key of validator added to the cluster + &cluster.owner, // Owner of the cluster + &cluster.committee_id(), // The committee id of the cluster + cluster.to_owned(), // The Cluster and all containing information ); // Save the metadata for the validators state.multi_state.validator_metadata.insert( - &validator.public_key, // The public key of the validator - &cluster.cluster_id, // The id of the cluster the validator belongs to - &cluster.owner, // The owner of the cluster - validator.to_owned(), // The metadata of the validator + &validator.public_key, // The public key of the validator + &cluster.cluster_id, // The id of the cluster the validator belongs to + &cluster.owner, // The owner of the cluster + &cluster.committee_id(), // The committee id of the cluster + validator.to_owned(), // The metadata of the validator ); }); diff --git a/anchor/database/src/lib.rs b/anchor/database/src/lib.rs index 2ed997eca..153ad1307 100644 --- a/anchor/database/src/lib.rs +++ b/anchor/database/src/lib.rs @@ -36,8 +36,16 @@ type PoolConn = r2d2::PooledConnection; /// Primary: public key of validator. uniquely identifies share /// Secondary: cluster id. corresponds to a list of shares /// Tertiary: owner of the cluster. corresponds to a list of shares -pub(crate) type ShareMultiIndexMap = - MultiIndexMap; +pub(crate) type ShareMultiIndexMap = MultiIndexMap< + PublicKeyBytes, + ClusterId, + Address, + CommitteeId, + Share, + NonUniqueTag, + NonUniqueTag, + NonUniqueTag, +>; /// Metadata for all validators in the network /// Primary: public key of the validator. uniquely identifies the metadata /// Secondary: cluster id. corresponds to list of metadata for all validators @@ -46,16 +54,26 @@ pub(crate) type MetadataMultiIndexMap = MultiIndexMap< PublicKeyBytes, ClusterId, Address, + CommitteeId, ValidatorMetadata, NonUniqueTag, NonUniqueTag, + NonUniqueTag, >; /// All of the clusters in the network /// Primary: cluster id. uniquely identifies a cluster /// Secondary: public key of the validator. uniquely identifies a cluster /// Tertiary: owner of the cluster. uniquely identifies a cluster -pub(crate) type ClusterMultiIndexMap = - MultiIndexMap; +pub(crate) type ClusterMultiIndexMap = MultiIndexMap< + ClusterId, + PublicKeyBytes, + Address, + CommitteeId, + Cluster, + UniqueTag, + UniqueTag, + NonUniqueTag, +>; // Information that needs to be accessed via multiple different indicies #[derive(Debug)] @@ -63,7 +81,7 @@ struct MultiState { shares: ShareMultiIndexMap, validator_metadata: MetadataMultiIndexMap, clusters: ClusterMultiIndexMap, - clusters_by_committee_id: HashMap, + // Be careful when adding new maps here. If you really must to, it must be updated in the operations files } // General information that can be single index access diff --git a/anchor/database/src/multi_index.rs b/anchor/database/src/multi_index.rs index b119167fb..e0f2cc84e 100644 --- a/anchor/database/src/multi_index.rs +++ b/anchor/database/src/multi_index.rs @@ -2,18 +2,19 @@ use std::collections::HashMap; use std::hash::Hash; use std::marker::PhantomData; -/// Marker trait for uniquely identifying indicies +/// Marker trait for uniquely identifying indices pub trait Unique {} -/// Marker trait for non-uniquely identifiying indicies +/// Marker trait for non-uniquely identifying indices pub trait NotUnique {} /// Index type markers pub enum Primary {} pub enum Secondary {} pub enum Tertiary {} +pub enum Quaternary {} -// Type tags markers +/// Type tags markers #[derive(Debug)] pub enum UniqueTag {} impl Unique for UniqueTag {} @@ -32,54 +33,69 @@ pub trait NonUniqueIndex { fn get_all_by(&self, key: &K) -> Option>; } +/// Inner storage maps for the multi-index map, now supporting a quaternary index. +/// - K1: Primary key type (always unique) +/// - K2: Secondary key type +/// - K3: Tertiary key type +/// - K4: Quaternary key type +/// - V: Value type #[derive(Debug)] -struct InnerMaps +struct InnerMaps where K1: Eq + Hash, K2: Eq + Hash, K3: Eq + Hash, + K4: Eq + Hash, { primary: HashMap, secondary_unique: HashMap, secondary_multi: HashMap>, tertiary_unique: HashMap, tertiary_multi: HashMap>, + quaternary_unique: HashMap, + quaternary_multi: HashMap>, } -/// A concurrent multi-index map that supports up to three different access patterns. -/// The core differentiates between unique identification and non unique identification. The primary -/// index is forced to always uniquely identify the value. The secondary and tertiary indicies have -/// more flexibility. The key may non uniquely identify many different values, or uniquely identify -/// a single value +/// A concurrent multi-index map that supports up to four different access patterns. +/// The core differentiates between unique identification and non-unique identification. +/// The primary index is forced to always uniquely identify the value. The secondary, tertiary, +/// and quaternary indices have more flexibility. A key may non-uniquely identify many values, +/// or uniquely identify a single value. /// -/// Example: A share is uniquely identified by the Validators public key that it belongs too. A -/// ClusterId does not uniquely identify a share as a cluster contains multiple shares +/// Example: A share might be uniquely identified by a primary key (like a Validators public key) +/// while a secondary or tertiary index (like a ClusterId) does not uniquely identify a share. The +/// new quaternary index provides an additional access pattern. /// /// - K1: Primary key type (always unique) /// - K2: Secondary key type /// - K3: Tertiary key type +/// - K4: Quaternary key type /// - V: Value type /// - U1: Secondary index uniqueness (Unique or NotUnique) /// - U2: Tertiary index uniqueness (Unique or NotUnique) +/// - U3: Quaternary index uniqueness (Unique or NotUnique) #[derive(Debug)] -pub struct MultiIndexMap +pub struct MultiIndexMap where K1: Eq + Hash, K2: Eq + Hash, K3: Eq + Hash, + K4: Eq + Hash, { - maps: InnerMaps, - _marker: PhantomData<(U1, U2)>, + maps: InnerMaps, + _marker: PhantomData<(U1, U2, U3)>, } -impl Default for MultiIndexMap +impl Default for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: 'static, U2: 'static, + U3: 'static, { fn default() -> Self { Self { @@ -89,22 +105,26 @@ where secondary_multi: HashMap::new(), tertiary_unique: HashMap::new(), tertiary_multi: HashMap::new(), + quaternary_unique: HashMap::new(), + quaternary_multi: HashMap::new(), }, _marker: PhantomData, } } } -impl MultiIndexMap +impl MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: 'static, U2: 'static, + U3: 'static, { - /// Creates a new empty MultiIndexMap + /// Creates a new empty MultiIndexMap. pub fn new() -> Self { Self { maps: InnerMaps { @@ -113,18 +133,22 @@ where secondary_multi: HashMap::new(), tertiary_unique: HashMap::new(), tertiary_multi: HashMap::new(), + quaternary_unique: HashMap::new(), + quaternary_multi: HashMap::new(), }, _marker: PhantomData, } } - /// Number of entires in the primary map + /// Returns the number of entries in the primary map. pub fn length(&self) -> usize { self.maps.primary.len() } - /// Insert a new value and associated keys into the map - pub fn insert(&mut self, k1: &K1, k2: &K2, k3: &K3, v: V) { + /// Inserts a new value and associated keys into the map. + /// Inserts the primary key and value first, then updates the secondary, tertiary, + /// and quaternary indices based on their uniqueness. + pub fn insert(&mut self, k1: &K1, k2: &K2, k3: &K3, k4: &K4, v: V) { // Insert into primary map first self.maps.primary.insert(k1.clone(), v); @@ -135,7 +159,7 @@ where self.maps .secondary_multi .entry(k2.clone()) - .and_modify(|v| v.push(k1.clone())) + .and_modify(|vec| vec.push(k1.clone())) .or_insert_with(|| vec![k1.clone()]); } @@ -146,61 +170,78 @@ where self.maps .tertiary_multi .entry(k3.clone()) - .and_modify(|v| v.push(k1.clone())) + .and_modify(|vec| vec.push(k1.clone())) + .or_insert_with(|| vec![k1.clone()]); + } + + // Handle quaternary index based on uniqueness + if std::any::TypeId::of::() == std::any::TypeId::of::() { + self.maps.quaternary_unique.insert(k4.clone(), k1.clone()); + } else { + self.maps + .quaternary_multi + .entry(k4.clone()) + .and_modify(|vec| vec.push(k1.clone())) .or_insert_with(|| vec![k1.clone()]); } } - /// Remove a value and all its indexes using the primary key + /// Removes a value and all its indexes using the primary key. pub fn remove(&mut self, k1: &K1) -> Option { // Remove from primary storage let removed = self.maps.primary.remove(k1)?; // Remove from secondary index if std::any::TypeId::of::() == std::any::TypeId::of::() { - // For unique indexes, just remove the entry that points to this k1 self.maps.secondary_unique.retain(|_, v| v != k1); } else { - // For non-unique indexes, remove k1 from any vectors it appears in - self.maps.secondary_multi.retain(|_, v| { - v.retain(|x| x != k1); - !v.is_empty() + self.maps.secondary_multi.retain(|_, vec| { + vec.retain(|x| x != k1); + !vec.is_empty() }); } // Remove from tertiary index if std::any::TypeId::of::() == std::any::TypeId::of::() { - // For unique indexes, just remove the entry that points to this k1 self.maps.tertiary_unique.retain(|_, v| v != k1); } else { - // For non-unique indexes, remove k1 from any vectors it appears in - self.maps.tertiary_multi.retain(|_, v| { - v.retain(|x| x != k1); - !v.is_empty() + self.maps.tertiary_multi.retain(|_, vec| { + vec.retain(|x| x != k1); + !vec.is_empty() + }); + } + + // Remove from quaternary index + if std::any::TypeId::of::() == std::any::TypeId::of::() { + self.maps.quaternary_unique.retain(|_, v| v != k1); + } else { + self.maps.quaternary_multi.retain(|_, vec| { + vec.retain(|x| x != k1); + !vec.is_empty() }); } Some(removed) } - /// Update an existing value using the primary key - /// Only updates if the primary key exists, indexes remain unchanged + /// Updates an existing value using the primary key. + /// Only updates if the primary key exists; indexes remain unchanged. pub fn update(&mut self, k1: &K1, new_value: V) -> Option { if !self.maps.primary.contains_key(k1) { return None; } - - // Only update the value in primary storage self.maps.primary.insert(k1.clone(), new_value) } } -// Implement unique access for primary key -impl UniqueIndex for MultiIndexMap +// Implement unique access for primary key. +impl UniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, { fn get_by(&self, key: &K1) -> Option { @@ -208,12 +249,14 @@ where } } -// Implement unique access for secondary key -impl UniqueIndex for MultiIndexMap +// Implement unique access for secondary key. +impl UniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: Unique, { @@ -223,13 +266,14 @@ where } } -// Implement non-unique access for secondary key -impl NonUniqueIndex - for MultiIndexMap +// Implement non-unique access for secondary key. +impl NonUniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: NotUnique, { @@ -242,12 +286,14 @@ where } } -// Implement unique access for tertiary key -impl UniqueIndex for MultiIndexMap +// Implement unique access for tertiary key. +impl UniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U2: Unique, { @@ -257,12 +303,14 @@ where } } -// Implement non-unique access for tertiary key -impl NonUniqueIndex for MultiIndexMap +// Implement non-unique access for tertiary key. +impl NonUniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U2: NotUnique, { @@ -275,6 +323,43 @@ where } } +// Implement unique access for quaternary key. +impl UniqueIndex + for MultiIndexMap +where + K1: Eq + Hash + Clone, + K2: Eq + Hash + Clone, + K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, + V: Clone, + U3: Unique, +{ + fn get_by(&self, key: &K4) -> Option { + let primary_key = self.maps.quaternary_unique.get(key)?; + self.maps.primary.get(primary_key).cloned() + } +} + +// Implement non-unique access for quaternary key. +impl NonUniqueIndex + for MultiIndexMap +where + K1: Eq + Hash + Clone, + K2: Eq + Hash + Clone, + K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, + V: Clone, + U3: NotUnique, +{ + fn get_all_by(&self, key: &K4) -> Option> { + self.maps.quaternary_multi.get(key).map(|keys| { + keys.iter() + .filter_map(|k1| self.maps.primary.get(k1).cloned()) + .collect() + }) + } +} + #[cfg(test)] mod multi_index_tests { use super::*; @@ -287,16 +372,25 @@ mod multi_index_tests { #[test] fn test_basic_operations() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + // Using unique indices for all secondary, tertiary, and quaternary keys. + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + UniqueTag, + UniqueTag, + UniqueTag, + > = MultiIndexMap::new(); let value = TestValue { id: 1, data: "test".to_string(), }; - // Test insertion - map.insert(&1, &"key1".to_string(), &true, value.clone()); + // Test insertion with quaternary key 'a' + map.insert(&1, &"key1".to_string(), &true, &'a', value.clone()); // Test primary key access assert_eq!(map.get_by(&1), Some(value.clone())); @@ -307,6 +401,9 @@ mod multi_index_tests { // Test tertiary key access assert_eq!(map.get_by(&true), Some(value.clone())); + // Test quaternary key access + assert_eq!(map.get_by(&'a'), Some(value.clone())); + // Test update let new_value = TestValue { id: 1, @@ -315,17 +412,27 @@ mod multi_index_tests { map.update(&1, new_value.clone()); assert_eq!(map.get_by(&1), Some(new_value.clone())); - // Test removal + // Test removal: all indices should be cleaned up assert_eq!(map.remove(&1), Some(new_value.clone())); assert_eq!(map.get_by(&1), None); assert_eq!(map.get_by(&"key1".to_string()), None); assert_eq!(map.get_by(&true), None); + assert_eq!(map.get_by(&'a'), None); } #[test] fn test_non_unique_indices() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + // Using non-unique indices for all secondary, tertiary, and quaternary keys. + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + NonUniqueTag, + NonUniqueTag, + NonUniqueTag, + > = MultiIndexMap::new(); let value1 = TestValue { id: 1, @@ -336,9 +443,9 @@ mod multi_index_tests { data: "test2".to_string(), }; - // Insert multiple values with same secondary and tertiary keys - map.insert(&1, &"shared_key".to_string(), &true, value1.clone()); - map.insert(&2, &"shared_key".to_string(), &true, value2.clone()); + // Insert multiple values with same secondary, tertiary, and quaternary keys. + map.insert(&1, &"shared_key".to_string(), &true, &'z', value1.clone()); + map.insert(&2, &"shared_key".to_string(), &true, &'z', value2.clone()); // Test primary key access (still unique) assert_eq!(map.get_by(&1), Some(value1.clone())); @@ -356,6 +463,12 @@ mod multi_index_tests { assert!(tertiary_values.contains(&value1)); assert!(tertiary_values.contains(&value2)); + // Test quaternary key access (non-unique) + let quaternary_values = map.get_all_by(&'z').unwrap(); + assert_eq!(quaternary_values.len(), 2); + assert!(quaternary_values.contains(&value1)); + assert!(quaternary_values.contains(&value2)); + // Test removal maintains other entries map.remove(&1); assert_eq!(map.get_by(&1), None); @@ -368,8 +481,17 @@ mod multi_index_tests { #[test] fn test_mixed_uniqueness() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + // Mixed: unique secondary, non-unique tertiary, unique quaternary. + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + UniqueTag, + NonUniqueTag, + UniqueTag, + > = MultiIndexMap::new(); let value1 = TestValue { id: 1, @@ -380,9 +502,9 @@ mod multi_index_tests { data: "test2".to_string(), }; - // Insert values with unique secondary key but shared tertiary key - map.insert(&1, &"key1".to_string(), &true, value1.clone()); - map.insert(&2, &"key2".to_string(), &true, value2.clone()); + // Insert values with unique secondary keys but shared tertiary and different quaternary keys. + map.insert(&1, &"key1".to_string(), &true, &'q', value1.clone()); + map.insert(&2, &"key2".to_string(), &true, &'r', value2.clone()); // Test unique secondary key access assert_eq!(map.get_by(&"key1".to_string()), Some(value1.clone())); @@ -393,17 +515,30 @@ mod multi_index_tests { assert_eq!(tertiary_values.len(), 2); assert!(tertiary_values.contains(&value1)); assert!(tertiary_values.contains(&value2)); + + // Test unique quaternary key access + assert_eq!(map.get_by(&'q'), Some(value1.clone())); + assert_eq!(map.get_by(&'r'), Some(value2.clone())); } #[test] fn test_empty_cases() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + UniqueTag, + UniqueTag, + UniqueTag, + > = MultiIndexMap::new(); // Test access on empty map assert_eq!(map.get_by(&1), None); assert_eq!(map.get_by(&"key".to_string()), None); assert_eq!(map.get_by(&true), None); + assert_eq!(map.get_by(&'x'), None); // Test remove on empty map assert_eq!(map.remove(&1), None); diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index 4fa3ac851..8a47456e9 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -1,5 +1,5 @@ use crate::{ - ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, ShareMultiIndexMap, UniqueIndex, + ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, NonUniqueIndex, ShareMultiIndexMap, }; use crate::{DatabaseError, NetworkState, Pool, PoolConn}; use crate::{MultiState, SingleState}; @@ -78,12 +78,14 @@ impl NetworkState { cluster_id, &validator.public_key, &cluster.owner, + &cluster.committee_id(), cluster.clone(), ); metadata_multi.insert( &validator.public_key, cluster_id, &cluster.owner, + &cluster.committee_id(), validator.clone(), ); @@ -98,6 +100,7 @@ impl NetworkState { &validator.public_key, cluster_id, &cluster.owner, + &cluster.committee_id(), share.clone(), ); } @@ -113,7 +116,6 @@ impl NetworkState { shares: shares_multi, validator_metadata: metadata_multi, clusters: cluster_multi, - clusters_by_committee_id, }, single_state, }) @@ -263,9 +265,9 @@ impl NetworkState { pub fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option> { self.multi_state - .clusters_by_committee_id - .get(committee_id) - .and_then(|cluster_id| self.multi_state.clusters.get_by(cluster_id)) + .clusters + .get_all_by(committee_id) + .and_then(|clusters| clusters.first().cloned()) .map(|cluster| cluster.cluster_members) } From e039197ad54a22e35d471d1f2a82f86105df6e1c Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 7 Mar 2025 18:27:10 +0100 Subject: [PATCH 06/19] fix naming --- anchor/message_validator/src/lib.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 29414c205..d4def5d67 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError::{Closed, Full}; use tokio::sync::mpsc::Sender; use tokio::sync::watch; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; // TODO taken from go-SSV as rough guidance. feel free to adjust as needed. https://github.com/ssvlabs/ssv/blob/e12abf7dfbbd068b99612fa2ebbe7e3372e57280/message/validation/errors.go#L55 #[derive(Debug)] @@ -162,7 +162,7 @@ pub enum Error { pub struct Validator { processor: Senders, result_tx: Sender, - network_state_rxx: watch::Receiver, + network_state_rx: watch::Receiver, } pub trait ValidatorService { @@ -178,12 +178,12 @@ impl Validator { pub fn new( processor: Senders, result_tx: Sender, - network_state_rxx: watch::Receiver, + network_state_rx: watch::Receiver, ) -> Self { Self { processor, result_tx, - network_state_rxx, + network_state_rx, } } @@ -220,7 +220,7 @@ impl Validator { ) -> Result<(), ValidationFailure> { let signers = signed_ssv_message.operator_ids().len(); - let db = self.network_state_rxx.borrow(); + let db = self.network_state_rx.borrow(); let committee_id = match signed_ssv_message.ssv_message().msg_id().duty_executor() { Some(DutyExecutor::Committee(id)) => id, _ => return Err(ValidationFailure::NonExistentCommitteeID), @@ -229,7 +229,8 @@ impl Validator { let committee_members = match db.get_cluster_members(&committee_id) { Some(committee_members) => { if committee_members.is_empty() { - return Err(ValidationFailure::NoValidators); + warn!(?committee_id, "Unexpected empty committee members"); + return Err(ValidationFailure::NonExistentCommitteeID); } committee_members } From dc3bd7830a9a07b2b2accb93c8a286184533e61d Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 7 Mar 2025 22:29:59 +0100 Subject: [PATCH 07/19] add tests --- Cargo.lock | 5 + anchor/client/src/lib.rs | 9 +- anchor/database/src/lib.rs | 1 + anchor/database/src/state.rs | 35 +- anchor/message_validator/Cargo.toml | 7 + anchor/message_validator/src/lib.rs | 599 +++++++++++++++++++++++++++- 6 files changed, 635 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 20eb38eea..7048f07d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5096,16 +5096,21 @@ dependencies = [ name = "message_validator" version = "0.1.0" dependencies = [ + "async-channel 1.9.0", + "bls", "database", "ethereum_ssz", "hex", "libp2p", + "once_cell", "processor", "sha2 0.10.8", "ssv_types", + "task_executor", "thiserror 2.0.11", "tokio", "tracing", + "types", ] [[package]] diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 97ca20361..94662966c 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -10,7 +10,7 @@ use beacon_node_fallback::{ }; pub use cli::Anchor; use config::Config; -use database::NetworkDatabase; +use database::{NetworkDatabase, WatchableNetworkState}; use eth2::reqwest::{Certificate, ClientBuilder}; use eth2::{BeaconNodeHttpClient, Timeouts}; use message_sender::NetworkMessageSender; @@ -356,8 +356,11 @@ impl Client { )?; let (results_tx, results_rx) = mpsc::channel::(9000); - let message_validator = - Validator::new(processor_senders.clone(), results_tx, database.watch()); + let message_validator = Validator::new( + processor_senders.clone(), + results_tx, + Box::new(WatchableNetworkState::new(database.watch())), + ); // Start the p2p network let network = Network::try_new( diff --git a/anchor/database/src/lib.rs b/anchor/database/src/lib.rs index 153ad1307..58f862734 100644 --- a/anchor/database/src/lib.rs +++ b/anchor/database/src/lib.rs @@ -13,6 +13,7 @@ use types::{Address, PublicKeyBytes}; pub use crate::error::DatabaseError; pub use crate::multi_index::{MultiIndexMap, *}; use crate::sql_operations::{SqlStatement, SQL}; +pub use crate::state::{NetworkStateService, WatchableNetworkState}; mod cluster_operations; mod error; diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index 8a47456e9..ba632be22 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -15,8 +15,13 @@ use ssv_types::{ }; use std::collections::{HashMap, HashSet}; use std::str::FromStr; +use tokio::sync::watch; use types::Address; +pub trait NetworkStateService: Send + Sync { + fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option>; +} + impl NetworkState { /// Build the network state from the database data pub(crate) fn new_with_state( @@ -263,14 +268,6 @@ impl NetworkState { &self.multi_state.clusters } - pub fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option> { - self.multi_state - .clusters - .get_all_by(committee_id) - .and_then(|clusters| clusters.first().cloned()) - .map(|cluster| cluster.cluster_members) - } - /// Get the ID of our Operator if it exists pub fn get_own_id(&self) -> Option { self.single_state.id @@ -301,3 +298,25 @@ impl NetworkState { self.single_state.last_processed_block } } + +pub struct WatchableNetworkState { + state_rx: watch::Receiver, +} + +impl WatchableNetworkState { + pub fn new(state_rx: watch::Receiver) -> Self { + Self { state_rx } + } +} + +impl NetworkStateService for WatchableNetworkState { + fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option> { + let db_state = self.state_rx.borrow(); + db_state + .multi_state + .clusters + .get_all_by(committee_id) + .and_then(|clusters| clusters.first().cloned()) + .map(|cluster| cluster.cluster_members) + } +} diff --git a/anchor/message_validator/Cargo.toml b/anchor/message_validator/Cargo.toml index 01750b87d..1b4f0a4a8 100644 --- a/anchor/message_validator/Cargo.toml +++ b/anchor/message_validator/Cargo.toml @@ -17,3 +17,10 @@ ssv_types = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } + +[dev-dependencies] +async-channel = { workspace = true } +bls = { workspace = true } +once_cell = "1.20.3" +task_executor = { workspace = true } +types = { workspace = true } diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index d4def5d67..13266d312 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -1,4 +1,4 @@ -use database::NetworkState; +use database::NetworkStateService; use libp2p::gossipsub::MessageAcceptance::{Accept, Reject}; use libp2p::gossipsub::{MessageAcceptance, MessageId}; use libp2p::PeerId; @@ -7,13 +7,15 @@ use sha2::{Digest, Sha256}; use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; use ssv_types::msgid::DutyExecutor; -use ssv_types::partial_sig::PartialSignatureMessages; +use ssv_types::partial_sig::{ + PartialSignatureKind, PartialSignatureMessage, PartialSignatureMessages, +}; use ssz::Decode; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError::{Closed, Full}; use tokio::sync::mpsc::Sender; -use tokio::sync::watch; use tracing::{error, trace, warn}; +use types::Slot; // TODO taken from go-SSV as rough guidance. feel free to adjust as needed. https://github.com/ssvlabs/ssv/blob/e12abf7dfbbd068b99612fa2ebbe7e3372e57280/message/validation/errors.go#L55 #[derive(Debug)] @@ -162,7 +164,7 @@ pub enum Error { pub struct Validator { processor: Senders, result_tx: Sender, - network_state_rx: watch::Receiver, + network_state_service: Arc, } pub trait ValidatorService { @@ -178,12 +180,12 @@ impl Validator { pub fn new( processor: Senders, result_tx: Sender, - network_state_rx: watch::Receiver, + network_state_service: Arc, ) -> Self { Self { processor, result_tx, - network_state_rx, + network_state_service, } } @@ -205,9 +207,16 @@ impl Validator { Ok(ValidatedSSVMessage::QbftMessage(consensus_message)) } MsgType::SSVPartialSignatureMsgType => { - PartialSignatureMessages::from_ssz_bytes(ssv_message.data()) + PartialSignatureMessage::from_ssz_bytes(ssv_message.data()) .ok() - .map(ValidatedSSVMessage::PartialSignatureMessages) + .map(|m| { + let p = PartialSignatureMessages { + kind: PartialSignatureKind::RandaoPartialSig, + slot: Slot::new(1), + messages: vec![m], + }; + ValidatedSSVMessage::PartialSignatureMessages(p) + }) .ok_or(ValidationFailure::UndecodableMessageData) } } @@ -220,13 +229,15 @@ impl Validator { ) -> Result<(), ValidationFailure> { let signers = signed_ssv_message.operator_ids().len(); - let db = self.network_state_rx.borrow(); let committee_id = match signed_ssv_message.ssv_message().msg_id().duty_executor() { Some(DutyExecutor::Committee(id)) => id, _ => return Err(ValidationFailure::NonExistentCommitteeID), }; - let committee_members = match db.get_cluster_members(&committee_id) { + let committee_members = match self + .network_state_service + .get_cluster_members(&committee_id) + { Some(committee_members) => { if committee_members.is_empty() { warn!(?committee_id, "Unexpected empty committee members"); @@ -418,3 +429,571 @@ fn hash_data_root(full_data: &[u8]) -> [u8; 32] { let hash: [u8; 32] = hasher.finalize().into(); hash } + +#[cfg(test)] +mod tests { + use super::*; + use bls::{Hash256, PublicKeyBytes}; + use once_cell::sync::Lazy; + use ssv_types::{CommitteeId, IndexSet, OperatorId}; + use std::sync::Arc; + use task_executor::TaskExecutor; + use tokio::sync::mpsc; + // Import real types from your modules. + use ssv_types::consensus::{QbftMessage, QbftMessageType}; + use ssv_types::domain_type::DomainType; + use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage, RSA_SIGNATURE_SIZE}; + use ssv_types::msgid::{DutyExecutor, MessageId, Role}; + + // Create a global task executor once for all tests. + static GLOBAL_EXECUTOR: Lazy = Lazy::new(|| { + let handle = tokio::runtime::Handle::current(); + let (_signal, exit) = async_channel::bounded(1); + let (shutdown, _) = libp2p::futures::channel::mpsc::channel(1); + TaskExecutor::new(handle, exit, shutdown) + }); + + // Create a global processor once for all tests. + static GLOBAL_PROCESSOR: Lazy = Lazy::new(|| { + let config = processor::Config::default(); + processor::spawn(config, GLOBAL_EXECUTOR.clone()) + }); + + struct MockNetworkStateService(usize); + + impl NetworkStateService for MockNetworkStateService { + fn get_cluster_members(&self, _cluster_id: &CommitteeId) -> Option> { + let mut members = IndexSet::new(); + for i in 0..self.0 { + members.insert(OperatorId(i as u64)); + } + Some(members) + } + } + + /// Real processor setup using the provided executor. + fn build_validator_and_outcome_channel( + _executor: TaskExecutor, + num_operators: usize, + ) -> (Arc, mpsc::Sender) { + let (outcome_tx, _outcome_rx) = mpsc::channel(10); + let validator = Arc::new(Validator::new( + GLOBAL_PROCESSOR.clone(), + outcome_tx.clone(), + Arc::new(MockNetworkStateService(num_operators)), + )); + (validator, outcome_tx) + } + + /// Helper: Create a valid MessageId for testing. + fn create_message_id_for_test(role: Role) -> MessageId { + let domain = DomainType([0, 0, 0, 1]); + let duty_executor = match role { + Role::Committee => DutyExecutor::Committee(CommitteeId([0u8; 32])), + _ => DutyExecutor::Validator(PublicKeyBytes::empty()), + }; + MessageId::new(&domain, role, &duty_executor) + } + + /// Helper functions for creating SSV messages. + mod test_utils { + use super::*; + use ssz::Encode; + /// Create a consensus SSVMessage from a given QbftMessage and message identifier. + pub fn create_consensus_ssv_message( + qbft_msg: QbftMessage, + msg_id: MessageId, + ) -> SSVMessage { + let qbft_bytes = qbft_msg.as_ssz_bytes(); + // The constructor now expects a MessageId (not a Vec). + SSVMessage::new(MsgType::SSVConsensusMsgType, msg_id, qbft_bytes) + .expect("SSVMessage should be created") + } + } + + /// Convenience function to build a SignedSSVMessage. + fn create_signed_ssv_message( + signatures: Vec>, + operator_ids: Vec, + ssv_message: SSVMessage, + full_data: Vec, + ) -> SignedSSVMessage { + SignedSSVMessage::new(signatures, operator_ids, ssv_message, full_data) + .expect("SignedSSVMessage should be created") + } + + /// Helper: Create a dummy SignedSSVMessage for justifications. + fn dummy_signed_ssv_message_for_justification() -> SignedSSVMessage { + let msg_id = create_message_id_for_test(Role::Proposer); + // Create a dummy consensus message; its content isn’t used. + + let dummy_qbft = QbftMessage { + qbft_message_type: QbftMessageType::Proposal, + height: 1, + round: 1, + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + let dummy_ssv = test_utils::create_consensus_ssv_message(dummy_qbft, msg_id); + create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + vec![OperatorId(42)], + dummy_ssv, + vec![], + ) + } + + /// Convenience: Quick SHA256 hash. + fn quick_hash(data: &[u8]) -> [u8; 32] { + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(data); + hasher.finalize().into() + } + + // --------------------------------------------------------------------- + // Consensus message tests + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_successful_validation_of_consensus_message_with_single_signer() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + let msg_id = create_message_id_for_test(Role::Committee); + let round = 1; + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Prepare, + height: 1, + round, + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id.clone()); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + vec![OperatorId(42)], + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_ok(), + "Expected a single-signer Prepare consensus message to validate successfully" + ); + if let Ok(ValidatedSSVMessage::QbftMessage(validated_qbft)) = result { + assert_eq!( + validated_qbft.round, round, + "Unexpected round in validated QbftMessage" + ); + assert_eq!( + validated_qbft.qbft_message_type, + QbftMessageType::Prepare, + "Unexpected QbftMessageType in validated QbftMessage" + ); + assert_eq!( + validated_qbft.identifier, msg_id, + "Identifier mismatch after validation" + ); + } else { + panic!("Expected a QbftMessage variant after validation"); + } + } + + #[tokio::test] + async fn test_consensus_message_with_multiple_signers_but_not_commit() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + // Multiple signers are only allowed for Commit messages. + let signers = vec![OperatorId(1), OperatorId(2), OperatorId(3)]; + let msg_id = create_message_id_for_test(Role::Committee); + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Prepare, // Non-Commit type. + height: 1, + round: 1, + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); + let signed_msg = create_signed_ssv_message( + vec![ + vec![0xAA; RSA_SIGNATURE_SIZE], + vec![0xBB; RSA_SIGNATURE_SIZE], + vec![0xCC; RSA_SIGNATURE_SIZE], + ], + signers.clone(), + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected multiple signers with non-Commit type to fail validation" + ); + match result.err().unwrap() { + ValidationFailure::NonDecidedWithMultipleSigners { got, want } => { + assert_eq!(got, signers.len(), "Unexpected number of signers in error"); + assert_eq!(want, 1, "Expected only one signer for non-Commit messages"); + } + other => panic!( + "Expected NonDecidedWithMultipleSigners error, got: {:?}", + other + ), + } + } + + #[tokio::test] + async fn test_consensus_message_with_multiple_signers_commit_but_not_enough_signers_for_quorum() + { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 4); + + // For Commit messages with multiple signers, the count must be >= quorum size. + let signers = vec![OperatorId(1), OperatorId(2)]; // Assume quorum requires at least 3. + let msg_id = create_message_id_for_test(Role::Committee); + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Commit, + height: 1, + round: 1, + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); + let signed_msg = create_signed_ssv_message( + vec![ + vec![0xAA; RSA_SIGNATURE_SIZE], + vec![0xBB; RSA_SIGNATURE_SIZE], + ], + signers.clone(), + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected Commit message with insufficient signers to fail validation" + ); + match result.err().unwrap() { + ValidationFailure::DecidedNotEnoughSigners { got, want } => { + assert_eq!(got, signers.len(), "Mismatch in signer count reported"); + assert!(got < want, "Got should be less than required quorum"); + } + other => panic!("Expected DecidedNotEnoughSigners error, got: {:?}", other), + } + } + + #[tokio::test] + async fn test_consensus_message_full_data_mismatched_root_hash() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + // For a Commit message with full_data (single-signer) the full data hash must match. + let signers = vec![OperatorId(42)]; + let full_data = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let msg_id = create_message_id_for_test(Role::Committee); + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Commit, + height: 1, + round: 1, + identifier: msg_id.clone(), + // Set root to the hash of an empty slice (mismatched) + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + signers.clone(), + ssv_msg, + full_data, + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected validation failure due to full data hash mismatch" + ); + match result.err().unwrap() { + ValidationFailure::PrepareOrCommitWithFullData => { /* Expected */ } + other => panic!( + "Expected PrepareOrCommitWithFullData error, got: {:?}", + other + ), + } + } + + #[tokio::test] + async fn test_consensus_message_zero_round_fails() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + let signers = vec![OperatorId(42)]; + let msg_id = create_message_id_for_test(Role::Committee); + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Proposal, + height: 1, + round: 0, // Invalid round. + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + signers, + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!(result.is_err(), "Expected round=0 to fail validation"); + match result.err().unwrap() { + ValidationFailure::ZeroRound => (), + other => panic!("Expected ZeroRound error, got: {:?}", other), + } + } + + #[tokio::test] + async fn test_consensus_message_round_too_high() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + // For a proposer, max_round is Some(6). Set round = 7 to trigger an error. + let signers = vec![OperatorId(42)]; + let msg_id = create_message_id_for_test(Role::Committee); + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Proposal, + height: 1, + round: 13, // Invalid round. + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + signers, + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected round > max_round to fail validation" + ); + match result.err().unwrap() { + ValidationFailure::RoundTooHigh => (), + other => panic!("Expected RoundTooHigh error, got: {:?}", other), + } + } + + #[tokio::test] + async fn test_consensus_message_mismatched_identifier() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + let signers = vec![OperatorId(42)]; + // Create two different MessageIds. + let msg_id_a = create_message_id_for_test(Role::Committee); + let msg_id_b = create_message_id_for_test(Role::Proposer); + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Proposal, + height: 1, + round: 1, + identifier: msg_id_b.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id_a); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + signers, + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected mismatched identifier to fail validation" + ); + match result.err().unwrap() { + ValidationFailure::MismatchedIdentifier { got, want } => { + // Expect hexadecimal strings representing the differing ids. + // Adjust these expectations as appropriate. + assert_ne!(got, want, "Expected identifiers to differ"); + } + other => panic!("Expected MismatchedIdentifier error, got: {:?}", other), + } + } + + #[tokio::test] + async fn test_consensus_message_decode_failure() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + let signers = vec![OperatorId(42)]; + // Provide invalid consensus data. + let msg_id = create_message_id_for_test(Role::Proposer); + let invalid_data = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, msg_id, invalid_data) + .expect("SSVMessage should be created"); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + signers, + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected decode failure for consensus message data" + ); + match result.err().unwrap() { + ValidationFailure::UndecodableMessageData => (), + other => panic!("Expected UndecodableMessageData error, got: {:?}", other), + } + } + + #[tokio::test] + async fn test_prepare_justifications_with_non_proposal_message() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + let signers = vec![OperatorId(42)]; + let msg_id = create_message_id_for_test(Role::Committee); + // Create a Prepare message (non-Proposal) with a non-empty prepare justification. + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Prepare, + height: 1, + round: 1, + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![dummy_signed_ssv_message_for_justification()], + }; + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + signers, + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected non-empty prepare_justifications in a non-Proposal to fail" + ); + match result.err().unwrap() { + ValidationFailure::UnexpectedPrepareJustifications => (), + other => panic!( + "Expected UnexpectedPrepareJustifications error, got: {:?}", + other + ), + } + } + + #[tokio::test] + async fn test_round_change_justifications_with_non_proposal_or_roundchange() { + let (validator, _outcome_tx) = + build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + + let signers = vec![OperatorId(42)]; + let msg_id = create_message_id_for_test(Role::Committee); + // Create a Commit message with non-empty round_change_justification. + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Commit, + height: 1, + round: 1, + identifier: msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![dummy_signed_ssv_message_for_justification()], + prepare_justification: vec![], + }; + let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); + let signed_msg = create_signed_ssv_message( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + signers, + ssv_msg, + vec![], + ); + + let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + assert!( + result.is_err(), + "Expected non-empty round_change_justifications in a Commit to fail" + ); + match result.err().unwrap() { + ValidationFailure::UnexpectedRoundChangeJustifications => (), + other => panic!( + "Expected UnexpectedRoundChangeJustifications error, got: {:?}", + other + ), + } + } + + #[tokio::test] + async fn test_compute_quorum_size() { + // For committee_size=4 -> f=1 -> quorum=3. + assert_eq!( + compute_quorum_size(4), + 3, + "Expected quorum=3 for committee of 4" + ); + // For committee_size=7 -> f=2 -> quorum=5. + assert_eq!( + compute_quorum_size(7), + 5, + "Expected quorum=5 for committee of 7" + ); + // For committee_size=1 -> f=0 -> quorum=1. + assert_eq!( + compute_quorum_size(1), + 1, + "Expected quorum=1 for committee of 1" + ); + } + // + // #[tokio::test] + // async fn test_hash_data_root() { + // let data = b"hello world"; + // let hash_of_data = hash_data_root(data); + // let expected_hash = quick_hash(data); + // assert_eq!( + // hash_of_data, expected_hash, + // "hash_data_root should match the SHA256 hash for the given input" + // ); + // } +} From 878c0dead46c4e00766d4f645da881b2581820c6 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 10 Mar 2025 13:40:47 +0100 Subject: [PATCH 08/19] improve tests --- anchor/client/src/lib.rs | 2 +- anchor/common/ssv_types/src/lib.rs | 1 + anchor/message_validator/src/lib.rs | 641 ++++++++++++---------------- 3 files changed, 265 insertions(+), 379 deletions(-) diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 94662966c..ef524e132 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -359,7 +359,7 @@ impl Client { let message_validator = Validator::new( processor_senders.clone(), results_tx, - Box::new(WatchableNetworkState::new(database.watch())), + Arc::new(WatchableNetworkState::new(database.watch())), ); // Start the p2p network diff --git a/anchor/common/ssv_types/src/lib.rs b/anchor/common/ssv_types/src/lib.rs index c7c17668f..2147a7014 100644 --- a/anchor/common/ssv_types/src/lib.rs +++ b/anchor/common/ssv_types/src/lib.rs @@ -16,3 +16,4 @@ mod util; pub use indexmap::IndexSet; pub use share::ENCRYPTED_KEY_LENGTH; +pub use types::Slot; diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 13266d312..360bcf854 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -10,12 +10,12 @@ use ssv_types::msgid::DutyExecutor; use ssv_types::partial_sig::{ PartialSignatureKind, PartialSignatureMessage, PartialSignatureMessages, }; +use ssv_types::Slot; use ssz::Decode; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError::{Closed, Full}; use tokio::sync::mpsc::Sender; use tracing::{error, trace, warn}; -use types::Slot; // TODO taken from go-SSV as rough guidance. feel free to adjust as needed. https://github.com/ssvlabs/ssv/blob/e12abf7dfbbd068b99612fa2ebbe7e3372e57280/message/validation/errors.go#L55 #[derive(Debug)] @@ -435,15 +435,20 @@ mod tests { use super::*; use bls::{Hash256, PublicKeyBytes}; use once_cell::sync::Lazy; - use ssv_types::{CommitteeId, IndexSet, OperatorId}; - use std::sync::Arc; - use task_executor::TaskExecutor; - use tokio::sync::mpsc; - // Import real types from your modules. use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::domain_type::DomainType; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage, RSA_SIGNATURE_SIZE}; use ssv_types::msgid::{DutyExecutor, MessageId, Role}; + use ssv_types::{CommitteeId, IndexSet, OperatorId}; + use ssz::Encode; + use std::sync::Arc; + use task_executor::TaskExecutor; + use tokio::sync::mpsc; + + // Constants for committee sizes in tests to improve readability + const SINGLE_NODE_COMMITTEE: usize = 1; + const FOUR_NODE_COMMITTEE: usize = 4; + const SEVEN_NODE_COMMITTEE: usize = 7; // Create a global task executor once for all tests. static GLOBAL_EXECUTOR: Lazy = Lazy::new(|| { @@ -471,87 +476,152 @@ mod tests { } } - /// Real processor setup using the provided executor. - fn build_validator_and_outcome_channel( - _executor: TaskExecutor, - num_operators: usize, - ) -> (Arc, mpsc::Sender) { - let (outcome_tx, _outcome_rx) = mpsc::channel(10); - let validator = Arc::new(Validator::new( - GLOBAL_PROCESSOR.clone(), - outcome_tx.clone(), - Arc::new(MockNetworkStateService(num_operators)), - )); - (validator, outcome_tx) + // Test fixture for setup + struct TestFixture { + validator: Arc, + _outcome_tx: Sender, } - /// Helper: Create a valid MessageId for testing. - fn create_message_id_for_test(role: Role) -> MessageId { - let domain = DomainType([0, 0, 0, 1]); - let duty_executor = match role { - Role::Committee => DutyExecutor::Committee(CommitteeId([0u8; 32])), - _ => DutyExecutor::Validator(PublicKeyBytes::empty()), - }; - MessageId::new(&domain, role, &duty_executor) - } + impl TestFixture { + fn new(committee_size: usize) -> Self { + let (outcome_tx, _outcome_rx) = mpsc::channel(10); + let validator = Arc::new(Validator::new( + GLOBAL_PROCESSOR.clone(), + outcome_tx.clone(), + Arc::new(MockNetworkStateService(committee_size)), + )); + Self { + validator, + _outcome_tx: outcome_tx, + } + } - /// Helper functions for creating SSV messages. - mod test_utils { - use super::*; - use ssz::Encode; - /// Create a consensus SSVMessage from a given QbftMessage and message identifier. - pub fn create_consensus_ssv_message( - qbft_msg: QbftMessage, - msg_id: MessageId, - ) -> SSVMessage { - let qbft_bytes = qbft_msg.as_ssz_bytes(); - // The constructor now expects a MessageId (not a Vec). - SSVMessage::new(MsgType::SSVConsensusMsgType, msg_id, qbft_bytes) - .expect("SSVMessage should be created") + // Helper for common validation pattern + fn validate_message( + &self, + signed_msg: &SignedSSVMessage, + ) -> Result { + self.validator + .validate_ssv_message(signed_msg, signed_msg.ssv_message()) } } - /// Convenience function to build a SignedSSVMessage. - fn create_signed_ssv_message( + // Helper functions for message creation + struct MessageBuilder { + msg_id: MessageId, + msg_type: QbftMessageType, + round: u64, + signers: Vec, signatures: Vec>, - operator_ids: Vec, - ssv_message: SSVMessage, full_data: Vec, - ) -> SignedSSVMessage { - SignedSSVMessage::new(signatures, operator_ids, ssv_message, full_data) - .expect("SignedSSVMessage should be created") + prepare_justification: Vec, + round_change_justification: Vec, } - /// Helper: Create a dummy SignedSSVMessage for justifications. - fn dummy_signed_ssv_message_for_justification() -> SignedSSVMessage { - let msg_id = create_message_id_for_test(Role::Proposer); - // Create a dummy consensus message; its content isn’t used. + impl MessageBuilder { + fn new(role: Role, msg_type: QbftMessageType) -> Self { + Self { + msg_id: create_message_id_for_test(role), + msg_type, + round: 1, + signers: vec![OperatorId(42)], + signatures: vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + full_data: vec![], + prepare_justification: vec![], + round_change_justification: vec![], + } + } - let dummy_qbft = QbftMessage { - qbft_message_type: QbftMessageType::Proposal, - height: 1, - round: 1, - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![], + fn with_round(mut self, round: u64) -> Self { + self.round = round; + self + } + + fn with_signers(mut self, signers: Vec) -> Self { + // Create matching number of signatures + self.signatures = signers + .iter() + .enumerate() + .map(|(i, _)| { + // Create unique signatures for each signer + vec![0xAA + i as u8; RSA_SIGNATURE_SIZE] + }) + .collect(); + self.signers = signers; + self + } + + fn with_full_data(mut self, data: Vec) -> Self { + self.full_data = data; + self + } + + fn with_prepare_justification(mut self, justifications: Vec) -> Self { + self.prepare_justification = justifications; + self + } + + fn with_round_change_justification( + mut self, + justifications: Vec, + ) -> Self { + self.round_change_justification = justifications; + self + } + + fn build(self) -> SignedSSVMessage { + let qbft_msg = QbftMessage { + qbft_message_type: self.msg_type, + height: 1, + round: self.round, + identifier: self.msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: self.round_change_justification, + prepare_justification: self.prepare_justification, + }; + + let qbft_bytes = qbft_msg.as_ssz_bytes(); + let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, self.msg_id, qbft_bytes) + .expect("SSVMessage should be created"); + + SignedSSVMessage::new(self.signatures, self.signers, ssv_msg, self.full_data) + .expect("SignedSSVMessage should be created") + } + } + + fn create_message_id_for_test(role: Role) -> MessageId { + let domain = DomainType([0, 0, 0, 1]); + let duty_executor = match role { + Role::Committee => DutyExecutor::Committee(CommitteeId([0u8; 32])), + _ => DutyExecutor::Validator(PublicKeyBytes::empty()), }; - let dummy_ssv = test_utils::create_consensus_ssv_message(dummy_qbft, msg_id); - create_signed_ssv_message( - vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - vec![OperatorId(42)], - dummy_ssv, - vec![], - ) + MessageId::new(&domain, role, &duty_executor) } - /// Convenience: Quick SHA256 hash. - fn quick_hash(data: &[u8]) -> [u8; 32] { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(data); - hasher.finalize().into() + fn dummy_signed_ssv_message_for_justification() -> SignedSSVMessage { + MessageBuilder::new(Role::Proposer, QbftMessageType::Proposal).build() + } + + // Assert helpers for common validation patterns + fn assert_validation_error( + result: Result, + expected_error: F, + error_name: &str, + ) where + F: Fn(&ValidationFailure) -> bool, + { + match result { + Ok(_) => panic!("Expected validation to fail with {}", error_name), + Err(failure) => { + assert!( + expected_error(&failure), + "Expected {} error, got: {:?}", + error_name, + failure + ); + } + } } // --------------------------------------------------------------------- @@ -560,38 +630,19 @@ mod tests { #[tokio::test] async fn test_successful_validation_of_consensus_message_with_single_signer() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - let msg_id = create_message_id_for_test(Role::Committee); - let round = 1; - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Prepare, - height: 1, - round, - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![], - }; + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Prepare).build(); - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id.clone()); - let signed_msg = create_signed_ssv_message( - vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - vec![OperatorId(42)], - ssv_msg, - vec![], - ); - - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); + let result = fixture.validate_message(&signed_msg); assert!( result.is_ok(), "Expected a single-signer Prepare consensus message to validate successfully" ); + if let Ok(ValidatedSSVMessage::QbftMessage(validated_qbft)) = result { assert_eq!( - validated_qbft.round, round, + validated_qbft.round, 1, "Unexpected round in validated QbftMessage" ); assert_eq!( @@ -600,7 +651,8 @@ mod tests { "Unexpected QbftMessageType in validated QbftMessage" ); assert_eq!( - validated_qbft.identifier, msg_id, + validated_qbft.identifier, + create_message_id_for_test(Role::Committee), "Identifier mismatch after validation" ); } else { @@ -610,390 +662,223 @@ mod tests { #[tokio::test] async fn test_consensus_message_with_multiple_signers_but_not_commit() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); // Multiple signers are only allowed for Commit messages. let signers = vec![OperatorId(1), OperatorId(2), OperatorId(3)]; - let msg_id = create_message_id_for_test(Role::Committee); - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Prepare, // Non-Commit type. - height: 1, - round: 1, - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![], - }; + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Prepare) + .with_signers(signers.clone()) + .build(); - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); - let signed_msg = create_signed_ssv_message( - vec![ - vec![0xAA; RSA_SIGNATURE_SIZE], - vec![0xBB; RSA_SIGNATURE_SIZE], - vec![0xCC; RSA_SIGNATURE_SIZE], - ], - signers.clone(), - ssv_msg, - vec![], - ); + let result = fixture.validate_message(&signed_msg); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected multiple signers with non-Commit type to fail validation" + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::NonDecidedWithMultipleSigners { got, want } if *got == signers.len() && *want == SINGLE_NODE_COMMITTEE), + "NonDecidedWithMultipleSigners", ); - match result.err().unwrap() { - ValidationFailure::NonDecidedWithMultipleSigners { got, want } => { - assert_eq!(got, signers.len(), "Unexpected number of signers in error"); - assert_eq!(want, 1, "Expected only one signer for non-Commit messages"); - } - other => panic!( - "Expected NonDecidedWithMultipleSigners error, got: {:?}", - other - ), - } } #[tokio::test] async fn test_consensus_message_with_multiple_signers_commit_but_not_enough_signers_for_quorum() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 4); + let fixture = TestFixture::new(FOUR_NODE_COMMITTEE); // For Commit messages with multiple signers, the count must be >= quorum size. - let signers = vec![OperatorId(1), OperatorId(2)]; // Assume quorum requires at least 3. - let msg_id = create_message_id_for_test(Role::Committee); - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Commit, - height: 1, - round: 1, - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![], - }; + let signers = vec![OperatorId(1), OperatorId(2)]; // Quorum requires at least 3 for a committee of 4. + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_signers(signers.clone()) + .build(); - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); - let signed_msg = create_signed_ssv_message( - vec![ - vec![0xAA; RSA_SIGNATURE_SIZE], - vec![0xBB; RSA_SIGNATURE_SIZE], - ], - signers.clone(), - ssv_msg, - vec![], - ); + let result = fixture.validate_message(&signed_msg); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected Commit message with insufficient signers to fail validation" + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::DecidedNotEnoughSigners { got, want } if *got == signers.len() && *want == FOUR_NODE_COMMITTEE - 1), + "DecidedNotEnoughSigners", ); - match result.err().unwrap() { - ValidationFailure::DecidedNotEnoughSigners { got, want } => { - assert_eq!(got, signers.len(), "Mismatch in signer count reported"); - assert!(got < want, "Got should be less than required quorum"); - } - other => panic!("Expected DecidedNotEnoughSigners error, got: {:?}", other), - } } #[tokio::test] async fn test_consensus_message_full_data_mismatched_root_hash() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - // For a Commit message with full_data (single-signer) the full data hash must match. - let signers = vec![OperatorId(42)]; let full_data = vec![0xDE, 0xAD, 0xBE, 0xEF]; - let msg_id = create_message_id_for_test(Role::Committee); - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Commit, - height: 1, - round: 1, - identifier: msg_id.clone(), - // Set root to the hash of an empty slice (mismatched) - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![], - }; - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); - let signed_msg = create_signed_ssv_message( - vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - signers.clone(), - ssv_msg, - full_data, - ); + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_full_data(full_data) + .build(); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected validation failure due to full data hash mismatch" + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::PrepareOrCommitWithFullData), + "PrepareOrCommitWithFullData", ); - match result.err().unwrap() { - ValidationFailure::PrepareOrCommitWithFullData => { /* Expected */ } - other => panic!( - "Expected PrepareOrCommitWithFullData error, got: {:?}", - other - ), - } } #[tokio::test] async fn test_consensus_message_zero_round_fails() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - let signers = vec![OperatorId(42)]; - let msg_id = create_message_id_for_test(Role::Committee); - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Proposal, - height: 1, - round: 0, // Invalid round. - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![], - }; - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); - let signed_msg = create_signed_ssv_message( - vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - signers, - ssv_msg, - vec![], - ); + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Proposal) + .with_round(0) + .build(); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!(result.is_err(), "Expected round=0 to fail validation"); - match result.err().unwrap() { - ValidationFailure::ZeroRound => (), - other => panic!("Expected ZeroRound error, got: {:?}", other), - } + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::ZeroRound), + "ZeroRound", + ); } #[tokio::test] async fn test_consensus_message_round_too_high() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - // For a proposer, max_round is Some(6). Set round = 7 to trigger an error. - let signers = vec![OperatorId(42)]; - let msg_id = create_message_id_for_test(Role::Committee); - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Proposal, - height: 1, - round: 13, // Invalid round. - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![], - }; - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); - let signed_msg = create_signed_ssv_message( - vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - signers, - ssv_msg, - vec![], - ); + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Proposal) + .with_round(13) // Too high (max is 12) + .build(); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected round > max_round to fail validation" + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::RoundTooHigh), + "RoundTooHigh", ); - match result.err().unwrap() { - ValidationFailure::RoundTooHigh => (), - other => panic!("Expected RoundTooHigh error, got: {:?}", other), - } } #[tokio::test] async fn test_consensus_message_mismatched_identifier() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - let signers = vec![OperatorId(42)]; - // Create two different MessageIds. + // Create message with mismatched identifier let msg_id_a = create_message_id_for_test(Role::Committee); let msg_id_b = create_message_id_for_test(Role::Proposer); + let qbft_msg = QbftMessage { qbft_message_type: QbftMessageType::Proposal, height: 1, round: 1, - identifier: msg_id_b.clone(), + identifier: msg_id_b, // Mismatched ID root: Hash256::from([0u8; 32]), data_round: 1, round_change_justification: vec![], prepare_justification: vec![], }; - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id_a); - let signed_msg = create_signed_ssv_message( + + let qbft_bytes = qbft_msg.as_ssz_bytes(); + let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, msg_id_a, qbft_bytes) + .expect("SSVMessage should be created"); + let signed_msg = SignedSSVMessage::new( vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - signers, + vec![OperatorId(42)], ssv_msg, vec![], - ); + ) + .expect("SignedSSVMessage should be created"); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected mismatched identifier to fail validation" + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| { + matches!( + failure, + ValidationFailure::MismatchedIdentifier { got: _, want: _ } + ) + }, + "MismatchedIdentifier", ); - match result.err().unwrap() { - ValidationFailure::MismatchedIdentifier { got, want } => { - // Expect hexadecimal strings representing the differing ids. - // Adjust these expectations as appropriate. - assert_ne!(got, want, "Expected identifiers to differ"); - } - other => panic!("Expected MismatchedIdentifier error, got: {:?}", other), - } } #[tokio::test] async fn test_consensus_message_decode_failure() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - let signers = vec![OperatorId(42)]; - // Provide invalid consensus data. + // Provide invalid consensus data let msg_id = create_message_id_for_test(Role::Proposer); let invalid_data = vec![0xDE, 0xAD, 0xBE, 0xEF]; let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, msg_id, invalid_data) .expect("SSVMessage should be created"); - let signed_msg = create_signed_ssv_message( + let signed_msg = SignedSSVMessage::new( vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - signers, + vec![OperatorId(42)], ssv_msg, vec![], - ); + ) + .expect("SignedSSVMessage should be created"); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected decode failure for consensus message data" + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::UndecodableMessageData), + "UndecodableMessageData", ); - match result.err().unwrap() { - ValidationFailure::UndecodableMessageData => (), - other => panic!("Expected UndecodableMessageData error, got: {:?}", other), - } } #[tokio::test] async fn test_prepare_justifications_with_non_proposal_message() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - let signers = vec![OperatorId(42)]; - let msg_id = create_message_id_for_test(Role::Committee); - // Create a Prepare message (non-Proposal) with a non-empty prepare justification. - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Prepare, - height: 1, - round: 1, - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![], - prepare_justification: vec![dummy_signed_ssv_message_for_justification()], - }; - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); - let signed_msg = create_signed_ssv_message( - vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - signers, - ssv_msg, - vec![], - ); + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Prepare) + .with_prepare_justification(vec![dummy_signed_ssv_message_for_justification()]) + .build(); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected non-empty prepare_justifications in a non-Proposal to fail" + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::UnexpectedPrepareJustifications), + "UnexpectedPrepareJustifications", ); - match result.err().unwrap() { - ValidationFailure::UnexpectedPrepareJustifications => (), - other => panic!( - "Expected UnexpectedPrepareJustifications error, got: {:?}", - other - ), - } } #[tokio::test] async fn test_round_change_justifications_with_non_proposal_or_roundchange() { - let (validator, _outcome_tx) = - build_validator_and_outcome_channel(GLOBAL_EXECUTOR.clone(), 1); + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); - let signers = vec![OperatorId(42)]; - let msg_id = create_message_id_for_test(Role::Committee); - // Create a Commit message with non-empty round_change_justification. - let qbft_msg = QbftMessage { - qbft_message_type: QbftMessageType::Commit, - height: 1, - round: 1, - identifier: msg_id.clone(), - root: Hash256::from([0u8; 32]), - data_round: 1, - round_change_justification: vec![dummy_signed_ssv_message_for_justification()], - prepare_justification: vec![], - }; - let ssv_msg = test_utils::create_consensus_ssv_message(qbft_msg, msg_id); - let signed_msg = create_signed_ssv_message( - vec![vec![0xAA; RSA_SIGNATURE_SIZE]], - signers, - ssv_msg, - vec![], - ); + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_round_change_justification(vec![dummy_signed_ssv_message_for_justification()]) + .build(); - let result = validator.validate_ssv_message(&signed_msg, signed_msg.ssv_message()); - assert!( - result.is_err(), - "Expected non-empty round_change_justifications in a Commit to fail" + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| { + matches!( + failure, + ValidationFailure::UnexpectedRoundChangeJustifications + ) + }, + "UnexpectedRoundChangeJustifications", ); - match result.err().unwrap() { - ValidationFailure::UnexpectedRoundChangeJustifications => (), - other => panic!( - "Expected UnexpectedRoundChangeJustifications error, got: {:?}", - other - ), - } } #[tokio::test] async fn test_compute_quorum_size() { // For committee_size=4 -> f=1 -> quorum=3. assert_eq!( - compute_quorum_size(4), + compute_quorum_size(FOUR_NODE_COMMITTEE), 3, "Expected quorum=3 for committee of 4" ); // For committee_size=7 -> f=2 -> quorum=5. assert_eq!( - compute_quorum_size(7), + compute_quorum_size(SEVEN_NODE_COMMITTEE), 5, "Expected quorum=5 for committee of 7" ); // For committee_size=1 -> f=0 -> quorum=1. assert_eq!( - compute_quorum_size(1), + compute_quorum_size(SINGLE_NODE_COMMITTEE), 1, "Expected quorum=1 for committee of 1" ); } - // - // #[tokio::test] - // async fn test_hash_data_root() { - // let data = b"hello world"; - // let hash_of_data = hash_data_root(data); - // let expected_hash = quick_hash(data); - // assert_eq!( - // hash_of_data, expected_hash, - // "hash_data_root should match the SHA256 hash for the given input" - // ); - // } } From e5158dbe0993d99e366e8b8498b654e9a610f817 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 10 Mar 2025 15:05:18 +0100 Subject: [PATCH 09/19] fix comments --- anchor/database/src/multi_index.rs | 6 ++++++ anchor/database/src/state.rs | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/anchor/database/src/multi_index.rs b/anchor/database/src/multi_index.rs index e0f2cc84e..c2823303d 100644 --- a/anchor/database/src/multi_index.rs +++ b/anchor/database/src/multi_index.rs @@ -193,8 +193,10 @@ where // Remove from secondary index if std::any::TypeId::of::() == std::any::TypeId::of::() { + // For unique indexes, just remove the entry that points to this k1 self.maps.secondary_unique.retain(|_, v| v != k1); } else { + // For non-unique indexes, remove k1 from any vectors it appears in self.maps.secondary_multi.retain(|_, vec| { vec.retain(|x| x != k1); !vec.is_empty() @@ -203,8 +205,10 @@ where // Remove from tertiary index if std::any::TypeId::of::() == std::any::TypeId::of::() { + // For unique indexes, just remove the entry that points to this k1 self.maps.tertiary_unique.retain(|_, v| v != k1); } else { + // For non-unique indexes, remove k1 from any vectors it appears in self.maps.tertiary_multi.retain(|_, vec| { vec.retain(|x| x != k1); !vec.is_empty() @@ -230,6 +234,8 @@ where if !self.maps.primary.contains_key(k1) { return None; } + + // Only update the value in primary storage self.maps.primary.insert(k1.clone(), new_value) } } diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index ba632be22..8759a3613 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -51,7 +51,7 @@ impl NetworkState { // 5) Owner -> Nonce (u16) let nonces = Self::fetch_nonces(&conn)?; - //ClusterId -> CommitteeId. It's populated in the loop that populates the multi-index maps + //CommitteeId -> ClusterId. It's populated in the loop that populates the multi-index maps let mut clusters_by_committee_id = HashMap::new(); // Second phase: Populate all in memory stores with data; From 87da566ee949c3b86c07725472be9a5bc7a92a1c Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 10 Mar 2025 19:05:30 +0100 Subject: [PATCH 10/19] add new test --- anchor/message_validator/src/lib.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 360bcf854..016c4612b 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -821,6 +821,23 @@ mod tests { ); } + #[tokio::test] + async fn test_consensus_message_multiple_signers_commit_with_full_data_and_invalid_hash() { + let fixture = TestFixture::new(FOUR_NODE_COMMITTEE); + let signers = vec![OperatorId(1), OperatorId(2), OperatorId(3)]; + let full_data = vec![0xFF; 16]; + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_signers(signers.clone()) + .with_full_data(full_data) + .build(); + let result = fixture.validate_message(&signed_msg); + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::InvalidHash), + "InvalidHash", + ); + } + #[tokio::test] async fn test_prepare_justifications_with_non_proposal_message() { let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); @@ -839,7 +856,7 @@ mod tests { } #[tokio::test] - async fn test_round_change_justifications_with_non_proposal_or_roundchange() { + async fn test_round_change_justifications_with_non_proposal_or_round_change() { let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) From 5a966e51bbd31f0ccdc36c063544f0e80baece84 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 10 Mar 2025 19:09:11 +0100 Subject: [PATCH 11/19] remove do_validate --- anchor/message_validator/src/lib.rs | 39 ++++++++--------------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 016c4612b..09f5be824 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -189,10 +189,6 @@ impl Validator { } } - fn do_validate(&self, _message: &SignedSSVMessage) -> Result<(), ValidationFailure> { - Ok(()) - } - fn validate_ssv_message( &self, signed_ssv_message: &SignedSSVMessage, @@ -349,30 +345,17 @@ impl ValidatorService for Validator { match SignedSSVMessage::from_ssz_bytes(&message_data) { Ok(deserialized_message) => { trace!(msg = ?deserialized_message, "SignedSSVMessage deserialized"); - match validator.do_validate(&deserialized_message) { - Ok(()) => { - match validator.validate_ssv_message( - &deserialized_message, - deserialized_message.ssv_message(), - ) { - Ok(validated_ssv_message) => ( - Accept, - Some(ValidatedMessage::new( - deserialized_message.clone(), - validated_ssv_message, - )), - ), - Err(failure) => { - trace!( - ?failure, - ?message_id, - ?propagation_source, - "Validation failure" - ); - ((&failure).into(), None) - } - } - } + match validator.validate_ssv_message( + &deserialized_message, + deserialized_message.ssv_message(), + ) { + Ok(validated_ssv_message) => ( + Accept, + Some(ValidatedMessage::new( + deserialized_message.clone(), + validated_ssv_message, + )), + ), Err(failure) => { trace!( ?failure, From a4a58be7071f1b2987842b15d70a24ea3426ac24 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 10 Mar 2025 21:55:44 +0100 Subject: [PATCH 12/19] decode msgs to PartialSignatureMessages --- anchor/message_validator/src/lib.rs | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 09f5be824..570adbe65 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -7,10 +7,7 @@ use sha2::{Digest, Sha256}; use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; use ssv_types::msgid::DutyExecutor; -use ssv_types::partial_sig::{ - PartialSignatureKind, PartialSignatureMessage, PartialSignatureMessages, -}; -use ssv_types::Slot; +use ssv_types::partial_sig::PartialSignatureMessages; use ssz::Decode; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError::{Closed, Full}; @@ -203,21 +200,23 @@ impl Validator { Ok(ValidatedSSVMessage::QbftMessage(consensus_message)) } MsgType::SSVPartialSignatureMsgType => { - PartialSignatureMessage::from_ssz_bytes(ssv_message.data()) - .ok() - .map(|m| { - let p = PartialSignatureMessages { - kind: PartialSignatureKind::RandaoPartialSig, - slot: Slot::new(1), - messages: vec![m], - }; - ValidatedSSVMessage::PartialSignatureMessages(p) - }) - .ok_or(ValidationFailure::UndecodableMessageData) + self.validate_partial_signature_message(ssv_message) } } } + fn validate_partial_signature_message( + &self, + ssv_message: &SSVMessage, + ) -> Result { + let messages = match PartialSignatureMessages::from_ssz_bytes(ssv_message.data()) { + Ok(msgs) => msgs, + Err(_) => return Err(ValidationFailure::UndecodableMessageData), + }; + + Ok(ValidatedSSVMessage::PartialSignatureMessages(messages)) + } + fn validate_consensus_message_semantics( &self, signed_ssv_message: &SignedSSVMessage, From ec791c74903db24ae9a038e764c3a3e21d62f1ce Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 11 Mar 2025 13:03:34 +0100 Subject: [PATCH 13/19] add Rule: Duty role has consensus (true except for ValidatorRegistration and VoluntaryExit) fix max_round --- anchor/common/ssv_types/src/consensus.rs | 12 +++++------- anchor/message_validator/src/lib.rs | 10 +++++++++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/anchor/common/ssv_types/src/consensus.rs b/anchor/common/ssv_types/src/consensus.rs index c6f78de24..d1f2c82fa 100644 --- a/anchor/common/ssv_types/src/consensus.rs +++ b/anchor/common/ssv_types/src/consensus.rs @@ -61,13 +61,11 @@ pub struct QbftMessage { impl QbftMessage { pub fn max_round(&self) -> Option { - match self.identifier.role() { - Some(role) => match role { - Role::Committee | Role::Aggregator => Some(12), - Role::Proposer | Role::SyncCommittee => Some(6), - }, - None => None, - } + self.identifier.role().and_then(|role| match role { + Role::Committee | Role::Aggregator => Some(12), + Role::Proposer | Role::SyncCommittee => Some(6), + _ => None, + }) } } diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 570adbe65..305b514ed 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -6,7 +6,7 @@ use processor::Senders; use sha2::{Digest, Sha256}; use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; -use ssv_types::msgid::DutyExecutor; +use ssv_types::msgid::{DutyExecutor, Role}; use ssv_types::partial_sig::PartialSignatureMessages; use ssz::Decode; use std::sync::Arc; @@ -283,6 +283,14 @@ impl Validator { return Err(ValidationFailure::ZeroRound); } + // Rule: Duty role has consensus (true except for ValidatorRegistration and VoluntaryExit) + if matches!( + signed_ssv_message.ssv_message().msg_id().role(), + Some(Role::ValidatorRegistration) | Some(Role::VoluntaryExit) + ) { + return Err(ValidationFailure::UnexpectedConsensusMessage); + } + let màx_round = match consensus_message.max_round() { Some(max_round) => max_round, None => return Err(ValidationFailure::FailedToGetMaxRound), From 66b4e3fb83aea50f321d82cf60575e651d0f6ff5 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 11 Mar 2025 15:47:17 +0100 Subject: [PATCH 14/19] remove unused code --- anchor/database/src/state.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index 8759a3613..1cf216998 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -51,9 +51,6 @@ impl NetworkState { // 5) Owner -> Nonce (u16) let nonces = Self::fetch_nonces(&conn)?; - //CommitteeId -> ClusterId. It's populated in the loop that populates the multi-index maps - let mut clusters_by_committee_id = HashMap::new(); - // Second phase: Populate all in memory stores with data; let mut shares_multi: ShareMultiIndexMap = MultiIndexMap::new(); let mut metadata_multi: MetadataMultiIndexMap = MultiIndexMap::new(); @@ -94,8 +91,6 @@ impl NetworkState { validator.clone(), ); - clusters_by_committee_id.insert(cluster.committee_id(), *cluster_id); - // Process this validators shares if let Some(share_map) = &share_map { if let Some(shares) = share_map.get(cluster_id) { From 0e73245dbeb60f36c56af1e12c4f89985e745a41 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 11 Mar 2025 15:58:51 +0100 Subject: [PATCH 15/19] add comment --- anchor/common/qbft/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index 85d6ea005..317b783f3 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -245,6 +245,7 @@ where // The rest of the verification only pertains to messages with one signature if wrapped_msg.signed_message.operator_ids().len() != 1 { + // The message validator already checked this is a decided message. // Do not care about data here, just that we had a success let valid_data = Some(ValidData::new(None, wrapped_msg.qbft_message.root)); return Some((valid_data, OperatorId::from(0))); From a52f6f252a1bc25ef1e3a5dafae0e34a375241c8 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 11 Mar 2025 16:19:41 +0100 Subject: [PATCH 16/19] improve comment for decided msg --- anchor/common/qbft/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index 317b783f3..279f3cfc2 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -244,8 +244,8 @@ where } // The rest of the verification only pertains to messages with one signature - if wrapped_msg.signed_message.operator_ids().len() != 1 { - // The message validator already checked this is a decided message. + if wrapped_msg.signed_message.operator_ids().len() > 1 { + // The message validator already checked this is a decided message (a commit message with > 1 signers). // Do not care about data here, just that we had a success let valid_data = Some(ValidData::new(None, wrapped_msg.qbft_message.root)); return Some((valid_data, OperatorId::from(0))); From fab2f2585cde918196dc70f08bf72dfa521e839e Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 11 Mar 2025 16:29:58 +0100 Subject: [PATCH 17/19] remove redundant root hash check --- anchor/common/qbft/src/lib.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index 279f3cfc2..5e8f6027a 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -414,12 +414,6 @@ where return; } - // Verify that the data root matches what was in the message - if valid_data.hash != wrapped_msg.qbft_message.root { - warn!(from = ?operator_id, self=?self.config.operator_id(), "Data roots do not match"); - return; - } - // Fulldata is included in propose messages let data = match valid_data.data { Some(data) => data, From 75fe7441fff3dc3a4ba15287032daa6ef6003112 Mon Sep 17 00:00:00 2001 From: Josh King Date: Wed, 12 Mar 2025 21:13:05 +1100 Subject: [PATCH 18/19] small typos --- anchor/message_validator/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 305b514ed..734030200 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -291,12 +291,12 @@ impl Validator { return Err(ValidationFailure::UnexpectedConsensusMessage); } - let màx_round = match consensus_message.max_round() { + let max_round = match consensus_message.max_round() { Some(max_round) => max_round, None => return Err(ValidationFailure::FailedToGetMaxRound), }; - if consensus_message.round > màx_round { + if consensus_message.round > max_round { return Err(ValidationFailure::RoundTooHigh); } From f4806eff10172c777f285db3317fcb5d742d4404 Mon Sep 17 00:00:00 2001 From: jking-aus <72330194+jking-aus@users.noreply.github.com> Date: Thu, 13 Mar 2025 09:53:35 +1100 Subject: [PATCH 19/19] Update consensus.rs --- anchor/common/ssv_types/src/consensus.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/anchor/common/ssv_types/src/consensus.rs b/anchor/common/ssv_types/src/consensus.rs index d1f2c82fa..2c5f09fe8 100644 --- a/anchor/common/ssv_types/src/consensus.rs +++ b/anchor/common/ssv_types/src/consensus.rs @@ -62,8 +62,8 @@ pub struct QbftMessage { impl QbftMessage { pub fn max_round(&self) -> Option { self.identifier.role().and_then(|role| match role { - Role::Committee | Role::Aggregator => Some(12), - Role::Proposer | Role::SyncCommittee => Some(6), + Role::Committee | Role::Aggregator => Some(12), // TODO: confirm max_round with ssvlabs + Role::Proposer | Role::SyncCommittee => Some(6), // as per https://github.com/ssvlabs/ssv/blob/main/message/validation/consensus_validation.go#L370 _ => None, }) }