Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions anchor/client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,14 @@ pub struct Node {
display_order = 0
)]
pub disable_slashing_protection: bool,

// debugging stuff
#[clap(
long,
hide = true,
help = "Act as if we were a certain operator, except for sending messages."
)]
pub impostor: Option<u64>,
}

pub fn get_color_style() -> Styles {
Expand Down
7 changes: 7 additions & 0 deletions anchor/client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use multiaddr::{Multiaddr, Protocol};
use network::{ListenAddr, ListenAddress};
use sensitive_url::SensitiveUrl;
use ssv_network_config::SsvNetworkConfig;
use ssv_types::OperatorId;
use tracing::{error, warn};

use crate::cli::Node;
Expand Down Expand Up @@ -60,6 +61,8 @@ pub struct Config {
pub password: Option<String>,
/// If slashing protection is disabled
pub disable_slashing_protection: bool,
/// Act as impostor
pub impostor: Option<OperatorId>,
}

impl Config {
Expand Down Expand Up @@ -103,6 +106,7 @@ impl Config {
processor: <_>::default(),
password: None,
disable_slashing_protection: false,
impostor: None,
}
}
}
Expand Down Expand Up @@ -232,6 +236,9 @@ pub fn from_cli(cli_args: &Node) -> Result<Config, String> {
config.http_metrics.listen_port = port;
}

// debugging stuff
config.impostor = cli_args.impostor.map(OperatorId);

Ok(config)
}

