diesel/connection/
transaction_manager.rs

1use crate::connection::Connection;
2use crate::result::{Error, QueryResult};
3use std::borrow::Cow;
4use std::num::NonZeroU32;
5
6/// Manages the internal transaction state for a connection.
7///
8/// You will not need to interact with this trait, unless you are writing an
9/// implementation of [`Connection`].
10pub trait TransactionManager<Conn: Connection> {
11    /// Data stored as part of the connection implementation
12    /// to track the current transaction state of a connection
13    type TransactionStateData;
14
15    /// Begin a new transaction or savepoint
16    ///
17    /// If the transaction depth is greater than 0,
18    /// this should create a savepoint instead.
19    /// This function is expected to increment the transaction depth by 1.
20    fn begin_transaction(conn: &mut Conn) -> QueryResult<()>;
21
22    /// Rollback the inner-most transaction or savepoint
23    ///
24    /// If the transaction depth is greater than 1,
25    /// this should rollback to the most recent savepoint.
26    /// This function is expected to decrement the transaction depth by 1.
27    fn rollback_transaction(conn: &mut Conn) -> QueryResult<()>;
28
29    /// Commit the inner-most transaction or savepoint
30    ///
31    /// If the transaction depth is greater than 1,
32    /// this should release the most recent savepoint.
33    /// This function is expected to decrement the transaction depth by 1.
34    fn commit_transaction(conn: &mut Conn) -> QueryResult<()>;
35
36    /// Fetch the current transaction status as mutable
37    ///
38    /// Used to ensure that `begin_test_transaction` is not called when already
39    /// inside of a transaction, and that operations are not run in a `InError`
40    /// transaction manager.
41    #[diesel_derives::__diesel_public_if(
42        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
43    )]
44    fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus;
45
46    /// Executes the given function inside of a database transaction
47    ///
48    /// Each implementation of this function needs to fulfill the documented
49    /// behaviour of [`Connection::transaction`]
50    fn transaction<F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
51    where
52        F: FnOnce(&mut Conn) -> Result<R, E>,
53        E: From<Error>,
54    {
55        Self::begin_transaction(conn)?;
56        match callback(&mut *conn) {
57            Ok(value) => {
58                Self::commit_transaction(conn)?;
59                Ok(value)
60            }
61            Err(user_error) => match Self::rollback_transaction(conn) {
62                Ok(()) => Err(user_error),
63                Err(Error::BrokenTransactionManager) => {
64                    // In this case we are probably more interested by the
65                    // original error, which likely caused this
66                    Err(user_error)
67                }
68                Err(rollback_error) => Err(rollback_error.into()),
69            },
70        }
71    }
72
73    /// This methods checks if the connection manager is considered to be broken
74    /// by connection pool implementations
75    ///
76    /// A connection manager is considered to be broken by default if it either
77    /// contains an open transaction (because you don't want to have connections
78    /// with open transactions in your pool) or when the transaction manager is
79    /// in an error state.
80    #[diesel_derives::__diesel_public_if(
81        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
82    )]
83    fn is_broken_transaction_manager(conn: &mut Conn) -> bool {
84        match Self::transaction_manager_status_mut(conn).transaction_state() {
85            // all transactions are closed
86            // so we don't consider this connection broken
87            Ok(ValidTransactionManagerStatus {
88                in_transaction: None,
89            }) => false,
90            // The transaction manager is in an error state
91            // Therefore we consider this connection broken
92            Err(_) => true,
93            // The transaction manager contains a open transaction
94            // we do consider this connection broken
95            // if that transaction was not opened by `begin_test_transaction`
96            Ok(ValidTransactionManagerStatus {
97                in_transaction: Some(s),
98            }) => !s.test_transaction,
99        }
100    }
101}
102
103/// An implementation of `TransactionManager` which can be used for backends
104/// which use ANSI standard syntax for savepoints such as SQLite and PostgreSQL.
105#[derive(#[automatically_derived]
impl ::core::default::Default for AnsiTransactionManager {
    #[inline]
    fn default() -> AnsiTransactionManager {
        AnsiTransactionManager { status: ::core::default::Default::default() }
    }
}Default, #[automatically_derived]
impl ::core::fmt::Debug for AnsiTransactionManager {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field1_finish(f,
            "AnsiTransactionManager", "status", &&self.status)
    }
}Debug)]
106pub struct AnsiTransactionManager {
107    pub(crate) status: TransactionManagerStatus,
108}
109
110/// Status of the transaction manager
111#[doc = " Status of the transaction manager"]
pub enum TransactionManagerStatus {

    /// Valid status, the manager can run operations
    Valid(ValidTransactionManagerStatus),

