Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl Client {
operator_id,
slot_clock.clone(),
network_message_sender,
config.ssv_network.ssv_domain_type.clone(),
)
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;

Expand Down
4 changes: 2 additions & 2 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
S: FnMut(Message),
{
// Construct a new QBFT Instance and start the first round
pub fn new(config: Config<F>, start_data: D, send_message: S) -> Self {
pub fn new(config: Config<F>, start_data: D, identifier: MessageId, send_message: S) -> Self {
let instance_height = *config.instance_height();
let current_round = config.round();
let quorum_size = config.quorum_size();
Expand All @@ -135,7 +135,7 @@ where

let mut qbft = Qbft {
config,
identifier: MessageId::from([0; 56]),
identifier,
instance_height,

start_data_hash,
Expand Down
1 change: 1 addition & 0 deletions anchor/common/qbft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ fn construct_and_run_committee<D: QbftData<Hash = Hash256>>(
let instance = Qbft::new(
config.clone().build().expect("test config is valid"),
validated_data.clone(),
MessageId::from([0; 56]),
move |message| msg_queue.borrow_mut().push_back((id, message)),
);
instances.insert(id, instance);
Expand Down
57 changes: 44 additions & 13 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use qbft::{
};
use slot_clock::SlotClock;
use ssv_types::consensus::{BeaconVote, QbftData, ValidatorConsensusData};
use ssv_types::domain_type::DomainType;
use ssv_types::msgid::{DutyExecutor, MessageId, Role};
use ssv_types::OperatorId as QbftOperatorId;
use ssv_types::{Cluster, CommitteeId, OperatorId};
use std::fmt::Debug;
Expand All @@ -17,7 +19,7 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration, Interval};
use tokio::time::{sleep, Interval};
use tracing::{error, warn};
use types::{Hash256, PublicKeyBytes};

Expand Down Expand Up @@ -68,6 +70,8 @@ pub enum QbftMessageKind<D: QbftData<Hash = Hash256>> {
// the configuration for the instance, and a channel to send the final data on
Initialize {
initial: D,
// The message id to be embedded into outgoing messages
message_id: MessageId,
config: qbft::Config<DefaultLeaderFunction>,
on_completed: oneshot::Sender<Completed<D>>,
},
Expand All @@ -94,6 +98,8 @@ pub struct QbftManager {
beacon_vote_instances: Map<CommitteeInstanceId, BeaconVote>,
// Utility to sign and serialize network messages
message_sender: Arc<dyn MessageSender>,
// Network domain to embed into messages
domain: DomainType,
}

impl QbftManager {
Expand All @@ -103,13 +109,15 @@ impl QbftManager {
operator_id: OperatorId,
slot_clock: impl SlotClock + 'static,
message_sender: impl MessageSender + 'static,
domain: DomainType,
) -> Result<Arc<Self>, QbftError> {
let manager = Arc::new(QbftManager {
processor,
operator_id,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
message_sender: Arc::new(message_sender),
domain,
});

// Start a long running task that will clean up old instances
Expand Down Expand Up @@ -143,13 +151,15 @@ impl QbftManager {

// Get or spawn a new qbft instance. This will return the sender that we can use to send
// new messages to the specific instance
let message_id = D::message_id(&self.domain, &id);
let sender = D::get_or_spawn_instance(self, id);
self.processor.urgent_consensus.send_immediate(
move |drop_on_finish: DropOnFinish| {
// A message to initialize this instance
let _ = sender.send(QbftMessage {
kind: QbftMessageKind::Initialize {
initial,
message_id,
config,
on_completed: result_sender,
},
Expand Down Expand Up @@ -230,6 +240,8 @@ pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
}

fn instance_height(&self, id: &Self::Id) -> InstanceHeight;

fn message_id(domain: &DomainType, id: &Self::Id) -> MessageId;
}

impl QbftDecidable for ValidatorConsensusData {
Expand All @@ -241,6 +253,15 @@ impl QbftDecidable for ValidatorConsensusData {
fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
id.instance_height
}

fn message_id(domain: &DomainType, id: &Self::Id) -> MessageId {
let role = match id.duty {
ValidatorDutyKind::Proposal => Role::Proposer,
ValidatorDutyKind::Aggregator => Role::Aggregator,
ValidatorDutyKind::SyncCommitteeAggregator => Role::SyncCommittee,
};
MessageId::new(domain, role, &DutyExecutor::Validator(id.validator))
}
}

impl QbftDecidable for BeaconVote {
Expand All @@ -252,6 +273,14 @@ impl QbftDecidable for BeaconVote {
fn instance_height(&self, id: &Self::Id) -> InstanceHeight {
id.instance_height
}

fn message_id(domain: &DomainType, id: &Self::Id) -> MessageId {
MessageId::new(
domain,
Role::Committee,
&DutyExecutor::Committee(id.committee),
)
}
}

// States that Qbft instance may be in
Expand Down Expand Up @@ -311,13 +340,18 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
match message.kind {
QbftMessageKind::Initialize {
initial,
message_id,
config,
on_completed,
} => {
instance = match instance {
// The instance is uninitialized and we have received a manager message to
// initialize it
QbftInstance::Uninitialized { message_buffer } => {
// create the interval and tick it right away
let mut interval = tokio::time::interval(config.round_time());
interval.tick().await;

let message_sender = message_sender.clone();
let committee_id = config
.committee_members()
Expand All @@ -326,22 +360,19 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
.collect::<Vec<_>>()
.into();
// Create a new instance and receive any buffered messages
let mut instance = Box::new(Qbft::new(config, initial, move |message| {
let (_, unsigned) = message.desugar();
if let Err(err) =
message_sender.clone().sign_and_send(unsigned, committee_id)
{
error!(?err, "Unable to send qbft message!");
}
}));
let mut instance =
Box::new(Qbft::new(config, initial, message_id, move |message| {
let (_, unsigned) = message.desugar();
if let Err(err) =
message_sender.clone().sign_and_send(unsigned, committee_id)
{
error!(?err, "Unable to send qbft message!");
}
}));
for message in message_buffer {
instance.receive(message);
}

// create the interval and tick it right away
let mut interval = tokio::time::interval(Duration::from_secs(2));
interval.tick().await;

QbftInstance::Initialized {
round_end: interval,
qbft: instance,
Expand Down
2 changes: 2 additions & 0 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use message_sender::testing::MockMessageSender;
use processor::Senders;
use slot_clock::{ManualSlotClock, SlotClock};
use ssv_types::consensus::{BeaconVote, QbftMessage, QbftMessageType};
use ssv_types::domain_type::DomainType;
use ssv_types::message::SignedSSVMessage;
use ssv_types::{Cluster, ClusterId, CommitteeId, OperatorId};
use ssz::Decode;
Expand Down Expand Up @@ -241,6 +242,7 @@ where
operator_id,
slot_clock.clone(),
MockMessageSender::new(network_tx.clone(), operator_id),
DomainType([0; 4]),
)
.expect("Creation should not fail");

Expand Down