Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions sea-orm-sync/src/database/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,43 @@ impl std::fmt::Display for AccessMode {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Which kind of transaction to start. Only supported by SQLite.
/// <https://www.sqlite.org/lang_transaction.html>
pub enum SqliteTransactionMode {
/// The default. Transaction starts when the next statement is executed, and
/// will be a read or write transaction depending on that statement.
Deferred,
/// Start a write transaction as soon as the BEGIN statement is received.
Immediate,
/// Start a write transaction as soon as the BEGIN statement is received.
/// When in non-WAL mode, also block all other transactions from reading the
/// database.
Exclusive,
}

impl SqliteTransactionMode {
/// The keyword used to start a transaction in this mode (the word coming after "BEGIN").
pub fn sqlite_keyword(&self) -> &'static str {
match self {
SqliteTransactionMode::Deferred => "DEFERRED",
SqliteTransactionMode::Immediate => "IMMEDIATE",
SqliteTransactionMode::Exclusive => "EXCLUSIVE",
}
}
}

/// Configuration for starting a transaction
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
pub struct TransactionOptions {
/// Isolation level for the new transaction
pub isolation_level: Option<IsolationLevel>,
/// Access mode for the new transaction
pub access_mode: Option<AccessMode>,
/// Transaction mode (deferred, immediate, exclusive) for the new transaction. Supported only by SQLite.
pub sqlite_transaction_mode: Option<SqliteTransactionMode>,
}

/// Spawn database transaction
pub trait TransactionTrait {
/// The concrete type for the transaction
Expand All @@ -133,6 +170,10 @@ pub trait TransactionTrait {
access_mode: Option<AccessMode>,
) -> Result<Self::Transaction, DbErr>;

/// Execute SQL `BEGIN` transaction with isolation level and/or access mode.
/// Returns a Transaction that can be committed or rolled back
fn begin_with_options(&self, options: TransactionOptions) -> Result<Self::Transaction, DbErr>;

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
Expand Down
42 changes: 40 additions & 2 deletions sea-orm-sync/src/database/db_connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
AccessMode, ConnectionTrait, DatabaseTransaction, ExecResult, IsolationLevel, QueryResult,
Schema, SchemaBuilder, Statement, StatementBuilder, StreamTrait, TransactionError,
TransactionTrait, error::*,
TransactionOptions, TransactionTrait, error::*,
};
use std::fmt::Debug;
use tracing::instrument;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl TransactionTrait for DatabaseConnection {
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => conn.begin(None, None),
#[cfg(feature = "rusqlite")]
DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.begin(None, None),
DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.begin(None, None, None),
#[cfg(feature = "mock")]
DatabaseConnectionType::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None)
Expand Down Expand Up @@ -365,8 +365,46 @@ impl TransactionTrait for DatabaseConnection {
}
#[cfg(feature = "rusqlite")]
DatabaseConnectionType::RusqliteSharedConnection(conn) => {
conn.begin(_isolation_level, _access_mode, None)
}
#[cfg(feature = "mock")]
DatabaseConnectionType::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None)
}
#[cfg(feature = "proxy")]
DatabaseConnectionType::ProxyDatabaseConnection(conn) => {
DatabaseTransaction::new_proxy(conn.clone(), None)
}
DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")),
}
}

#[instrument(level = "trace")]
fn begin_with_options(
&self,
TransactionOptions {
isolation_level: _isolation_level,
access_mode: _access_mode,
sqlite_transaction_mode: _sqlite_transaction_mode,
}: TransactionOptions,
) -> Result<DatabaseTransaction, DbErr> {
match &self.inner {
#[cfg(feature = "sqlx-mysql")]
DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode)
}
#[cfg(feature = "sqlx-postgres")]
DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode)
}
#[cfg(feature = "sqlx-sqlite")]
DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => {
conn.begin(_isolation_level, _access_mode, _sqlite_transaction_mode)
}
#[cfg(feature = "rusqlite")]
DatabaseConnectionType::RusqliteSharedConnection(conn) => {
conn.begin(_isolation_level, _access_mode, _sqlite_transaction_mode)
}
#[cfg(feature = "mock")]
DatabaseConnectionType::MockDatabaseConnection(conn) => {
DatabaseTransaction::new_mock(Arc::clone(conn), None)
Expand Down
13 changes: 12 additions & 1 deletion sea-orm-sync/src/database/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionTrait,
ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionOptions,
TransactionTrait,
};
use crate::{Schema, SchemaBuilder};
use std::future::Future;
Expand Down Expand Up @@ -89,6 +90,16 @@ impl TransactionTrait for DatabaseExecutor<'_> {
}
}

