Skip to content

Commit 397c7a8

Browse files
committed
Add SQLite transaction modes
1 parent 9a59e1e commit 397c7a8

File tree

14 files changed

+240
-26
lines changed

14 files changed

+240
-26
lines changed

sea-orm-sync/src/database/executor.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,16 @@ impl TransactionTrait for DatabaseExecutor<'_> {
8989
}
9090
}
9191

92+
fn begin_with_options(
93+
&self,
94+
options: TransactionOptions,
95+
) -> Result<DatabaseTransaction, DbErr> {
96+
match self {
97+
DatabaseExecutor::Connection(conn) => conn.begin_with_options(options),
98+
DatabaseExecutor::Transaction(trans) => trans.begin_with_options(options),
99+
}
100+
}
101+
92102
fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
93103
where
94104
F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,

sea-orm-sync/src/database/restricted_connection.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,18 @@ impl TransactionTrait for RestrictedConnection {
230230
})
231231
}
232232

233+
#[instrument(level = "trace")]
234+
fn begin_with_options(
235+
&self,
236+
options: TransactionOptions,
237+
) -> Result<RestrictedTransaction, DbErr> {
238+
Ok(RestrictedTransaction {
239+
user_id: self.user_id,
240+
conn: self.conn.begin_with_options(options)?,
241+
rbac: self.conn.rbac.clone(),
242+
})
243+
}
244+
233245
/// Execute the function inside a transaction.
234246
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
235247
#[instrument(level = "trace", skip(callback))]
@@ -287,6 +299,18 @@ impl TransactionTrait for RestrictedTransaction {
287299
})
288300
}
289301

302+
#[instrument(level = "trace")]
303+
fn begin_with_options(
304+
&self,
305+
options: TransactionOptions,
306+
) -> Result<RestrictedTransaction, DbErr> {
307+
Ok(RestrictedTransaction {
308+
user_id: self.user_id,
309+
conn: self.conn.begin_with_options(options)?,
310+
rbac: self.conn.rbac.clone(),
311+
})
312+
}
313+
290314
/// Execute the function inside a transaction.
291315
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
292316
#[instrument(level = "trace", skip(callback))]

sea-orm-sync/src/database/transaction.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,25 @@ impl TransactionTrait for DatabaseTransaction {
593593
)
594594
}
595595

596+
#[instrument(level = "trace")]
597+
fn begin_with_options(
598+
&self,
599+
TransactionOptions {
600+
isolation_level,
601+
access_mode,
602+
sqlite_transaction_mode,
603+
}: TransactionOptions,
604+
) -> Result<DatabaseTransaction, DbErr> {
605+
DatabaseTransaction::begin(
606+
Arc::clone(&self.conn),
607+
self.backend,
608+
self.metric_callback.clone(),
609+
isolation_level,
610+
access_mode,
611+
sqlite_transaction_mode,
612+
)
613+
}
614+
596615
/// Execute the function inside a transaction.
597616
/// If the function returns an error, the transaction will be rolled back.
598617
/// Otherwise, the transaction will be committed.

src/database/connection.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
use std::{future::Future, pin::Pin};
2+
3+
use futures_util::Stream;
4+
15
use crate::{
26
DbBackend, DbErr, ExecResult, QueryResult, Statement, StatementBuilder, TransactionError,
37
};
4-
use futures_util::Stream;
5-
use std::{future::Future, pin::Pin};
68

79
/// The generic API for a database connection that can perform query or execute statements.
810
/// It abstracts database connection and transaction
@@ -125,6 +127,43 @@ impl std::fmt::Display for AccessMode {
125127
}
126128
}
127129

