Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion anchor/common/ssv_types/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ use crate::OperatorId;
use derive_more::{Deref, From};
use indexmap::IndexSet;
use ssz_derive::{Decode, Encode};
use std::fmt::Debug;
use types::{Address, Graffiti, PublicKeyBytes};

/// Unique identifier for a cluster
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, From, Deref)]
#[derive(Clone, Copy, Default, Eq, PartialEq, Hash, From, Deref)]
pub struct ClusterId(pub [u8; 32]);

impl Debug for ClusterId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}

/// A Cluster is a group of Operators that are acting on behalf of one or more Validators
///
/// Each cluster is owned by a unqiue EOA and only that Address may perform operators on the
Expand Down
9 changes: 8 additions & 1 deletion anchor/common/ssv_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{OperatorId, ValidatorIndex};
use derive_more::{Deref, From};
use indexmap::IndexSet;
use sha2::{Digest, Sha256};
use std::fmt::{Debug, Formatter};

const COMMITTEE_ID_LEN: usize = 32;

Expand All @@ -13,9 +14,15 @@ pub struct CommitteeInfo {
}

/// Unique identifier for a committee
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, From, Deref)]
#[derive(Clone, Copy, Default, Eq, PartialEq, Hash, From, Deref)]
pub struct CommitteeId(pub [u8; COMMITTEE_ID_LEN]);

impl Debug for CommitteeId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}

impl From<Vec<OperatorId>> for CommitteeId {
fn from(mut operator_ids: Vec<OperatorId>) -> Self {
// Sort the operator IDs
Expand Down
23 changes: 21 additions & 2 deletions anchor/common/ssv_types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use crate::ValidatorIndex;
use sha2::{Digest, Sha256};
use ssz::{Decode, DecodeError, Encode};
use ssz_derive::{Decode, Encode};
use std::fmt::Debug;
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::ops::Deref;
use tree_hash::{PackedEncoding, TreeHash, TreeHashType};
use tree_hash_derive::TreeHash;
use types::typenum::{U13, U56};
Expand Down Expand Up @@ -45,7 +46,7 @@ pub struct UnsignedSSVMessage {
}

/// A QBFT specific message
#[derive(Clone, Debug, Encode, Decode)]
#[derive(Clone, Encode, Decode)]
pub struct QbftMessage {
pub qbft_message_type: QbftMessageType,
pub height: u64,
Expand All @@ -57,6 +58,24 @@ pub struct QbftMessage {
pub prepare_justification: Vec<SignedSSVMessage>, // always without full_data
}

impl Debug for QbftMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QbftMessage")
.field("qbft_message_type", &self.qbft_message_type)
.field("height", &self.height)
.field("round", &self.round)
.field("identifier", &hex::encode(self.identifier.deref()))
.field("root", &self.root)
.field("data_round", &self.data_round)
.field(
"round_change_justification",
&self.round_change_justification,
)
.field("prepare_justification", &self.prepare_justification)
.finish()
}
}

/// Different states the QBFT Message may represent
#[derive(Clone, Debug, PartialEq, PartialOrd, Copy)]
pub enum QbftMessageType {
Expand Down
29 changes: 26 additions & 3 deletions anchor/common/ssv_types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::OperatorId;
use ssz::{Decode, DecodeError, Encode};
use ssz_derive::{Decode, Encode};
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::{Debug, Formatter};
use thiserror::Error;

const QBFT_MSG_TYPE_SIZE: usize = 8;
Expand Down Expand Up @@ -143,13 +143,23 @@ pub enum SSVMessageError {
}

/// Represents a bare SSVMessage with a type, ID, and data.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
#[derive(Encode, Decode, Clone, PartialEq, Eq)]
pub struct SSVMessage {
msg_type: MsgType,
msg_id: MessageId, // Fixed-size [u8; 56]
data: Vec<u8>, // Variable-length byte array
}

impl Debug for SSVMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SSVMessage")
.field("msg_type", &self.msg_type)
.field("msg_id", &self.msg_id)
.field("data", &hex::encode(&self.data))
.finish()
}
}

