Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use beacon_node_fallback::{
};
pub use cli::Node;
use config::Config;
use database::{NetworkDatabase, WatchableNetworkState};
use database::NetworkDatabase;
use eth2::reqwest::{Certificate, ClientBuilder};
use eth2::{BeaconNodeHttpClient, Timeouts};
use message_receiver::ManagerMessageReceiver;
use message_receiver::MessageReceiver;
use message_sender::NetworkMessageSender;
use message_validator::Validator;
use network::Network;
Expand Down Expand Up @@ -367,8 +367,7 @@ impl Client {
network::SUBNET_COUNT,
)?;

let message_validator =
Validator::new(Arc::new(WatchableNetworkState::new(database.watch())));
let message_validator = Validator::new(database.watch());

// Create the signature collector
let signature_collector = SignatureCollectorManager::new(
Expand All @@ -392,7 +391,7 @@ impl Client {

let (outcome_tx, outcome_rx) = mpsc::channel::<message_receiver::Outcome>(9000);

let message_receiver = ManagerMessageReceiver::new(
let message_receiver = MessageReceiver::new(
processor_senders.clone(),
qbft_manager.clone(),
signature_collector.clone(),
Expand Down
10 changes: 9 additions & 1 deletion anchor/common/ssv_types/src/committee.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use crate::OperatorId;
use crate::{OperatorId, ValidatorIndex};
use derive_more::{Deref, From};
use indexmap::IndexSet;
use sha2::{Digest, Sha256};

const COMMITTEE_ID_LEN: usize = 32;

/// Structure to hold committee members and validator indices
#[derive(Debug, Clone)]
pub struct CommitteeInfo {
pub committee_members: IndexSet<OperatorId>,
pub validator_indices: Vec<ValidatorIndex>,
}

/// Unique identifier for a committee
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, From, Deref)]
pub struct CommitteeId(pub [u8; COMMITTEE_ID_LEN]);
Expand Down
2 changes: 1 addition & 1 deletion anchor/common/ssv_types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub use cluster::{Cluster, ClusterId, ClusterMember, ValidatorIndex, ValidatorMetadata};
pub use committee::CommitteeId;
pub use committee::{CommitteeId, CommitteeInfo};
pub use operator::{Operator, OperatorId};
pub use share::Share;
pub use util::parse_rsa;
Expand Down
2 changes: 1 addition & 1 deletion anchor/common/ssv_types/src/partial_sig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use ssz::{Decode, DecodeError, Encode};
use ssz_derive::{Decode, Encode};
use types::{Hash256, Signature, Slot};

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PartialSignatureKind {
// PostConsensusPartialSig is a partial signature over a decided duty (attestation data, block, etc)
PostConsensus = 0,
Expand Down
9 changes: 1 addition & 8 deletions anchor/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use types::{Address, PublicKeyBytes};
pub use crate::error::DatabaseError;
pub use crate::multi_index::{MultiIndexMap, *};
use crate::sql_operations::{SqlStatement, SQL};
pub use crate::state::{NetworkStateService, WatchableNetworkState};
pub use crate::state::NetworkState;

mod cluster_operations;
mod error;
Expand Down Expand Up @@ -103,13 +103,6 @@ struct SingleState {
nonces: HashMap<Address, u16>,
}

// Container to hold all network state
#[derive(Debug)]
pub struct NetworkState {
multi_state: MultiState,
single_state: SingleState,
}

/// Top level NetworkDatabase that contains in memory storage for quick access
/// to relevant information and a connection to the database
#[derive(Debug)]
Expand Down
96 changes: 73 additions & 23 deletions anchor/database/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
ClusterMultiIndexMap, MetadataMultiIndexMap, MultiIndexMap, NonUniqueIndex, ShareMultiIndexMap,
UniqueIndex,
};
use crate::{DatabaseError, NetworkState, Pool, PoolConn};
use crate::{DatabaseError, Pool, PoolConn};
use crate::{MultiState, SingleState};
use crate::{SqlStatement, SQL};
use base64::prelude::*;
Expand All @@ -10,16 +11,18 @@ use openssl::rsa::Rsa;
use rusqlite::{params, OptionalExtension};
use rusqlite::{types::Type, Error as SqlError};
use ssv_types::{
Cluster, ClusterId, ClusterMember, CommitteeId, IndexSet, Operator, OperatorId, Share,
ValidatorMetadata,
Cluster, ClusterId, ClusterMember, CommitteeId, CommitteeInfo, IndexSet, Operator, OperatorId,
Share, ValidatorIndex, ValidatorMetadata,
};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use tokio::sync::watch;
use types::Address;
use types::{Address, PublicKeyBytes};

pub trait NetworkStateService: Send + Sync {
fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option<IndexSet<OperatorId>>;
// Container to hold all network state
#[derive(Debug)]
pub struct NetworkState {
pub(crate) multi_state: MultiState,
pub(crate) single_state: SingleState,
}

impl NetworkState {
Expand Down Expand Up @@ -248,6 +251,41 @@ impl NetworkState {
nonces.collect()
}

fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option<IndexSet<OperatorId>> {
self.multi_state
.clusters
.get_all_by(committee_id)
.and_then(|clusters| clusters.first().cloned())
.map(|cluster| cluster.cluster_members)
}

fn get_cluster_members_for_validator(
&self,
validator_pk: &PublicKeyBytes,
) -> Option<IndexSet<OperatorId>> {
let cluster_id = self
.multi_state
.validator_metadata
.get_by(validator_pk)
.map(|v| v.cluster_id)?;
self.multi_state
.clusters
.get_by(&cluster_id)
.map(|c| c.cluster_members)
}

fn get_validator_indices(&self, committee_id: &CommitteeId) -> Option<Vec<ValidatorIndex>> {
self.multi_state
.validator_metadata
.get_all_by(committee_id)
.map(|metadata| {
metadata
.iter()
.map(|metadata| metadata.index)
.collect::<Vec<_>>()
})
}

/// Get a reference to the shares map
pub fn shares(&self) -> &ShareMultiIndexMap {
&self.multi_state.shares
Expand Down Expand Up @@ -292,26 +330,38 @@ impl NetworkState {
pub fn get_last_processed_block(&self) -> u64 {
self.single_state.last_processed_block
}
}

pub struct WatchableNetworkState {
state_rx: watch::Receiver<NetworkState>,
}
pub fn get_committee_info_by_committee_id(
&self,
committee_id: &CommitteeId,
) -> Option<CommitteeInfo> {
// Get committee members
let committee_members = self.get_cluster_members(committee_id)?;

// Get validator indices for this committee
let validator_indices = self.get_validator_indices(committee_id)?;

impl WatchableNetworkState {
pub fn new(state_rx: watch::Receiver<NetworkState>) -> Self {
Self { state_rx }
Some(CommitteeInfo {
committee_members,
validator_indices,
})
}
}

impl NetworkStateService for WatchableNetworkState {
fn get_cluster_members(&self, committee_id: &CommitteeId) -> Option<IndexSet<OperatorId>> {
let db_state = self.state_rx.borrow();
db_state
pub fn get_committee_info_by_validator_pk(
&self,
validator_pk: &PublicKeyBytes,
) -> Option<CommitteeInfo> {
let validator_index = self
.multi_state
.clusters
.get_all_by(committee_id)
.and_then(|clusters| clusters.first().cloned())
.map(|cluster| cluster.cluster_members)
.validator_metadata
.get_by(validator_pk)
.map(|v| v.index)?;

let committee_members = self.get_cluster_members_for_validator(validator_pk)?;

Some(CommitteeInfo {
committee_members,
validator_indices: vec![validator_index],
})
}
}
4 changes: 1 addition & 3 deletions anchor/message_receiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ processor = { workspace = true }
qbft_manager = { workspace = true }
signature_collector = { workspace = true }
ssv_types = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[features]
testing = []
17 changes: 6 additions & 11 deletions anchor/message_receiver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
mod manager;

#[cfg(feature = "testing")]
pub mod testing;
use thiserror::Error;

pub use crate::manager::*;
use libp2p::gossipsub::{Message, MessageId};
use libp2p::PeerId;
pub use crate::MessageReceiver;

pub trait MessageReceiver: Send + Sync {
fn receive(
&self,
propagation_source: PeerId,
message_id: MessageId,
message: Message,
) -> Result<(), processor::Error>;
#[derive(Error, Debug)]
pub enum Error {
#[error("Processor error: {0}")]
Processor(#[from] processor::Error),
}
24 changes: 12 additions & 12 deletions anchor/message_receiver/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::MessageReceiver;
use database::{NetworkState, UniqueIndex};
use libp2p::gossipsub::{Message, MessageAcceptance, MessageId};
use libp2p::PeerId;
use message_validator::{ValidatedMessage, ValidatedSSVMessage, ValidatorService};
use processor::Error;
use message_validator::Validator;
use message_validator::{ValidatedMessage, ValidatedSSVMessage};
use qbft_manager::QbftManager;
use signature_collector::SignatureCollectorManager;
use ssv_types::msgid::DutyExecutor;
Expand All @@ -21,22 +20,22 @@ pub struct Outcome {
}

/// A message receiver that passes messages to responsible managers.
pub struct ManagerMessageReceiver<V: ValidatorService + 'static> {
pub struct MessageReceiver {
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: V,
validator: Validator,
}

impl<V: ValidatorService + 'static> MessageReceiver for Arc<ManagerMessageReceiver<V>> {
fn receive(
&self,
impl MessageReceiver {
pub fn receive(
self: Arc<Self>,
propagation_source: PeerId,
message_id: MessageId,
message: Message,
) -> Result<(), Error> {
) -> Result<(), crate::Error> {
let receiver = self.clone();
self.processor.urgent_consensus.send_blocking(move || {
let result = receiver.validator.validate(message.data);
Expand Down Expand Up @@ -116,18 +115,19 @@ impl<V: ValidatorService + 'static> MessageReceiver for Arc<ManagerMessageReceiv
}
}
}
}, RECEIVER_NAME)
}, RECEIVER_NAME)?;
Ok(())
}
}

impl<V: ValidatorService + 'static> ManagerMessageReceiver<V> {
impl MessageReceiver {
pub fn new(
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: V,
validator: Validator,
) -> Arc<Self> {
Arc::new(Self {
processor,
Expand Down
54 changes: 0 additions & 54 deletions anchor/message_receiver/src/testing.rs

This file was deleted.

Loading
Loading