Skip to content

Commit 1d7ec16

Browse files
Introduce validator index syncer (sigp#212)
Co-authored-by: diegomrsantos <diegomrsantos@gmail.com>
1 parent 4b9e06b commit 1d7ec16

File tree

23 files changed

+370
-157
lines changed

23 files changed

+370
-157
lines changed

Cargo.lock

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

anchor/client/src/cli.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,20 @@ pub struct Node {
109109
#[clap(
110110
long,
111111
value_name = "NETWORK_ADDRESSES",
112-
help = "Comma-separated addresses to one or more beacon node HTTP APIs. \
112+
help = "Comma-separated addresses to one or more execution node JSON-RPC APIs. \
113113
Default is http://localhost:8545.",
114114
display_order = 0
115115
)]
116-
pub execution_nodes: Option<Vec<String>>,
116+
pub execution_rpc: Option<Vec<String>>,
117+
118+
#[clap(
119+
long,
120+
value_name = "NETWORK_ADDRESSES",
121+
help = "Comma-separated addresses to one or more execution node WS APIs. \
122+
Default is ws://localhost:8546.",
123+
display_order = 0
124+
)]
125+
pub execution_ws: Option<Vec<String>>,
117126

118127
#[clap(
119128
long,

anchor/client/src/config.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub struct Config {
3636
pub proposer_nodes: Vec<SensitiveUrl>,
3737
/// The http endpoints of the execution node APIs.
3838
pub execution_nodes: Vec<SensitiveUrl>,
39+
/// The websocket endpoints of the execution node APIs.
40+
pub execution_nodes_websocket: Vec<SensitiveUrl>,
3941
/// beacon node is not synced at startup.
4042
pub allow_unsynced_beacon_node: bool,
4143
/// If true, use longer timeouts for requests made to the beacon node.
@@ -79,19 +81,18 @@ impl Config {
7981

8082
let beacon_nodes = vec![SensitiveUrl::parse(DEFAULT_BEACON_NODE)
8183
.expect("beacon_nodes must always be a valid url.")];
82-
let execution_nodes = vec![
83-
SensitiveUrl::parse(DEFAULT_EXECUTION_NODE)
84-
.expect("execution_nodes must always be a valid url."),
85-
SensitiveUrl::parse(DEFAULT_EXECUTION_NODE_WS)
86-
.expect("execution_nodes must always be a valid url."),
87-
];
84+
let execution_nodes = vec![SensitiveUrl::parse(DEFAULT_EXECUTION_NODE)
85+
.expect("execution_nodes must always be a valid url.")];
86+
let execution_nodes_websocket = vec![SensitiveUrl::parse(DEFAULT_EXECUTION_NODE_WS)
87+
.expect("execution_nodes_websocket must always be a valid url.")];
8888

8989
Self {
9090
data_dir,
9191
ssv_network,
9292
beacon_nodes,
9393
proposer_nodes: vec![],
9494
execution_nodes,
95+
execution_nodes_websocket,
9596
allow_unsynced_beacon_node: false,
9697
use_long_timeouts: false,
9798
http_api: <_>::default(),
@@ -127,20 +128,18 @@ pub fn from_cli(cli_args: &Node) -> Result<Config, String> {
127128
.map_err(|e| format!("Failed to create {:?}: {:?}", config.data_dir, e))?;
128129
}
129130

130-
if let Some(beacon_nodes) = &cli_args.beacon_nodes {
131-
config.beacon_nodes = beacon_nodes
132-
.iter()
133-
.map(|s| SensitiveUrl::parse(s))
134-
.collect::<Result<_, _>>()
135-
.map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?;
131+
if let Some(ref beacon_nodes) = cli_args.beacon_nodes {
132+
parse_urls(&mut config.beacon_nodes, beacon_nodes, "beacon node")?;
136133
}
137-
138-
if let Some(execution_nodes) = &cli_args.execution_nodes {
139-
config.execution_nodes = execution_nodes
140-
.iter()
141-
.map(|s| SensitiveUrl::parse(s))
142-
.collect::<Result<_, _>>()
143-
.map_err(|e| format!("Unable to parse execution node URL: {:?}", e))?;
134+
if let Some(ref execution_rpc) = cli_args.execution_rpc {
135+
parse_urls(&mut config.execution_nodes, execution_rpc, "execution RPC")?;
136+
}
137+
if let Some(ref execution_ws) = cli_args.execution_ws {
138+
parse_urls(
139+
&mut config.execution_nodes_websocket,
140+
execution_ws,
141+
"execution WebSocket",
142+
)?;
144143
}
145144

146145
// Password to decrypt rsa key file
@@ -236,6 +235,16 @@ pub fn from_cli(cli_args: &Node) -> Result<Config, String> {
236235
Ok(config)
237236
}
238237

238+
/// Read SensitiveUrls from given CLI Strings
239+
fn parse_urls(dest: &mut Vec<SensitiveUrl>, src: &[String], kind: &str) -> Result<(), String> {
240+
*dest = src
241+
.iter()
242+
.map(|s| SensitiveUrl::parse(s))
243+
.collect::<Result<_, _>>()
244+
.map_err(|e| format!("Unable to parse {kind} URL: {:?}", e))?;
245+
Ok(())
246+
}
247+
239248
/// Gets the listening_addresses for lighthouse based on the cli options.
240249
pub fn parse_listening_addresses(cli_args: &Node) -> Result<ListenAddress, String> {
241250
// parse the possible ips

anchor/client/src/lib.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use beacon_node_fallback::{
2020
pub use cli::Node;
2121
use config::Config;
2222
use database::NetworkDatabase;
23+
use eth::index_sync::start_validator_index_syncer;
2324
use eth2::{
2425
reqwest::{Certificate, ClientBuilder},
2526
BeaconNodeHttpClient, Timeouts,
@@ -105,6 +106,7 @@ impl Client {
105106
info!(
106107
beacon_nodes = format!("{:?}", &config.beacon_nodes),
107108
execution_nodes = format!("{:?}", &config.execution_nodes),
109+
execution_nodes_websocket = format!("{:?}", &config.execution_nodes_websocket),
108110
data_dir = format!("{:?}", config.data_dir),
109111
"Starting the Anchor client"
110112
);
@@ -322,25 +324,26 @@ impl Client {
322324
// Wait until genesis has occurred.
323325
wait_for_genesis(&beacon_nodes, genesis_time).await?;
324326

327+
// Start validator index syncer
328+
let index_sync_tx =
329+
start_validator_index_syncer(beacon_nodes.clone(), database.clone(), executor.clone());
330+
325331
// Start syncer
326332
let (historic_finished_tx, historic_finished_rx) = oneshot::channel();
327333
let mut syncer = eth::SsvEventSyncer::new(
328334
database.clone(),
329-
// TODO this is very hacky, but `eth::Config` has a TODO anyways
335+
index_sync_tx,
330336
eth::Config {
331337
http_url: config
332338
.execution_nodes
333339
.first()
334340
.ok_or("No execution node http url specified")?
335-
.full
336-
.to_string(),
341+
.clone(),
337342
ws_url: config
338-
.execution_nodes
339-
.get(1)
340-
.ok_or("No execution node wss url specified")?
341-
.full
342-
.to_string(),
343-
beacon_url: "".to_string(), // this one is not actually needed :)
343+
.execution_nodes_websocket
344+
.first()
345+
.ok_or("No execution node ws url specified")?
346+
.clone(),
344347
network: config.ssv_network.clone(),
345348
historic_finished_notify: Some(historic_finished_tx),
346349
},

anchor/common/ssv_types/src/cluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub struct ValidatorMetadata {
7878
/// The cluster that is responsible for this validator
7979
pub cluster_id: ClusterId,
8080
/// Index of the validator
81-
pub index: ValidatorIndex,
81+
pub index: Option<ValidatorIndex>,
8282
/// Graffiti
8383
pub graffiti: Graffiti,
8484
}

anchor/common/ssv_types/src/sql_conversions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl TryFrom<&Row<'_>> for ValidatorMetadata {
111111
let cluster_id: ClusterId = ClusterId(row.get(1)?);
112112

113113
// Get ValidatorIndex from column 2
114-
let index: ValidatorIndex = ValidatorIndex(row.get(2)?);
114+
let index = row.get::<_, Option<usize>>(2)?.map(ValidatorIndex);
115115

116116
// Get Graffiti from column 3
117117
let graffiti = Graffiti(row.get::<_, [u8; GRAFFITI_BYTES_LEN]>(3)?);

anchor/database/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ rand = { workspace = true }
1313
rusqlite = { workspace = true }
1414
ssv_types = { workspace = true }
1515
tokio = { workspace = true }
16+
tracing = { workspace = true }
1617
types = { workspace = true }
1718

1819
[dev-dependencies]

anchor/database/src/cluster_operations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl NetworkDatabase {
2929
.execute(params![
3030
validator.public_key.to_string(), // validator public key
3131
*cluster.cluster_id, // cluster id
32-
*validator.index, // validator index
32+
validator.index.as_deref(), // validator index
3333
validator.graffiti.0.as_slice(), // graffiti
3434
])?;
3535

anchor/database/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type PoolConn = r2d2::PooledConnection<SqliteConnectionManager>;
4545
/// Primary: public key of validator. uniquely identifies share
4646
/// Secondary: cluster id. corresponds to a list of shares
4747
/// Tertiary: owner of the cluster. corresponds to a list of shares
48-
pub(crate) type ShareMultiIndexMap = MultiIndexMap<
48+
pub type ShareMultiIndexMap = MultiIndexMap<
4949
PublicKeyBytes,
5050
ClusterId,
5151
Address,
@@ -59,7 +59,7 @@ pub(crate) type ShareMultiIndexMap = MultiIndexMap<
5959
/// Primary: public key of the validator. uniquely identifies the metadata
6060
/// Secondary: cluster id. corresponds to list of metadata for all validators
6161
/// Tertiary: owner of the cluster: corresponds to list of metadata for all validators
62-
pub(crate) type MetadataMultiIndexMap = MultiIndexMap<
62+
pub type MetadataMultiIndexMap = MultiIndexMap<
6363
PublicKeyBytes,
6464
ClusterId,
6565
Address,
@@ -73,7 +73,7 @@ pub(crate) type MetadataMultiIndexMap = MultiIndexMap<
7373
/// Primary: cluster id. uniquely identifies a cluster
7474
/// Secondary: public key of the validator. uniquely identifies a cluster
7575
/// Tertiary: owner of the cluster. uniquely identifies a cluster
76-
pub(crate) type ClusterMultiIndexMap = MultiIndexMap<
76+
pub type ClusterMultiIndexMap = MultiIndexMap<
7777
ClusterId,
7878
PublicKeyBytes,
7979
Address,

anchor/database/src/multi_index.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,10 @@ where
236236
// Only update the value in primary storage
237237
self.maps.primary.insert(k1.clone(), new_value)
238238
}
239+
240+
pub fn values(&self) -> impl Iterator<Item = &V> {
241+
self.maps.primary.values()
242+
}
239243
}
240244

241245
// Implement unique access for primary key.

0 commit comments

Comments
 (0)