Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ impl Client {
operator_id,
slot_clock.clone(),
network_message_sender,
config.ssv_network.ssv_domain_type.clone(),
)
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;

Expand Down
4 changes: 2 additions & 2 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ where
S: FnMut(UnsignedWrappedQbftMessage),
{
// Construct a new QBFT Instance and start the first round
pub fn new(config: Config<F>, start_data: D, send_message: S) -> Self {
pub fn new(config: Config<F>, start_data: D, identifier: MessageId, send_message: S) -> Self {
let instance_height = *config.instance_height();
let current_round = config.round();
let quorum_size = config.quorum_size();
Expand All @@ -134,7 +134,7 @@ where

let mut qbft = Qbft {
config,
identifier: MessageId::from([0; 56]),
identifier,
instance_height,

start_data_hash,
Expand Down
1 change: 1 addition & 0 deletions anchor/common/qbft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ fn construct_and_run_committee<D: QbftData<Hash = Hash256>>(
let instance = Qbft::new(
config.clone().build().expect("test config is valid"),
validated_data.clone(),
MessageId::from([0; 56]),
move |message| msg_queue.borrow_mut().push_back((id, message)),
);
instances.insert(id, instance);
Expand Down
75 changes: 53 additions & 22 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use slot_clock::SlotClock;

use processor::Error::Queue;
use ssv_types::consensus::{BeaconVote, QbftData, ValidatorConsensusData};
use ssv_types::domain_type::DomainType;
use ssv_types::msgid::{DutyExecutor, MessageId, Role};
use ssv_types::OperatorId as QbftOperatorId;
use ssv_types::{Cluster, CommitteeId, OperatorId};
use std::fmt::Debug;
Expand All @@ -19,7 +21,7 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration, Interval};
use tokio::time::{sleep, Interval};
use tracing::{error, warn};
use types::{Hash256, PublicKeyBytes};

Expand Down Expand Up @@ -70,6 +72,8 @@ pub enum QbftMessageKind<D: QbftData<Hash = Hash256>> {
// the configuration for the instance, and a channel to send the final data on
Initialize {
initial: D,
// The message id to be embedded into outgoing messages
message_id: MessageId,
config: qbft::Config<DefaultLeaderFunction>,
on_completed: oneshot::Sender<Completed<D>>,
},
Expand All @@ -96,6 +100,8 @@ pub struct QbftManager {
beacon_vote_instances: Map<CommitteeInstanceId, BeaconVote>,
// Utility to sign and serialize network messages
message_sender: Arc<dyn MessageSender>,
// Network domain to embed into messages
domain: DomainType,
}

impl QbftManager {
Expand All @@ -105,13 +111,15 @@ impl QbftManager {
operator_id: OperatorId,
slot_clock: impl SlotClock + 'static,
message_sender: impl MessageSender + 'static,
domain: DomainType,
) -> Result<Arc<Self>, QbftError> {
let manager = Arc::new(QbftManager {
processor,
operator_id,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
message_sender: Arc::new(message_sender),
domain,
});

// Start a long running task that will clean up old instances
Expand Down Expand Up @@ -145,13 +153,15 @@ impl QbftManager {

// Get or spawn a new qbft instance. This will return the sender that we can use to send
// new messages to the specific instance
let message_id = D::message_id(&self.domain, &id);
let sender = D::get_or_spawn_instance(self, id);
self.processor.urgent_consensus.send_immediate(
move |drop_on_finish: DropOnFinish| {
// A message to initialize this instance
let _ = sender.send(QbftMessage {
kind: QbftMessageKind::Initialize {
initial,
message_id,
config,
on_completed: result_sender,
},
Expand Down Expand Up @@ -232,6 +242,8 @@ pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
}

fn instance_height(&self, id: &Self::Id) -> InstanceHeight;

fn message_id(domain: &DomainType, id: &Self::Id) -> MessageId;
}

impl QbftDecidable for ValidatorConsensusData {
Expand All @@ -243,6 +255,15 @@ impl QbftDecidable for ValidatorConsensusData {
fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
id.instance_height
}

fn message_id(domain: &DomainType, id: &Self::Id) -> MessageId {
let role = match id.duty {
ValidatorDutyKind::Proposal => Role::Proposer,
ValidatorDutyKind::Aggregator => Role::Aggregator,
ValidatorDutyKind::SyncCommitteeAggregator => Role::SyncCommittee,
};
MessageId::new(domain, role, &DutyExecutor::Validator(id.validator))
}
}

impl QbftDecidable for BeaconVote {
Expand All @@ -254,6 +275,14 @@ impl QbftDecidable for BeaconVote {
fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
id.instance_height
}

fn message_id(domain: &DomainType, id: &Self::Id) -> MessageId {
MessageId::new(
domain,
Role::Committee,
&DutyExecutor::Committee(id.committee),
)
}
}

// States that Qbft instance may be in
Expand Down Expand Up @@ -324,13 +353,18 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
match message.kind {
QbftMessageKind::Initialize {
initial,
message_id,
config,
on_completed,
} => {
instance = match instance {
// The instance is uninitialized and we have received a manager message to
// initialize it
QbftInstance::Uninitialized { message_buffer } => {
// create the interval and tick it right away
let mut interval = tokio::time::interval(config.round_time());
interval.tick().await;

let (sent_by_us_tx, sent_by_us_rx) = mpsc::unbounded_channel();

let message_sender = message_sender.clone();
Expand All @@ -341,31 +375,28 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
.collect::<Vec<_>>()
.into();
// Create a new instance and receive any buffered messages
let mut instance = Box::new(Qbft::new(config, initial, move |message| {
let sent_by_us_tx = sent_by_us_tx.clone();
if let Err(err) = message_sender.clone().sign_and_send(
message.unsigned_message,
committee_id,
Some(Box::new(move |signed| {
// this might fail, but that's ok: it simply means that the
// instance has shut down (e.g. because it's done)
let _ = sent_by_us_tx.send(WrappedQbftMessage {
signed_message: signed.clone(),
qbft_message: message.qbft_message,
});
})),
) {
error!(?err, "Unable to send qbft message!");
}
}));
let mut instance =
Box::new(Qbft::new(config, initial, message_id, move |message| {
let sent_by_us_tx = sent_by_us_tx.clone();
if let Err(err) = message_sender.clone().sign_and_send(
message.unsigned_message,
committee_id,
Some(Box::new(move |signed| {
// this might fail, but that's ok: it simply means that the
// instance has shut down (e.g. because it's done)
let _ = sent_by_us_tx.send(WrappedQbftMessage {
signed_message: signed.clone(),
qbft_message: message.qbft_message,
});
})),
) {
error!(?err, "Unable to send qbft message!");
}
}));
for message in message_buffer {
instance.receive(message);
}

// create the interval and tick it right away
let mut interval = tokio::time::interval(Duration::from_secs(2));
interval.tick().await;

QbftInstance::Initialized {
round_end: interval,
qbft: instance,
Expand Down
2 changes: 2 additions & 0 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use message_sender::testing::MockMessageSender;
use processor::Senders;
use slot_clock::{ManualSlotClock, SlotClock};
use ssv_types::consensus::{BeaconVote, QbftMessage, QbftMessageType};
use ssv_types::domain_type::DomainType;
use ssv_types::message::SignedSSVMessage;
use ssv_types::{Cluster, ClusterId, CommitteeId, OperatorId};
use ssz::Decode;
Expand Down Expand Up @@ -253,6 +254,7 @@ where
operator_id,
slot_clock.clone(),
MockMessageSender::new(network_tx.clone(), operator_id),
DomainType([0; 4]),
)
.expect("Creation should not fail");

Expand Down
Loading