1use crate::connection::Connection;
2use crate::result::{Error, QueryResult};
3use std::borrow::Cow;
4use std::num::NonZeroU32;
5
6pub trait TransactionManager<Conn: Connection> {
11 type TransactionStateData;
14
15 fn begin_transaction(conn: &mut Conn) -> QueryResult<()>;
21
22 fn rollback_transaction(conn: &mut Conn) -> QueryResult<()>;
28
29 fn commit_transaction(conn: &mut Conn) -> QueryResult<()>;
35
36 #[diesel_derives::__diesel_public_if(
42 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
43 )]
44 fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus;
45
46 fn transaction<F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
51 where
52 F: FnOnce(&mut Conn) -> Result<R, E>,
53 E: From<Error>,
54 {
55 Self::begin_transaction(conn)?;
56 match callback(&mut *conn) {
57 Ok(value) => {
58 Self::commit_transaction(conn)?;
59 Ok(value)
60 }
61 Err(user_error) => match Self::rollback_transaction(conn) {
62 Ok(()) => Err(user_error),
63 Err(Error::BrokenTransactionManager) => {
64 Err(user_error)
67 }
68 Err(rollback_error) => Err(rollback_error.into()),
69 },
70 }
71 }
72
73 #[diesel_derives::__diesel_public_if(
81 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
82 )]
83 fn is_broken_transaction_manager(conn: &mut Conn) -> bool {
84 match Self::transaction_manager_status_mut(conn).transaction_state() {
85 Ok(ValidTransactionManagerStatus {
88 in_transaction: None,
89 }) => false,
90 Err(_) => true,
93 Ok(ValidTransactionManagerStatus {
97 in_transaction: Some(s),
98 }) => !s.test_transaction,
99 }
100 }
101}
102
103#[derive(Default, Debug)]
106pub struct AnsiTransactionManager {
107 pub(crate) status: TransactionManagerStatus,
108}
109
110#[diesel_derives::__diesel_public_if(
112 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
113)]
114#[derive(Debug)]
115pub enum TransactionManagerStatus {
116 Valid(ValidTransactionManagerStatus),
118 InError,
120}
121
122impl Default for TransactionManagerStatus {
123 fn default() -> Self {
124 TransactionManagerStatus::Valid(ValidTransactionManagerStatus::default())
125 }
126}
127
128impl TransactionManagerStatus {
129 pub fn transaction_depth(&self) -> QueryResult<Option<NonZeroU32>> {
132 match self {
133 TransactionManagerStatus::Valid(valid_status) => Ok(valid_status.transaction_depth()),
134 TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
135 }
136 }
137
138 #[cfg(any(
139 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
140 feature = "postgres",
141 feature = "mysql",
142 test
143 ))]
144 #[diesel_derives::__diesel_public_if(
145 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
146 )]
147 pub(crate) fn set_requires_rollback_maybe_up_to_top_level(&mut self, to: bool) {
153 if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
154 in_transaction:
155 Some(InTransactionStatus {
156 requires_rollback_maybe_up_to_top_level,
157 ..
158 }),
159 }) = self
160 {
161 *requires_rollback_maybe_up_to_top_level = to;
162 }
163 }
164
165 pub fn set_in_error(&mut self) {
170 *self = TransactionManagerStatus::InError
171 }
172
173 #[diesel_derives::__diesel_public_if(
178 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
179 )]
180 pub(self) fn transaction_state(&mut self) -> QueryResult<&mut ValidTransactionManagerStatus> {
181 match self {
182 TransactionManagerStatus::Valid(valid_status) => Ok(valid_status),
183 TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
184 }
185 }
186
187 #[diesel_derives::__diesel_public_if(
194 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
195 )]
196 pub(crate) fn set_test_transaction_flag(&mut self) {
197 if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
198 in_transaction: Some(s),
199 }) = self
200 {
201 s.test_transaction = true;
202 }
203 }
204}
205
206#[allow(missing_copy_implementations)]
208#[derive(Debug, Default)]
209#[diesel_derives::__diesel_public_if(
210 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
211 public_fields(in_transaction)
212)]
213pub struct ValidTransactionManagerStatus {
214 in_transaction: Option<InTransactionStatus>,
216}
217
218#[allow(missing_copy_implementations)]
221#[derive(Debug)]
222#[diesel_derives::__diesel_public_if(
223 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
224 public_fields(
225 test_transaction,
226 transaction_depth,
227 requires_rollback_maybe_up_to_top_level
228 )
229)]
230pub struct InTransactionStatus {
231 transaction_depth: NonZeroU32,
233 requires_rollback_maybe_up_to_top_level: bool,
236 test_transaction: bool,
238}
239
240impl ValidTransactionManagerStatus {
241 pub fn transaction_depth(&self) -> Option<NonZeroU32> {
246 self.in_transaction.as_ref().map(|it| it.transaction_depth)
247 }
248
249 pub fn change_transaction_depth(
252 &mut self,
253 transaction_depth_change: TransactionDepthChange,
254 ) -> QueryResult<()> {
255 match (&mut self.in_transaction, transaction_depth_change) {
256 (Some(in_transaction), TransactionDepthChange::IncreaseDepth) => {
257 in_transaction.transaction_depth =
260 NonZeroU32::new(in_transaction.transaction_depth.get().saturating_add(1))
261 .expect("nz + nz is always non-zero");
262 Ok(())
263 }
264 (Some(in_transaction), TransactionDepthChange::DecreaseDepth) => {
265 match NonZeroU32::new(in_transaction.transaction_depth.get() - 1) {
267 Some(depth) => in_transaction.transaction_depth = depth,
268 None => self.in_transaction = None,
269 }
270 Ok(())
271 }
272 (None, TransactionDepthChange::IncreaseDepth) => {
273 self.in_transaction = Some(InTransactionStatus {
274 transaction_depth: NonZeroU32::new(1).expect("1 is non-zero"),
275 requires_rollback_maybe_up_to_top_level: false,
276 test_transaction: false,
277 });
278 Ok(())
279 }
280 (None, TransactionDepthChange::DecreaseDepth) => {
281 Err(Error::NotInTransaction)
285 }
286 }
287 }
288}
289
290#[derive(Debug, Clone, Copy)]
292pub enum TransactionDepthChange {
293 IncreaseDepth,
295 DecreaseDepth,
297}
298
299impl AnsiTransactionManager {
300 fn get_transaction_state<Conn>(
301 conn: &mut Conn,
302 ) -> QueryResult<&mut ValidTransactionManagerStatus>
303 where
304 Conn: Connection<TransactionManager = Self>,
305 {
306 conn.transaction_state().status.transaction_state()
307 }
308
309 pub fn begin_transaction_sql<Conn>(conn: &mut Conn, sql: &str) -> QueryResult<()>
315 where
316 Conn: Connection<TransactionManager = Self>,
317 {
318 let state = Self::get_transaction_state(conn)?;
319 if let Some(_depth) = state.transaction_depth() {
320 return Err(Error::AlreadyInTransaction);
321 }
322 let instrumentation_depth = NonZeroU32::new(1);
323 conn.instrumentation().on_connection_event(
326 super::instrumentation::InstrumentationEvent::BeginTransaction {
327 depth: instrumentation_depth.expect("We know that 1 is not zero"),
328 },
329 );
330 conn.batch_execute(sql)?;
331 Self::get_transaction_state(conn)?
332 .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
333
334 Ok(())
335 }
336}
337
338impl<Conn> TransactionManager<Conn> for AnsiTransactionManager
339where
340 Conn: Connection<TransactionManager = Self>,
341{
342 type TransactionStateData = Self;
343
344 fn begin_transaction(conn: &mut Conn) -> QueryResult<()> {
345 let transaction_state = Self::get_transaction_state(conn)?;
346 let transaction_depth = transaction_state.transaction_depth();
347 let start_transaction_sql = match transaction_depth {
348 None => Cow::from("BEGIN"),
349 Some(transaction_depth) => {
350 Cow::from(format!("SAVEPOINT diesel_savepoint_{transaction_depth}"))
351 }
352 };
353 let instrumentation_depth =
354 NonZeroU32::new(transaction_depth.map_or(0, NonZeroU32::get).wrapping_add(1));
355 let sql = &start_transaction_sql;
356 conn.instrumentation().on_connection_event(
359 super::instrumentation::InstrumentationEvent::BeginTransaction {
360 depth: instrumentation_depth.expect("Transaction depth is too large"),
361 },
362 );
363 conn.batch_execute(sql)?;
364 Self::get_transaction_state(conn)?
365 .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
366
367 Ok(())
368 }
369
370 fn rollback_transaction(conn: &mut Conn) -> QueryResult<()> {
371 let transaction_state = Self::get_transaction_state(conn)?;
372
373 let (
374 (rollback_sql, rolling_back_top_level),
375 requires_rollback_maybe_up_to_top_level_before_execute,
376 ) = match transaction_state.in_transaction {
377 Some(ref in_transaction) => (
378 match in_transaction.transaction_depth.get() {
379 1 => (Cow::Borrowed("ROLLBACK"), true),
380 depth_gt1 => (
381 Cow::Owned(format!(
382 "ROLLBACK TO SAVEPOINT diesel_savepoint_{}",
383 depth_gt1 - 1
384 )),
385 false,
386 ),
387 },
388 in_transaction.requires_rollback_maybe_up_to_top_level,
389 ),
390 None => return Err(Error::NotInTransaction),
391 };
392 let depth = transaction_state
393 .transaction_depth()
394 .expect("We know that we are in a transaction here");
395 conn.instrumentation().on_connection_event(
396 super::instrumentation::InstrumentationEvent::RollbackTransaction { depth },
397 );
398
399 match conn.batch_execute(&rollback_sql) {
400 Ok(()) => {
401 match Self::get_transaction_state(conn)?
402 .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
403 {
404 Ok(()) => {}
405 Err(Error::NotInTransaction) if rolling_back_top_level => {
406 }
409 Err(e) => return Err(e),
410 }
411 Ok(())
412 }
413 Err(rollback_error) => {
414 let tm_status = Self::transaction_manager_status_mut(conn);
415 match tm_status {
416 TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
417 in_transaction:
418 Some(InTransactionStatus {
419 transaction_depth,
420 requires_rollback_maybe_up_to_top_level,
421 ..
422 }),
423 }) if transaction_depth.get() > 1 => {
424 *transaction_depth = NonZeroU32::new(transaction_depth.get() - 1)
432 .expect("Depth was checked to be > 1");
433 *requires_rollback_maybe_up_to_top_level = true;
434 if requires_rollback_maybe_up_to_top_level_before_execute {
435 return Ok(());
438 }
439 }
440 TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
441 in_transaction: None,
442 }) => {
443 }
448 _ => tm_status.set_in_error(),
449 }
450 Err(rollback_error)
451 }
452 }
453 }
454
455 fn commit_transaction(conn: &mut Conn) -> QueryResult<()> {
461 let transaction_state = Self::get_transaction_state(conn)?;
462 let transaction_depth = transaction_state.transaction_depth();
463 let (commit_sql, committing_top_level) = match transaction_depth {
464 None => return Err(Error::NotInTransaction),
465 Some(transaction_depth) if transaction_depth.get() == 1 => {
466 (Cow::Borrowed("COMMIT"), true)
467 }
468 Some(transaction_depth) => (
469 Cow::Owned(format!(
470 "RELEASE SAVEPOINT diesel_savepoint_{}",
471 transaction_depth.get() - 1
472 )),
473 false,
474 ),
475 };
476 let depth = transaction_state
477 .transaction_depth()
478 .expect("We know that we are in a transaction here");
479 conn.instrumentation().on_connection_event(
480 super::instrumentation::InstrumentationEvent::CommitTransaction { depth },
481 );
482 match conn.batch_execute(&commit_sql) {
483 Ok(()) => {
484 match Self::get_transaction_state(conn)?
485 .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
486 {
487 Ok(()) => {}
488 Err(Error::NotInTransaction) if committing_top_level => {
489 }
492 Err(e) => return Err(e),
493 }
494 Ok(())
495 }
496 Err(commit_error) => {
497 if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
498 in_transaction:
499 Some(InTransactionStatus {
500 requires_rollback_maybe_up_to_top_level: true,
501 ..
502 }),
503 }) = conn.transaction_state().status
504 {
505 match Self::rollback_transaction(conn) {
506 Ok(()) => {}
507 Err(rollback_error) => {
508 conn.transaction_state().status.set_in_error();
509 return Err(Error::RollbackErrorOnCommit {
510 rollback_error: Box::new(rollback_error),
511 commit_error: Box::new(commit_error),
512 });
513 }
514 }
515 }
516 Err(commit_error)
517 }
518 }
519 }
520
521 fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus {
522 &mut conn.transaction_state().status
523 }
524}
525
526#[cfg(test)]
527#[allow(clippy::uninlined_format_args)]
529mod test {
530 mod mock {
532 use crate::connection::transaction_manager::AnsiTransactionManager;
533 use crate::connection::Instrumentation;
534 use crate::connection::{
535 Connection, ConnectionSealed, SimpleConnection, TransactionManager,
536 };
537 use crate::result::QueryResult;
538 use crate::test_helpers::TestConnection;
539 use std::collections::VecDeque;
540
541 pub(crate) struct MockConnection {
542 pub(crate) next_results: VecDeque<QueryResult<usize>>,
543 pub(crate) next_batch_execute_results: VecDeque<QueryResult<()>>,
544 pub(crate) top_level_requires_rollback_after_next_batch_execute: bool,
545 transaction_state: AnsiTransactionManager,
546 instrumentation: Option<Box<dyn Instrumentation>>,
547 }
548
549 impl SimpleConnection for MockConnection {
550 fn batch_execute(&mut self, _query: &str) -> QueryResult<()> {
551 let res = self
552 .next_batch_execute_results
553 .pop_front()
554 .expect("No next result");
555 if self.top_level_requires_rollback_after_next_batch_execute {
556 self.transaction_state
557 .status
558 .set_requires_rollback_maybe_up_to_top_level(true);
559 }
560 res
561 }
562 }
563
564 impl ConnectionSealed for MockConnection {}
565
566 impl Connection for MockConnection {
567 type Backend = <TestConnection as Connection>::Backend;
568
569 type TransactionManager = AnsiTransactionManager;
570
571 fn establish(_database_url: &str) -> crate::ConnectionResult<Self> {
572 Ok(Self {
573 next_results: VecDeque::new(),
574 next_batch_execute_results: VecDeque::new(),
575 top_level_requires_rollback_after_next_batch_execute: false,
576 transaction_state: AnsiTransactionManager::default(),
577 instrumentation: None,
578 })
579 }
580
581 fn execute_returning_count<T>(&mut self, _source: &T) -> QueryResult<usize>
582 where
583 T: crate::query_builder::QueryFragment<Self::Backend>
584 + crate::query_builder::QueryId,
585 {
586 self.next_results.pop_front().expect("No next result")
587 }
588
589 fn transaction_state(
590 &mut self,
591 ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData
592 {
593 &mut self.transaction_state
594 }
595
596 fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation {
597 &mut self.instrumentation
598 }
599
600 fn set_instrumentation(
601 &mut self,
602 instrumentation: impl crate::connection::Instrumentation,
603 ) {
604 self.instrumentation = Some(Box::new(instrumentation));
605 }
606
607 fn set_prepared_statement_cache_size(&mut self, _size: crate::connection::CacheSize) {
608 panic!("implement, if you want to use it")
609 }
610 }
611 }
612
613 #[diesel_test_helper::test]
614 #[cfg(feature = "postgres")]
615 fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
616 use crate::connection::transaction_manager::AnsiTransactionManager;
617 use crate::connection::transaction_manager::TransactionManager;
618 use crate::result::Error;
619 use crate::PgConnection;
620
621 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
622 assert_eq!(
623 None,
624 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
625 conn
626 ).transaction_depth().expect("Transaction depth")
627 );
628 let result = AnsiTransactionManager::commit_transaction(conn);
629 assert!(matches!(result, Err(Error::NotInTransaction)))
630 }
631
632 #[diesel_test_helper::test]
633 #[cfg(feature = "postgres")]
634 fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
635 use crate::connection::transaction_manager::AnsiTransactionManager;
636 use crate::connection::transaction_manager::TransactionManager;
637 use crate::result::Error;
638 use crate::PgConnection;
639
640 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
641 assert_eq!(
642 None,
643 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
644 conn
645 ).transaction_depth().expect("Transaction depth")
646 );
647 let result = AnsiTransactionManager::rollback_transaction(conn);
648 assert!(matches!(result, Err(Error::NotInTransaction)))
649 }
650
651 #[diesel_test_helper::test]
652 fn transaction_manager_enters_broken_state_when_connection_is_broken() {
653 use crate::connection::transaction_manager::AnsiTransactionManager;
654 use crate::connection::transaction_manager::TransactionManager;
655 use crate::connection::TransactionManagerStatus;
656 use crate::result::{DatabaseErrorKind, Error};
657 use crate::*;
658
659 let mut conn = mock::MockConnection::establish("mock").expect("Mock connection");
660
661 conn.next_batch_execute_results.push_back(Ok(()));
663 let result = conn.transaction(|conn| {
664 conn.next_results.push_back(Ok(1));
665 let query_result = sql_query("SELECT 1").execute(conn);
666 assert!(query_result.is_ok());
667 conn.next_batch_execute_results
669 .push_back(Err(Error::DatabaseError(
670 DatabaseErrorKind::Unknown,
671 Box::new("commit fails".to_string()),
672 )));
673 conn.top_level_requires_rollback_after_next_batch_execute = true;
674 conn.next_batch_execute_results
675 .push_back(Err(Error::DatabaseError(
676 DatabaseErrorKind::Unknown,
677 Box::new("rollback also fails".to_string()),
678 )));
679 Ok(())
680 });
681 assert!(
682 matches!(
683 &result,
684 Err(Error::RollbackErrorOnCommit {
685 rollback_error,
686 commit_error
687 }) if matches!(**commit_error, Error::DatabaseError(DatabaseErrorKind::Unknown, _))
688 && matches!(&**rollback_error,
689 Error::DatabaseError(DatabaseErrorKind::Unknown, msg)
690 if msg.message() == "rollback also fails"
691 )
692 ),
693 "Got {:?}",
694 result
695 );
696 assert!(matches!(
697 *AnsiTransactionManager::transaction_manager_status_mut(&mut conn),
698 TransactionManagerStatus::InError
699 ));
700 let result = conn.transaction(|_conn| Ok(()));
702 assert!(matches!(result, Err(Error::BrokenTransactionManager)))
703 }
704
705 #[diesel_test_helper::test]
706 #[cfg(feature = "mysql")]
707 fn mysql_transaction_is_rolled_back_upon_syntax_error() {
708 use crate::connection::transaction_manager::AnsiTransactionManager;
709 use crate::connection::transaction_manager::TransactionManager;
710 use crate::*;
711 use std::num::NonZeroU32;
712
713 let conn = &mut crate::test_helpers::connection_no_transaction();
714 assert_eq!(
715 None,
716 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
717 conn
718 ).transaction_depth().expect("Transaction depth")
719 );
720 let _result = conn.transaction(|conn| {
721 assert_eq!(
722 NonZeroU32::new(1),
723 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
724 conn
725 ).transaction_depth().expect("Transaction depth")
726 );
727 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
729 assert!(query_result.is_err());
730 query_result
731 });
732 assert_eq!(
733 None,
734 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
735 conn
736 ).transaction_depth().expect("Transaction depth")
737 );
738 }
739
740 #[diesel_test_helper::test]
741 #[cfg(feature = "sqlite")]
742 fn sqlite_transaction_is_rolled_back_upon_syntax_error() {
743 use crate::connection::transaction_manager::AnsiTransactionManager;
744 use crate::connection::transaction_manager::TransactionManager;
745 use crate::*;
746 use std::num::NonZeroU32;
747
748 let conn = &mut crate::test_helpers::connection();
749 assert_eq!(
750 None,
751 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
752 conn
753 ).transaction_depth().expect("Transaction depth")
754 );
755 let _result = conn.transaction(|conn| {
756 assert_eq!(
757 NonZeroU32::new(1),
758 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
759 conn
760 ).transaction_depth().expect("Transaction depth")
761 );
762 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
764 assert!(query_result.is_err());
765 query_result
766 });
767 assert_eq!(
768 None,
769 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
770 conn
771 ).transaction_depth().expect("Transaction depth")
772 );
773 }
774
775 #[diesel_test_helper::test]
776 #[cfg(feature = "mysql")]
777 fn nested_mysql_transaction_is_rolled_back_upon_syntax_error() {
778 use crate::connection::transaction_manager::AnsiTransactionManager;
779 use crate::connection::transaction_manager::TransactionManager;
780 use crate::*;
781 use std::num::NonZeroU32;
782
783 let conn = &mut crate::test_helpers::connection_no_transaction();
784 assert_eq!(
785 None,
786 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
787 conn
788 ).transaction_depth().expect("Transaction depth")
789 );
790 let result = conn.transaction(|conn| {
791 assert_eq!(
792 NonZeroU32::new(1),
793 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
794 conn
795 ).transaction_depth().expect("Transaction depth")
796 );
797 let result = conn.transaction(|conn| {
798 assert_eq!(
799 NonZeroU32::new(2),
800 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
801 conn
802 ).transaction_depth().expect("Transaction depth")
803 );
804 sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
806 });
807 assert!(result.is_err());
808 assert_eq!(
809 NonZeroU32::new(1),
810 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
811 conn
812 ).transaction_depth().expect("Transaction depth")
813 );
814 let query_result = sql_query("SELECT 1").execute(conn);
815 assert!(query_result.is_ok());
816 query_result
817 });
818 assert!(result.is_ok());
819 assert_eq!(
820 None,
821 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
822 conn
823 ).transaction_depth().expect("Transaction depth")
824 );
825 }
826
827 #[diesel_test_helper::test]
828 #[cfg(feature = "mysql")]
829 #[allow(clippy::needless_collect)]
832 fn mysql_transaction_depth_commits_tracked_properly_on_serialization_failure() {
833 use crate::result::DatabaseErrorKind::SerializationFailure;
834 use crate::result::Error::DatabaseError;
835 use crate::*;
836 use std::num::NonZeroU32;
837 use std::sync::{Arc, Barrier};
838 use std::thread;
839
840 table! {
841 #[sql_name = "mysql_transaction_depth_is_tracked_properly_on_commit_failure"]
842 serialization_example {
843 id -> Integer,
844 class -> Integer,
845 }
846 }
847
848 let conn = &mut crate::test_helpers::connection_no_transaction();
849
850 sql_query(
851 "DROP TABLE IF EXISTS mysql_transaction_depth_is_tracked_properly_on_commit_failure;",
852 )
853 .execute(conn)
854 .unwrap();
855 sql_query(
856 r#"
857 CREATE TABLE mysql_transaction_depth_is_tracked_properly_on_commit_failure (
858 id INT AUTO_INCREMENT PRIMARY KEY,
859 class INTEGER NOT NULL
860 )
861 "#,
862 )
863 .execute(conn)
864 .unwrap();
865
866 insert_into(serialization_example::table)
867 .values(&vec![
868 serialization_example::class.eq(1),
869 serialization_example::class.eq(2),
870 ])
871 .execute(conn)
872 .unwrap();
873
874 let before_barrier = Arc::new(Barrier::new(2));
875 let after_barrier = Arc::new(Barrier::new(2));
876
877 let threads = (1..3)
878 .map(|i| {
879 let before_barrier = before_barrier.clone();
880 let after_barrier = after_barrier.clone();
881 thread::spawn(move || {
882 use crate::connection::transaction_manager::AnsiTransactionManager;
883 use crate::connection::transaction_manager::TransactionManager;
884 let conn = &mut crate::test_helpers::connection_no_transaction();
885 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
886 crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
887
888 let result =
889 conn.transaction(|conn| {
890 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
891 let _ = serialization_example::table
892 .filter(serialization_example::class.eq(i))
893 .count()
894 .execute(conn)?;
895
896 let other_i = if i == 1 { 2 } else { 1 };
897 let q = insert_into(serialization_example::table)
898 .values(serialization_example::class.eq(other_i));
899 before_barrier.wait();
900
901 let r = q.execute(conn);
902 after_barrier.wait();
903 r
904 });
905
906 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
907
908 let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
909 assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
910 result
911 })
912 })
913 .collect::<Vec<_>>();
914 let second_trans_result =
915 conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
916 assert!(second_trans_result.is_ok(), "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
917
918 let mut results = threads
919 .into_iter()
920 .map(|t| t.join().unwrap())
921 .collect::<Vec<_>>();
922
923 results.sort_by_key(|r| r.is_err());
924 assert!(results[0].is_ok(), "Got {:?} instead", results);
925 assert!(
927 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
928 "Got {:?} instead",
929 results
930 );
931 }
932
933 #[diesel_test_helper::test]
934 #[cfg(feature = "mysql")]
935 #[allow(clippy::needless_collect)]
938 fn mysql_nested_transaction_depth_commits_tracked_properly_on_serialization_failure() {
939 use crate::result::DatabaseErrorKind::SerializationFailure;
940 use crate::result::Error::DatabaseError;
941 use crate::*;
942 use std::num::NonZeroU32;
943 use std::sync::{Arc, Barrier};
944 use std::thread;
945
946 table! {
947 #[sql_name = "mysql_nested_trans_depth_is_tracked_properly_on_commit_failure"]
948 serialization_example {
949 id -> Integer,
950 class -> Integer,
951 }
952 }
953
954 let conn = &mut crate::test_helpers::connection_no_transaction();
955
956 sql_query(
957 "DROP TABLE IF EXISTS mysql_nested_trans_depth_is_tracked_properly_on_commit_failure;",
958 )
959 .execute(conn)
960 .unwrap();
961 sql_query(
962 r#"
963 CREATE TABLE mysql_nested_trans_depth_is_tracked_properly_on_commit_failure (
964 id INT AUTO_INCREMENT PRIMARY KEY,
965 class INTEGER NOT NULL
966 )
967 "#,
968 )
969 .execute(conn)
970 .unwrap();
971
972 insert_into(serialization_example::table)
973 .values(&vec![
974 serialization_example::class.eq(1),
975 serialization_example::class.eq(2),
976 ])
977 .execute(conn)
978 .unwrap();
979
980 let before_barrier = Arc::new(Barrier::new(2));
981 let after_barrier = Arc::new(Barrier::new(2));
982
983 let threads = (1..3)
984 .map(|i| {
985 let before_barrier = before_barrier.clone();
986 let after_barrier = after_barrier.clone();
987 thread::spawn(move || {
988 use crate::connection::transaction_manager::AnsiTransactionManager;
989 use crate::connection::transaction_manager::TransactionManager;
990 let conn = &mut crate::test_helpers::connection_no_transaction();
991 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
992 crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
993
994 let result =
995 conn.transaction(|conn| {
996 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
997 conn.transaction(|conn| {
998 assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
999 let _ = serialization_example::table
1000 .filter(serialization_example::class.eq(i))
1001 .count()
1002 .execute(conn)?;
1003
1004 let other_i = if i == 1 { 2 } else { 1 };
1005 let q = insert_into(serialization_example::table)
1006 .values(serialization_example::class.eq(other_i));
1007 before_barrier.wait();
1008
1009 let r = q.execute(conn);
1010 after_barrier.wait();
1011 r
1012 })
1013 });
1014
1015 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1016
1017 let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1018 assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
1019 result
1020 })
1021 })
1022 .collect::<Vec<_>>();
1023 let second_trans_result =
1024 conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1025 assert!(second_trans_result.is_ok(), "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
1026
1027 let mut results = threads
1028 .into_iter()
1029 .map(|t| t.join().unwrap())
1030 .collect::<Vec<_>>();
1031
1032 results.sort_by_key(|r| r.is_err());
1033 assert!(results[0].is_ok(), "Got {:?} instead", results);
1034 assert!(
1035 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1036 "Got {:?} instead",
1037 results
1038 );
1039 }
1040
1041 #[diesel_test_helper::test]
1042 #[cfg(feature = "sqlite")]
1043 fn sqlite_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1044 use crate::connection::transaction_manager::AnsiTransactionManager;
1045 use crate::connection::transaction_manager::TransactionManager;
1046 use crate::result::Error;
1047 use crate::*;
1048 use std::num::NonZeroU32;
1049
1050 let conn = &mut crate::test_helpers::connection();
1051 assert_eq!(
1052 None,
1053 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1054 conn
1055 ).transaction_depth().expect("Transaction depth")
1056 );
1057 let result: Result<_, Error> = conn.transaction(|conn| {
1058 assert_eq!(
1059 NonZeroU32::new(1),
1060 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1061 conn
1062 ).transaction_depth().expect("Transaction depth")
1063 );
1064 sql_query("DROP TABLE IF EXISTS deferred_commit").execute(conn)?;
1065 sql_query("CREATE TABLE deferred_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1066 sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn)?;
1067 let result = sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn);
1068 assert!(result.is_ok());
1069 Ok(())
1070 });
1071 assert!(result.is_err());
1072 assert_eq!(
1073 None,
1074 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1075 conn
1076 ).transaction_depth().expect("Transaction depth")
1077 );
1078 }
1079
1080 #[diesel_test_helper::test]
1083 #[cfg(feature = "postgres")]
1084 fn some_libpq_failures_are_recoverable_by_rolling_back_the_savepoint_only() {
1085 use crate::connection::{AnsiTransactionManager, TransactionManager};
1086 use crate::prelude::*;
1087 use crate::sql_query;
1088
1089 crate::table! {
1090 rollback_test (id) {
1091 id -> Int4,
1092 value -> Int4,
1093 }
1094 }
1095
1096 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1097 assert_eq!(
1098 None,
1099 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1100 conn
1101 ).transaction_depth().expect("Transaction depth")
1102 );
1103
1104 let res = conn.transaction(|conn| {
1105 sql_query(
1106 "CREATE TABLE IF NOT EXISTS rollback_test (id INT PRIMARY KEY, value INT NOT NULL)",
1107 )
1108 .execute(conn)?;
1109 conn.transaction(|conn| {
1110 sql_query("SET TRANSACTION READ ONLY").execute(conn)?;
1111 crate::update(rollback_test::table)
1112 .set(rollback_test::value.eq(0))
1113 .execute(conn)
1114 })
1115 .map(|_| {
1116 panic!("Should use the `or_else` branch");
1117 })
1118 .or_else(|_| sql_query("SELECT 1").execute(conn))
1119 .map(|_| ())
1120 });
1121 assert!(res.is_ok());
1122
1123 assert_eq!(
1124 None,
1125 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1126 conn
1127 ).transaction_depth().expect("Transaction depth")
1128 );
1129 }
1130
1131 #[diesel_test_helper::test]
1132 #[cfg(feature = "postgres")]
1133 fn other_libpq_failures_are_not_recoverable_by_rolling_back_the_savepoint_only() {
1134 use crate::connection::{AnsiTransactionManager, TransactionManager};
1135 use crate::prelude::*;
1136 use crate::sql_query;
1137 use std::num::NonZeroU32;
1138 use std::sync::{Arc, Barrier};
1139
1140 crate::table! {
1141 rollback_test2 (id) {
1142 id -> Int4,
1143 value -> Int4,
1144 }
1145 }
1146 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1147
1148 sql_query(
1149 "CREATE TABLE IF NOT EXISTS rollback_test2 (id INT PRIMARY KEY, value INT NOT NULL)",
1150 )
1151 .execute(conn)
1152 .unwrap();
1153
1154 let start_barrier = Arc::new(Barrier::new(2));
1155 let commit_barrier = Arc::new(Barrier::new(2));
1156
1157 let other_start_barrier = start_barrier.clone();
1158 let other_commit_barrier = commit_barrier.clone();
1159
1160 let t1 = std::thread::spawn(move || {
1161 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1162 assert_eq!(
1163 None,
1164 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1165 conn
1166 ).transaction_depth().expect("Transaction depth")
1167 );
1168 let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1169 assert_eq!(
1170 NonZeroU32::new(1),
1171 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1172 conn
1173 ).transaction_depth().expect("Transaction depth")
1174 );
1175 rollback_test2::table.load::<(i32, i32)>(conn)?;
1176 crate::insert_into(rollback_test2::table)
1177 .values((rollback_test2::id.eq(1), rollback_test2::value.eq(42)))
1178 .execute(conn)?;
1179 let r = conn.transaction(|conn| {
1180 assert_eq!(
1181 NonZeroU32::new(2),
1182 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1183 conn
1184 ).transaction_depth().expect("Transaction depth")
1185 );
1186 start_barrier.wait();
1187 commit_barrier.wait();
1188 let r = rollback_test2::table.load::<(i32, i32)>(conn);
1189 assert!(r.is_err());
1190 Err::<(), _>(crate::result::Error::RollbackTransaction)
1191 });
1192 assert_eq!(
1193 NonZeroU32::new(1),
1194 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1195 conn
1196 ).transaction_depth().expect("Transaction depth")
1197 );
1198 assert!(
1199 matches!(r, Err(crate::result::Error::RollbackTransaction)),
1200 "rollback failed (such errors should be ignored by transaction manager): {}",
1201 r.unwrap_err()
1202 );
1203 let r = rollback_test2::table.load::<(i32, i32)>(conn);
1204 assert!(r.is_err());
1205 r
1209 });
1210 assert!(r.is_err());
1211 assert_eq!(
1212 None,
1213 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1214 conn
1215 ).transaction_depth().expect("Transaction depth")
1216 );
1217 });
1218
1219 let t2 = std::thread::spawn(move || {
1220 other_start_barrier.wait();
1221 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1222 assert_eq!(
1223 None,
1224 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1225 conn
1226 ).transaction_depth().expect("Transaction depth")
1227 );
1228 let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1229 assert_eq!(
1230 NonZeroU32::new(1),
1231 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1232 conn
1233 ).transaction_depth().expect("Transaction depth")
1234 );
1235 let _ = rollback_test2::table.load::<(i32, i32)>(conn)?;
1236 crate::insert_into(rollback_test2::table)
1237 .values((rollback_test2::id.eq(23), rollback_test2::value.eq(42)))
1238 .execute(conn)?;
1239 Ok(())
1240 });
1241 other_commit_barrier.wait();
1242 assert!(r.is_ok(), "{:?}", r.unwrap_err());
1243 assert_eq!(
1244 None,
1245 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1246 conn
1247 ).transaction_depth().expect("Transaction depth")
1248 );
1249 });
1250 crate::sql_query("DELETE FROM rollback_test2")
1251 .execute(conn)
1252 .unwrap();
1253 t1.join().unwrap();
1254 t2.join().unwrap();
1255 }
1256}