Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use database::NetworkDatabase;
use eth2::reqwest::{Certificate, ClientBuilder};
use eth2::{BeaconNodeHttpClient, Timeouts};
use keygen::{encryption::decrypt, run_keygen, Keygen};
use message_receiver::MessageReceiver;
use message_receiver::NetworkMessageReceiver;
use message_sender::NetworkMessageSender;
use message_validator::Validator;
use network::Network;
Expand Down Expand Up @@ -366,7 +366,7 @@ impl Client {
// Network sender/receiver
let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);

let message_validator = Validator::new(database.watch());
let message_validator = Validator::new(database.watch(), slot_clock.clone());

let network_message_sender = NetworkMessageSender::new(
processor_senders.clone(),
Expand Down Expand Up @@ -399,7 +399,7 @@ impl Client {

let (outcome_tx, outcome_rx) = mpsc::channel::<message_receiver::Outcome>(9000);

let message_receiver = MessageReceiver::new(
let message_receiver = NetworkMessageReceiver::new(
processor_senders.clone(),
qbft_manager.clone(),
signature_collector.clone(),
Expand All @@ -413,7 +413,7 @@ impl Client {
&config.network,
subnet_tracker,
network_rx,
message_receiver,
Arc::new(message_receiver),
outcome_rx,
executor.clone(),
)
Expand Down
4 changes: 2 additions & 2 deletions anchor/common/qbft/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::error::ConfigBuilderError;
use crate::qbft_types::{DefaultLeaderFunction, InstanceHeight, LeaderFunction, Round};
use crate::qbft_types::{DefaultLeaderFunction, InstanceHeight, LeaderFunction};
use indexmap::IndexSet;
use ssv_types::OperatorId;
use ssv_types::{OperatorId, Round};
use std::fmt::Debug;
use std::time::Duration;

Expand Down
4 changes: 2 additions & 2 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::msg_container::MessageContainer;
use ssv_types::consensus::{QbftData, QbftMessage, QbftMessageType, UnsignedSSVMessage};
use ssv_types::message::{MsgType, SSVMessage, SignedSSVMessage};
use ssv_types::msgid::MessageId;
use ssv_types::OperatorId;
use ssv_types::{OperatorId, Round};
use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -15,7 +15,7 @@ pub use error::ConfigBuilderError;
pub use qbft_types::WrappedQbftMessage;
pub use qbft_types::{
Completed, ConsensusData, DefaultLeaderFunction, InstanceHeight, InstanceState, LeaderFunction,
Round, UnsignedWrappedQbftMessage,
UnsignedWrappedQbftMessage,
};

mod config;
Expand Down
32 changes: 1 addition & 31 deletions anchor/common/qbft/src/qbft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use derive_more::{Deref, From};
use indexmap::IndexSet;
use ssv_types::consensus::{QbftMessage, UnsignedSSVMessage};
use ssv_types::message::SignedSSVMessage;
use ssv_types::OperatorId;
use ssv_types::{OperatorId, Round};
use std::cmp::Eq;
use std::fmt::Debug;
use std::hash::Hash;
use std::num::NonZeroUsize;
use types::Hash256;

/// Generic LeaderFunction trait to allow for future implementations of the QBFT module
Expand Down Expand Up @@ -58,35 +57,6 @@ pub struct UnsignedWrappedQbftMessage {
pub qbft_message: QbftMessage,
}

/// This represents an individual round, these change on regular time intervals
#[derive(Clone, Copy, Debug, Deref, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Round(NonZeroUsize);

impl From<u64> for Round {
fn from(round: u64) -> Round {
Round(NonZeroUsize::new(round as usize).expect("round == 0"))
}
}

impl Default for Round {
fn default() -> Self {
// rounds are indexed starting at 1
Round(NonZeroUsize::new(1).expect("1 != 0"))
}
}

impl Round {
/// Returns the next round
pub fn next(&self) -> Option<Round> {
self.0.checked_add(1).map(Round)
}

/// Sets the current round
pub fn set(&mut self, round: Round) {
*self = round;
}
}

/// The instance height behaves like an "ID" for the QBFT instance. It is used to uniquely identify
/// different instances, that have the same operator id.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, From, Deref)]
Expand Down
4 changes: 3 additions & 1 deletion anchor/common/ssv_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ pub mod message;
pub mod msgid;
mod operator;
pub mod partial_sig;
mod round;
mod share;
mod sql_conversions;
mod util;

pub use indexmap::IndexSet;
pub use round::Round;
pub use share::ENCRYPTED_KEY_LENGTH;
pub use types::{Slot, VariableList};
pub use types::{Epoch, Slot, VariableList};
54 changes: 54 additions & 0 deletions anchor/common/ssv_types/src/round.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use derive_more::Deref;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::num::NonZeroUsize;
use std::ops::Add;

/// This represents an individual round, these change on regular time intervals
#[derive(Clone, Copy, Debug, Deref, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Round(NonZeroUsize);

impl From<u64> for Round {
fn from(round: u64) -> Round {
Round(NonZeroUsize::new(round as usize).expect("round == 0"))
}
}

impl From<Round> for u64 {
fn from(round: Round) -> u64 {
round.0.get() as u64
}
}

impl Add<u64> for Round {
type Output = Round;

fn add(self, rhs: u64) -> Round {
Round(NonZeroUsize::new(self.0.get() + rhs as usize).expect("round == 0"))
}
}

impl Default for Round {
fn default() -> Self {
// rounds are indexed starting at 1
Round(NonZeroUsize::new(1).expect("1 != 0"))
}
}

impl Display for Round {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl Round {
/// Returns the next round
pub fn next(&self) -> Option<Round> {
self.0.checked_add(1).map(Round)
}

/// Sets the current round
pub fn set(&mut self, round: Round) {
*self = round;
}
}
1 change: 1 addition & 0 deletions anchor/message_receiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message_validator = { workspace = true }
processor = { workspace = true }
qbft_manager = { workspace = true }
signature_collector = { workspace = true }
slot_clock = { workspace = true }
ssv_types = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
13 changes: 12 additions & 1 deletion anchor/message_receiver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
mod manager;

use gossipsub::{Message, MessageId};
use libp2p::PeerId;
use thiserror::Error;

pub use crate::manager::*;
pub use crate::MessageReceiver;
pub use crate::NetworkMessageReceiver;

pub trait MessageReceiver {
fn receive(
&self,
propagation_source: PeerId,
message_id: MessageId,
message: Message,
) -> Result<(), Error>;
}

#[derive(Error, Debug)]
pub enum Error {
Expand Down
52 changes: 27 additions & 25 deletions anchor/message_receiver/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::MessageReceiver;
use database::{NetworkState, UniqueIndex};
use gossipsub::{Message, MessageAcceptance, MessageId};
use libp2p::PeerId;
use message_validator::Validator;
use message_validator::{ValidatedMessage, ValidatedSSVMessage};
use qbft_manager::QbftManager;
use signature_collector::SignatureCollectorManager;
use slot_clock::SlotClock;
use ssv_types::msgid::DutyExecutor;
use std::sync::Arc;
use tokio::sync::mpsc::error::TrySendError;
Expand All @@ -20,18 +22,38 @@ pub struct Outcome {
}

/// A message receiver that passes messages to responsible managers.
pub struct MessageReceiver {
pub struct NetworkMessageReceiver<S: SlotClock> {
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Validator,
validator: Validator<S>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that the Validator actually has state, this needs to be an Arc, right? Or else the state of the receiver validator and sender validator diverges.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Do we want to share the consensus state? I think so.

}

impl MessageReceiver {
pub fn receive(
self: Arc<Self>,
impl<S: SlotClock + 'static> NetworkMessageReceiver<S> {
pub fn new(
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Validator<S>,
) -> Arc<Self> {
Arc::new(Self {
processor,
qbft_manager,
signature_collector,
network_state_rx,
outcome_tx,
validator,
})
}
}

impl<S: SlotClock + 'static> MessageReceiver for Arc<NetworkMessageReceiver<S>> {
fn receive(
&self,
propagation_source: PeerId,
message_id: MessageId,
message: Message,
Expand Down Expand Up @@ -119,23 +141,3 @@ impl MessageReceiver {
Ok(())
}
}

impl MessageReceiver {
pub fn new(
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Validator,
) -> Arc<Self> {
Arc::new(Self {
processor,
qbft_manager,
signature_collector,
network_state_rx,
outcome_tx,
validator,
})
}
}
Empty file.
2 changes: 1 addition & 1 deletion anchor/message_sender/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ edition = { workspace = true }
authors = ["Sigma Prime <contact@sigmaprime.io>"]

[dependencies]
database = { workspace = true }
ethereum_ssz = { workspace = true }
message_validator = { workspace = true }
openssl = { workspace = true }
processor = { workspace = true }
slot_clock = { workspace = true }
ssv_types = { workspace = true }
subnet_tracker = { workspace = true }
tokio = { workspace = true }
Expand Down
11 changes: 6 additions & 5 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use openssl::hash::MessageDigest;
use openssl::pkey::{PKey, Private};
use openssl::rsa::Rsa;
use openssl::sign::Signer;
use slot_clock::SlotClock;
use ssv_types::consensus::UnsignedSSVMessage;
use ssv_types::message::SignedSSVMessage;
use ssv_types::{CommitteeId, OperatorId};
Expand All @@ -18,16 +19,16 @@ use tracing::{debug, error, warn};
const SIGNER_NAME: &str = "message_sign_and_send";
const SENDER_NAME: &str = "message_send";

pub struct NetworkMessageSender {
pub struct NetworkMessageSender<S: SlotClock> {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: PKey<Private>,
operator_id: OperatorId,
validator: Option<Validator>,
validator: Option<Validator<S>>,
subnet_count: usize,
}

impl MessageSender for Arc<NetworkMessageSender> {
impl<S: SlotClock + 'static> MessageSender for Arc<NetworkMessageSender<S>> {
fn sign_and_send(
&self,
message: UnsignedSSVMessage,
Expand Down Expand Up @@ -90,13 +91,13 @@ impl MessageSender for Arc<NetworkMessageSender> {
}
}

impl NetworkMessageSender {
impl<S: SlotClock> NetworkMessageSender<S> {
pub fn new(
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: Rsa<Private>,
operator_id: OperatorId,
validator: Option<Validator>,
validator: Option<Validator<S>>,
subnet_count: usize,
) -> Result<Arc<Self>, String> {
let private_key = PKey::from_rsa(private_key)
Expand Down
Loading