Skip to main content

diesel/connection/
transaction_manager.rs

1use crate::connection::Connection;
2use crate::result::{Error, QueryResult};
3use alloc::borrow::Cow;
4use alloc::boxed::Box;
5use core::num::NonZeroU32;
6
7/// Manages the internal transaction state for a connection.
8///
9/// You will not need to interact with this trait, unless you are writing an
10/// implementation of [`Connection`].
11pub trait TransactionManager<Conn: Connection> {
12    /// Data stored as part of the connection implementation
13    /// to track the current transaction state of a connection
14    type TransactionStateData;
15
16    /// Begin a new transaction or savepoint
17    ///
18    /// If the transaction depth is greater than 0,
19    /// this should create a savepoint instead.
20    /// This function is expected to increment the transaction depth by 1.
21    fn begin_transaction(conn: &mut Conn) -> QueryResult<()>;
22
23    /// Rollback the inner-most transaction or savepoint
24    ///
25    /// If the transaction depth is greater than 1,
26    /// this should rollback to the most recent savepoint.
27    /// This function is expected to decrement the transaction depth by 1.
28    fn rollback_transaction(conn: &mut Conn) -> QueryResult<()>;
29
30    /// Commit the inner-most transaction or savepoint
31    ///
32    /// If the transaction depth is greater than 1,
33    /// this should release the most recent savepoint.
34    /// This function is expected to decrement the transaction depth by 1.
35    fn commit_transaction(conn: &mut Conn) -> QueryResult<()>;
36
37    /// Fetch the current transaction status as mutable
38    ///
39    /// Used to ensure that `begin_test_transaction` is not called when already
40    /// inside of a transaction, and that operations are not run in a `InError`
41    /// transaction manager.
42    #[diesel_derives::__diesel_public_if(
43        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
44    )]
45    fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus;
46
47    /// Executes the given function inside of a database transaction
48    ///
49    /// Each implementation of this function needs to fulfill the documented
50    /// behaviour of [`Connection::transaction`]
51    fn transaction<F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
52    where
53        F: FnOnce(&mut Conn) -> Result<R, E>,
54        E: From<Error>,
55    {
56        Self::begin_transaction(conn)?;
57        match callback(&mut *conn) {
58            Ok(value) => {
59                Self::commit_transaction(conn)?;
60                Ok(value)
61            }
62            Err(user_error) => match Self::rollback_transaction(conn) {
63                Ok(()) => Err(user_error),
64                Err(Error::BrokenTransactionManager) => {
65                    // In this case we are probably more interested by the
66                    // original error, which likely caused this
67                    Err(user_error)
68                }
69                Err(rollback_error) => Err(rollback_error.into()),
70            },
71        }
72    }
73
74    /// This methods checks if the connection manager is considered to be broken
75    /// by connection pool implementations
76    ///
77    /// A connection manager is considered to be broken by default if it either
78    /// contains an open transaction (because you don't want to have connections
79    /// with open transactions in your pool) or when the transaction manager is
80    /// in an error state.
81    #[diesel_derives::__diesel_public_if(
82        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
83    )]
84    fn is_broken_transaction_manager(conn: &mut Conn) -> bool {
85        match Self::transaction_manager_status_mut(conn).transaction_state() {
86            // all transactions are closed
87            // so we don't consider this connection broken
88            Ok(ValidTransactionManagerStatus {
89                in_transaction: None,
90            }) => false,
91            // The transaction manager is in an error state
92            // Therefore we consider this connection broken
93            Err(_) => true,
94            // The transaction manager contains a open transaction
95            // we do consider this connection broken
96            // if that transaction was not opened by `begin_test_transaction`
97            Ok(ValidTransactionManagerStatus {
98                in_transaction: Some(s),
99            }) => !s.test_transaction,
100        }
101    }
102}
103
104/// An implementation of `TransactionManager` which can be used for backends
105/// which use ANSI standard syntax for savepoints such as SQLite and PostgreSQL.
106#[derive(#[automatically_derived]
impl ::core::default::Default for AnsiTransactionManager {
    #[inline]
    fn default() -> AnsiTransactionManager {
        AnsiTransactionManager { status: ::core::default::Default::default() }
    }
}Default, #[automatically_derived]
impl ::core::fmt::Debug for AnsiTransactionManager {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field1_finish(f,
            "AnsiTransactionManager", "status", &&self.status)
    }
}Debug)]
107pub struct AnsiTransactionManager {
108    pub(crate) status: TransactionManagerStatus,
109}
110
111/// Status of the transaction manager
112#[doc = " Status of the transaction manager"]
pub enum TransactionManagerStatus {

    /// Valid status, the manager can run operations
    Valid(ValidTransactionManagerStatus),