130+
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
131+
/// Which kind of transaction to start. Only supported by SQLite.
132+
/// <https://www.sqlite.org/lang_transaction.html>
133+
pub enum SqliteTransactionMode {
134+
/// The default. Transaction starts when the next statement is executed, and
135+
/// will be a read or write transaction depending on that statement.
136+
Deferred,
137+
/// Start a write transaction as soon as the BEGIN statement is received.
138+
Immediate,
139+
/// Start a write transaction as soon as the BEGIN statement is received.
140+
/// When in non-WAL mode, also block all other transactions from reading the
141+
/// database.
142+
Exclusive,
143+
}
144+
145+
impl SqliteTransactionMode {
146+
/// The keyword used to start a transaction in this mode (the word coming after "BEGIN").
147+
pub fn sqlite_keyword(&self) -> &'static str {
148+
match self {
149+
SqliteTransactionMode::Deferred => "DEFERRED",
150+
SqliteTransactionMode::Immediate => "IMMEDIATE",
151+
SqliteTransactionMode::Exclusive => "EXCLUSIVE",
152+
}
153+
}
154+
}
155+
156+
/// Configuration for starting a transaction
157+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
158+
pub struct TransactionOptions {
159+
/// Isolation level for the new transaction
160+
pub isolation_level: Option<IsolationLevel>,
161+
/// Access mode for the new transaction
162+
pub access_mode: Option<AccessMode>,
163+
/// Transaction mode (deferred, immediate, exclusive) for the new transaction. Supported only by SQLite.
164+
pub sqlite_transaction_mode: Option<SqliteTransactionMode>,
165+
}
166+
128167
/// Spawn database transaction
129168
#[async_trait::async_trait]
130169
pub trait TransactionTrait {
@@ -143,6 +182,13 @@ pub trait TransactionTrait {
143182
access_mode: Option<AccessMode>,
144183
) -> Result<Self::Transaction, DbErr>;
145184

185+
/// Execute SQL `BEGIN` transaction with isolation level and/or access mode.
186+
/// Returns a Transaction that can be committed or rolled back
187+
async fn begin_with_options(
188+
&self,
189+
options: TransactionOptions,
190+
) -> Result<Self::Transaction, DbErr>;
191+
146192
/// Execute the function inside a transaction.
147193
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
148194
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>

src/database/db_connection.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
AccessMode, ConnectionTrait, DatabaseTransaction, ExecResult, IsolationLevel, QueryResult,
33
Schema, SchemaBuilder, Statement, StatementBuilder, StreamTrait, TransactionError,
4-
TransactionTrait, error::*,
4+
TransactionOptions, TransactionTrait, error::*,
55
};
66
use std::{fmt::Debug, future::Future, pin::Pin};
77
use tracing::instrument;
@@ -357,9 +357,11 @@ impl TransactionTrait for DatabaseConnection {
357357
conn.begin(None, None).await
358358
}
359359
#[cfg(feature = "sqlx-sqlite")]
360-
DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => conn.begin(None, None).await,
360+
DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => {
361+
conn.begin(None, None, None).await
362+
}
361363
#[cfg(feature = "rusqlite")]
362-
DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.begin(None, None),
364+
DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.begin(None, None, None),
363365
#[cfg(feature = "mock")]
364366
DatabaseConnectionType::MockDatabaseConnection(conn) => {
365367
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
@@ -389,11 +391,50 @@ impl TransactionTrait for DatabaseConnection {
389391
}
390392
#[cfg(feature = "sqlx-sqlite")]
391393
DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => {
394+
conn.begin(_isolation_level, _access_mode, None).await
395+
}
396+
#[cfg(feature = "rusqlite")]
397+
DatabaseConnectionType::RusqliteSharedConnection(conn) => {
398+
conn.begin(_isolation_level, _access_mode, None)
399+
}
400+
#[cfg(feature = "mock")]
401+
DatabaseConnectionType::MockDatabaseConnection(conn) => {
402+
DatabaseTransaction::new_mock(Arc::clone(conn), None).await
403+
}
404+
#[cfg(feature = "proxy")]
405+
DatabaseConnectionType::ProxyDatabaseConnection(conn) => {
406+
DatabaseTransaction::new_proxy(conn.clone(), None).await
407+
}
408+
DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")),
409+
}
410+
}
411+
412+
#[instrument(level = "trace")]
413+
async fn begin_with_options(
414+
&self,
415+
TransactionOptions {
416+
isolation_level: _isolation_level,
417+
access_mode: _access_mode,
418+
sqlite_transaction_mode: _sqlite_transaction_mode,
419+
}: TransactionOptions,
420+
) -> Result<DatabaseTransaction, DbErr> {
421+
match &self.inner {
422+
#[cfg(feature = "sqlx-mysql")]
423+
DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => {
392424
conn.begin(_isolation_level, _access_mode).await
393425
}
426+
#[cfg(feature = "sqlx-postgres")]
427+
DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => {
428+
conn.begin(_isolation_level, _access_mode).await
429+
}
430+
#[cfg(feature = "sqlx-sqlite")]
431+
DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => {
432+
conn.begin(_isolation_level, _access_mode, _sqlite_transaction_mode)
433+
.await
434+
}
394435
#[cfg(feature = "rusqlite")]
395436
DatabaseConnectionType::RusqliteSharedConnection(conn) => {
396-
conn.begin(_isolation_level, _access_mode)
437+
conn.begin(_isolation_level, _access_mode, _sqlite_transaction_mode)
397438
}
398439
#[cfg(feature = "mock")]
399440
DatabaseConnectionType::MockDatabaseConnection(conn) => {

src/database/executor.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{
22
AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
3-
ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionTrait,
3+
ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionOptions,
4+
TransactionTrait,
45
};
56
use crate::{Schema, SchemaBuilder};
67
use std::future::Future;
@@ -91,6 +92,16 @@ impl TransactionTrait for DatabaseExecutor<'_> {
9192
}
9293
}
9394

95+
async fn begin_with_options(
96+
&self,
97+
options: TransactionOptions,
98+
) -> Result<DatabaseTransaction, DbErr> {
99+
match self {
100+
DatabaseExecutor::Connection(conn) => conn.begin_with_options(options).await,
101+
DatabaseExecutor::Transaction(trans) => trans.begin_with_options(options).await,
102+
}
103+
}
104+
94105
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
95106
where
96107
F: for<'c> FnOnce(

src/database/restricted_connection.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1-
use crate::rbac::{
2-
PermissionRequest, RbacEngine, RbacError, RbacPermissionsByResources,
3-
RbacResourcesAndPermissions, RbacRoleHierarchyList, RbacRolesAndRanks, RbacUserRolePermissions,
4-
ResourceRequest,
5-
entity::{role::RoleId, user::UserId},
6-
};
71
use crate::{
82
AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
93
ExecResult, IsolationLevel, QueryResult, Statement, StatementBuilder, TransactionError,
104
TransactionSession, TransactionTrait,
115
};
6+
use crate::{
7+
TransactionOptions,
8+
rbac::{
9+
PermissionRequest, RbacEngine, RbacError, RbacPermissionsByResources,
10+
RbacResourcesAndPermissions, RbacRoleHierarchyList, RbacRolesAndRanks,
11+
RbacUserRolePermissions, ResourceRequest,
12+
entity::{role::RoleId, user::UserId},
13+
},
14+
};
1215
use std::{
1316
pin::Pin,
1417
sync::{Arc, RwLock},
@@ -236,6 +239,18 @@ impl TransactionTrait for RestrictedConnection {
236239
})
237240
}
238241