Expand Down
42 changes: 28 additions & 14 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use eth2::{
};
use keygen::{encryption::decrypt, run_keygen, Keygen};
use message_receiver::NetworkMessageReceiver;
use message_sender::NetworkMessageSender;
use message_sender::{impostor::ImpostorMessageSender, MessageSender, NetworkMessageSender};
use message_validator::Validator;
use network::Network;
use openssl::{pkey::Private, rsa::Rsa};
Expand Down Expand Up @@ -159,8 +159,15 @@ impl Client {

// Open database
let database = Arc::new(
NetworkDatabase::new(config.data_dir.join("anchor_db.sqlite").as_path(), &pubkey)
.map_err(|e| format!("Unable to open Anchor database: {e}"))?,
if let Some(impostor) = &config.impostor {
NetworkDatabase::new_as_impostor(
config.data_dir.join("anchor_db.sqlite").as_path(),
impostor,
)
} else {
NetworkDatabase::new(config.data_dir.join("anchor_db.sqlite").as_path(), &pubkey)
}
.map_err(|e| format!("Unable to open Anchor database: {e}"))?,
);

let subnet_tracker = start_subnet_tracker(
Expand Down Expand Up @@ -378,21 +385,28 @@ impl Client {
slot_clock.clone(),
));

let network_message_sender = NetworkMessageSender::new(
processor_senders.clone(),
network_tx.clone(),
key.clone(),
operator_id,
Some(message_validator.clone()),
network::SUBNET_COUNT,
)?;
let message_sender: Arc<dyn MessageSender> = if config.impostor.is_none() {
Arc::new(NetworkMessageSender::new(
processor_senders.clone(),
network_tx.clone(),
key.clone(),
operator_id,
Some(message_validator.clone()),
network::SUBNET_COUNT,
)?)
} else {
Arc::new(ImpostorMessageSender::new(
network_tx.clone(),
network::SUBNET_COUNT,
))
};

// Create the signature collector
let signature_collector = SignatureCollectorManager::new(
processor_senders.clone(),
operator_id,
config.ssv_network.ssv_domain_type.clone(),
network_message_sender.clone(),
message_sender.clone(),
slot_clock.clone(),
)
.map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?;
Expand All @@ -402,7 +416,7 @@ impl Client {
processor_senders.clone(),
operator_id,
slot_clock.clone(),
network_message_sender,
message_sender,
config.ssv_network.ssv_domain_type.clone(),
)
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;
Expand Down Expand Up @@ -441,7 +455,7 @@ impl Client {
slot_clock.clone(),
spec.clone(),
genesis_validators_root,
key,
config.impostor.is_none().then_some(key),
executor.clone(),
);

Expand Down
27 changes: 23 additions & 4 deletions anchor/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,18 @@ struct SingleState {
nonces: HashMap<Address, u16>,
}

#[derive(Debug)]
enum PubkeyOrId {
Pubkey(Rsa<Public>),
Id(OperatorId),
}

/// Top level NetworkDatabase that contains in memory storage for quick access
/// to relevant information and a connection to the database
#[derive(Debug)]
pub struct NetworkDatabase {
/// The public key of our operator
pubkey: Rsa<Public>,
/// The public key or ID of our operator
operator: PubkeyOrId,
/// Custom state stores for easy data access
state: watch::Sender<NetworkState>,
/// Connection to the database
Expand All @@ -127,9 +133,22 @@ impl NetworkDatabase {
/// Construct a new NetworkDatabase at the given path and the Public Key of the current operator
pub fn new(path: &Path, pubkey: &Rsa<Public>) -> Result<Self, DatabaseError> {
let conn_pool = Self::open_or_create(path)?;
let state = watch::Sender::new(NetworkState::new_with_state(&conn_pool, pubkey)?);
let operator = PubkeyOrId::Pubkey(pubkey.clone());
let state = watch::Sender::new(NetworkState::new_with_state(&conn_pool, &operator)?);
Ok(Self {
operator,
state,
conn_pool,
})
}

/// Act as if we had the pubkey of a certain operator
pub fn new_as_impostor(path: &Path, operator: &OperatorId) -> Result<Self, DatabaseError> {
let conn_pool = Self::open_or_create(path)?;
let operator = PubkeyOrId::Id(*operator);
let state = watch::Sender::new(NetworkState::new_with_state(&conn_pool, &operator)?);
Ok(Self {
pubkey: pubkey.clone(),
operator,
state,
conn_pool,
})
Expand Down
9 changes: 7 additions & 2 deletions anchor/database/src/operator_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use base64::prelude::*;
use rusqlite::params;
use ssv_types::{Operator, OperatorId};

use super::{DatabaseError, NetworkDatabase, SqlStatement, SQL};
use super::{DatabaseError, NetworkDatabase, PubkeyOrId, SqlStatement, SQL};

/// Implements all operator related functionality on the database
impl NetworkDatabase {
Expand Down Expand Up @@ -36,7 +36,12 @@ impl NetworkDatabase {
// Check to see if this operator is the current operator
if state.single_state.id.is_none() {
// If the keys match, this is the current operator so we want to save the id
let keys_match = pem_key == self.pubkey.public_key_to_pem().unwrap_or_default();
let keys_match = match &self.operator {
PubkeyOrId::Pubkey(pubkey) => {
pem_key == pubkey.public_key_to_pem().unwrap_or_default()
}
PubkeyOrId::Id(id) => *id == operator.id,
};
if keys_match {
state.single_state.id = Some(operator.id);
}
Expand Down
11 changes: 7 additions & 4 deletions anchor/database/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use types::{Address, PublicKeyBytes};

use crate::{
ClusterMultiIndexMap, DatabaseError, MetadataMultiIndexMap, MultiIndexMap, MultiState,
NonUniqueIndex, Pool, PoolConn, ShareMultiIndexMap, SingleState, SqlStatement, UniqueIndex,
SQL,
NonUniqueIndex, Pool, PoolConn, PubkeyOrId, ShareMultiIndexMap, SingleState, SqlStatement,
UniqueIndex, SQL,
};

// Container to hold all network state
Expand All @@ -29,7 +29,7 @@ impl NetworkState {
/// Build the network state from the database data
pub(crate) fn new_with_state(
conn_pool: &Pool,
pubkey: &Rsa<Public>,
operator: &PubkeyOrId,
) -> Result<Self, DatabaseError> {
// Get database connection from the pool
let conn = conn_pool.get()?;
Expand All @@ -41,7 +41,10 @@ impl NetworkState {
// key is stored the database. If it does not exist, that means the operator still
// has to be registered with the network contract or that we have not seen the
// corresponding event yet
let id = Self::does_self_exist(&conn, pubkey)?;
let id = match operator {
PubkeyOrId::Pubkey(pubkey) => Self::does_self_exist(&conn, pubkey)?,
PubkeyOrId::Id(id) => Some(*id),
};

// First Phase: Fetch data from the database
// 1) OperatorId -> Operator
Expand Down
41 changes: 41 additions & 0 deletions anchor/message_sender/src/impostor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use ssv_types::{consensus::UnsignedSSVMessage, message::SignedSSVMessage, CommitteeId};
use subnet_tracker::SubnetId;
use tokio::sync::mpsc;
use tracing::debug;

use crate::{Error, MessageCallback, MessageSender};

#[derive(Clone)]
pub struct ImpostorMessageSender {
// we only hold this so network does not get sad over the closed channel lol
_network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
subnet_count: usize,
}

impl MessageSender for ImpostorMessageSender {
fn sign_and_send(
&self,
msg: UnsignedSSVMessage,
committee_id: CommitteeId,
_additional_message_callback: Option<Box<MessageCallback>>,
) -> Result<(), Error> {
let subnet = SubnetId::from_committee(committee_id, self.subnet_count);
debug!(?msg, ?subnet, "Would send message");
Ok(())
}

fn send(&self, msg: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error> {
let subnet = SubnetId::from_committee(committee_id, self.subnet_count);
debug!(?msg, ?subnet, "Would send message");
Ok(())
}
}

impl ImpostorMessageSender {
pub fn new(network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>, subnet_count: usize) -> Self {
Self {
_network_tx: network_tx,
subnet_count,
}
}
}
1 change: 1 addition & 0 deletions anchor/message_sender/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod network;

pub mod impostor;
#[cfg(feature = "testing")]
pub mod testing;

Expand Down
4 changes: 2 additions & 2 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ impl QbftManager {
processor: Senders,
operator_id: OperatorId,
slot_clock: impl SlotClock + 'static,
message_sender: impl MessageSender + 'static,
message_sender: Arc<dyn MessageSender>,
domain: DomainType,
) -> Result<Arc<Self>, QbftError> {
let manager = Arc::new(QbftManager {
processor,
operator_id,
validator_consensus_data_instances: DashMap::new(),
beacon_vote_instances: DashMap::new(),
message_sender: Arc::new(message_sender),
message_sender,
domain,
});

Expand Down
2 changes: 1 addition & 1 deletion anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ where
sender_queues.clone(),
operator_id,
slot_clock.clone(),
MockMessageSender::new(network_tx.clone(), operator_id),
Arc::new(MockMessageSender::new(network_tx.clone(), operator_id)),
DomainType([0; 4]),
)
.expect("Creation should not fail");
Expand Down
23 changes: 15 additions & 8 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ impl SignatureCollectorManager {
processor: Senders,
operator_id: OperatorId,
domain: DomainType,
message_sender: impl MessageSender + 'static,
message_sender: Arc<dyn MessageSender>,
slot_clock: impl SlotClock + 'static,
) -> Result<Arc<Self>, CollectionError> {
let manager = Arc::new(Self {
processor,
operator_id,
domain,
message_sender: Arc::new(message_sender),
message_sender,
signature_collectors: DashMap::new(),
committee_signatures: DashMap::new(),
});
Expand Down Expand Up @@ -140,9 +140,13 @@ impl SignatureCollectorManager {
self.processor.urgent_consensus.send_blocking(
move || {
trace!(root = ?validator_signing_data.root, "Signing...");
let partial_signature = validator_signing_data
.share
.sign(validator_signing_data.root);
// If we have no share, we can not actually sign the message, because we are running
// in impostor mode.
let partial_signature = if let Some(share) = &validator_signing_data.share {
share.sign(validator_signing_data.root)
} else {
Signature::empty()
};
trace!(root = ?validator_signing_data.root, "Signed");

let message = PartialSignatureMessage {
Expand Down Expand Up @@ -222,8 +226,11 @@ impl SignatureCollectorManager {
}
}

// Finally, make the local instance aware of the partial signature.
let _ = manager.receive_partial_signature(message, metadata.slot);
// Finally, make the local instance aware of the partial signature, if it is a real
// signature.
if validator_signing_data.share.is_some() {
let _ = manager.receive_partial_signature(message, metadata.slot);
}
},
SIGNER_NAME,
)?;
Expand Down Expand Up @@ -399,7 +406,7 @@ pub enum SignatureRequester {
pub struct ValidatorSigningData {
pub root: Hash256,
pub index: ValidatorIndex,
pub share: SecretKey,
pub share: Option<SecretKey>,
}

struct CollectorMessage {
Expand Down
Loading
Loading