fn begin_with_options(
&self,
options: TransactionOptions,
) -> Result<DatabaseTransaction, DbErr> {
match self {
DatabaseExecutor::Connection(conn) => conn.begin_with_options(options),
DatabaseExecutor::Transaction(trans) => trans.begin_with_options(options),
}
}

fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
where
F: for<'c> FnOnce(&'c DatabaseTransaction) -> Result<T, E>,
Expand Down
26 changes: 25 additions & 1 deletion sea-orm-sync/src/database/restricted_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::rbac::{
use crate::{
AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
ExecResult, IsolationLevel, QueryResult, Statement, StatementBuilder, TransactionError,
TransactionSession, TransactionTrait,
TransactionOptions, TransactionSession, TransactionTrait,
};
use std::{
pin::Pin,
Expand Down Expand Up @@ -230,6 +230,18 @@ impl TransactionTrait for RestrictedConnection {
})
}

#[instrument(level = "trace")]
fn begin_with_options(
&self,
options: TransactionOptions,
) -> Result<RestrictedTransaction, DbErr> {
Ok(RestrictedTransaction {
user_id: self.user_id,
conn: self.conn.begin_with_options(options)?,
rbac: self.conn.rbac.clone(),
})
}

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
#[instrument(level = "trace", skip(callback))]
Expand Down Expand Up @@ -287,6 +299,18 @@ impl TransactionTrait for RestrictedTransaction {
})
}

#[instrument(level = "trace")]
fn begin_with_options(
&self,
options: TransactionOptions,
) -> Result<RestrictedTransaction, DbErr> {
Ok(RestrictedTransaction {
user_id: self.user_id,
conn: self.conn.begin_with_options(options)?,
rbac: self.rbac.clone(),
})
}

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
#[instrument(level = "trace", skip(callback))]
Expand Down
34 changes: 31 additions & 3 deletions sea-orm-sync/src/database/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#![allow(unused_assignments)]
use crate::SqliteTransactionMode;
use crate::{
AccessMode, ConnectionTrait, DbBackend, DbErr, ExecResult, InnerConnection, IsolationLevel,
QueryResult, Statement, StreamTrait, TransactionSession, TransactionStream, TransactionTrait,
debug_print, error::*,
QueryResult, Statement, StreamTrait, TransactionOptions, TransactionSession, TransactionStream,
TransactionTrait, debug_print, error::*,
};
#[cfg(feature = "sqlx-dep")]
use crate::{sqlx_error_to_exec_err, sqlx_error_to_query_err};
Expand Down Expand Up @@ -37,6 +38,7 @@ impl DatabaseTransaction {
metric_callback: Option<crate::metric::Callback>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
sqlite_transaction_mode: Option<SqliteTransactionMode>,
) -> Result<DatabaseTransaction, DbErr> {
let res = DatabaseTransaction {
conn,
Expand Down Expand Up @@ -87,7 +89,12 @@ impl DatabaseTransaction {
isolation_level,
access_mode,
)?;
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, None)
// TODO using this for beginning a nested transaction currently causes an error. Should we make it a warning instead?
let statement = sqlite_transaction_mode.map(|mode| {
std::borrow::Cow::from(format!("BEGIN {}", mode.sqlite_keyword()))
});
<sqlx::Sqlite as sqlx::Database>::TransactionManager::begin(c, statement)
.await
.map_err(sqlx_error_to_query_err)
}
#[cfg(feature = "rusqlite")]
Expand Down Expand Up @@ -575,6 +582,7 @@ impl TransactionTrait for DatabaseTransaction {
self.metric_callback.clone(),
None,
None,
None,
)
}

Expand All @@ -590,6 +598,26 @@ impl TransactionTrait for DatabaseTransaction {
self.metric_callback.clone(),
isolation_level,
access_mode,
None,
)
}

#[instrument(level = "trace")]
fn begin_with_options(
&self,
TransactionOptions {
isolation_level,
access_mode,
sqlite_transaction_mode,
}: TransactionOptions,
) -> Result<DatabaseTransaction, DbErr> {
DatabaseTransaction::begin(
Arc::clone(&self.conn),
self.backend,
self.metric_callback.clone(),
isolation_level,
access_mode,
sqlite_transaction_mode,
)
}