    /// Error status, probably following a broken connection. The manager will no longer run operations
    InError,
}#[diesel_derives::__diesel_public_if(
112    feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
113)]
114#[derive(#[automatically_derived]
impl ::core::fmt::Debug for TransactionManagerStatus {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        match self {
            TransactionManagerStatus::Valid(__self_0) =>
                ::core::fmt::Formatter::debug_tuple_field1_finish(f, "Valid",
                    &__self_0),
            TransactionManagerStatus::InError =>
                ::core::fmt::Formatter::write_str(f, "InError"),
        }
    }
}Debug)]
115pub enum TransactionManagerStatus {
116    /// Valid status, the manager can run operations
117    Valid(ValidTransactionManagerStatus),
118    /// Error status, probably following a broken connection. The manager will no longer run operations
119    InError,
120}
121
122impl Default for TransactionManagerStatus {
123    fn default() -> Self {
124        TransactionManagerStatus::Valid(ValidTransactionManagerStatus::default())
125    }
126}
127
128impl TransactionManagerStatus {
129    /// Returns the transaction depth if the transaction manager's status is valid, or returns
130    /// [`Error::BrokenTransactionManager`] if the transaction manager is in error.
131    pub fn transaction_depth(&self) -> QueryResult<Option<NonZeroU32>> {
132        match self {
133            TransactionManagerStatus::Valid(valid_status) => Ok(valid_status.transaction_depth()),
134            TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
135        }
136    }
137
138    #[cfg(any(
139        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
140        feature = "postgres",
141        feature = "mysql",
142        test
143    ))]
144    #[diesel_derives::__diesel_public_if(
145        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
146    )]
147    /// If in transaction and transaction manager is not broken, registers that it's possible that
148    /// the connection can not be used anymore until top-level transaction is rolled back.
149    ///
150    /// If that is registered, savepoints rollbacks will still be attempted, but failure to do so
151    /// will not result in an error. (Some may succeed, some may not.)
152    pub(crate) fn set_requires_rollback_maybe_up_to_top_level(&mut self, to: bool) {
153        if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
154            in_transaction:
155                Some(InTransactionStatus {
156                    requires_rollback_maybe_up_to_top_level,
157                    ..
158                }),
159        }) = self
160        {
161            *requires_rollback_maybe_up_to_top_level = to;
162        }
163    }
164
165    /// Sets the transaction manager status to InError
166    ///
167    /// Subsequent attempts to use transaction-related features will result in a
168    /// [`Error::BrokenTransactionManager`] error
169    pub fn set_in_error(&mut self) {
170        *self = TransactionManagerStatus::InError
171    }
172
173    /// Expose access to the inner transaction state
174    ///
175    /// This function returns an error if the Transaction manager is in a broken
176    /// state
177    #[diesel_derives::__diesel_public_if(
178        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
179    )]
180    pub(self) fn transaction_state(&mut self) -> QueryResult<&mut ValidTransactionManagerStatus> {
181        match self {
182            TransactionManagerStatus::Valid(valid_status) => Ok(valid_status),
183            TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
184        }
185    }
186
187    /// This function allows to flag a transaction manager
188    /// in such a way that it contains a test transaction.
189    ///
190    /// This will disable some checks in regards to open transactions
191    /// to allow `Connection::begin_test_transaction` to work with
192    /// pooled connections as well
193    #[diesel_derives::__diesel_public_if(
194        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
195    )]
196    pub(crate) fn set_test_transaction_flag(&mut self) {
197        if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
198            in_transaction: Some(s),
199        }) = self
200        {
201            s.test_transaction = true;
202        }
203    }
204}
205
206/// Valid transaction status for the manager. Can return the current transaction depth
207#[allow(missing_copy_implementations)]
208#[derive(#[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::fmt::Debug for ValidTransactionManagerStatus {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field1_finish(f,
            "ValidTransactionManagerStatus", "in_transaction",
            &&self.in_transaction)
    }
}Debug, #[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::default::Default for ValidTransactionManagerStatus {
    #[inline]
    fn default() -> ValidTransactionManagerStatus {
        ValidTransactionManagerStatus {
            in_transaction: ::core::default::Default::default(),
        }
    }
}Default)]
209#[doc =
" Valid transaction status for the manager. Can return the current transaction depth"]
#[allow(missing_copy_implementations)]
#[non_exhaustive]
pub struct ValidTransactionManagerStatus {
    #[doc = " Inner status, or `None` if no transaction is running"]
    pub in_transaction: Option<InTransactionStatus>,
}#[diesel_derives::__diesel_public_if(
210    feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
211    public_fields(in_transaction)
212)]
213pub struct ValidTransactionManagerStatus {
214    /// Inner status, or `None` if no transaction is running
215    in_transaction: Option<InTransactionStatus>,
216}
217
218/// Various status fields to track the status of
219/// a transaction manager with a started transaction
220#[allow(missing_copy_implementations)]
221#[derive(#[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::fmt::Debug for InTransactionStatus {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field3_finish(f,
            "InTransactionStatus", "transaction_depth",
            &self.transaction_depth,
            "requires_rollback_maybe_up_to_top_level",
            &self.requires_rollback_maybe_up_to_top_level, "test_transaction",
            &&self.test_transaction)
    }
}Debug)]
222#[doc = " Various status fields to track the status of"]
#[doc = " a transaction manager with a started transaction"]
#[allow(missing_copy_implementations)]
#[non_exhaustive]
pub struct InTransactionStatus {
    #[doc = " The current depth of nested transactions"]
    pub transaction_depth: NonZeroU32,
    #[doc =
    " If that is registered, savepoints rollbacks will still be attempted, but failure to do so"]
    #[doc = " will not result in an error. (Some may succeed, some may not.)"]
    pub requires_rollback_maybe_up_to_top_level: bool,
    #[doc = " Is this transaction manager status marked as test-transaction?"]
    pub test_transaction: bool,
}#[diesel_derives::__diesel_public_if(
223    feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
224    public_fields(
225        test_transaction,
226        transaction_depth,
227        requires_rollback_maybe_up_to_top_level
228    )
229)]
230pub struct InTransactionStatus {
231    /// The current depth of nested transactions
232    transaction_depth: NonZeroU32,
233    /// If that is registered, savepoints rollbacks will still be attempted, but failure to do so
234    /// will not result in an error. (Some may succeed, some may not.)
235    requires_rollback_maybe_up_to_top_level: bool,
236    /// Is this transaction manager status marked as test-transaction?
237    test_transaction: bool,
238}
239
240impl ValidTransactionManagerStatus {
241    /// Return the current transaction depth
242    ///
243    /// This value is `None` if no current transaction is running
244    /// otherwise the number of nested transactions is returned.
245    pub fn transaction_depth(&self) -> Option<NonZeroU32> {
246        self.in_transaction.as_ref().map(|it| it.transaction_depth)
247    }
248
249    /// Update the transaction depth by adding the value of the `transaction_depth_change` parameter if the `query` is
250    /// `Ok(())`
251    pub fn change_transaction_depth(
252        &mut self,
253        transaction_depth_change: TransactionDepthChange,
254    ) -> QueryResult<()> {
255        match (&mut self.in_transaction, transaction_depth_change) {
256            (Some(in_transaction), TransactionDepthChange::IncreaseDepth) => {
257                // Can be replaced with saturating_add directly on NonZeroU32 once
258                // <https://github.com/rust-lang/rust/issues/84186> is stable
259                in_transaction.transaction_depth =
260                    NonZeroU32::new(in_transaction.transaction_depth.get().saturating_add(1))
261                        .expect("nz + nz is always non-zero");
262                Ok(())
263            }
264            (Some(in_transaction), TransactionDepthChange::DecreaseDepth) => {
265                // This sets `transaction_depth` to `None` as soon as we reach zero
266                match NonZeroU32::new(in_transaction.transaction_depth.get() - 1) {
267                    Some(depth) => in_transaction.transaction_depth = depth,
268                    None => self.in_transaction = None,
269                }
270                Ok(())
271            }
272            (None, TransactionDepthChange::IncreaseDepth) => {
273                self.in_transaction = Some(InTransactionStatus {
274                    transaction_depth: NonZeroU32::new(1).expect("1 is non-zero"),
275                    requires_rollback_maybe_up_to_top_level: false,
276                    test_transaction: false,
277                });
278                Ok(())
279            }
280            (None, TransactionDepthChange::DecreaseDepth) => {
281                // We screwed up something somewhere
282                // we cannot decrease the transaction count if
283                // we are not inside a transaction
284                Err(Error::NotInTransaction)
285            }
286        }
287    }
288}
289
290/// Represents a change to apply to the depth of a transaction
291#[derive(#[automatically_derived]
impl ::core::fmt::Debug for TransactionDepthChange {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::write_str(f,
            match self {
                TransactionDepthChange::IncreaseDepth => "IncreaseDepth",
                TransactionDepthChange::DecreaseDepth => "DecreaseDepth",
            })
    }
}Debug, #[automatically_derived]
impl ::core::clone::Clone for TransactionDepthChange {
    #[inline]
    fn clone(&self) -> TransactionDepthChange { *self }
}Clone, #[automatically_derived]
impl ::core::marker::Copy for TransactionDepthChange { }Copy)]
292pub enum TransactionDepthChange {
293    /// Increase the depth of the transaction (corresponds to `BEGIN` or `SAVEPOINT`)
294    IncreaseDepth,
295    /// Decreases the depth of the transaction (corresponds to `COMMIT`/`RELEASE SAVEPOINT` or `ROLLBACK`)
296    DecreaseDepth,
297}
298
299impl AnsiTransactionManager {
300    fn get_transaction_state<Conn>(
301        conn: &mut Conn,
302    ) -> QueryResult<&mut ValidTransactionManagerStatus>
303    where
304        Conn: Connection<TransactionManager = Self>,
305    {
306        conn.transaction_state().status.transaction_state()
307    }
308
309    /// Begin a transaction with custom SQL
310    ///
311    /// This is used by connections to implement more complex transaction APIs
312    /// to set things such as isolation levels.
313    /// Returns an error if already inside of a transaction.
314    pub fn begin_transaction_sql<Conn>(conn: &mut Conn, sql: &str) -> QueryResult<()>
315    where
316        Conn: Connection<TransactionManager = Self>,
317    {
318        let state = Self::get_transaction_state(conn)?;
319        if let Some(_depth) = state.transaction_depth() {
320            return Err(Error::AlreadyInTransaction);
321        }
322        let instrumentation_depth = NonZeroU32::new(1);
323        // Keep remainder of this method in sync with `begin_transaction()`.
324
325        conn.instrumentation().on_connection_event(
326            super::instrumentation::InstrumentationEvent::BeginTransaction {
327                depth: instrumentation_depth.expect("We know that 1 is not zero"),
328            },
329        );
330        conn.batch_execute(sql)?;
331        Self::get_transaction_state(conn)?
332            .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
333
334        Ok(())
335    }
336}
337
338impl<Conn> TransactionManager<Conn> for AnsiTransactionManager
339where
340    Conn: Connection<TransactionManager = Self>,
341{
342    type TransactionStateData = Self;
343
344    fn begin_transaction(conn: &mut Conn) -> QueryResult<()> {
345        let transaction_state = Self::get_transaction_state(conn)?;
346        let transaction_depth = transaction_state.transaction_depth();
347        let start_transaction_sql = match transaction_depth {
348            None => Cow::from("BEGIN"),
349            Some(transaction_depth) => {
350                Cow::from(::alloc::__export::must_use({
        ::alloc::fmt::format(format_args!("SAVEPOINT diesel_savepoint_{0}",
                transaction_depth))
    })format!("SAVEPOINT diesel_savepoint_{transaction_depth}"))
351            }
352        };
353        let instrumentation_depth =
354            NonZeroU32::new(transaction_depth.map_or(0, NonZeroU32::get).wrapping_add(1));
355        let sql = &start_transaction_sql;
356        // Keep remainder of this method in sync with `begin_transaction_sql()`.
357
358        conn.instrumentation().on_connection_event(
359            super::instrumentation::InstrumentationEvent::BeginTransaction {
360                depth: instrumentation_depth.expect("Transaction depth is too large"),
361            },
362        );
363        conn.batch_execute(sql)?;
364        Self::get_transaction_state(conn)?
365            .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
366
367        Ok(())
368    }
369
370    fn rollback_transaction(conn: &mut Conn) -> QueryResult<()> {
371        let transaction_state = Self::get_transaction_state(conn)?;
372
373        let (
374            (rollback_sql, rolling_back_top_level),
375            requires_rollback_maybe_up_to_top_level_before_execute,
376        ) = match transaction_state.in_transaction {
377            Some(ref in_transaction) => (
378                match in_transaction.transaction_depth.get() {
379                    1 => (Cow::Borrowed("ROLLBACK"), true),
380                    depth_gt1 => (
381                        Cow::Owned(::alloc::__export::must_use({
        ::alloc::fmt::format(format_args!("ROLLBACK TO SAVEPOINT diesel_savepoint_{0}",
                depth_gt1 - 1))
    })format!(
382                            "ROLLBACK TO SAVEPOINT diesel_savepoint_{}",
383                            depth_gt1 - 1
384                        )),
385                        false,
386                    ),
387                },
388                in_transaction.requires_rollback_maybe_up_to_top_level,
389            ),
390            None => return Err(Error::NotInTransaction),
391        };
392        let depth = transaction_state
393            .transaction_depth()
394            .expect("We know that we are in a transaction here");
395        conn.instrumentation().on_connection_event(
396            super::instrumentation::InstrumentationEvent::RollbackTransaction { depth },
397        );
398
399        match conn.batch_execute(&rollback_sql) {
400            Ok(()) => {
401                match Self::get_transaction_state(conn)?
402                    .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
403                {
404                    Ok(()) => {}
405                    Err(Error::NotInTransaction) if rolling_back_top_level => {
406                        // Transaction exit may have already been detected by connection
407                        // implementation. It's fine.
408                    }
409                    Err(e) => return Err(e),
410                }
411                Ok(())
412            }
413            Err(rollback_error) => {
414                let tm_status = Self::transaction_manager_status_mut(conn);
415                match tm_status {
416                    TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
417                        in_transaction:
418                            Some(InTransactionStatus {
419                                transaction_depth,
420                                requires_rollback_maybe_up_to_top_level,
421                                ..
422                            }),
423                    }) if transaction_depth.get() > 1 => {
424                        // A savepoint failed to rollback - we may still attempt to repair
425                        // the connection by rolling back higher levels.
426
427                        // To make it easier on the user (that they don't have to really
428                        // look at actual transaction depth and can just rely on the number
429                        // of times they have called begin/commit/rollback) we still
430                        // decrement here:
431                        *transaction_depth = NonZeroU32::new(transaction_depth.get() - 1)
432                            .expect("Depth was checked to be > 1");
433                        *requires_rollback_maybe_up_to_top_level = true;
434                        if requires_rollback_maybe_up_to_top_level_before_execute {
435                            // In that case, we tolerate that savepoint releases fail
436                            // -> we should ignore errors
437                            return Ok(());
438                        }
439                    }
440                    TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
441                        in_transaction: None,
442                    }) => {
443                        // we would have returned `NotInTransaction` if that was already the state
444                        // before we made our call
445                        // => Transaction manager status has been fixed by the underlying connection
446                        // so we don't need to set_in_error
447                    }
448                    _ => tm_status.set_in_error(),
449                }
450                Err(rollback_error)
451            }
452        }
453    }
454
455    /// If the transaction fails to commit due to a `SerializationFailure` or a
456    /// `ReadOnlyTransaction` a rollback will be attempted. If the rollback succeeds,
457    /// the original error will be returned, otherwise the error generated by the rollback
458    /// will be returned. In the second case the connection will be considered broken
459    /// as it contains a uncommitted unabortable open transaction.
460    fn commit_transaction(conn: &mut Conn) -> QueryResult<()> {
461        let transaction_state = Self::get_transaction_state(conn)?;
462        let transaction_depth = transaction_state.transaction_depth();
463        let (commit_sql, committing_top_level) = match transaction_depth {
464            None => return Err(Error::NotInTransaction),
465            Some(transaction_depth) if transaction_depth.get() == 1 => {
466                (Cow::Borrowed("COMMIT"), true)
467            }
468            Some(transaction_depth) => (
469                Cow::Owned(::alloc::__export::must_use({
        ::alloc::fmt::format(format_args!("RELEASE SAVEPOINT diesel_savepoint_{0}",
                transaction_depth.get() - 1))
    })format!(
470                    "RELEASE SAVEPOINT diesel_savepoint_{}",
471                    transaction_depth.get() - 1
472                )),
473                false,
474            ),
475        };
476        let depth = transaction_state
477            .transaction_depth()
478            .expect("We know that we are in a transaction here");
479        conn.instrumentation().on_connection_event(
480            super::instrumentation::InstrumentationEvent::CommitTransaction { depth },
481        );
482        match conn.batch_execute(&commit_sql) {
483            Ok(()) => {
484                match Self::get_transaction_state(conn)?
485                    .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
486                {
487                    Ok(()) => {}
488                    Err(Error::NotInTransaction) if committing_top_level => {
489                        // Transaction exit may have already been detected by connection.
490                        // It's fine
491                    }
492                    Err(e) => return Err(e),
493                }
494                Ok(())
495            }
496            Err(commit_error) => {
497                if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
498                    in_transaction:
499                        Some(InTransactionStatus {
500                            requires_rollback_maybe_up_to_top_level: true,
501                            ..
502                        }),
503                }) = conn.transaction_state().status
504                {
505                    match Self::rollback_transaction(conn) {
506                        Ok(()) => {}
507                        Err(rollback_error) => {
508                            conn.transaction_state().status.set_in_error();
509                            return Err(Error::RollbackErrorOnCommit {
510                                rollback_error: Box::new(rollback_error),
511                                commit_error: Box::new(commit_error),
512                            });
513                        }
514                    }
515                }
516                Err(commit_error)
517            }
518        }
519    }
520
521    fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus {
522        &mut conn.transaction_state().status
523    }
524}
525
526#[cfg(test)]
527// that's a false positive for `panic!`/`assert!` on rust 2018
528#[allow(clippy::uninlined_format_args)]
529mod test {
530    // Mock connection.
531    mod mock {
532        use crate::connection::transaction_manager::AnsiTransactionManager;
533        use crate::connection::Instrumentation;
534        use crate::connection::{
535            Connection, ConnectionSealed, SimpleConnection, TransactionManager,
536        };
537        use crate::result::QueryResult;
538        use crate::test_helpers::TestConnection;
539        use std::collections::VecDeque;
540
541        pub(crate) struct MockConnection {
542            pub(crate) next_results: VecDeque<QueryResult<usize>>,
543            pub(crate) next_batch_execute_results: VecDeque<QueryResult<()>>,
544            pub(crate) top_level_requires_rollback_after_next_batch_execute: bool,
545            transaction_state: AnsiTransactionManager,
546            instrumentation: Option<Box<dyn Instrumentation>>,
547        }
548
549        impl SimpleConnection for MockConnection {
550            fn batch_execute(&mut self, _query: &str) -> QueryResult<()> {
551                let res = self
552                    .next_batch_execute_results
553                    .pop_front()
554                    .expect("No next result");
555                if self.top_level_requires_rollback_after_next_batch_execute {
556                    self.transaction_state
557                        .status
558                        .set_requires_rollback_maybe_up_to_top_level(true);
559                }
560                res
561            }
562        }
563
564        impl ConnectionSealed for MockConnection {}
565
566        impl Connection for MockConnection {
567            type Backend = <TestConnection as Connection>::Backend;
568
569            type TransactionManager = AnsiTransactionManager;
570
571            fn establish(_database_url: &str) -> crate::ConnectionResult<Self> {
572                Ok(Self {
573                    next_results: VecDeque::new(),
574                    next_batch_execute_results: VecDeque::new(),
575                    top_level_requires_rollback_after_next_batch_execute: false,
576                    transaction_state: AnsiTransactionManager::default(),
577                    instrumentation: None,
578                })
579            }
580
581            fn execute_returning_count<T>(&mut self, _source: &T) -> QueryResult<usize>
582            where
583                T: crate::query_builder::QueryFragment<Self::Backend>
584                    + crate::query_builder::QueryId,
585            {
586                self.next_results.pop_front().expect("No next result")
587            }
588
589            fn transaction_state(
590                &mut self,
591            ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData
592            {
593                &mut self.transaction_state
594            }
595
596            fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation {
597                &mut self.instrumentation
598            }
599
600            fn set_instrumentation(
601                &mut self,
602                instrumentation: impl crate::connection::Instrumentation,
603            ) {
604                self.instrumentation = Some(Box::new(instrumentation));
605            }
606
607            fn set_prepared_statement_cache_size(&mut self, _size: crate::connection::CacheSize) {
608                panic!("implement, if you want to use it")
609            }
610        }
611    }
612
613    #[diesel_test_helper::test]
614    #[cfg(feature = "postgres")]
615    fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
616        use crate::connection::transaction_manager::AnsiTransactionManager;
617        use crate::connection::transaction_manager::TransactionManager;
618        use crate::result::Error;
619        use crate::PgConnection;
620
621        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
622        assert_eq!(
623            None,
624            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
625                conn
626            ).transaction_depth().expect("Transaction depth")
627        );
628        let result = AnsiTransactionManager::commit_transaction(conn);
629        assert!(matches!(result, Err(Error::NotInTransaction)))
630    }
631
632    #[diesel_test_helper::test]
633    #[cfg(feature = "postgres")]
634    fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
635        use crate::connection::transaction_manager::AnsiTransactionManager;
636        use crate::connection::transaction_manager::TransactionManager;
637        use crate::result::Error;
638        use crate::PgConnection;
639
640        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
641        assert_eq!(
642            None,
643            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
644                conn
645            ).transaction_depth().expect("Transaction depth")
646        );
647        let result = AnsiTransactionManager::rollback_transaction(conn);
648        assert!(matches!(result, Err(Error::NotInTransaction)))
649    }
650
651    #[diesel_test_helper::test]
652    fn transaction_manager_enters_broken_state_when_connection_is_broken() {
653        use crate::connection::transaction_manager::AnsiTransactionManager;
654        use crate::connection::transaction_manager::TransactionManager;
655        use crate::connection::TransactionManagerStatus;
656        use crate::result::{DatabaseErrorKind, Error};
657        use crate::*;
658
659        let mut conn = mock::MockConnection::establish("mock").expect("Mock connection");
660
661        // Set result for BEGIN
662        conn.next_batch_execute_results.push_back(Ok(()));
663        let result = conn.transaction(|conn| {
664            conn.next_results.push_back(Ok(1));
665            let query_result = sql_query("SELECT 1").execute(conn);
666            assert!(query_result.is_ok());
667            // Set result for COMMIT attempt
668            conn.next_batch_execute_results
669                .push_back(Err(Error::DatabaseError(
670                    DatabaseErrorKind::Unknown,
671                    Box::new("commit fails".to_string()),
672                )));
673            conn.top_level_requires_rollback_after_next_batch_execute = true;
674            conn.next_batch_execute_results
675                .push_back(Err(Error::DatabaseError(
676                    DatabaseErrorKind::Unknown,
677                    Box::new("rollback also fails".to_string()),
678                )));
679            Ok(())
680        });
681        assert!(
682            matches!(
683                &result,
684                Err(Error::RollbackErrorOnCommit {
685                    rollback_error,
686                    commit_error
687                }) if matches!(**commit_error, Error::DatabaseError(DatabaseErrorKind::Unknown, _))
688                    && matches!(&**rollback_error,
689                        Error::DatabaseError(DatabaseErrorKind::Unknown, msg)
690                            if msg.message() == "rollback also fails"
691                    )
692            ),
693            "Got {:?}",
694            result
695        );
696        assert!(matches!(
697            *AnsiTransactionManager::transaction_manager_status_mut(&mut conn),
698            TransactionManagerStatus::InError
699        ));
700        // Ensure the transaction manager is unusable
701        let result = conn.transaction(|_conn| Ok(()));
702        assert!(matches!(result, Err(Error::BrokenTransactionManager)))
703    }
704
705    #[diesel_test_helper::test]
706    #[cfg(feature = "mysql")]
707    fn mysql_transaction_is_rolled_back_upon_syntax_error() {
708        use crate::connection::transaction_manager::AnsiTransactionManager;
709        use crate::connection::transaction_manager::TransactionManager;
710        use crate::*;
711        use std::num::NonZeroU32;
712
713        let conn = &mut crate::test_helpers::connection_no_transaction();
714        assert_eq!(
715            None,
716            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
717                conn
718            ).transaction_depth().expect("Transaction depth")
719        );
720        let _result = conn.transaction(|conn| {
721            assert_eq!(
722                NonZeroU32::new(1),
723                <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
724                    conn
725            ).transaction_depth().expect("Transaction depth")
726            );
727            // In MySQL, a syntax error does not break the transaction block
728            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
729            assert!(query_result.is_err());
730            query_result
731        });
732        assert_eq!(
733            None,
734            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
735                conn
736            ).transaction_depth().expect("Transaction depth")
737        );
738    }
739
740    #[diesel_test_helper::test]
741    #[cfg(feature = "sqlite")]
742    fn sqlite_transaction_is_rolled_back_upon_syntax_error() {
743        use crate::connection::transaction_manager::AnsiTransactionManager;
744        use crate::connection::transaction_manager::TransactionManager;
745        use crate::*;
746        use std::num::NonZeroU32;
747
748        let conn = &mut crate::test_helpers::connection();
749        assert_eq!(
750            None,
751            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
752                conn
753            ).transaction_depth().expect("Transaction depth")
754        );
755        let _result = conn.transaction(|conn| {
756            assert_eq!(
757                NonZeroU32::new(1),
758                <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
759                    conn
760            ).transaction_depth().expect("Transaction depth")
761            );
762            // In Sqlite, a syntax error does not break the transaction block
763            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
764            assert!(query_result.is_err());
765            query_result
766        });
767        assert_eq!(
768            None,
769            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
770                conn
771            ).transaction_depth().expect("Transaction depth")
772        );
773    }
774
775    #[diesel_test_helper::test]
776    #[cfg(feature = "mysql")]
777    fn nested_mysql_transaction_is_rolled_back_upon_syntax_error() {
778        use crate::connection::transaction_manager::AnsiTransactionManager;
779        use crate::connection::transaction_manager::TransactionManager;
780        use crate::*;
781        use std::num::NonZeroU32;
782
783        let conn = &mut crate::test_helpers::connection_no_transaction();
784        assert_eq!(
785            None,
786            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
787                conn
788            ).transaction_depth().expect("Transaction depth")
789        );
790        let result = conn.transaction(|conn| {
791            assert_eq!(
792                NonZeroU32::new(1),
793                <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
794                    conn
795            ).transaction_depth().expect("Transaction depth")
796            );
797            let result = conn.transaction(|conn| {
798                assert_eq!(
799                    NonZeroU32::new(2),
800                    <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
801                        conn
802            ).transaction_depth().expect("Transaction depth")
803                );
804                // In MySQL, a syntax error does not break the transaction block
805                sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
806            });
807            assert!(result.is_err());
808            assert_eq!(
809                NonZeroU32::new(1),
810                <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
811                    conn
812            ).transaction_depth().expect("Transaction depth")
813            );
814            let query_result = sql_query("SELECT 1").execute(conn);
815            assert!(query_result.is_ok());
816            query_result
817        });
818        assert!(result.is_ok());
819        assert_eq!(
820            None,
821            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
822                conn
823            ).transaction_depth().expect("Transaction depth")
824        );
825    }
826
827    #[diesel_test_helper::test]
828    #[cfg(feature = "mysql")]
829    // This function uses a collect with side effects (spawning threads)
830    // so clippy is wrong here
831    #[allow(clippy::needless_collect)]
832    fn mysql_transaction_depth_commits_tracked_properly_on_serialization_failure() {
833        use crate::result::DatabaseErrorKind::SerializationFailure;
834        use crate::result::Error::DatabaseError;
835        use crate::*;
836        use std::num::NonZeroU32;
837        use std::sync::{Arc, Barrier};
838        use std::thread;
839
840        table! {
841            #[sql_name = "mysql_transaction_depth_is_tracked_properly_on_commit_failure"]
842            serialization_example {
843                id -> Integer,
844                class -> Integer,
845            }
846        }
847
848        let conn = &mut crate::test_helpers::connection_no_transaction();
849
850        sql_query(
851            "DROP TABLE IF EXISTS mysql_transaction_depth_is_tracked_properly_on_commit_failure;",
852        )
853        .execute(conn)
854        .unwrap();
855        sql_query(
856            r#"
857            CREATE TABLE mysql_transaction_depth_is_tracked_properly_on_commit_failure (
858                id INT AUTO_INCREMENT PRIMARY KEY,
859                class INTEGER NOT NULL
860            )
861        "#,
862        )
863        .execute(conn)
864        .unwrap();
865
866        insert_into(serialization_example::table)
867            .values(&vec![
868                serialization_example::class.eq(1),
869                serialization_example::class.eq(2),
870            ])
871            .execute(conn)
872            .unwrap();
873
874        let before_barrier = Arc::new(Barrier::new(2));
875        let after_barrier = Arc::new(Barrier::new(2));
876
877        let threads = (1..3)
878            .map(|i| {
879                let before_barrier = before_barrier.clone();
880                let after_barrier = after_barrier.clone();
881                thread::spawn(move || {
882                    use crate::connection::transaction_manager::AnsiTransactionManager;
883                    use crate::connection::transaction_manager::TransactionManager;
884                    let conn = &mut crate::test_helpers::connection_no_transaction();
885                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
886                    crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
887
888                    let result =
889                    conn.transaction(|conn| {
890                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
891                        let _ = serialization_example::table
892                            .filter(serialization_example::class.eq(i))
893                            .count()
894                            .execute(conn)?;
895
896                        let other_i = if i == 1 { 2 } else { 1 };
897                        let q = insert_into(serialization_example::table)
898                            .values(serialization_example::class.eq(other_i));
899                        before_barrier.wait();
900
901                        let r = q.execute(conn);
902                        after_barrier.wait();
903                        r
904                    });
905
906                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
907
908                    let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
909                    assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
910                    result
911                })
912            })
913            .collect::<Vec<_>>();
914        let second_trans_result =
915            conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
916        assert!(second_trans_result.is_ok(), "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
917
918        let mut results = threads
919            .into_iter()
920            .map(|t| t.join().unwrap())
921            .collect::<Vec<_>>();
922
923        results.sort_by_key(|r| r.is_err());
924        assert!(results[0].is_ok(), "Got {:?} instead", results);
925        // Note that contrary to Postgres, this is not a commit failure
926        assert!(
927            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
928            "Got {:?} instead",
929            results
930        );
931    }
932
933    #[diesel_test_helper::test]
934    #[cfg(feature = "mysql")]
935    // This function uses a collect with side effects (spawning threads)
936    // so clippy is wrong here
937    #[allow(clippy::needless_collect)]
938    fn mysql_nested_transaction_depth_commits_tracked_properly_on_serialization_failure() {
939        use crate::result::DatabaseErrorKind::SerializationFailure;
940        use crate::result::Error::DatabaseError;
941        use crate::*;
942        use std::num::NonZeroU32;
943        use std::sync::{Arc, Barrier};
944        use std::thread;
945
946        table! {
947            #[sql_name = "mysql_nested_trans_depth_is_tracked_properly_on_commit_failure"]
948            serialization_example {
949                id -> Integer,
950                class -> Integer,
951            }
952        }
953
954        let conn = &mut crate::test_helpers::connection_no_transaction();
955
956        sql_query(
957            "DROP TABLE IF EXISTS mysql_nested_trans_depth_is_tracked_properly_on_commit_failure;",
958        )
959        .execute(conn)
960        .unwrap();
961        sql_query(
962            r#"
963            CREATE TABLE mysql_nested_trans_depth_is_tracked_properly_on_commit_failure (
964                id INT AUTO_INCREMENT PRIMARY KEY,
965                class INTEGER NOT NULL
966            )
967        "#,
968        )
969        .execute(conn)
970        .unwrap();
971
972        insert_into(serialization_example::table)
973            .values(&vec![
974                serialization_example::class.eq(1),
975                serialization_example::class.eq(2),
976            ])
977            .execute(conn)
978            .unwrap();
979
980        let before_barrier = Arc::new(Barrier::new(2));
981        let after_barrier = Arc::new(Barrier::new(2));
982
983        let threads = (1..3)
984            .map(|i| {
985                let before_barrier = before_barrier.clone();
986                let after_barrier = after_barrier.clone();
987                thread::spawn(move || {
988                    use crate::connection::transaction_manager::AnsiTransactionManager;
989                    use crate::connection::transaction_manager::TransactionManager;
990                    let conn = &mut crate::test_helpers::connection_no_transaction();
991                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
992                    crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
993
994                    let result =
995                    conn.transaction(|conn| {
996                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
997                       conn.transaction(|conn| {
998                            assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
999                            let _ = serialization_example::table
1000                                .filter(serialization_example::class.eq(i))
1001                                .count()
1002                                .execute(conn)?;
1003
1004                            let other_i = if i == 1 { 2 } else { 1 };
1005                            let q = insert_into(serialization_example::table)
1006                                .values(serialization_example::class.eq(other_i));
1007                            before_barrier.wait();
1008
1009                            let r = q.execute(conn);
1010                            after_barrier.wait();
1011                            r
1012                        })
1013                    });
1014
1015                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1016
1017                    let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1018                    assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
1019                    result
1020                })
1021            })
1022            .collect::<Vec<_>>();
1023        let second_trans_result =
1024            conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1025        assert!(second_trans_result.is_ok(), "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
1026
1027        let mut results = threads
1028            .into_iter()
1029            .map(|t| t.join().unwrap())
1030            .collect::<Vec<_>>();
1031
1032        results.sort_by_key(|r| r.is_err());
1033        assert!(results[0].is_ok(), "Got {:?} instead", results);
1034        assert!(
1035            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1036            "Got {:?} instead",
1037            results
1038        );
1039    }
1040
1041    #[diesel_test_helper::test]
1042    #[cfg(feature = "sqlite")]
1043    fn sqlite_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1044        use crate::connection::transaction_manager::AnsiTransactionManager;
1045        use crate::connection::transaction_manager::TransactionManager;
1046        use crate::result::Error;
1047        use crate::*;
1048        use std::num::NonZeroU32;
1049
1050        let conn = &mut crate::test_helpers::connection();
1051        assert_eq!(
1052            None,
1053            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1054                conn
1055            ).transaction_depth().expect("Transaction depth")
1056        );
1057        let result: Result<_, Error> = conn.transaction(|conn| {
1058            assert_eq!(
1059                NonZeroU32::new(1),
1060                <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1061                    conn
1062            ).transaction_depth().expect("Transaction depth")
1063            );
1064            sql_query("DROP TABLE IF EXISTS deferred_commit").execute(conn)?;
1065            sql_query("CREATE TABLE deferred_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1066            sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn)?;
1067            let result = sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn);
1068            assert!(result.is_ok());
1069            Ok(())
1070        });
1071        assert!(result.is_err());
1072        assert_eq!(
1073            None,
1074            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1075                conn
1076            ).transaction_depth().expect("Transaction depth")
1077        );
1078    }
1079
1080    // regression test for #3470
1081    // crates.io depends on this behaviour
1082    #[diesel_test_helper::test]
1083    #[cfg(feature = "postgres")]
1084    fn some_libpq_failures_are_recoverable_by_rolling_back_the_savepoint_only() {
1085        use crate::connection::{AnsiTransactionManager, TransactionManager};
1086        use crate::prelude::*;
1087        use crate::sql_query;
1088
1089        crate::table! {
1090            rollback_test (id) {
1091                id -> Int4,
1092                value -> Int4,
1093            }
1094        }
1095
1096        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1097        assert_eq!(
1098            None,
1099            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1100                conn
1101            ).transaction_depth().expect("Transaction depth")
1102        );
1103
1104        let res = conn.transaction(|conn| {
1105            sql_query(
1106                "CREATE TABLE IF NOT EXISTS rollback_test (id INT PRIMARY KEY, value INT NOT NULL)",
1107            )
1108            .execute(conn)?;
1109            conn.transaction(|conn| {
1110                sql_query("SET TRANSACTION READ ONLY").execute(conn)?;
1111                crate::update(rollback_test::table)
1112                    .set(rollback_test::value.eq(0))
1113                    .execute(conn)
1114            })
1115            .map(|_| {
1116                panic!("Should use the `or_else` branch");
1117            })
1118            .or_else(|_| sql_query("SELECT 1").execute(conn))
1119            .map(|_| ())
1120        });
1121        assert!(res.is_ok());
1122
1123        assert_eq!(
1124            None,
1125            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1126                conn
1127            ).transaction_depth().expect("Transaction depth")
1128        );
1129    }
1130
1131    #[diesel_test_helper::test]
1132    #[cfg(feature = "postgres")]
1133    fn other_libpq_failures_are_not_recoverable_by_rolling_back_the_savepoint_only() {
1134        use crate::connection::{AnsiTransactionManager, TransactionManager};
1135        use crate::prelude::*;
1136        use crate::sql_query;
1137        use std::num::NonZeroU32;
1138        use std::sync::{Arc, Barrier};
1139
1140        crate::table! {
1141            rollback_test2 (id) {
1142                id -> Int4,
1143                value -> Int4,
1144            }
1145        }
1146        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1147
1148        sql_query(
1149            "CREATE TABLE IF NOT EXISTS rollback_test2 (id INT PRIMARY KEY, value INT NOT NULL)",
1150        )
1151        .execute(conn)
1152        .unwrap();
1153
1154        let start_barrier = Arc::new(Barrier::new(2));
1155        let commit_barrier = Arc::new(Barrier::new(2));
1156
1157        let other_start_barrier = start_barrier.clone();
1158        let other_commit_barrier = commit_barrier.clone();
1159
1160        let t1 = std::thread::spawn(move || {
1161            let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1162            assert_eq!(
1163                None,
1164                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1165                    conn
1166                ).transaction_depth().expect("Transaction depth")
1167            );
1168            let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1169                assert_eq!(
1170                    NonZeroU32::new(1),
1171                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1172                        conn
1173                    ).transaction_depth().expect("Transaction depth")
1174                );
1175                rollback_test2::table.load::<(i32, i32)>(conn)?;
1176                crate::insert_into(rollback_test2::table)
1177                    .values((rollback_test2::id.eq(1), rollback_test2::value.eq(42)))
1178                    .execute(conn)?;
1179                let r = conn.transaction(|conn| {
1180                    assert_eq!(
1181                        NonZeroU32::new(2),
1182                        <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1183                            conn
1184                        ).transaction_depth().expect("Transaction depth")
1185                    );
1186                    start_barrier.wait();
1187                    commit_barrier.wait();
1188                    let r = rollback_test2::table.load::<(i32, i32)>(conn);
1189                    assert!(r.is_err());
1190                    Err::<(), _>(crate::result::Error::RollbackTransaction)
1191                });
1192                assert_eq!(
1193                    NonZeroU32::new(1),
1194                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1195                        conn
1196                    ).transaction_depth().expect("Transaction depth")
1197                );
1198                assert!(
1199                    matches!(r, Err(crate::result::Error::RollbackTransaction)),
1200                    "rollback failed (such errors should be ignored by transaction manager): {}",
1201                    r.unwrap_err()
1202                );
1203                let r = rollback_test2::table.load::<(i32, i32)>(conn);
1204                assert!(r.is_err());
1205                // fun fact: if hitting "commit" after receiving a serialization failure, PG
1206                // returns that the commit has succeeded, but in fact it was actually rolled back.
1207                // soo.. one should avoid doing that
1208                r
1209            });
1210            assert!(r.is_err());
1211            assert_eq!(
1212                None,
1213                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1214                    conn
1215                ).transaction_depth().expect("Transaction depth")
1216            );
1217        });
1218
1219        let t2 = std::thread::spawn(move || {
1220            other_start_barrier.wait();
1221            let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1222            assert_eq!(
1223                None,
1224                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1225                    conn
1226                ).transaction_depth().expect("Transaction depth")
1227            );
1228            let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1229                assert_eq!(
1230                    NonZeroU32::new(1),
1231                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1232                        conn
1233                    ).transaction_depth().expect("Transaction depth")
1234                );
1235                let _ = rollback_test2::table.load::<(i32, i32)>(conn)?;
1236                crate::insert_into(rollback_test2::table)
1237                    .values((rollback_test2::id.eq(23), rollback_test2::value.eq(42)))
1238                    .execute(conn)?;
1239                Ok(())
1240            });
1241            other_commit_barrier.wait();
1242            assert!(r.is_ok(), "{:?}", r.unwrap_err());
1243            assert_eq!(
1244                None,
1245                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1246                    conn
1247                ).transaction_depth().expect("Transaction depth")
1248            );
1249        });
1250        crate::sql_query("DELETE FROM rollback_test2")
1251            .execute(conn)
1252            .unwrap();
1253        t1.join().unwrap();
1254        t2.join().unwrap();
1255    }
1256}