Skip to content

Commit 2289a42

Browse files
authored
Adjust DB transactions to be one per log batch (sigp#331)
1 parent d7b7b0f commit 2289a42

File tree

13 files changed

+281
-160
lines changed

13 files changed

+281
-160
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

anchor/database/src/cluster_operations.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use rusqlite::params;
1+
use rusqlite::{Transaction, params};
22
use ssv_types::{Cluster, ClusterId, OperatorId, Share, ValidatorMetadata};
33
use types::{Address, PublicKeyBytes};
44

@@ -13,10 +13,8 @@ impl NetworkDatabase {
1313
cluster: Cluster,
1414
validator: ValidatorMetadata,
1515
shares: Vec<Share>,
16+
tx: &Transaction<'_>,
1617
) -> Result<(), DatabaseError> {
17-
let mut conn = self.connection()?;
18-
let tx = conn.transaction()?;
19-
2018
// Insert the top level cluster data if it does not exist, and the associated validator
2119
// metadata
2220
tx.prepare_cached(SQL[&SqlStatement::InsertCluster])?
@@ -51,12 +49,9 @@ impl NetworkDatabase {
5149
// Insert the cluster member and the share
5250
tx.prepare_cached(SQL[&SqlStatement::InsertClusterMember])?
5351
.execute(params![*share.cluster_id, *share.operator_id])?;
54-
self.insert_share(&tx, share, &validator.public_key)
52+
self.insert_share(tx, share, &validator.public_key)
5553
})?;
5654

57-
// Commit all operations to the db
58-
tx.commit()?;
59-
6055
self.modify_state(|state| {
6156
// If we are a member in this cluster, store membership and our share
6257
if let Some(share) = our_share {
@@ -96,9 +91,13 @@ impl NetworkDatabase {
9691
}
9792

9893
/// Mark the cluster as liquidated or active
99-
pub fn update_status(&self, cluster_id: ClusterId, status: bool) -> Result<(), DatabaseError> {
100-
let conn = self.connection()?;
101-
conn.prepare_cached(SQL[&SqlStatement::UpdateClusterStatus])?
94+
pub fn update_status(
95+
&self,
96+
cluster_id: ClusterId,
97+
status: bool,
98+
tx: &Transaction<'_>,
99+
) -> Result<(), DatabaseError> {
100+
tx.prepare_cached(SQL[&SqlStatement::UpdateClusterStatus])?
102101
.execute(params![
103102
status, // status of the cluster (liquidated = false, active = true)
104103
*cluster_id // Id of the cluster
@@ -118,10 +117,13 @@ impl NetworkDatabase {
118117
/// Delete a validator from a cluster. This will cascade and remove all corresponding share
119118
/// data for this validator. If this validator is the last one in the cluster, the cluster
120119
/// and all corresponding cluster members will also be removed
121-
pub fn delete_validator(&self, validator_pubkey: &PublicKeyBytes) -> Result<(), DatabaseError> {
120+
pub fn delete_validator(
121+
&self,
122+
validator_pubkey: &PublicKeyBytes,
123+
tx: &Transaction<'_>,
124+
) -> Result<(), DatabaseError> {
122125
// Remove from database
123-
let conn = self.connection()?;
124-
conn.prepare_cached(SQL[&SqlStatement::DeleteValidator])?
126+
tx.prepare_cached(SQL[&SqlStatement::DeleteValidator])?
125127
.execute(params![validator_pubkey.to_string()])?;
126128

127129
self.modify_state(|state| {
@@ -150,10 +152,13 @@ impl NetworkDatabase {
150152
}
151153

152154
/// Bump the nonce of the owner
153-
pub fn bump_and_get_nonce(&self, owner: &Address) -> Result<u16, DatabaseError> {
155+
pub fn bump_and_get_nonce(
156+
&self,
157+
owner: &Address,
158+
tx: &Transaction<'_>,
159+
) -> Result<u16, DatabaseError> {
154160
// bump the nonce in the db
155-
let conn = self.connection()?;
156-
conn.prepare_cached(SQL[&SqlStatement::BumpNonce])?
161+
tx.prepare_cached(SQL[&SqlStatement::BumpNonce])?
157162
.execute(params![owner.to_string()])?;
158163

159164
let mut nonce = 0;

anchor/database/src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77

88
use openssl::{pkey::Public, rsa::Rsa};
99
use r2d2_sqlite::SqliteConnectionManager;
10-
use rusqlite::params;
10+
use rusqlite::{Transaction, params};
1111
use ssv_types::{Cluster, ClusterId, CommitteeId, Operator, OperatorId, Share, ValidatorMetadata};
1212
use tokio::sync::{
1313
watch,
@@ -166,9 +166,12 @@ impl NetworkDatabase {
166166

167167
/// Update the last processed block number in the database
168168
/// Also, trigger a notification for other code to act on the new state
169-
pub fn processed_block(&self, block_number: u64) -> Result<(), DatabaseError> {
170-
let conn = self.connection()?;
171-
conn.prepare_cached(SQL[&SqlStatement::UpdateBlockNumber])?
169+
pub fn processed_block(
170+
&self,
171+
block_number: u64,
172+
tx: &Transaction<'_>,
173+
) -> Result<(), DatabaseError> {
174+
tx.prepare_cached(SQL[&SqlStatement::UpdateBlockNumber])?
172175
.execute(params![block_number])?;
173176
self.state
174177
.send_modify(|state| state.single_state.last_processed_block = block_number);
@@ -213,7 +216,7 @@ impl NetworkDatabase {
213216
}
214217

215218
// Open a new connection
216-
fn connection(&self) -> Result<PoolConn, DatabaseError> {
219+
pub fn connection(&self) -> Result<PoolConn, DatabaseError> {
217220
Ok(self.conn_pool.get()?)
218221
}
219222

anchor/database/src/operator_operations.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
use base64::prelude::*;
2-
use rusqlite::params;
2+
use rusqlite::{Transaction, params};
33
use ssv_types::{Operator, OperatorId};
44

55
use super::{DatabaseError, NetworkDatabase, PubkeyOrId, SQL, SqlStatement};
66

77
/// Implements all operator related functionality on the database
88
impl NetworkDatabase {
99
/// Insert a new Operator into the database
10-
pub fn insert_operator(&self, operator: &Operator) -> Result<(), DatabaseError> {
10+
pub fn insert_operator(
11+
&self,
12+
operator: &Operator,
13+
tx: &Transaction<'_>,
14+
) -> Result<(), DatabaseError> {
1115
// 1ake sure that this operator does not already exist
1216
if self.state().operator_exists(&operator.id) {
1317
return Err(DatabaseError::NotFound(format!(
@@ -24,8 +28,7 @@ impl NetworkDatabase {
2428
let encoded = BASE64_STANDARD.encode(pem_key.clone());
2529

2630
// Insert into the database
27-
let conn = self.connection()?;
28-
conn.prepare_cached(SQL[&SqlStatement::InsertOperator])?
31+
tx.prepare_cached(SQL[&SqlStatement::InsertOperator])?
2932
.execute(params![
3033
*operator.id, // The id of the registered operator
3134
encoded, // RSA public key
@@ -56,7 +59,11 @@ impl NetworkDatabase {
5659
}
5760

5861
/// Delete an operator
59-
pub fn delete_operator(&self, id: OperatorId) -> Result<(), DatabaseError> {
62+
pub fn delete_operator(
63+
&self,
64+
id: OperatorId,
65+
tx: &Transaction<'_>,
66+
) -> Result<(), DatabaseError> {
6067
// Make sure that this operator exists
6168
if !self.state().operator_exists(&id) {
6269
return Err(DatabaseError::NotFound(format!(
@@ -67,8 +74,7 @@ impl NetworkDatabase {
6774

6875
// Remove from db and in memory. This should cascade to delete this operator from all of the
6976
// clusters that it is in and all of the shares that it owns
70-
let conn = self.connection()?;
71-
conn.prepare_cached(SQL[&SqlStatement::DeleteOperator])?
77+
tx.prepare_cached(SQL[&SqlStatement::DeleteOperator])?
7278
.execute(params![*id])?;
7379

7480
self.state.send_modify(|state| {

anchor/database/src/tests/cluster_tests.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ mod cluster_database_tests {
88
// Test inserting a cluster into the database
99
fn test_insert_retrieve_cluster() {
1010
let fixture = TestFixture::new();
11-
assertions::cluster::exists_in_db(&fixture.db, &fixture.cluster);
11+
12+
let mut conn = fixture.db.connection().unwrap();
13+
let tx = conn.transaction().unwrap();
14+
15+
assertions::cluster::exists_in_db(&fixture.cluster, &tx);
1216
assertions::cluster::exists_in_memory(&fixture.db, &fixture.cluster);
1317
assertions::validator::exists_in_memory(&fixture.db, &fixture.validator);
14-
assertions::validator::exists_in_db(&fixture.db, &fixture.validator);
15-
assertions::share::exists_in_db(
16-
&fixture.db,
17-
&fixture.validator.public_key,
18-
&fixture.shares,
19-
);
18+
assertions::validator::exists_in_db(&fixture.validator, &tx);
19+
assertions::share::exists_in_db(&fixture.validator.public_key, &fixture.shares, &tx);
2020
}
2121

2222
#[test]
@@ -25,14 +25,18 @@ mod cluster_database_tests {
2525
fn test_delete_last_validator() {
2626
let fixture = TestFixture::new();
2727
let pubkey = fixture.validator.public_key;
28-
assert!(fixture.db.delete_validator(&pubkey).is_ok());
28+
29+
let mut conn = fixture.db.connection().unwrap();
30+
let tx = conn.transaction().unwrap();
31+
32+
assert!(fixture.db.delete_validator(&pubkey, &tx).is_ok());
2933

3034
// Since there was only one validator in the cluster, everything should be removed
31-
assertions::cluster::exists_not_in_db(&fixture.db, fixture.cluster.cluster_id);
35+
assertions::cluster::exists_not_in_db(fixture.cluster.cluster_id, &tx);
3236
assertions::cluster::exists_not_in_memory(&fixture.db, fixture.cluster.cluster_id);
33-
assertions::validator::exists_not_in_db(&fixture.db, &fixture.validator);
37+
assertions::validator::exists_not_in_db(&fixture.validator, &tx);
3438
assertions::validator::exists_not_in_memory(&fixture.db, &fixture.validator);
35-
assertions::share::exists_not_in_db(&fixture.db, &pubkey);
39+
assertions::share::exists_not_in_db(&pubkey, &tx);
3640
assertions::share::exists_not_in_memory(&fixture.db, &pubkey);
3741
}
3842

@@ -43,17 +47,20 @@ mod cluster_database_tests {
4347
let mut cluster = fixture.cluster;
4448
let new_fee_recipient = Address::random();
4549

50+
let mut conn = fixture.db.connection().unwrap();
51+
let tx = conn.transaction().unwrap();
52+
4653
// Update fee recipient
4754
assert!(
4855
fixture
4956
.db
50-
.update_fee_recipient(cluster.owner, new_fee_recipient)
57+
.update_fee_recipient(cluster.owner, new_fee_recipient, &tx)
5158
.is_ok()
5259
);
5360

5461
// assertions will compare the data
5562
cluster.fee_recipient = new_fee_recipient;
56-
assertions::cluster::exists_in_db(&fixture.db, &cluster);
63+
assertions::cluster::exists_in_db(&cluster, &tx);
5764
assertions::cluster::exists_in_memory(&fixture.db, &cluster);
5865
}
5966

@@ -68,9 +75,11 @@ mod cluster_database_tests {
6875
OperatorId(1),
6976
&fixture.validator.public_key,
7077
)];
78+
let mut conn = fixture.db.connection().unwrap();
79+
let tx = conn.transaction().unwrap();
7180
fixture
7281
.db
73-
.insert_validator(cluster, metadata, shares)
82+
.insert_validator(cluster, metadata, shares, &tx)
7483
.expect_err("Insertion should fail");
7584
}
7685

@@ -80,25 +89,30 @@ mod cluster_database_tests {
8089
let fixture = TestFixture::new();
8190
let mut cluster = fixture.cluster;
8291

92+
let mut conn = fixture.db.connection().unwrap();
93+
let tx = conn.transaction().unwrap();
94+
8395
// Test updating to liquidated
8496
fixture
8597
.db
86-
.update_status(cluster.cluster_id, true)
98+
.update_status(cluster.cluster_id, true, &tx)
8799
.expect("Failed to update cluster status");
88100

89101
// verify in memory and db
90102
cluster.liquidated = true;
91-
assertions::cluster::exists_in_db(&fixture.db, &cluster);
103+
assertions::cluster::exists_in_db(&cluster, &tx);
92104
assertions::cluster::exists_in_memory(&fixture.db, &cluster);
93105
}
94106

95107
#[test]
96108
// Test inserting a cluster that already exists
97109
fn test_duplicate_cluster_insert() {
98110
let fixture = TestFixture::new();
111+
let mut conn = fixture.db.connection().unwrap();
112+
let tx = conn.transaction().unwrap();
99113
fixture
100114
.db
101-
.insert_validator(fixture.cluster, fixture.validator, fixture.shares)
115+
.insert_validator(fixture.cluster, fixture.validator, fixture.shares, &tx)
102116
.expect_err("Expected failure when inserting cluster that already exists");
103117
}
104118

@@ -108,26 +122,35 @@ mod cluster_database_tests {
108122
let fixture = TestFixture::new();
109123
let mut cluster = fixture.cluster;
110124

125+
let mut conn = fixture.db.connection().unwrap();
126+
let tx = conn.transaction().unwrap();
127+
111128
// Confirm that the fee recipient was inserted when the cluster was made
112-
let fee_recipient = fixture.db.fee_recipient_for_owner(&cluster.owner).unwrap();
129+
let fee_recipient = fixture
130+
.db
131+
.fee_recipient_for_owner(&cluster.owner, &tx)
132+
.unwrap();
113133
assert_eq!(fee_recipient, Some(cluster.fee_recipient));
114134

115135
// Update fee recipient
116136
let new_fee_recipient = Address::random();
117137
assert!(
118138
fixture
119139
.db
120-
.update_fee_recipient(cluster.owner, new_fee_recipient)
140+
.update_fee_recipient(cluster.owner, new_fee_recipient, &tx)
121141
.is_ok()
122142
);
123143

124144
// Confirm that fee recipient was updated
125145
cluster.fee_recipient = new_fee_recipient;
126-
assertions::cluster::exists_in_db(&fixture.db, &cluster);
146+
assertions::cluster::exists_in_db(&cluster, &tx);
127147
assertions::cluster::exists_in_memory(&fixture.db, &cluster);
128148

129149
// Confirm that we have set the correct fee recipient for the owner
130-
let stored_fee_recipient = fixture.db.fee_recipient_for_owner(&cluster.owner).unwrap();
150+
let stored_fee_recipient = fixture
151+
.db
152+
.fee_recipient_for_owner(&cluster.owner, &tx)
153+
.unwrap();
131154
assert_eq!(stored_fee_recipient, Some(new_fee_recipient));
132155
}
133156
}

0 commit comments

Comments
 (0)