Expand Down
1 change: 1 addition & 0 deletions sea-orm-sync/src/driver/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl crate::DatabaseTransaction {
metric_callback,
None,
None,
None,
)
}
}
8 changes: 5 additions & 3 deletions sea-orm-sync/src/driver/rusqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use sea_query_rusqlite::{RusqliteValue, RusqliteValues, rusqlite};

use crate::{
AccessMode, ColIdx, ConnectOptions, DatabaseConnection, DatabaseConnectionType,
DatabaseTransaction, InnerConnection, IsolationLevel, QueryStream, Statement, TransactionError,
error::*, executor::*,
DatabaseTransaction, InnerConnection, IsolationLevel, QueryStream, SqliteTransactionMode,
Statement, TransactionError, error::*, executor::*,
};

/// A helper class to connect to Rusqlite
Expand Down Expand Up @@ -333,6 +333,7 @@ impl RusqliteSharedConnection {
&self,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
sqlite_transaction_mode: Option<SqliteTransactionMode>,
) -> Result<DatabaseTransaction, DbErr> {
let conn = self.loan()?;
DatabaseTransaction::begin(
Expand All @@ -341,6 +342,7 @@ impl RusqliteSharedConnection {
self.metric_callback.clone(),
isolation_level,
access_mode,
sqlite_transaction_mode,
)
}

Expand All @@ -356,7 +358,7 @@ impl RusqliteSharedConnection {
F: for<'b> FnOnce(&'b DatabaseTransaction) -> Result<T, E>,
E: std::fmt::Display + std::fmt::Debug,
{
self.begin(isolation_level, access_mode)
self.begin(isolation_level, access_mode, None)
.map_err(|e| TransactionError::Connection(e))?
.run(callback)
}
Expand Down
50 changes: 48 additions & 2 deletions src/database/connection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::{future::Future, pin::Pin};

use futures_util::Stream;

use crate::{
DbBackend, DbErr, ExecResult, QueryResult, Statement, StatementBuilder, TransactionError,
};
use futures_util::Stream;
use std::{future::Future, pin::Pin};

/// The generic API for a database connection that can perform query or execute statements.
/// It abstracts database connection and transaction
Expand Down Expand Up @@ -125,6 +127,43 @@ impl std::fmt::Display for AccessMode {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
/// Which kind of transaction to start. Only supported by SQLite.
/// <https://www.sqlite.org/lang_transaction.html>
pub enum SqliteTransactionMode {
/// The default. Transaction starts when the next statement is executed, and
/// will be a read or write transaction depending on that statement.
Deferred,
/// Start a write transaction as soon as the BEGIN statement is received.
Immediate,
/// Start a write transaction as soon as the BEGIN statement is received.
/// When in non-WAL mode, also block all other transactions from reading the
/// database.
Exclusive,
}

impl SqliteTransactionMode {
/// The keyword used to start a transaction in this mode (the word coming after "BEGIN").
pub fn sqlite_keyword(&self) -> &'static str {
match self {
SqliteTransactionMode::Deferred => "DEFERRED",
SqliteTransactionMode::Immediate => "IMMEDIATE",
SqliteTransactionMode::Exclusive => "EXCLUSIVE",
}
}
}

/// Configuration for starting a transaction
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
pub struct TransactionOptions {
/// Isolation level for the new transaction
pub isolation_level: Option<IsolationLevel>,
/// Access mode for the new transaction
pub access_mode: Option<AccessMode>,
/// Transaction mode (deferred, immediate, exclusive) for the new transaction. Supported only by SQLite.
pub sqlite_transaction_mode: Option<SqliteTransactionMode>,
}

/// Spawn database transaction
#[async_trait::async_trait]
pub trait TransactionTrait {
Expand All @@ -143,6 +182,13 @@ pub trait TransactionTrait {
access_mode: Option<AccessMode>,
) -> Result<Self::Transaction, DbErr>;

/// Execute SQL `BEGIN` transaction with isolation level and/or access mode.
/// Returns a Transaction that can be committed or rolled back
async fn begin_with_options(
&self,
options: TransactionOptions,
) -> Result<Self::Transaction, DbErr>;

/// Execute the function inside a transaction.
/// If the function returns an error, the transaction will be rolled back. If it does not return an error, the transaction will be committed.
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
Expand Down
Loading
Loading