242+
#[instrument(level = "trace")]
243+
async fn begin_with_options(
244+
&self,
245+
options: TransactionOptions,
246+
) -> Result<RestrictedTransaction, DbErr> {
247+
Ok(RestrictedTransaction {
248+
user_id: self.user_id,
249+
conn: self.conn.begin_with_options(options).await?,
250+
rbac: self.conn.rbac.clone(),
251+
})
252+
}
253+
239254
/// Execute the function inside a transaction.
240255
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
241256
#[instrument(level = "trace", skip(callback))]
@@ -306,6 +321,18 @@ impl TransactionTrait for RestrictedTransaction {
306321
})
307322
}
308323

324+
#[instrument(level = "trace")]
325+
async fn begin_with_options(
326+
&self,
327+
options: TransactionOptions,
328+
) -> Result<RestrictedTransaction, DbErr> {
329+
Ok(RestrictedTransaction {
330+
user_id: self.user_id,
331+
conn: self.conn.begin_with_options(options).await?,
332+
rbac: self.rbac.clone(),
333+
})
334+
}
335+
309336
/// Execute the function inside a transaction.
310337
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
311338
#[instrument(level = "trace", skip(callback))]

src/database/transaction.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
#![allow(unused_assignments)]
2+
use std::{future::Future, pin::Pin, sync::Arc};
3+
4+
use futures_util::lock::Mutex;
5+
#[cfg(feature = "sqlx-dep")]
6+
use sqlx::TransactionManager;
7+
use tracing::instrument;
8+
29
use crate::{
310
AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
4-
QueryResult, Statement, StreamTrait, TransactionSession, TransactionStream, TransactionTrait,
5-
debug_print, error::*,
11+
QueryResult, SqliteTransactionMode, Statement, StreamTrait, TransactionOptions,
12+
TransactionSession, TransactionStream, TransactionTrait, debug_print, error::*,
613
};
714
#[cfg(feature = "sqlx-dep")]
815
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
9-
use futures_util::lock::Mutex;
10-
#[cfg(feature = "sqlx-dep")]
11-
use sqlx::TransactionManager;
12-
use std::{future::Future, pin::Pin, sync::Arc};
13-
use tracing::instrument;
1416

