diff --git a/Cargo.lock b/Cargo.lock index a854c5665..f1ad6f2ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5162,13 +5162,21 @@ dependencies = [ name = "message_validator" version = "0.1.0" dependencies = [ + "async-channel 1.9.0", + "bls", + "database", "ethereum_ssz", + "hex", "libp2p", + "once_cell", "processor", + "sha2 0.10.8", "ssv_types", + "task_executor", "thiserror 2.0.11", "tokio", "tracing", + "types", ] [[package]] diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index d8bdd48a5..fba14ce75 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -10,7 +10,7 @@ use beacon_node_fallback::{ }; pub use cli::Node; use config::Config; -use database::NetworkDatabase; +use database::{NetworkDatabase, WatchableNetworkState}; use eth2::reqwest::{Certificate, ClientBuilder}; use eth2::{BeaconNodeHttpClient, Timeouts}; use message_receiver::ManagerMessageReceiver; @@ -367,7 +367,8 @@ impl Client { network::SUBNET_COUNT, )?; - let message_validator = Validator::new(); + let message_validator = + Validator::new(Arc::new(WatchableNetworkState::new(database.watch()))); // Create the signature collector let signature_collector = SignatureCollectorManager::new( diff --git a/anchor/common/qbft/src/lib.rs b/anchor/common/qbft/src/lib.rs index 133f57552..5e8f6027a 100644 --- a/anchor/common/qbft/src/lib.rs +++ b/anchor/common/qbft/src/lib.rs @@ -213,12 +213,6 @@ where &self, wrapped_msg: &WrappedQbftMessage, ) -> Option<(Option>, OperatorId)> { - // Validate the qbft message - if !wrapped_msg.qbft_message.validate() { - warn!("Invalid qbft_message"); - return None; - } - // Ensure that this message is for the correct round let current_round = self.current_round.get(); if (wrapped_msg.qbft_message.round < current_round as u64) @@ -250,22 +244,11 @@ where } // The rest of the verification only pertains to messages with one signature - if wrapped_msg.signed_message.operator_ids().len() != 1 { - // If there is more than one signer, we also have to check if this is a decided message. - if matches!( - wrapped_msg.qbft_message.qbft_message_type, - QbftMessageType::Commit - ) { - // Do not care about data here, just that we had a success - let valid_data = Some(ValidData::new(None, wrapped_msg.qbft_message.root)); - return Some((valid_data, OperatorId::from(0))); - } - // Otherwise, this is invalid data - warn!( - num_signers = wrapped_msg.signed_message.operator_ids().len(), - "Message only allows one signer" - ); - return None; + if wrapped_msg.signed_message.operator_ids().len() > 1 { + // The message validator already checked this is a decided message (a commit message with > 1 signers). + // Do not care about data here, just that we had a success + let valid_data = Some(ValidData::new(None, wrapped_msg.qbft_message.root)); + return Some((valid_data, OperatorId::from(0))); } // Message is not a decide message, we know there is only one signer @@ -431,12 +414,6 @@ where return; } - // Verify that the data root matches what was in the message - if valid_data.hash != wrapped_msg.qbft_message.root { - warn!(from = ?operator_id, self=?self.config.operator_id(), "Data roots do not match"); - return; - } - // Fulldata is included in propose messages let data = match valid_data.data { Some(data) => data, diff --git a/anchor/common/ssv_types/src/consensus.rs b/anchor/common/ssv_types/src/consensus.rs index ac238e0f0..2c5f09fe8 100644 --- a/anchor/common/ssv_types/src/consensus.rs +++ b/anchor/common/ssv_types/src/consensus.rs @@ -1,5 +1,5 @@ use crate::message::*; -use crate::msgid::MessageId; +use crate::msgid::{MessageId, Role}; use crate::ValidatorIndex; use sha2::{Digest, Sha256}; use ssz::{Decode, DecodeError, Encode}; @@ -60,12 +60,12 @@ pub struct QbftMessage { } impl QbftMessage { - /// Do QBFTMessage specific validation - pub fn validate(&self) -> bool { - if self.qbft_message_type > QbftMessageType::RoundChange { - return false; - } - true + pub fn max_round(&self) -> Option { + self.identifier.role().and_then(|role| match role { + Role::Committee | Role::Aggregator => Some(12), // TODO: confirm max_round with ssvlabs + Role::Proposer | Role::SyncCommittee => Some(6), // as per https://github.com/ssvlabs/ssv/blob/main/message/validation/consensus_validation.go#L370 + _ => None, + }) } } diff --git a/anchor/common/ssv_types/src/lib.rs b/anchor/common/ssv_types/src/lib.rs index 7e84c4b87..17b6ee749 100644 --- a/anchor/common/ssv_types/src/lib.rs +++ b/anchor/common/ssv_types/src/lib.rs @@ -15,4 +15,6 @@ mod share; mod sql_conversions; mod util; +pub use indexmap::IndexSet; pub use share::ENCRYPTED_KEY_LENGTH; +pub use types::Slot; diff --git a/anchor/database/src/cluster_operations.rs b/anchor/database/src/cluster_operations.rs index fcfafcab9..04fe5b798 100644 --- a/anchor/database/src/cluster_operations.rs +++ b/anchor/database/src/cluster_operations.rs @@ -58,27 +58,30 @@ impl NetworkDatabase { // Save the keyshare state.multi_state.shares.insert( - &validator.public_key, // The validator this keyshare belongs to - &cluster.cluster_id, // The id of the cluster - &cluster.owner, // The owner of the cluster - share.to_owned(), // The keyshare itself + &validator.public_key, // The validator this keyshare belongs to + &cluster.cluster_id, // The id of the cluster + &cluster.owner, // The owner of the cluster + &cluster.committee_id(), // The committee id of the cluster + share.to_owned(), // The keyshare itself ); } // Save all cluster related information state.multi_state.clusters.insert( - &cluster.cluster_id, // The id of the cluster - &validator.public_key, // The public key of validator added to the cluster - &cluster.owner, // Owner of the cluster - cluster.to_owned(), // The Cluster and all containing information + &cluster.cluster_id, // The id of the cluster + &validator.public_key, // The public key of validator added to the cluster + &cluster.owner, // Owner of the cluster + &cluster.committee_id(), // The committee id of the cluster + cluster.to_owned(), // The Cluster and all containing information ); // Save the metadata for the validators state.multi_state.validator_metadata.insert( - &validator.public_key, // The public key of the validator - &cluster.cluster_id, // The id of the cluster the validator belongs to - &cluster.owner, // The owner of the cluster - validator.to_owned(), // The metadata of the validator + &validator.public_key, // The public key of the validator + &cluster.cluster_id, // The id of the cluster the validator belongs to + &cluster.owner, // The owner of the cluster + &cluster.committee_id(), // The committee id of the cluster + validator.to_owned(), // The metadata of the validator ); }); diff --git a/anchor/database/src/lib.rs b/anchor/database/src/lib.rs index abcaa8c46..920bcc4ea 100644 --- a/anchor/database/src/lib.rs +++ b/anchor/database/src/lib.rs @@ -1,7 +1,7 @@ use openssl::{pkey::Public, rsa::Rsa}; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::params; -use ssv_types::{Cluster, ClusterId, Operator, OperatorId, Share, ValidatorMetadata}; +use ssv_types::{Cluster, ClusterId, CommitteeId, Operator, OperatorId, Share, ValidatorMetadata}; use std::collections::{HashMap, HashSet}; use std::fs::File; use std::path::Path; @@ -13,6 +13,7 @@ use types::{Address, PublicKeyBytes}; pub use crate::error::DatabaseError; pub use crate::multi_index::{MultiIndexMap, *}; use crate::sql_operations::{SqlStatement, SQL}; +pub use crate::state::{NetworkStateService, WatchableNetworkState}; mod cluster_operations; mod error; @@ -37,8 +38,16 @@ type PoolConn = r2d2::PooledConnection; /// Primary: public key of validator. uniquely identifies share /// Secondary: cluster id. corresponds to a list of shares /// Tertiary: owner of the cluster. corresponds to a list of shares -pub(crate) type ShareMultiIndexMap = - MultiIndexMap; +pub(crate) type ShareMultiIndexMap = MultiIndexMap< + PublicKeyBytes, + ClusterId, + Address, + CommitteeId, + Share, + NonUniqueTag, + NonUniqueTag, + NonUniqueTag, +>; /// Metadata for all validators in the network /// Primary: public key of the validator. uniquely identifies the metadata /// Secondary: cluster id. corresponds to list of metadata for all validators @@ -47,16 +56,26 @@ pub(crate) type MetadataMultiIndexMap = MultiIndexMap< PublicKeyBytes, ClusterId, Address, + CommitteeId, ValidatorMetadata, NonUniqueTag, NonUniqueTag, + NonUniqueTag, >; /// All of the clusters in the network /// Primary: cluster id. uniquely identifies a cluster /// Secondary: public key of the validator. uniquely identifies a cluster /// Tertiary: owner of the cluster. uniquely identifies a cluster -pub(crate) type ClusterMultiIndexMap = - MultiIndexMap; +pub(crate) type ClusterMultiIndexMap = MultiIndexMap< + ClusterId, + PublicKeyBytes, + Address, + CommitteeId, + Cluster, + UniqueTag, + UniqueTag, + NonUniqueTag, +>; // Information that needs to be accessed via multiple different indicies #[derive(Debug)] @@ -64,6 +83,7 @@ struct MultiState { shares: ShareMultiIndexMap, validator_metadata: MetadataMultiIndexMap, clusters: ClusterMultiIndexMap, + // Be careful when adding new maps here. If you really must to, it must be updated in the operations files } // General information that can be single index access diff --git a/anchor/database/src/multi_index.rs b/anchor/database/src/multi_index.rs index b119167fb..c2823303d 100644 --- a/anchor/database/src/multi_index.rs +++ b/anchor/database/src/multi_index.rs @@ -2,18 +2,19 @@ use std::collections::HashMap; use std::hash::Hash; use std::marker::PhantomData; -/// Marker trait for uniquely identifying indicies +/// Marker trait for uniquely identifying indices pub trait Unique {} -/// Marker trait for non-uniquely identifiying indicies +/// Marker trait for non-uniquely identifying indices pub trait NotUnique {} /// Index type markers pub enum Primary {} pub enum Secondary {} pub enum Tertiary {} +pub enum Quaternary {} -// Type tags markers +/// Type tags markers #[derive(Debug)] pub enum UniqueTag {} impl Unique for UniqueTag {} @@ -32,54 +33,69 @@ pub trait NonUniqueIndex { fn get_all_by(&self, key: &K) -> Option>; } +/// Inner storage maps for the multi-index map, now supporting a quaternary index. +/// - K1: Primary key type (always unique) +/// - K2: Secondary key type +/// - K3: Tertiary key type +/// - K4: Quaternary key type +/// - V: Value type #[derive(Debug)] -struct InnerMaps +struct InnerMaps where K1: Eq + Hash, K2: Eq + Hash, K3: Eq + Hash, + K4: Eq + Hash, { primary: HashMap, secondary_unique: HashMap, secondary_multi: HashMap>, tertiary_unique: HashMap, tertiary_multi: HashMap>, + quaternary_unique: HashMap, + quaternary_multi: HashMap>, } -/// A concurrent multi-index map that supports up to three different access patterns. -/// The core differentiates between unique identification and non unique identification. The primary -/// index is forced to always uniquely identify the value. The secondary and tertiary indicies have -/// more flexibility. The key may non uniquely identify many different values, or uniquely identify -/// a single value +/// A concurrent multi-index map that supports up to four different access patterns. +/// The core differentiates between unique identification and non-unique identification. +/// The primary index is forced to always uniquely identify the value. The secondary, tertiary, +/// and quaternary indices have more flexibility. A key may non-uniquely identify many values, +/// or uniquely identify a single value. /// -/// Example: A share is uniquely identified by the Validators public key that it belongs too. A -/// ClusterId does not uniquely identify a share as a cluster contains multiple shares +/// Example: A share might be uniquely identified by a primary key (like a Validators public key) +/// while a secondary or tertiary index (like a ClusterId) does not uniquely identify a share. The +/// new quaternary index provides an additional access pattern. /// /// - K1: Primary key type (always unique) /// - K2: Secondary key type /// - K3: Tertiary key type +/// - K4: Quaternary key type /// - V: Value type /// - U1: Secondary index uniqueness (Unique or NotUnique) /// - U2: Tertiary index uniqueness (Unique or NotUnique) +/// - U3: Quaternary index uniqueness (Unique or NotUnique) #[derive(Debug)] -pub struct MultiIndexMap +pub struct MultiIndexMap where K1: Eq + Hash, K2: Eq + Hash, K3: Eq + Hash, + K4: Eq + Hash, { - maps: InnerMaps, - _marker: PhantomData<(U1, U2)>, + maps: InnerMaps, + _marker: PhantomData<(U1, U2, U3)>, } -impl Default for MultiIndexMap +impl Default for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: 'static, U2: 'static, + U3: 'static, { fn default() -> Self { Self { @@ -89,22 +105,26 @@ where secondary_multi: HashMap::new(), tertiary_unique: HashMap::new(), tertiary_multi: HashMap::new(), + quaternary_unique: HashMap::new(), + quaternary_multi: HashMap::new(), }, _marker: PhantomData, } } } -impl MultiIndexMap +impl MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: 'static, U2: 'static, + U3: 'static, { - /// Creates a new empty MultiIndexMap + /// Creates a new empty MultiIndexMap. pub fn new() -> Self { Self { maps: InnerMaps { @@ -113,18 +133,22 @@ where secondary_multi: HashMap::new(), tertiary_unique: HashMap::new(), tertiary_multi: HashMap::new(), + quaternary_unique: HashMap::new(), + quaternary_multi: HashMap::new(), }, _marker: PhantomData, } } - /// Number of entires in the primary map + /// Returns the number of entries in the primary map. pub fn length(&self) -> usize { self.maps.primary.len() } - /// Insert a new value and associated keys into the map - pub fn insert(&mut self, k1: &K1, k2: &K2, k3: &K3, v: V) { + /// Inserts a new value and associated keys into the map. + /// Inserts the primary key and value first, then updates the secondary, tertiary, + /// and quaternary indices based on their uniqueness. + pub fn insert(&mut self, k1: &K1, k2: &K2, k3: &K3, k4: &K4, v: V) { // Insert into primary map first self.maps.primary.insert(k1.clone(), v); @@ -135,7 +159,7 @@ where self.maps .secondary_multi .entry(k2.clone()) - .and_modify(|v| v.push(k1.clone())) + .and_modify(|vec| vec.push(k1.clone())) .or_insert_with(|| vec![k1.clone()]); } @@ -146,12 +170,23 @@ where self.maps .tertiary_multi .entry(k3.clone()) - .and_modify(|v| v.push(k1.clone())) + .and_modify(|vec| vec.push(k1.clone())) + .or_insert_with(|| vec![k1.clone()]); + } + + // Handle quaternary index based on uniqueness + if std::any::TypeId::of::() == std::any::TypeId::of::() { + self.maps.quaternary_unique.insert(k4.clone(), k1.clone()); + } else { + self.maps + .quaternary_multi + .entry(k4.clone()) + .and_modify(|vec| vec.push(k1.clone())) .or_insert_with(|| vec![k1.clone()]); } } - /// Remove a value and all its indexes using the primary key + /// Removes a value and all its indexes using the primary key. pub fn remove(&mut self, k1: &K1) -> Option { // Remove from primary storage let removed = self.maps.primary.remove(k1)?; @@ -162,9 +197,9 @@ where self.maps.secondary_unique.retain(|_, v| v != k1); } else { // For non-unique indexes, remove k1 from any vectors it appears in - self.maps.secondary_multi.retain(|_, v| { - v.retain(|x| x != k1); - !v.is_empty() + self.maps.secondary_multi.retain(|_, vec| { + vec.retain(|x| x != k1); + !vec.is_empty() }); } @@ -174,17 +209,27 @@ where self.maps.tertiary_unique.retain(|_, v| v != k1); } else { // For non-unique indexes, remove k1 from any vectors it appears in - self.maps.tertiary_multi.retain(|_, v| { - v.retain(|x| x != k1); - !v.is_empty() + self.maps.tertiary_multi.retain(|_, vec| { + vec.retain(|x| x != k1); + !vec.is_empty() + }); + } + + // Remove from quaternary index + if std::any::TypeId::of::() == std::any::TypeId::of::() { + self.maps.quaternary_unique.retain(|_, v| v != k1); + } else { + self.maps.quaternary_multi.retain(|_, vec| { + vec.retain(|x| x != k1); + !vec.is_empty() }); } Some(removed) } - /// Update an existing value using the primary key - /// Only updates if the primary key exists, indexes remain unchanged + /// Updates an existing value using the primary key. + /// Only updates if the primary key exists; indexes remain unchanged. pub fn update(&mut self, k1: &K1, new_value: V) -> Option { if !self.maps.primary.contains_key(k1) { return None; @@ -195,12 +240,14 @@ where } } -// Implement unique access for primary key -impl UniqueIndex for MultiIndexMap +// Implement unique access for primary key. +impl UniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, { fn get_by(&self, key: &K1) -> Option { @@ -208,12 +255,14 @@ where } } -// Implement unique access for secondary key -impl UniqueIndex for MultiIndexMap +// Implement unique access for secondary key. +impl UniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: Unique, { @@ -223,13 +272,14 @@ where } } -// Implement non-unique access for secondary key -impl NonUniqueIndex - for MultiIndexMap +// Implement non-unique access for secondary key. +impl NonUniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U1: NotUnique, { @@ -242,12 +292,14 @@ where } } -// Implement unique access for tertiary key -impl UniqueIndex for MultiIndexMap +// Implement unique access for tertiary key. +impl UniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U2: Unique, { @@ -257,12 +309,14 @@ where } } -// Implement non-unique access for tertiary key -impl NonUniqueIndex for MultiIndexMap +// Implement non-unique access for tertiary key. +impl NonUniqueIndex + for MultiIndexMap where K1: Eq + Hash + Clone, K2: Eq + Hash + Clone, K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, V: Clone, U2: NotUnique, { @@ -275,6 +329,43 @@ where } } +// Implement unique access for quaternary key. +impl UniqueIndex + for MultiIndexMap +where + K1: Eq + Hash + Clone, + K2: Eq + Hash + Clone, + K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, + V: Clone, + U3: Unique, +{ + fn get_by(&self, key: &K4) -> Option { + let primary_key = self.maps.quaternary_unique.get(key)?; + self.maps.primary.get(primary_key).cloned() + } +} + +// Implement non-unique access for quaternary key. +impl NonUniqueIndex + for MultiIndexMap +where + K1: Eq + Hash + Clone, + K2: Eq + Hash + Clone, + K3: Eq + Hash + Clone, + K4: Eq + Hash + Clone, + V: Clone, + U3: NotUnique, +{ + fn get_all_by(&self, key: &K4) -> Option> { + self.maps.quaternary_multi.get(key).map(|keys| { + keys.iter() + .filter_map(|k1| self.maps.primary.get(k1).cloned()) + .collect() + }) + } +} + #[cfg(test)] mod multi_index_tests { use super::*; @@ -287,16 +378,25 @@ mod multi_index_tests { #[test] fn test_basic_operations() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + // Using unique indices for all secondary, tertiary, and quaternary keys. + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + UniqueTag, + UniqueTag, + UniqueTag, + > = MultiIndexMap::new(); let value = TestValue { id: 1, data: "test".to_string(), }; - // Test insertion - map.insert(&1, &"key1".to_string(), &true, value.clone()); + // Test insertion with quaternary key 'a' + map.insert(&1, &"key1".to_string(), &true, &'a', value.clone()); // Test primary key access assert_eq!(map.get_by(&1), Some(value.clone())); @@ -307,6 +407,9 @@ mod multi_index_tests { // Test tertiary key access assert_eq!(map.get_by(&true), Some(value.clone())); + // Test quaternary key access + assert_eq!(map.get_by(&'a'), Some(value.clone())); + // Test update let new_value = TestValue { id: 1, @@ -315,17 +418,27 @@ mod multi_index_tests { map.update(&1, new_value.clone()); assert_eq!(map.get_by(&1), Some(new_value.clone())); - // Test removal + // Test removal: all indices should be cleaned up assert_eq!(map.remove(&1), Some(new_value.clone())); assert_eq!(map.get_by(&1), None); assert_eq!(map.get_by(&"key1".to_string()), None); assert_eq!(map.get_by(&true), None); + assert_eq!(map.get_by(&'a'), None); } #[test] fn test_non_unique_indices() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + // Using non-unique indices for all secondary, tertiary, and quaternary keys. + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + NonUniqueTag, + NonUniqueTag, + NonUniqueTag, + > = MultiIndexMap::new(); let value1 = TestValue { id: 1, @@ -336,9 +449,9 @@ mod multi_index_tests { data: "test2".to_string(), }; - // Insert multiple values with same secondary and tertiary keys - map.insert(&1, &"shared_key".to_string(), &true, value1.clone()); - map.insert(&2, &"shared_key".to_string(), &true, value2.clone()); + // Insert multiple values with same secondary, tertiary, and quaternary keys. + map.insert(&1, &"shared_key".to_string(), &true, &'z', value1.clone()); + map.insert(&2, &"shared_key".to_string(), &true, &'z', value2.clone()); // Test primary key access (still unique) assert_eq!(map.get_by(&1), Some(value1.clone())); @@ -356,6 +469,12 @@ mod multi_index_tests { assert!(tertiary_values.contains(&value1)); assert!(tertiary_values.contains(&value2)); + // Test quaternary key access (non-unique) + let quaternary_values = map.get_all_by(&'z').unwrap(); + assert_eq!(quaternary_values.len(), 2); + assert!(quaternary_values.contains(&value1)); + assert!(quaternary_values.contains(&value2)); + // Test removal maintains other entries map.remove(&1); assert_eq!(map.get_by(&1), None); @@ -368,8 +487,17 @@ mod multi_index_tests { #[test] fn test_mixed_uniqueness() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + // Mixed: unique secondary, non-unique tertiary, unique quaternary. + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + UniqueTag, + NonUniqueTag, + UniqueTag, + > = MultiIndexMap::new(); let value1 = TestValue { id: 1, @@ -380,9 +508,9 @@ mod multi_index_tests { data: "test2".to_string(), }; - // Insert values with unique secondary key but shared tertiary key - map.insert(&1, &"key1".to_string(), &true, value1.clone()); - map.insert(&2, &"key2".to_string(), &true, value2.clone()); + // Insert values with unique secondary keys but shared tertiary and different quaternary keys. + map.insert(&1, &"key1".to_string(), &true, &'q', value1.clone()); + map.insert(&2, &"key2".to_string(), &true, &'r', value2.clone()); // Test unique secondary key access assert_eq!(map.get_by(&"key1".to_string()), Some(value1.clone())); @@ -393,17 +521,30 @@ mod multi_index_tests { assert_eq!(tertiary_values.len(), 2); assert!(tertiary_values.contains(&value1)); assert!(tertiary_values.contains(&value2)); + + // Test unique quaternary key access + assert_eq!(map.get_by(&'q'), Some(value1.clone())); + assert_eq!(map.get_by(&'r'), Some(value2.clone())); } #[test] fn test_empty_cases() { - let mut map: MultiIndexMap = - MultiIndexMap::new(); + let mut map: MultiIndexMap< + i32, + String, + bool, + char, + TestValue, + UniqueTag, + UniqueTag, + UniqueTag, + > = MultiIndexMap::new(); // Test access on empty map assert_eq!(map.get_by(&1), None); assert_eq!(map.get_by(&"key".to_string()), None); assert_eq!(map.get_by(&true), None); + assert_eq!(map.get_by(&'x'), None); // Test remove on empty map assert_eq!(map.remove(&1), None); diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index 9f18393e4..1cf216998 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -1,4 +1,6 @@ -use crate::{ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, ShareMultiIndexMap}; +use crate::{ + ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, NonUniqueIndex, ShareMultiIndexMap, +}; use crate::{DatabaseError, NetworkState, Pool, PoolConn}; use crate::{MultiState, SingleState}; use crate::{SqlStatement, SQL}; @@ -8,12 +10,18 @@ use openssl::rsa::Rsa; use rusqlite::{params, OptionalExtension}; use rusqlite::{types::Type, Error as SqlError}; use ssv_types::{ - Cluster, ClusterId, ClusterMember, Operator, OperatorId, Share, ValidatorMetadata, + Cluster, ClusterId, ClusterMember, CommitteeId, IndexSet, Operator, OperatorId, Share, + ValidatorMetadata, }; use std::collections::{HashMap, HashSet}; use std::str::FromStr; +use tokio::sync::watch; use types::Address; +pub trait NetworkStateService: Send + Sync { + fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option>; +} + impl NetworkState { /// Build the network state from the database data pub(crate) fn new_with_state( @@ -72,12 +80,14 @@ impl NetworkState { cluster_id, &validator.public_key, &cluster.owner, + &cluster.committee_id(), cluster.clone(), ); metadata_multi.insert( &validator.public_key, cluster_id, &cluster.owner, + &cluster.committee_id(), validator.clone(), ); @@ -90,6 +100,7 @@ impl NetworkState { &validator.public_key, cluster_id, &cluster.owner, + &cluster.committee_id(), share.clone(), ); } @@ -251,6 +262,7 @@ impl NetworkState { pub fn clusters(&self) -> &ClusterMultiIndexMap { &self.multi_state.clusters } + /// Get the ID of our Operator if it exists pub fn get_own_id(&self) -> Option { self.single_state.id @@ -281,3 +293,25 @@ impl NetworkState { self.single_state.last_processed_block } } + +pub struct WatchableNetworkState { + state_rx: watch::Receiver, +} + +impl WatchableNetworkState { + pub fn new(state_rx: watch::Receiver) -> Self { + Self { state_rx } + } +} + +impl NetworkStateService for WatchableNetworkState { + fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option> { + let db_state = self.state_rx.borrow(); + db_state + .multi_state + .clusters + .get_all_by(committee_id) + .and_then(|clusters| clusters.first().cloned()) + .map(|cluster| cluster.cluster_members) + } +} diff --git a/anchor/message_validator/Cargo.toml b/anchor/message_validator/Cargo.toml index 2aa84e03d..1b4f0a4a8 100644 --- a/anchor/message_validator/Cargo.toml +++ b/anchor/message_validator/Cargo.toml @@ -5,12 +5,22 @@ edition = { workspace = true } authors = ["Sigma Prime "] [dependencies] +database = { workspace = true } ethereum_ssz = { workspace = true } +hex = { workspace = true } libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16", default-features = false, features = [ "gossipsub", ] } processor = { workspace = true } +sha2 = { workspace = true } ssv_types = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } + +[dev-dependencies] +async-channel = { workspace = true } +bls = { workspace = true } +once_cell = "1.20.3" +task_executor = { workspace = true } +types = { workspace = true } diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 4e2bdbce1..a2a8310a1 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -1,9 +1,13 @@ +use database::NetworkStateService; use libp2p::gossipsub::MessageAcceptance; -use ssv_types::consensus::QbftMessage; +use sha2::{Digest, Sha256}; +use ssv_types::consensus::{QbftMessage, QbftMessageType}; use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage}; +use ssv_types::msgid::{DutyExecutor, Role}; use ssv_types::partial_sig::PartialSignatureMessages; use ssz::Decode; -use tracing::{error, trace}; +use std::sync::Arc; +use tracing::{error, trace, warn}; // TODO taken from go-SSV as rough guidance. feel free to adjust as needed. https://github.com/ssvlabs/ssv/blob/e12abf7dfbbd068b99612fa2ebbe7e3372e57280/message/validation/errors.go#L55 #[derive(Debug)] @@ -27,7 +31,7 @@ pub enum ValidationFailure { NoDuty, EstimatedRoundNotInAllowedSpread, EmptyData, - MismatchedIdentifier, + MismatchedIdentifier { got: String, want: String }, SignatureVerification, PubSubMessageHasNoData, MalformedPubSubMessage, @@ -52,8 +56,8 @@ pub enum ValidationFailure { UnknownQBFTMessageType, InvalidPartialSignatureType, PartialSignatureTypeRoleMismatch, - NonDecidedWithMultipleSigners, - DecidedNotEnoughSigners, + NonDecidedWithMultipleSigners { got: usize, want: usize }, + DecidedNotEnoughSigners { got: usize, want: usize }, DifferentProposalData, MalformedPrepareJustifications, UnexpectedPrepareJustifications, @@ -72,6 +76,7 @@ pub enum ValidationFailure { InvalidPartialSignatureTypeCount, TooManyPartialSignatureMessages, EncodeOperators, + FailedToGetMaxRound, } impl From<&ValidationFailure> for MessageAcceptance { @@ -127,39 +132,170 @@ pub enum Error { Processor(#[from] ::processor::Error), } -pub struct Validator; +pub struct Validator { + network_state_service: Arc, +} pub trait ValidatorService: Send + Sync { fn validate(&self, message_data: Vec) -> Result; } impl Validator { - // we will need more parameters in a PR that is merged soon - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - Self - } - - fn do_validate(&self, _message: &SignedSSVMessage) -> Result<(), ValidationFailure> { - Ok(()) + pub fn new(network_state_service: Arc) -> Self { + Self { + network_state_service, + } } fn validate_ssv_message( &self, - ssv_message: &SSVMessage, + signed_ssv_message: &SignedSSVMessage, ) -> Result { + let ssv_message = signed_ssv_message.ssv_message(); match ssv_message.msg_type() { - MsgType::SSVConsensusMsgType => QbftMessage::from_ssz_bytes(ssv_message.data()) - .ok() - .map(ValidatedSSVMessage::QbftMessage) - .ok_or(ValidationFailure::UndecodableMessageData), - MsgType::SSVPartialSignatureMsgType => { - PartialSignatureMessages::from_ssz_bytes(ssv_message.data()) + MsgType::SSVConsensusMsgType => { + let consensus_message = QbftMessage::from_ssz_bytes(ssv_message.data()) .ok() - .map(ValidatedSSVMessage::PartialSignatureMessages) - .ok_or(ValidationFailure::UndecodableMessageData) + .ok_or(ValidationFailure::UndecodableMessageData)?; + self.validate_consensus_message_semantics(signed_ssv_message, &consensus_message)?; + Ok(ValidatedSSVMessage::QbftMessage(consensus_message)) + } + MsgType::SSVPartialSignatureMsgType => { + self.validate_partial_signature_message(ssv_message) + } + } + } + + fn validate_partial_signature_message( + &self, + ssv_message: &SSVMessage, + ) -> Result { + let messages = match PartialSignatureMessages::from_ssz_bytes(ssv_message.data()) { + Ok(msgs) => msgs, + Err(_) => return Err(ValidationFailure::UndecodableMessageData), + }; + + Ok(ValidatedSSVMessage::PartialSignatureMessages(messages)) + } + + fn validate_consensus_message_semantics( + &self, + signed_ssv_message: &SignedSSVMessage, + consensus_message: &QbftMessage, + ) -> Result<(), ValidationFailure> { + let signers = signed_ssv_message.operator_ids().len(); + + let committee_id = match signed_ssv_message.ssv_message().msg_id().duty_executor() { + Some(DutyExecutor::Committee(id)) => id, + _ => return Err(ValidationFailure::NonExistentCommitteeID), + }; + + let committee_members = match self + .network_state_service + .get_cluster_members(&committee_id) + { + Some(committee_members) => { + if committee_members.is_empty() { + warn!(?committee_id, "Unexpected empty committee members"); + return Err(ValidationFailure::NonExistentCommitteeID); + } + committee_members + } + None => return Err(ValidationFailure::NonExistentCommitteeID), + }; + + let quorum_size = compute_quorum_size(committee_members.len()); + let msg_type = consensus_message.qbft_message_type; + + if signers > 1 { + // Rule: Decided msg with different type than Commit + if msg_type != QbftMessageType::Commit { + return Err(ValidationFailure::NonDecidedWithMultipleSigners { + got: signers, + want: 1, + }); + } + + // Rule: Number of signers must be >= quorum size + if signers < quorum_size { + return Err(ValidationFailure::DecidedNotEnoughSigners { + got: signers, + want: quorum_size, + }); + } + } + + if !signed_ssv_message.full_data().is_empty() { + // Rule: Prepare or commit messages must not have full data + if msg_type == QbftMessageType::Prepare + || (msg_type == QbftMessageType::Commit && signers == 1) + { + return Err(ValidationFailure::PrepareOrCommitWithFullData); + } + + let hashed_full_data = hash_data_root(signed_ssv_message.full_data()); + // Rule: Full data hash must match root + if hashed_full_data != consensus_message.root { + return Err(ValidationFailure::InvalidHash); } } + + if consensus_message.round == 0 { + return Err(ValidationFailure::ZeroRound); + } + + // Rule: Duty role has consensus (true except for ValidatorRegistration and VoluntaryExit) + if matches!( + signed_ssv_message.ssv_message().msg_id().role(), + Some(Role::ValidatorRegistration) | Some(Role::VoluntaryExit) + ) { + return Err(ValidationFailure::UnexpectedConsensusMessage); + } + + let max_round = match consensus_message.max_round() { + Some(max_round) => max_round, + None => return Err(ValidationFailure::FailedToGetMaxRound), + }; + + if consensus_message.round > max_round { + return Err(ValidationFailure::RoundTooHigh); + } + + // Rule: consensus message must have the same identifier as the ssv message's identifier + if consensus_message.identifier != *signed_ssv_message.ssv_message().msg_id() { + return Err(ValidationFailure::MismatchedIdentifier { + got: hex::encode(&consensus_message.identifier), + want: hex::encode(signed_ssv_message.ssv_message().msg_id()), + }); + } + + self.validate_justifications(consensus_message)?; + + Ok(()) + } + + fn validate_justifications( + &self, + consensus_message: &QbftMessage, + ) -> Result<(), ValidationFailure> { + // Rule: Can only exist for Proposal messages + let prepare_justifications = &consensus_message.prepare_justification; + if !prepare_justifications.is_empty() + && consensus_message.qbft_message_type != QbftMessageType::Proposal + { + return Err(ValidationFailure::UnexpectedPrepareJustifications); + } + + // Rule: Can only exist for Proposal or Round-Change messages + let round_change_justifications = &consensus_message.round_change_justification; + if !round_change_justifications.is_empty() + && consensus_message.qbft_message_type != QbftMessageType::Proposal + && consensus_message.qbft_message_type != QbftMessageType::RoundChange + { + return Err(ValidationFailure::UnexpectedRoundChangeJustifications); + } + + Ok(()) } } @@ -168,8 +304,7 @@ impl ValidatorService for Validator { match SignedSSVMessage::from_ssz_bytes(&message_data) { Ok(deserialized_message) => { trace!(msg = ?deserialized_message, "SignedSSVMessage deserialized"); - self.do_validate(&deserialized_message)?; - self.validate_ssv_message(deserialized_message.ssv_message()) + self.validate_ssv_message(&deserialized_message) .map(|validated| ValidatedMessage::new(deserialized_message.clone(), validated)) } Err(error) => { @@ -179,3 +314,465 @@ impl ValidatorService for Validator { } } } + +fn compute_quorum_size(committee_size: usize) -> usize { + let f = get_f(committee_size); + f * 2 + 1 +} + +// # TODO centralize this and the one in the qbft crate +fn get_f(committee_size: usize) -> usize { + (committee_size - 1) / 3 +} + +fn hash_data_root(full_data: &[u8]) -> [u8; 32] { + let mut hasher = Sha256::new(); + hasher.update(full_data); + let hash: [u8; 32] = hasher.finalize().into(); + hash +} + +#[cfg(test)] +mod tests { + use super::*; + use bls::{Hash256, PublicKeyBytes}; + use ssv_types::consensus::{QbftMessage, QbftMessageType}; + use ssv_types::domain_type::DomainType; + use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage, RSA_SIGNATURE_SIZE}; + use ssv_types::msgid::{DutyExecutor, MessageId, Role}; + use ssv_types::{CommitteeId, IndexSet, OperatorId}; + use ssz::Encode; + use std::sync::Arc; + + // Constants for committee sizes in tests to improve readability + const SINGLE_NODE_COMMITTEE: usize = 1; + const FOUR_NODE_COMMITTEE: usize = 4; + const SEVEN_NODE_COMMITTEE: usize = 7; + + struct MockNetworkStateService(usize); + + impl NetworkStateService for MockNetworkStateService { + fn get_cluster_members(&self, _cluster_id: &CommitteeId) -> Option> { + let mut members = IndexSet::new(); + for i in 0..self.0 { + members.insert(OperatorId(i as u64)); + } + Some(members) + } + } + + // Test fixture for setup + struct TestFixture { + validator: Arc, + } + + impl TestFixture { + fn new(committee_size: usize) -> Self { + let validator = Arc::new(Validator::new(Arc::new(MockNetworkStateService( + committee_size, + )))); + Self { validator } + } + + // Helper for common validation pattern + fn validate_message( + &self, + signed_msg: &SignedSSVMessage, + ) -> Result { + self.validator.validate_ssv_message(signed_msg) + } + } + + // Helper functions for message creation + struct MessageBuilder { + msg_id: MessageId, + msg_type: QbftMessageType, + round: u64, + signers: Vec, + signatures: Vec>, + full_data: Vec, + prepare_justification: Vec, + round_change_justification: Vec, + } + + impl MessageBuilder { + fn new(role: Role, msg_type: QbftMessageType) -> Self { + Self { + msg_id: create_message_id_for_test(role), + msg_type, + round: 1, + signers: vec![OperatorId(42)], + signatures: vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + full_data: vec![], + prepare_justification: vec![], + round_change_justification: vec![], + } + } + + fn with_round(mut self, round: u64) -> Self { + self.round = round; + self + } + + fn with_signers(mut self, signers: Vec) -> Self { + // Create matching number of signatures + self.signatures = signers + .iter() + .enumerate() + .map(|(i, _)| { + // Create unique signatures for each signer + vec![0xAA + i as u8; RSA_SIGNATURE_SIZE] + }) + .collect(); + self.signers = signers; + self + } + + fn with_full_data(mut self, data: Vec) -> Self { + self.full_data = data; + self + } + + fn with_prepare_justification(mut self, justifications: Vec) -> Self { + self.prepare_justification = justifications; + self + } + + fn with_round_change_justification( + mut self, + justifications: Vec, + ) -> Self { + self.round_change_justification = justifications; + self + } + + fn build(self) -> SignedSSVMessage { + let qbft_msg = QbftMessage { + qbft_message_type: self.msg_type, + height: 1, + round: self.round, + identifier: self.msg_id.clone(), + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: self.round_change_justification, + prepare_justification: self.prepare_justification, + }; + + let qbft_bytes = qbft_msg.as_ssz_bytes(); + let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, self.msg_id, qbft_bytes) + .expect("SSVMessage should be created"); + + SignedSSVMessage::new(self.signatures, self.signers, ssv_msg, self.full_data) + .expect("SignedSSVMessage should be created") + } + } + + fn create_message_id_for_test(role: Role) -> MessageId { + let domain = DomainType([0, 0, 0, 1]); + let duty_executor = match role { + Role::Committee => DutyExecutor::Committee(CommitteeId([0u8; 32])), + _ => DutyExecutor::Validator(PublicKeyBytes::empty()), + }; + MessageId::new(&domain, role, &duty_executor) + } + + fn dummy_signed_ssv_message_for_justification() -> SignedSSVMessage { + MessageBuilder::new(Role::Proposer, QbftMessageType::Proposal).build() + } + + // Assert helpers for common validation patterns + fn assert_validation_error( + result: Result, + expected_error: F, + error_name: &str, + ) where + F: Fn(&ValidationFailure) -> bool, + { + match result { + Ok(_) => panic!("Expected validation to fail with {}", error_name), + Err(failure) => { + assert!( + expected_error(&failure), + "Expected {} error, got: {:?}", + error_name, + failure + ); + } + } + } + + // --------------------------------------------------------------------- + // Consensus message tests + // --------------------------------------------------------------------- + + #[tokio::test] + async fn test_successful_validation_of_consensus_message_with_single_signer() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Prepare).build(); + + let result = fixture.validate_message(&signed_msg); + assert!( + result.is_ok(), + "Expected a single-signer Prepare consensus message to validate successfully" + ); + + if let Ok(ValidatedSSVMessage::QbftMessage(validated_qbft)) = result { + assert_eq!( + validated_qbft.round, 1, + "Unexpected round in validated QbftMessage" + ); + assert_eq!( + validated_qbft.qbft_message_type, + QbftMessageType::Prepare, + "Unexpected QbftMessageType in validated QbftMessage" + ); + assert_eq!( + validated_qbft.identifier, + create_message_id_for_test(Role::Committee), + "Identifier mismatch after validation" + ); + } else { + panic!("Expected a QbftMessage variant after validation"); + } + } + + #[tokio::test] + async fn test_consensus_message_with_multiple_signers_but_not_commit() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + // Multiple signers are only allowed for Commit messages. + let signers = vec![OperatorId(1), OperatorId(2), OperatorId(3)]; + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Prepare) + .with_signers(signers.clone()) + .build(); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::NonDecidedWithMultipleSigners { got, want } if *got == signers.len() && *want == SINGLE_NODE_COMMITTEE), + "NonDecidedWithMultipleSigners", + ); + } + + #[tokio::test] + async fn test_consensus_message_with_multiple_signers_commit_but_not_enough_signers_for_quorum() + { + let fixture = TestFixture::new(FOUR_NODE_COMMITTEE); + + // For Commit messages with multiple signers, the count must be >= quorum size. + let signers = vec![OperatorId(1), OperatorId(2)]; // Quorum requires at least 3 for a committee of 4. + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_signers(signers.clone()) + .build(); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::DecidedNotEnoughSigners { got, want } if *got == signers.len() && *want == FOUR_NODE_COMMITTEE - 1), + "DecidedNotEnoughSigners", + ); + } + + #[tokio::test] + async fn test_consensus_message_full_data_mismatched_root_hash() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + let full_data = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_full_data(full_data) + .build(); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::PrepareOrCommitWithFullData), + "PrepareOrCommitWithFullData", + ); + } + + #[tokio::test] + async fn test_consensus_message_zero_round_fails() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Proposal) + .with_round(0) + .build(); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::ZeroRound), + "ZeroRound", + ); + } + + #[tokio::test] + async fn test_consensus_message_round_too_high() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Proposal) + .with_round(13) // Too high (max is 12) + .build(); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::RoundTooHigh), + "RoundTooHigh", + ); + } + + #[tokio::test] + async fn test_consensus_message_mismatched_identifier() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + // Create message with mismatched identifier + let msg_id_a = create_message_id_for_test(Role::Committee); + let msg_id_b = create_message_id_for_test(Role::Proposer); + + let qbft_msg = QbftMessage { + qbft_message_type: QbftMessageType::Proposal, + height: 1, + round: 1, + identifier: msg_id_b, // Mismatched ID + root: Hash256::from([0u8; 32]), + data_round: 1, + round_change_justification: vec![], + prepare_justification: vec![], + }; + + let qbft_bytes = qbft_msg.as_ssz_bytes(); + let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, msg_id_a, qbft_bytes) + .expect("SSVMessage should be created"); + let signed_msg = SignedSSVMessage::new( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + vec![OperatorId(42)], + ssv_msg, + vec![], + ) + .expect("SignedSSVMessage should be created"); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| { + matches!( + failure, + ValidationFailure::MismatchedIdentifier { got: _, want: _ } + ) + }, + "MismatchedIdentifier", + ); + } + + #[tokio::test] + async fn test_consensus_message_decode_failure() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + // Provide invalid consensus data + let msg_id = create_message_id_for_test(Role::Proposer); + let invalid_data = vec![0xDE, 0xAD, 0xBE, 0xEF]; + let ssv_msg = SSVMessage::new(MsgType::SSVConsensusMsgType, msg_id, invalid_data) + .expect("SSVMessage should be created"); + let signed_msg = SignedSSVMessage::new( + vec![vec![0xAA; RSA_SIGNATURE_SIZE]], + vec![OperatorId(42)], + ssv_msg, + vec![], + ) + .expect("SignedSSVMessage should be created"); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::UndecodableMessageData), + "UndecodableMessageData", + ); + } + + #[tokio::test] + async fn test_consensus_message_multiple_signers_commit_with_full_data_and_invalid_hash() { + let fixture = TestFixture::new(FOUR_NODE_COMMITTEE); + let signers = vec![OperatorId(1), OperatorId(2), OperatorId(3)]; + let full_data = vec![0xFF; 16]; + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_signers(signers.clone()) + .with_full_data(full_data) + .build(); + let result = fixture.validate_message(&signed_msg); + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::InvalidHash), + "InvalidHash", + ); + } + + #[tokio::test] + async fn test_prepare_justifications_with_non_proposal_message() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Prepare) + .with_prepare_justification(vec![dummy_signed_ssv_message_for_justification()]) + .build(); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| matches!(failure, ValidationFailure::UnexpectedPrepareJustifications), + "UnexpectedPrepareJustifications", + ); + } + + #[tokio::test] + async fn test_round_change_justifications_with_non_proposal_or_round_change() { + let fixture = TestFixture::new(SINGLE_NODE_COMMITTEE); + + let signed_msg = MessageBuilder::new(Role::Committee, QbftMessageType::Commit) + .with_round_change_justification(vec![dummy_signed_ssv_message_for_justification()]) + .build(); + + let result = fixture.validate_message(&signed_msg); + + assert_validation_error( + result, + |failure| { + matches!( + failure, + ValidationFailure::UnexpectedRoundChangeJustifications + ) + }, + "UnexpectedRoundChangeJustifications", + ); + } + + #[tokio::test] + async fn test_compute_quorum_size() { + // For committee_size=4 -> f=1 -> quorum=3. + assert_eq!( + compute_quorum_size(FOUR_NODE_COMMITTEE), + 3, + "Expected quorum=3 for committee of 4" + ); + // For committee_size=7 -> f=2 -> quorum=5. + assert_eq!( + compute_quorum_size(SEVEN_NODE_COMMITTEE), + 5, + "Expected quorum=5 for committee of 7" + ); + // For committee_size=1 -> f=0 -> quorum=1. + assert_eq!( + compute_quorum_size(SINGLE_NODE_COMMITTEE), + 1, + "Expected quorum=1 for committee of 1" + ); + } +}