impl SSVMessage {
/// Creates a new `SSVMessage`.
///
Expand Down Expand Up @@ -263,14 +273,27 @@ pub enum SignedSSVMessageError {
}

/// Represents a signed SSV Message with signatures, operator IDs, the message itself, and full data.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
#[derive(Encode, Decode, Clone, PartialEq, Eq)]
pub struct SignedSSVMessage {
signatures: Vec<Vec<u8>>, // Vec of Vec<u8>, max 13 elements, each with 256 bytes
operator_ids: Vec<OperatorId>, // Vec of OperatorID (u64), max 13 elements
ssv_message: SSVMessage, // SSVMessage: Required field
full_data: Vec<u8>, // Variable-length byte array, max 4,194,532 bytes
}

impl Debug for SignedSSVMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let signatures = self.signatures.iter().map(hex::encode).collect::<Vec<_>>();

f.debug_struct("SignedSSVMessage")
.field("signatures", &signatures)
.field("operator_ids", &self.operator_ids)
.field("ssv_message", &self.ssv_message)
.field("full_data", &hex::encode(&self.full_data))
.finish()
}
}

impl SignedSSVMessage {
/// Creates a new `SignedSSVMessage` after validating constraints.
///
Expand Down
9 changes: 8 additions & 1 deletion anchor/common/ssv_types/src/msgid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::committee::CommitteeId;
use crate::domain_type::DomainType;
use derive_more::{From, Into};
use ssz::{Decode, DecodeError, Encode};
use std::fmt::{Debug, Formatter};
use types::typenum::U56;
use types::{PublicKeyBytes, VariableList};

Expand Down Expand Up @@ -62,9 +63,15 @@ pub enum DutyExecutor {
Validator(PublicKeyBytes),
}

#[derive(Debug, Clone, Hash, Eq, PartialEq, From, Into)]
#[derive(Clone, Hash, Eq, PartialEq, From, Into)]
pub struct MessageId([u8; 56]);

impl Debug for MessageId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}