    /// Error status, probably following a broken connection. The manager will no longer run operations
    InError,
}#[diesel_derives::__diesel_public_if(
113    feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
114)]
115#[derive(#[automatically_derived]
impl ::core::fmt::Debug for TransactionManagerStatus {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        match self {
            TransactionManagerStatus::Valid(__self_0) =>
                ::core::fmt::Formatter::debug_tuple_field1_finish(f, "Valid",
                    &__self_0),
            TransactionManagerStatus::InError =>
                ::core::fmt::Formatter::write_str(f, "InError"),
        }
    }
}Debug)]
116pub enum TransactionManagerStatus {
117    /// Valid status, the manager can run operations
118    Valid(ValidTransactionManagerStatus),
119    /// Error status, probably following a broken connection. The manager will no longer run operations
120    InError,
121}
122
123impl Default for TransactionManagerStatus {
124    fn default() -> Self {
125        TransactionManagerStatus::Valid(ValidTransactionManagerStatus::default())
126    }
127}
128
129impl TransactionManagerStatus {
130    /// Returns the transaction depth if the transaction manager's status is valid, or returns
131    /// [`Error::BrokenTransactionManager`] if the transaction manager is in error.
132    pub fn transaction_depth(&self) -> QueryResult<Option<NonZeroU32>> {
133        match self {
134            TransactionManagerStatus::Valid(valid_status) => Ok(valid_status.transaction_depth()),
135            TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
136        }
137    }
138
139    #[cfg(any(
140        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
141        feature = "postgres",
142        feature = "mysql",
143        test
144    ))]
145    #[diesel_derives::__diesel_public_if(
146        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
147    )]
148    /// If in transaction and transaction manager is not broken, registers that it's possible that
149    /// the connection can not be used anymore until top-level transaction is rolled back.
150    ///
151    /// If that is registered, savepoints rollbacks will still be attempted, but failure to do so
152    /// will not result in an error. (Some may succeed, some may not.)
153    pub(crate) fn set_requires_rollback_maybe_up_to_top_level(&mut self, to: bool) {
154        if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
155            in_transaction:
156                Some(InTransactionStatus {
157                    requires_rollback_maybe_up_to_top_level,
158                    ..
159                }),
160        }) = self
161        {
162            *requires_rollback_maybe_up_to_top_level = to;
163        }
164    }
165
166    /// Sets the transaction manager status to InError
167    ///
168    /// Subsequent attempts to use transaction-related features will result in a
169    /// [`Error::BrokenTransactionManager`] error
170    pub fn set_in_error(&mut self) {
171        *self = TransactionManagerStatus::InError
172    }
173
174    /// Expose access to the inner transaction state
175    ///
176    /// This function returns an error if the Transaction manager is in a broken
177    /// state
178    #[diesel_derives::__diesel_public_if(
179        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
180    )]
181    pub(self) fn transaction_state(&mut self) -> QueryResult<&mut ValidTransactionManagerStatus> {
182        match self {
183            TransactionManagerStatus::Valid(valid_status) => Ok(valid_status),
184            TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
185        }
186    }
187
188    /// This function allows to flag a transaction manager
189    /// in such a way that it contains a test transaction.
190    ///
191    /// This will disable some checks in regards to open transactions
192    /// to allow `Connection::begin_test_transaction` to work with
193    /// pooled connections as well
194    #[diesel_derives::__diesel_public_if(
195        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
196    )]
197    pub(crate) fn set_test_transaction_flag(&mut self) {
198        if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
199            in_transaction: Some(s),
200        }) = self
201        {
202            s.test_transaction = true;
203        }
204    }
205}
206
207/// Valid transaction status for the manager. Can return the current transaction depth
208#[allow(missing_copy_implementations)]
209#[derive(#[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::fmt::Debug for ValidTransactionManagerStatus {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field1_finish(f,
            "ValidTransactionManagerStatus", "in_transaction",
            &&self.in_transaction)
    }
}Debug, #[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::default::Default for ValidTransactionManagerStatus {
    #[inline]
    fn default() -> ValidTransactionManagerStatus {
        ValidTransactionManagerStatus {
            in_transaction: ::core::default::Default::default(),
        }
    }
}Default)]
210#[doc =
" Valid transaction status for the manager. Can return the current transaction depth"]
#[allow(missing_copy_implementations)]
#[non_exhaustive]
pub struct ValidTransactionManagerStatus {
    #[doc = " Inner status, or `None` if no transaction is running"]
    pub in_transaction: Option<InTransactionStatus>,
}#[diesel_derives::__diesel_public_if(
211    feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
212    public_fields(in_transaction)
213)]
214pub struct ValidTransactionManagerStatus {
215    /// Inner status, or `None` if no transaction is running
216    in_transaction: Option<InTransactionStatus>,
217}
218
219/// Various status fields to track the status of
220/// a transaction manager with a started transaction
221#[allow(missing_copy_implementations)]
222#[derive(#[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::fmt::Debug for InTransactionStatus {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::debug_struct_field3_finish(f,
            "InTransactionStatus", "transaction_depth",
            &self.transaction_depth,
            "requires_rollback_maybe_up_to_top_level",
            &self.requires_rollback_maybe_up_to_top_level, "test_transaction",
            &&self.test_transaction)
    }
}Debug)]
223#[doc = " Various status fields to track the status of"]
#[doc = " a transaction manager with a started transaction"]
#[allow(missing_copy_implementations)]
#[non_exhaustive]
pub struct InTransactionStatus {
    #[doc = " The current depth of nested transactions"]
    pub transaction_depth: NonZeroU32,
    #[doc =
    " If that is registered, savepoints rollbacks will still be attempted, but failure to do so"]
    #[doc = " will not result in an error. (Some may succeed, some may not.)"]
    pub requires_rollback_maybe_up_to_top_level: bool,
    #[doc = " Is this transaction manager status marked as test-transaction?"]
    pub test_transaction: bool,
}#[diesel_derives::__diesel_public_if(
224    feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
225    public_fields(
226        test_transaction,
227        transaction_depth,
228        requires_rollback_maybe_up_to_top_level
229    )
230)]
231pub struct InTransactionStatus {
232    /// The current depth of nested transactions
233    transaction_depth: NonZeroU32,
234    /// If that is registered, savepoints rollbacks will still be attempted, but failure to do so
235    /// will not result in an error. (Some may succeed, some may not.)
236    requires_rollback_maybe_up_to_top_level: bool,
237    /// Is this transaction manager status marked as test-transaction?
238    test_transaction: bool,
239}
240
241impl ValidTransactionManagerStatus {
242    /// Return the current transaction depth
243    ///
244    /// This value is `None` if no current transaction is running
245    /// otherwise the number of nested transactions is returned.
246    pub fn transaction_depth(&self) -> Option<NonZeroU32> {
247        self.in_transaction.as_ref().map(|it| it.transaction_depth)
248    }
249
250    /// Update the transaction depth by adding the value of the `transaction_depth_change` parameter if the `query` is
251    /// `Ok(())`
252    pub fn change_transaction_depth(
253        &mut self,
254        transaction_depth_change: TransactionDepthChange,
255    ) -> QueryResult<()> {
256        match (&mut self.in_transaction, transaction_depth_change) {
257            (Some(in_transaction), TransactionDepthChange::IncreaseDepth) => {
258                // Can be replaced with saturating_add directly on NonZeroU32 once
259                // <https://github.com/rust-lang/rust/issues/84186> is stable
260                in_transaction.transaction_depth =
261                    NonZeroU32::new(in_transaction.transaction_depth.get().saturating_add(1))
262                        .expect("nz + nz is always non-zero");
263                Ok(())
264            }
265            (Some(in_transaction), TransactionDepthChange::DecreaseDepth) => {
266                // This sets `transaction_depth` to `None` as soon as we reach zero
267                match NonZeroU32::new(in_transaction.transaction_depth.get() - 1) {
268                    Some(depth) => in_transaction.transaction_depth = depth,
269                    None => self.in_transaction = None,
270                }
271                Ok(())
272            }
273            (None, TransactionDepthChange::IncreaseDepth) => {
274                self.in_transaction = Some(InTransactionStatus {
275                    transaction_depth: NonZeroU32::new(1).expect("1 is non-zero"),
276                    requires_rollback_maybe_up_to_top_level: false,
277                    test_transaction: false,
278                });
279                Ok(())
280            }
281            (None, TransactionDepthChange::DecreaseDepth) => {
282                // We screwed up something somewhere
283                // we cannot decrease the transaction count if
284                // we are not inside a transaction
285                Err(Error::NotInTransaction)
286            }
287        }
288    }
289}
290
291/// Represents a change to apply to the depth of a transaction
292#[derive(#[automatically_derived]
impl ::core::fmt::Debug for TransactionDepthChange {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::write_str(f,
            match self {
                TransactionDepthChange::IncreaseDepth => "IncreaseDepth",
                TransactionDepthChange::DecreaseDepth => "DecreaseDepth",
            })
    }
}Debug, #[automatically_derived]
impl ::core::clone::Clone for TransactionDepthChange {
    #[inline]
    fn clone(&self) -> TransactionDepthChange { *self }
}Clone, #[automatically_derived]
impl ::core::marker::Copy for TransactionDepthChange { }Copy)]
293pub enum TransactionDepthChange {
294    /// Increase the depth of the transaction (corresponds to `BEGIN` or `SAVEPOINT`)
295    IncreaseDepth,
296    /// Decreases the depth of the transaction (corresponds to `COMMIT`/`RELEASE SAVEPOINT` or `ROLLBACK`)
297    DecreaseDepth,
298}
299
300impl AnsiTransactionManager {
301    fn get_transaction_state<Conn>(
302        conn: &mut Conn,
303    ) -> QueryResult<&mut ValidTransactionManagerStatus>
304    where
305        Conn: Connection<TransactionManager = Self>,
306    {
307        conn.transaction_state().status.transaction_state()
308    }
309
310    /// Begin a transaction with custom SQL
311    ///
312    /// This is used by connections to implement more complex transaction APIs
313    /// to set things such as isolation levels.
314    /// Returns an error if already inside of a transaction.
315    pub fn begin_transaction_sql<Conn>(conn: &mut Conn, sql: &str) -> QueryResult<()>
316    where
317        Conn: Connection<TransactionManager = Self>,
318    {
319        let state = Self::get_transaction_state(conn)?;
320        if let Some(_depth) = state.transaction_depth() {
321            return Err(Error::AlreadyInTransaction);
322        }
323        let instrumentation_depth = NonZeroU32::new(1);
324        // Keep remainder of this method in sync with `begin_transaction()`.
325
326        conn.instrumentation().on_connection_event(
327            super::instrumentation::InstrumentationEvent::BeginTransaction {
328                depth: instrumentation_depth.expect("We know that 1 is not zero"),
329            },
330        );
331        conn.batch_execute(sql)?;
332        Self::get_transaction_state(conn)?
333            .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
334
335        Ok(())
336    }
337}
338
339impl<Conn> TransactionManager<Conn> for AnsiTransactionManager
340where
341    Conn: Connection<TransactionManager = Self>,
342{
343    type TransactionStateData = Self;
344
345    fn begin_transaction(conn: &mut Conn) -> QueryResult<()> {
346        let transaction_state = Self::get_transaction_state(conn)?;
347        let transaction_depth = transaction_state.transaction_depth();
348        let start_transaction_sql = match transaction_depth {
349            None => Cow::from("BEGIN"),
350            Some(transaction_depth) => Cow::from(::alloc::__export::must_use({
        ::alloc::fmt::format(format_args!("SAVEPOINT diesel_savepoint_{0}",
                transaction_depth))
    })alloc::format!(
351                "SAVEPOINT diesel_savepoint_{transaction_depth}"
352            )),
353        };
354        let instrumentation_depth =
355            NonZeroU32::new(transaction_depth.map_or(0, NonZeroU32::get).wrapping_add(1));
356        let sql = &start_transaction_sql;
357        // Keep remainder of this method in sync with `begin_transaction_sql()`.
358
359        conn.instrumentation().on_connection_event(
360            super::instrumentation::InstrumentationEvent::BeginTransaction {
361                depth: instrumentation_depth.expect("Transaction depth is too large"),
362            },
363        );
364        conn.batch_execute(sql)?;
365        Self::get_transaction_state(conn)?
366            .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
367
368        Ok(())
369    }
370
371    fn rollback_transaction(conn: &mut Conn) -> QueryResult<()> {
372        let transaction_state = Self::get_transaction_state(conn)?;
373
374        let (
375            (rollback_sql, rolling_back_top_level),
376            requires_rollback_maybe_up_to_top_level_before_execute,
377        ) = match transaction_state.in_transaction {
378            Some(ref in_transaction) => (
379                match in_transaction.transaction_depth.get() {
380                    1 => (Cow::Borrowed("ROLLBACK"), true),
381                    depth_gt1 => (
382                        Cow::Owned(::alloc::__export::must_use({
        ::alloc::fmt::format(format_args!("ROLLBACK TO SAVEPOINT diesel_savepoint_{0}",
                depth_gt1 - 1))
    })alloc::format!(
383                            "ROLLBACK TO SAVEPOINT diesel_savepoint_{}",
384                            depth_gt1 - 1
385                        )),
386                        false,
387                    ),
388                },
389                in_transaction.requires_rollback_maybe_up_to_top_level,
390            ),
391            None => return Err(Error::NotInTransaction),
392        };
393        let depth = transaction_state
394            .transaction_depth()
395            .expect("We know that we are in a transaction here");
396        conn.instrumentation().on_connection_event(
397            super::instrumentation::InstrumentationEvent::RollbackTransaction { depth },
398        );
399
400        match conn.batch_execute(&rollback_sql) {
401            Ok(()) => {
402                match Self::get_transaction_state(conn)?
403                    .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
404                {
405                    Ok(()) => {}
406                    Err(Error::NotInTransaction) if rolling_back_top_level => {
407                        // Transaction exit may have already been detected by connection
408                        // implementation. It's fine.
409                    }
410                    Err(e) => return Err(e),
411                }
412                Ok(())
413            }
414            Err(rollback_error) => {
415                let tm_status = Self::transaction_manager_status_mut(conn);
416                match tm_status {
417                    TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
418                        in_transaction:
419                            Some(InTransactionStatus {
420                                transaction_depth,
421                                requires_rollback_maybe_up_to_top_level,
422                                ..
423                            }),
424                    }) if transaction_depth.get() > 1 => {
425                        // A savepoint failed to rollback - we may still attempt to repair
426                        // the connection by rolling back higher levels.
427
428                        // To make it easier on the user (that they don't have to really
429                        // look at actual transaction depth and can just rely on the number
430                        // of times they have called begin/commit/rollback) we still
431                        // decrement here:
432                        *transaction_depth = NonZeroU32::new(transaction_depth.get() - 1)
433                            .expect("Depth was checked to be > 1");
434                        *requires_rollback_maybe_up_to_top_level = true;
435                        if requires_rollback_maybe_up_to_top_level_before_execute {
436                            // In that case, we tolerate that savepoint releases fail
437                            // -> we should ignore errors
438                            return Ok(());
439                        }
440                    }
441                    TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
442                        in_transaction: None,
443                    }) => {
444                        // we would have returned `NotInTransaction` if that was already the state
445                        // before we made our call
446                        // => Transaction manager status has been fixed by the underlying connection
447                        // so we don't need to set_in_error
448                    }
449                    _ => tm_status.set_in_error(),
450                }
451                Err(rollback_error)
452            }
453        }
454    }
455
456    /// If the transaction fails to commit due to a `SerializationFailure` or a
457    /// `ReadOnlyTransaction` a rollback will be attempted. If the rollback succeeds,
458    /// the original error will be returned, otherwise the error generated by the rollback
459    /// will be returned. In the second case the connection will be considered broken
460    /// as it contains a uncommitted unabortable open transaction.
461    fn commit_transaction(conn: &mut Conn) -> QueryResult<()> {
462        let transaction_state = Self::get_transaction_state(conn)?;
463        let transaction_depth = transaction_state.transaction_depth();
464        let (commit_sql, committing_top_level) = match transaction_depth {
465            None => return Err(Error::NotInTransaction),
466            Some(transaction_depth) if transaction_depth.get() == 1 => {
467                (Cow::Borrowed("COMMIT"), true)
468            }
469            Some(transaction_depth) => (
470                Cow::Owned(::alloc::__export::must_use({
        ::alloc::fmt::format(format_args!("RELEASE SAVEPOINT diesel_savepoint_{0}",
                transaction_depth.get() - 1))
    })alloc::format!(
471                    "RELEASE SAVEPOINT diesel_savepoint_{}",
472                    transaction_depth.get() - 1
473                )),
474                false,
475            ),
476        };
477        let depth = transaction_state
478            .transaction_depth()
479            .expect("We know that we are in a transaction here");
480        conn.instrumentation().on_connection_event(
481            super::instrumentation::InstrumentationEvent::CommitTransaction { depth },
482        );
483        match conn.batch_execute(&commit_sql) {
484            Ok(()) => {
485                match Self::get_transaction_state(conn)?
486                    .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
487                {
488                    Ok(()) => {}
489                    Err(Error::NotInTransaction) if committing_top_level => {
490                        // Transaction exit may have already been detected by connection.
491                        // It's fine
492                    }
493                    Err(e) => return Err(e),
494                }
495                Ok(())
496            }
497            Err(commit_error) => {
498                if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
499                    in_transaction:
500                        Some(InTransactionStatus {
501                            requires_rollback_maybe_up_to_top_level: true,
502                            ..
503                        }),
504                }) = conn.transaction_state().status
505                {
506                    match Self::rollback_transaction(conn) {
507                        Ok(()) => {}
508                        Err(rollback_error) => {
509                            conn.transaction_state().status.set_in_error();
510                            return Err(Error::RollbackErrorOnCommit {
511                                rollback_error: Box::new(rollback_error),
512                                commit_error: Box::new(commit_error),
513                            });
514                        }
515                    }
516                }
517                Err(commit_error)
518            }
519        }
520    }
521
522    fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus {
523        &mut conn.transaction_state().status
524    }
525}
526
527#[cfg(test)]
528// that's a false positive for `panic!`/`assert!` on rust 2018
529#[allow(clippy::uninlined_format_args)]
530mod test {
531    // Mock connection.
532    mod mock {
533        use crate::connection::Instrumentation;
534        use crate::connection::transaction_manager::AnsiTransactionManager;
535        use crate::connection::{
536            Connection, ConnectionSealed, SimpleConnection, TransactionManager,
537        };
538        use crate::result::QueryResult;
539        use crate::test_helpers::TestConnection;
540        use std::collections::VecDeque;
541
542        pub(crate) struct MockConnection {
543            pub(crate) next_results: VecDeque<QueryResult<usize>>,
544            pub(crate) next_batch_execute_results: VecDeque<QueryResult<()>>,
545            pub(crate) top_level_requires_rollback_after_next_batch_execute: bool,
546            transaction_state: AnsiTransactionManager,
547            instrumentation: Option<Box<dyn Instrumentation>>,
548        }
549
550        impl SimpleConnection for MockConnection {
551            fn batch_execute(&mut self, _query: &str) -> QueryResult<()> {
552                let res = self
553                    .next_batch_execute_results
554                    .pop_front()
555                    .expect("No next result");
556                if self.top_level_requires_rollback_after_next_batch_execute {
557                    self.transaction_state
558                        .status
559                        .set_requires_rollback_maybe_up_to_top_level(true);
560                }
561                res
562            }
563        }
564
565        impl ConnectionSealed for MockConnection {}
566
567        impl Connection for MockConnection {
568            type Backend = <TestConnection as Connection>::Backend;
569
570            type TransactionManager = AnsiTransactionManager;
571
572            fn establish(_database_url: &str) -> crate::ConnectionResult<Self> {
573                Ok(Self {
574                    next_results: VecDeque::new(),
575                    next_batch_execute_results: VecDeque::new(),
576                    top_level_requires_rollback_after_next_batch_execute: false,
577                    transaction_state: AnsiTransactionManager::default(),
578                    instrumentation: None,
579                })
580            }
581
582            fn execute_returning_count<T>(&mut self, _source: &T) -> QueryResult<usize>
583            where
584                T: crate::query_builder::QueryFragment<Self::Backend>
585                    + crate::query_builder::QueryId,
586            {
587                self.next_results.pop_front().expect("No next result")
588            }
589
590            fn transaction_state(
591                &mut self,
592            ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData
593            {
594                &mut self.transaction_state
595            }
596
597            fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation {
598                &mut self.instrumentation
599            }
600
601            fn set_instrumentation(
602                &mut self,
603                instrumentation: impl crate::connection::Instrumentation,
604            ) {
605                self.instrumentation = Some(Box::new(instrumentation));
606            }
607
608            fn set_prepared_statement_cache_size(&mut self, _size: crate::connection::CacheSize) {
609                panic!("implement, if you want to use it")
610            }
611        }
612    }
613
614    #[diesel_test_helper::test]
615    #[cfg(feature = "postgres")]
616    fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
617        use crate::PgConnection;
618        use crate::connection::transaction_manager::AnsiTransactionManager;
619        use crate::connection::transaction_manager::TransactionManager;
620        use crate::result::Error;
621
622        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
623        assert_eq!(
624            None,
625            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
626                conn
627            ).transaction_depth().expect("Transaction depth")
628        );
629        let result = AnsiTransactionManager::commit_transaction(conn);
630        assert!(matches!(result, Err(Error::NotInTransaction)))
631    }
632
633    #[diesel_test_helper::test]
634    #[cfg(feature = "postgres")]
635    fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
636        use crate::PgConnection;
637        use crate::connection::transaction_manager::AnsiTransactionManager;
638        use crate::connection::transaction_manager::TransactionManager;
639        use crate::result::Error;
640
641        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
642        assert_eq!(
643            None,
644            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
645                conn
646            ).transaction_depth().expect("Transaction depth")
647        );
648        let result = AnsiTransactionManager::rollback_transaction(conn);
649        assert!(matches!(result, Err(Error::NotInTransaction)))
650    }
651
652    #[diesel_test_helper::test]
653    fn transaction_manager_enters_broken_state_when_connection_is_broken() {
654        use crate::connection::TransactionManagerStatus;
655        use crate::connection::transaction_manager::AnsiTransactionManager;
656        use crate::connection::transaction_manager::TransactionManager;
657        use crate::result::{DatabaseErrorKind, Error};
658        use crate::*;
659
660        let mut conn = mock::MockConnection::establish("mock").expect("Mock connection");
661
662        // Set result for BEGIN
663        conn.next_batch_execute_results.push_back(Ok(()));
664        let result = conn.transaction(|conn| {
665            conn.next_results.push_back(Ok(1));
666            let query_result = sql_query("SELECT 1").execute(conn);
667            assert!(query_result.is_ok());
668            // Set result for COMMIT attempt
669            conn.next_batch_execute_results
670                .push_back(Err(Error::DatabaseError(
671                    DatabaseErrorKind::Unknown,
672                    Box::new("commit fails".to_string()),
673                )));
674            conn.top_level_requires_rollback_after_next_batch_execute = true;
675            conn.next_batch_execute_results
676                .push_back(Err(Error::DatabaseError(
677                    DatabaseErrorKind::Unknown,
678                    Box::new("rollback also fails".to_string()),
679                )));
680            Ok(())
681        });
682        assert!(
683            matches!(
684                &result,
685                Err(Error::RollbackErrorOnCommit {
686                    rollback_error,
687                    commit_error
688                }) if matches!(**commit_error, Error::DatabaseError(DatabaseErrorKind::Unknown, _))
689                    && matches!(&**rollback_error,
690                        Error::DatabaseError(DatabaseErrorKind::Unknown, msg)
691                            if msg.message() == "rollback also fails"
692                    )
693            ),
694            "Got {:?}",
695            result
696        );
697        assert!(matches!(
698            *AnsiTransactionManager::transaction_manager_status_mut(&mut conn),
699            TransactionManagerStatus::InError
700        ));
701        // Ensure the transaction manager is unusable
702        let result = conn.transaction(|_conn| Ok(()));
703        assert!(matches!(result, Err(Error::BrokenTransactionManager)))
704    }
705
706    #[diesel_test_helper::test]
707    #[cfg(feature = "mysql")]
708    fn mysql_transaction_is_rolled_back_upon_syntax_error() {
709        use crate::connection::transaction_manager::AnsiTransactionManager;
710        use crate::connection::transaction_manager::TransactionManager;
711        use crate::*;
712        use std::num::NonZeroU32;
713
714        let conn = &mut crate::test_helpers::connection_no_transaction();
715        assert_eq!(
716            None,
717            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
718                conn
719            ).transaction_depth().expect("Transaction depth")
720        );
721        let _result = conn.transaction(|conn| {
722            assert_eq!(
723                NonZeroU32::new(1),
724                <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
725                    conn
726            ).transaction_depth().expect("Transaction depth")
727            );
728            // In MySQL, a syntax error does not break the transaction block
729            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
730            assert!(query_result.is_err());
731            query_result
732        });
733        assert_eq!(
734            None,
735            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
736                conn
737            ).transaction_depth().expect("Transaction depth")
738        );
739    }
740
741    #[diesel_test_helper::test]
742    #[cfg(feature = "__sqlite-shared")]
743    fn sqlite_transaction_is_rolled_back_upon_syntax_error() {
744        use crate::connection::transaction_manager::AnsiTransactionManager;
745        use crate::connection::transaction_manager::TransactionManager;
746        use crate::*;
747        use std::num::NonZeroU32;
748
749        let conn = &mut crate::test_helpers::connection();
750        assert_eq!(
751            None,
752            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
753                conn
754            ).transaction_depth().expect("Transaction depth")
755        );
756        let _result = conn.transaction(|conn| {
757            assert_eq!(
758                NonZeroU32::new(1),
759                <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
760                    conn
761            ).transaction_depth().expect("Transaction depth")
762            );
763            // In Sqlite, a syntax error does not break the transaction block
764            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
765            assert!(query_result.is_err());
766            query_result
767        });
768        assert_eq!(
769            None,
770            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
771                conn
772            ).transaction_depth().expect("Transaction depth")
773        );
774    }
775
776    #[diesel_test_helper::test]
777    #[cfg(feature = "mysql")]
778    fn nested_mysql_transaction_is_rolled_back_upon_syntax_error() {
779        use crate::connection::transaction_manager::AnsiTransactionManager;
780        use crate::connection::transaction_manager::TransactionManager;
781        use crate::*;
782        use std::num::NonZeroU32;
783
784        let conn = &mut crate::test_helpers::connection_no_transaction();
785        assert_eq!(
786            None,
787            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
788                conn
789            ).transaction_depth().expect("Transaction depth")
790        );
791        let result = conn.transaction(|conn| {
792            assert_eq!(
793                NonZeroU32::new(1),
794                <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
795                    conn
796            ).transaction_depth().expect("Transaction depth")
797            );
798            let result = conn.transaction(|conn| {
799                assert_eq!(
800                    NonZeroU32::new(2),
801                    <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
802                        conn
803            ).transaction_depth().expect("Transaction depth")
804                );
805                // In MySQL, a syntax error does not break the transaction block
806                sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
807            });
808            assert!(result.is_err());
809            assert_eq!(
810                NonZeroU32::new(1),
811                <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
812                    conn
813            ).transaction_depth().expect("Transaction depth")
814            );
815            let query_result = sql_query("SELECT 1").execute(conn);
816            assert!(query_result.is_ok());
817            query_result
818        });
819        assert!(result.is_ok());
820        assert_eq!(
821            None,
822            <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
823                conn
824            ).transaction_depth().expect("Transaction depth")
825        );
826    }
827
828    #[diesel_test_helper::test]
829    #[cfg(feature = "mysql")]
830    // This function uses a collect with side effects (spawning threads)
831    // so clippy is wrong here
832    #[allow(clippy::needless_collect)]
833    fn mysql_transaction_depth_commits_tracked_properly_on_serialization_failure() {
834        use crate::result::DatabaseErrorKind::SerializationFailure;
835        use crate::result::Error::DatabaseError;
836        use crate::*;
837        use std::num::NonZeroU32;
838        use std::sync::{Arc, Barrier};
839        use std::thread;
840
841        table! {
842            #[sql_name = "mysql_transaction_depth_is_tracked_properly_on_commit_failure"]
843            serialization_example {
844                id -> Integer,
845                class -> Integer,
846            }
847        }
848
849        let conn = &mut crate::test_helpers::connection_no_transaction();
850
851        sql_query(
852            "DROP TABLE IF EXISTS mysql_transaction_depth_is_tracked_properly_on_commit_failure;",
853        )
854        .execute(conn)
855        .unwrap();
856        sql_query(
857            r#"
858            CREATE TABLE mysql_transaction_depth_is_tracked_properly_on_commit_failure (
859                id INT AUTO_INCREMENT PRIMARY KEY,
860                class INTEGER NOT NULL
861            )
862        "#,
863        )
864        .execute(conn)
865        .unwrap();
866
867        insert_into(serialization_example::table)
868            .values(&vec![
869                serialization_example::class.eq(1),
870                serialization_example::class.eq(2),
871            ])
872            .execute(conn)
873            .unwrap();
874
875        let before_barrier = Arc::new(Barrier::new(2));
876        let after_barrier = Arc::new(Barrier::new(2));
877
878        let threads = (1..3)
879            .map(|i| {
880                let before_barrier = before_barrier.clone();
881                let after_barrier = after_barrier.clone();
882                thread::spawn(move || {
883                    use crate::connection::transaction_manager::AnsiTransactionManager;
884                    use crate::connection::transaction_manager::TransactionManager;
885                    let conn = &mut crate::test_helpers::connection_no_transaction();
886                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
887                    crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
888
889                    let result =
890                    conn.transaction(|conn| {
891                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
892                        let _ = serialization_example::table
893                            .filter(serialization_example::class.eq(i))
894                            .count()
895                            .execute(conn)?;
896
897                        let other_i = if i == 1 { 2 } else { 1 };
898                        let q = insert_into(serialization_example::table)
899                            .values(serialization_example::class.eq(other_i));
900                        before_barrier.wait();
901
902                        let r = q.execute(conn);
903                        after_barrier.wait();
904                        r
905                    });
906
907                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
908
909                    let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
910                    assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
911                    result
912                })
913            })
914            .collect::<Vec<_>>();
915        let second_trans_result =
916            conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
917        assert!(
918            second_trans_result.is_ok(),
919            "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}",
920            second_trans_result
921        );
922
923        let mut results = threads
924            .into_iter()
925            .map(|t| t.join().unwrap())
926            .collect::<Vec<_>>();
927
928        results.sort_by_key(|r| r.is_err());
929        assert!(results[0].is_ok(), "Got {:?} instead", results);
930        // Note that contrary to Postgres, this is not a commit failure
931        assert!(
932            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
933            "Got {:?} instead",
934            results
935        );
936    }
937
938    #[diesel_test_helper::test]
939    #[cfg(feature = "mysql")]
940    // This function uses a collect with side effects (spawning threads)
941    // so clippy is wrong here
942    #[allow(clippy::needless_collect)]
943    fn mysql_nested_transaction_depth_commits_tracked_properly_on_serialization_failure() {
944        use crate::result::DatabaseErrorKind::SerializationFailure;
945        use crate::result::Error::DatabaseError;
946        use crate::*;
947        use std::num::NonZeroU32;
948        use std::sync::{Arc, Barrier};
949        use std::thread;
950
951        table! {
952            #[sql_name = "mysql_nested_trans_depth_is_tracked_properly_on_commit_failure"]
953            serialization_example {
954                id -> Integer,
955                class -> Integer,
956            }
957        }
958
959        let conn = &mut crate::test_helpers::connection_no_transaction();
960
961        sql_query(
962            "DROP TABLE IF EXISTS mysql_nested_trans_depth_is_tracked_properly_on_commit_failure;",
963        )
964        .execute(conn)
965        .unwrap();
966        sql_query(
967            r#"
968            CREATE TABLE mysql_nested_trans_depth_is_tracked_properly_on_commit_failure (
969                id INT AUTO_INCREMENT PRIMARY KEY,
970                class INTEGER NOT NULL
971            )
972        "#,
973        )
974        .execute(conn)
975        .unwrap();
976
977        insert_into(serialization_example::table)
978            .values(&vec![
979                serialization_example::class.eq(1),
980                serialization_example::class.eq(2),
981            ])
982            .execute(conn)
983            .unwrap();
984
985        let before_barrier = Arc::new(Barrier::new(2));
986        let after_barrier = Arc::new(Barrier::new(2));
987
988        let threads = (1..3)
989            .map(|i| {
990                let before_barrier = before_barrier.clone();
991                let after_barrier = after_barrier.clone();
992                thread::spawn(move || {
993                    use crate::connection::transaction_manager::AnsiTransactionManager;
994                    use crate::connection::transaction_manager::TransactionManager;
995                    let conn = &mut crate::test_helpers::connection_no_transaction();
996                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
997                    crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
998
999                    let result =
1000                    conn.transaction(|conn| {
1001                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1002                       conn.transaction(|conn| {
1003                            assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1004                            let _ = serialization_example::table
1005                                .filter(serialization_example::class.eq(i))
1006                                .count()
1007                                .execute(conn)?;
1008
1009                            let other_i = if i == 1 { 2 } else { 1 };
1010                            let q = insert_into(serialization_example::table)
1011                                .values(serialization_example::class.eq(other_i));
1012                            before_barrier.wait();
1013
1014                            let r = q.execute(conn);
1015                            after_barrier.wait();
1016                            r
1017                        })
1018                    });
1019
1020                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1021
1022                    let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1023                    assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
1024                    result
1025                })
1026            })
1027            .collect::<Vec<_>>();
1028        let second_trans_result =
1029            conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1030        assert!(
1031            second_trans_result.is_ok(),
1032            "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}",
1033            second_trans_result
1034        );
1035
1036        let mut results = threads
1037            .into_iter()
1038            .map(|t| t.join().unwrap())
1039            .collect::<Vec<_>>();
1040
1041        results.sort_by_key(|r| r.is_err());
1042        assert!(results[0].is_ok(), "Got {:?} instead", results);
1043        assert!(
1044            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1045            "Got {:?} instead",
1046            results
1047        );
1048    }
1049
1050    #[diesel_test_helper::test]
1051    #[cfg(feature = "__sqlite-shared")]
1052    fn sqlite_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1053        use crate::connection::transaction_manager::AnsiTransactionManager;
1054        use crate::connection::transaction_manager::TransactionManager;
1055        use crate::result::Error;
1056        use crate::*;
1057        use std::num::NonZeroU32;
1058
1059        let conn = &mut crate::test_helpers::connection();
1060        assert_eq!(
1061            None,
1062            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1063                conn
1064            ).transaction_depth().expect("Transaction depth")
1065        );
1066        let result: Result<_, Error> = conn.transaction(|conn| {
1067            assert_eq!(
1068                NonZeroU32::new(1),
1069                <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1070                    conn
1071            ).transaction_depth().expect("Transaction depth")
1072            );
1073            sql_query("DROP TABLE IF EXISTS deferred_commit").execute(conn)?;
1074            sql_query("CREATE TABLE deferred_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1075            sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn)?;
1076            let result = sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn);
1077            assert!(result.is_ok());
1078            Ok(())
1079        });
1080        assert!(result.is_err());
1081        assert_eq!(
1082            None,
1083            <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1084                conn
1085            ).transaction_depth().expect("Transaction depth")
1086        );
1087    }
1088
1089    // regression test for #3470
1090    // crates.io depends on this behaviour
1091    #[diesel_test_helper::test]
1092    #[cfg(feature = "postgres")]
1093    fn some_libpq_failures_are_recoverable_by_rolling_back_the_savepoint_only() {
1094        use crate::connection::{AnsiTransactionManager, TransactionManager};
1095        use crate::prelude::*;
1096        use crate::sql_query;
1097
1098        crate::table! {
1099            rollback_test (id) {
1100                id -> Int4,
1101                value -> Int4,
1102            }
1103        }
1104
1105        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1106        assert_eq!(
1107            None,
1108            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1109                conn
1110            ).transaction_depth().expect("Transaction depth")
1111        );
1112
1113        let res = conn.transaction(|conn| {
1114            sql_query(
1115                "CREATE TABLE IF NOT EXISTS rollback_test (id INT PRIMARY KEY, value INT NOT NULL)",
1116            )
1117            .execute(conn)?;
1118            conn.transaction(|conn| {
1119                sql_query("SET TRANSACTION READ ONLY").execute(conn)?;
1120                crate::update(rollback_test::table)
1121                    .set(rollback_test::value.eq(0))
1122                    .execute(conn)
1123            })
1124            .map(|_| {
1125                panic!("Should use the `or_else` branch");
1126            })
1127            .or_else(|_| sql_query("SELECT 1").execute(conn))
1128            .map(|_| ())
1129        });
1130        assert!(res.is_ok());
1131
1132        assert_eq!(
1133            None,
1134            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1135                conn
1136            ).transaction_depth().expect("Transaction depth")
1137        );
1138    }
1139
1140    #[diesel_test_helper::test]
1141    #[cfg(feature = "postgres")]
1142    fn other_libpq_failures_are_not_recoverable_by_rolling_back_the_savepoint_only() {
1143        use crate::connection::{AnsiTransactionManager, TransactionManager};
1144        use crate::prelude::*;
1145        use crate::sql_query;
1146        use std::num::NonZeroU32;
1147        use std::sync::{Arc, Barrier};
1148
1149        crate::table! {
1150            rollback_test2 (id) {
1151                id -> Int4,
1152                value -> Int4,
1153            }
1154        }
1155        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1156
1157        sql_query(
1158            "CREATE TABLE IF NOT EXISTS rollback_test2 (id INT PRIMARY KEY, value INT NOT NULL)",
1159        )
1160        .execute(conn)
1161        .unwrap();
1162
1163        let start_barrier = Arc::new(Barrier::new(2));
1164        let commit_barrier = Arc::new(Barrier::new(2));
1165
1166        let other_start_barrier = start_barrier.clone();
1167        let other_commit_barrier = commit_barrier.clone();
1168
1169        let t1 = std::thread::spawn(move || {
1170            let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1171            assert_eq!(
1172                None,
1173                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1174                    conn
1175                ).transaction_depth().expect("Transaction depth")
1176            );
1177            let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1178                assert_eq!(
1179                    NonZeroU32::new(1),
1180                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1181                        conn
1182                    ).transaction_depth().expect("Transaction depth")
1183                );
1184                rollback_test2::table.load::<(i32, i32)>(conn)?;
1185                crate::insert_into(rollback_test2::table)
1186                    .values((rollback_test2::id.eq(1), rollback_test2::value.eq(42)))
1187                    .execute(conn)?;
1188                let r = conn.transaction(|conn| {
1189                    assert_eq!(
1190                        NonZeroU32::new(2),
1191                        <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1192                            conn
1193                        ).transaction_depth().expect("Transaction depth")
1194                    );
1195                    start_barrier.wait();
1196                    commit_barrier.wait();
1197                    let r = rollback_test2::table.load::<(i32, i32)>(conn);
1198                    assert!(r.is_err());
1199                    Err::<(), _>(crate::result::Error::RollbackTransaction)
1200                });
1201                assert_eq!(
1202                    NonZeroU32::new(1),
1203                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1204                        conn
1205                    ).transaction_depth().expect("Transaction depth")
1206                );
1207                assert!(
1208                    matches!(r, Err(crate::result::Error::RollbackTransaction)),
1209                    "rollback failed (such errors should be ignored by transaction manager): {}",
1210                    r.unwrap_err()
1211                );
1212                let r = rollback_test2::table.load::<(i32, i32)>(conn);
1213                assert!(r.is_err());
1214                // fun fact: if hitting "commit" after receiving a serialization failure, PG
1215                // returns that the commit has succeeded, but in fact it was actually rolled back.
1216                // soo.. one should avoid doing that
1217                r
1218            });
1219            assert!(r.is_err());
1220            assert_eq!(
1221                None,
1222                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1223                    conn
1224                ).transaction_depth().expect("Transaction depth")
1225            );
1226        });
1227
1228        let t2 = std::thread::spawn(move || {
1229            other_start_barrier.wait();
1230            let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1231            assert_eq!(
1232                None,
1233                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1234                    conn
1235                ).transaction_depth().expect("Transaction depth")
1236            );
1237            let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1238                assert_eq!(
1239                    NonZeroU32::new(1),
1240                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1241                        conn
1242                    ).transaction_depth().expect("Transaction depth")
1243                );
1244                let _ = rollback_test2::table.load::<(i32, i32)>(conn)?;
1245                crate::insert_into(rollback_test2::table)
1246                    .values((rollback_test2::id.eq(23), rollback_test2::value.eq(42)))
1247                    .execute(conn)?;
1248                Ok(())
1249            });
1250            other_commit_barrier.wait();
1251            assert!(r.is_ok(), "{:?}", r.unwrap_err());
1252            assert_eq!(
1253                None,
1254                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1255                    conn
1256                ).transaction_depth().expect("Transaction depth")
1257            );
1258        });
1259        crate::sql_query("DELETE FROM rollback_test2")
1260            .execute(conn)
1261            .unwrap();
1262        t1.join().unwrap();
1263        t2.join().unwrap();
1264    }
1265}