1517
/// Defines a database transaction, whether it is an open transaction and the type of
1618
/// backend to use.
@@ -37,6 +39,7 @@ impl DatabaseTransaction {
3739
metric_callback: Option<crate::metric::Callback>,
3840
isolation_level: Option<IsolationLevel>,
3941
access_mode: Option<AccessMode>,
42+
sqlite_transaction_mode: Option<SqliteTransactionMode>,
4043
) -> Result<DatabaseTransaction, DbErr> {
4144
let res = DatabaseTransaction {
4245
conn,
@@ -92,7 +95,11 @@ impl DatabaseTransaction {
9295
access_mode,
9396
)
9497
.await?;
95-
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, None)
98+
// TODO using this for beginning a nested transaction currently causes an error. Should we make it a warning instead?
99+
let statement = sqlite_transaction_mode.map(|mode| {
100+
std::borrow::Cow::from(format!("BEGIN {}", mode.sqlite_keyword()))
101+
});
102+
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
96103
.await
97104
.map_err(sqlx_error_to_query_err)
98105
}
@@ -605,6 +612,7 @@ impl TransactionTrait for DatabaseTransaction {
605612
self.metric_callback.clone(),
606613
None,
607614
None,
615+
None,
608616
)
609617
.await
610618
}
@@ -621,6 +629,23 @@ impl TransactionTrait for DatabaseTransaction {
621629
self.metric_callback.clone(),
622630
isolation_level,
623631
access_mode,
632+
None,
633+
)
634+
.await
635+
}
636+
637+
#[instrument(level = "trace")]
638+
async fn begin_with_options(
639+
&self,
640+
options: TransactionOptions,
641+
) -> Result<DatabaseTransaction, DbErr> {
642+
DatabaseTransaction::begin(
643+
Arc::clone(&self.conn),
644+
self.backend,
645+
self.metric_callback.clone(),
646+
options.isolation_level,
647+
options.access_mode,
648+
options.sqlite_transaction_mode,
624649
)
625650
.await
626651
}

0 commit comments

Comments
 (0)