impl MessageId {
pub fn new(domain: &DomainType, role: Role, duty_executor: &DutyExecutor) -> Self {
let mut id = [0; 56];
Expand Down
8 changes: 7 additions & 1 deletion anchor/eth/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ impl SsvEventSyncer {
#[instrument(skip(db, config))]
/// Create a new SsvEventSyncer to sync all of the events from the chain
pub async fn new(db: Arc<NetworkDatabase>, config: Config) -> Result<Self, ExecutionError> {
info!(?config, "Creating new SSV Event Syncer");
info!("Creating new SSV Event Syncer");

// Construct HTTP Provider
let http_url = config.http_url.parse().expect("Failed to parse HTTP URL");
let rpc_client = Arc::new(ProviderBuilder::default().on_http(http_url));

debug!("Created rpc client");

// Construct Websocket Provider
let ws = WsConnect::new(&config.ws_url);
let ws_client = ProviderBuilder::default()
Expand All @@ -120,9 +122,13 @@ impl SsvEventSyncer {
))
})?;

debug!("Created ws client");

// Construct an EventProcessor with access to the DB
let event_processor = EventProcessor::new(db, false);

debug!("Created event processor - done");

Ok(Self {
rpc_client,
ws_client,
Expand Down
9 changes: 5 additions & 4 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Interval};
use tracing::{debug, error, warn};
use tracing::{debug, error, info_span, warn, Instrument};
use types::{Hash256, PublicKeyBytes};

#[cfg(test)]
Expand Down Expand Up @@ -271,7 +271,7 @@ impl QbftManager {

// Trait that describes any data that is able to be decided upon during a qbft instance
pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
type Id: Hash + Eq + Send;
type Id: Hash + Eq + Send + Debug;

fn get_map(manager: &QbftManager) -> &Map<Self::Id, Self>;

Expand All @@ -286,9 +286,10 @@ pub trait QbftDecidable: QbftData<Hash = Hash256> + Send + Sync + 'static {
// There is not an instance running yet, store the sender and spawn a new instance
// with the reeiver
let (tx, rx) = mpsc::unbounded_channel();
let span = info_span!("qbft_instance", instance_id = ?entry.key());
let tx = entry.insert(tx);
let _ = manager.processor.permitless.send_async(
Box::pin(qbft_instance(rx, manager.message_sender.clone())),
Box::pin(qbft_instance(rx, manager.message_sender.clone()).instrument(span)),
QBFT_INSTANCE_NAME,
);
tx.clone()
Expand Down Expand Up @@ -406,7 +407,7 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
break;
};

debug!(?message, "Handling message in qbft_instance");
debug!(msg = ?message.kind, "Handling message in qbft_instance");

match message.kind {
QbftMessageKind::Initialize {
Expand Down
24 changes: 19 additions & 5 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::sleep;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, info_span, trace, warn, Instrument};
use types::{Hash256, PublicKeyBytes, SecretKey, Signature, Slot};

const COLLECTOR_NAME: &str = "signature_collector";
Expand Down Expand Up @@ -100,6 +100,14 @@ impl SignatureCollectorManager {
) -> Result<Arc<Signature>, CollectionError> {
let (result_tx, result_rx) = oneshot::channel();

debug!(
?metadata,
?requester,
root=?validator_signing_data.root,
index=?validator_signing_data.index,
"sign_and_collect called",
);

// first, register notifier with preexisting or newly spawned instance
let cloned_metadata = metadata.clone();
let manager = self.clone();
Expand Down Expand Up @@ -294,14 +302,20 @@ impl SignatureCollectorManager {
Entry::Vacant(entry) => {
// this channel is effectively limited by the processor permit amount
let (tx, rx) = mpsc::unbounded_channel();
let span = info_span!(
"signature_collector",
?slot,
?validator_index,
?signing_root
);
entry.insert(SignatureCollector {
sender: tx.clone(),
for_slot: slot,
});
let _ = self
.processor
.permitless
.send_async(Box::pin(signature_collector(rx)), COLLECTOR_NAME);
let _ = self.processor.permitless.send_async(
Box::pin(signature_collector(rx).instrument(span)),
COLLECTOR_NAME,
);
debug!(
?signing_root,
?validator_index,
Expand Down
5 changes: 2 additions & 3 deletions anchor/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use signature_collector::{
use slashing_protection::{NotSafe, Safe, SlashingDatabase};
use slot_clock::SlotClock;
use ssv_types::consensus::{
BeaconVote, Contribution, DataSsz, ValidatorConsensusData, ValidatorDuty,
BeaconVote, Contribution, DataSsz, QbftData, ValidatorConsensusData, ValidatorDuty,
BEACON_ROLE_AGGREGATOR, BEACON_ROLE_PROPOSER, BEACON_ROLE_SYNC_COMMITTEE_CONTRIBUTION,
DATA_VERSION_ALTAIR, DATA_VERSION_BELLATRIX, DATA_VERSION_CAPELLA, DATA_VERSION_DENEB,
DATA_VERSION_PHASE0, DATA_VERSION_UNKNOWN,
Expand All @@ -37,7 +37,6 @@ use tokio::select;
use tokio::sync::{watch, Barrier, RwLock};
use tokio::time::sleep;
use tracing::{error, info, warn};
use tree_hash::TreeHash;
use types::attestation::Attestation;
use types::beacon_block::BeaconBlock;
use types::graffiti::Graffiti;
Expand Down Expand Up @@ -739,7 +738,7 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {
Completed::TimedOut => return Err(Error::SpecificError(SpecificError::Timeout)),
Completed::Success(data) => data,
};
let data_hash = data.tree_hash_root();
let data_hash = data.hash();
attestation.data_mut().beacon_block_root = data.block_root;
attestation.data_mut().source = data.source;
attestation.data_mut().target = data.target;
Expand Down
Loading