1use crate::connection::Connection;
2use crate::result::{Error, QueryResult};
3use alloc::borrow::Cow;
4use alloc::boxed::Box;
5use core::num::NonZeroU32;
6
7pub trait TransactionManager<Conn: Connection> {
12 type TransactionStateData;
15
16 fn begin_transaction(conn: &mut Conn) -> QueryResult<()>;
22
23 fn rollback_transaction(conn: &mut Conn) -> QueryResult<()>;
29
30 fn commit_transaction(conn: &mut Conn) -> QueryResult<()>;
36
37 #[diesel_derives::__diesel_public_if(
43 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
44 )]
45 fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus;
46
47 fn transaction<F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
52 where
53 F: FnOnce(&mut Conn) -> Result<R, E>,
54 E: From<Error>,
55 {
56 Self::begin_transaction(conn)?;
57 match callback(&mut *conn) {
58 Ok(value) => {
59 Self::commit_transaction(conn)?;
60 Ok(value)
61 }
62 Err(user_error) => match Self::rollback_transaction(conn) {
63 Ok(()) => Err(user_error),
64 Err(Error::BrokenTransactionManager) => {
65 Err(user_error)
68 }
69 Err(rollback_error) => Err(rollback_error.into()),
70 },
71 }
72 }
73
74 #[diesel_derives::__diesel_public_if(
82 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
83 )]
84 fn is_broken_transaction_manager(conn: &mut Conn) -> bool {
85 match Self::transaction_manager_status_mut(conn).transaction_state() {
86 Ok(ValidTransactionManagerStatus {
89 in_transaction: None,
90 }) => false,
91 Err(_) => true,
94 Ok(ValidTransactionManagerStatus {
98 in_transaction: Some(s),
99 }) => !s.test_transaction,
100 }
101 }
102}
103
104#[derive(#[automatically_derived]
impl ::core::default::Default for AnsiTransactionManager {
#[inline]
fn default() -> AnsiTransactionManager {
AnsiTransactionManager { status: ::core::default::Default::default() }
}
}Default, #[automatically_derived]
impl ::core::fmt::Debug for AnsiTransactionManager {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field1_finish(f,
"AnsiTransactionManager", "status", &&self.status)
}
}Debug)]
107pub struct AnsiTransactionManager {
108 pub(crate) status: TransactionManagerStatus,
109}
110
111#[doc = " Status of the transaction manager"]
pub enum TransactionManagerStatus {
Valid(ValidTransactionManagerStatus),
InError,
}#[diesel_derives::__diesel_public_if(
113 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
114)]
115#[derive(#[automatically_derived]
impl ::core::fmt::Debug for TransactionManagerStatus {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
match self {
TransactionManagerStatus::Valid(__self_0) =>
::core::fmt::Formatter::debug_tuple_field1_finish(f, "Valid",
&__self_0),
TransactionManagerStatus::InError =>
::core::fmt::Formatter::write_str(f, "InError"),
}
}
}Debug)]
116pub enum TransactionManagerStatus {
117 Valid(ValidTransactionManagerStatus),
119 InError,
121}
122
123impl Default for TransactionManagerStatus {
124 fn default() -> Self {
125 TransactionManagerStatus::Valid(ValidTransactionManagerStatus::default())
126 }
127}
128
129impl TransactionManagerStatus {
130 pub fn transaction_depth(&self) -> QueryResult<Option<NonZeroU32>> {
133 match self {
134 TransactionManagerStatus::Valid(valid_status) => Ok(valid_status.transaction_depth()),
135 TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
136 }
137 }
138
139 #[cfg(any(
140 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
141 feature = "postgres",
142 feature = "mysql",
143 test
144 ))]
145 #[diesel_derives::__diesel_public_if(
146 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
147 )]
148 pub(crate) fn set_requires_rollback_maybe_up_to_top_level(&mut self, to: bool) {
154 if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
155 in_transaction:
156 Some(InTransactionStatus {
157 requires_rollback_maybe_up_to_top_level,
158 ..
159 }),
160 }) = self
161 {
162 *requires_rollback_maybe_up_to_top_level = to;
163 }
164 }
165
166 pub fn set_in_error(&mut self) {
171 *self = TransactionManagerStatus::InError
172 }
173
174 #[diesel_derives::__diesel_public_if(
179 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
180 )]
181 pub(self) fn transaction_state(&mut self) -> QueryResult<&mut ValidTransactionManagerStatus> {
182 match self {
183 TransactionManagerStatus::Valid(valid_status) => Ok(valid_status),
184 TransactionManagerStatus::InError => Err(Error::BrokenTransactionManager),
185 }
186 }
187
188 #[diesel_derives::__diesel_public_if(
195 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
196 )]
197 pub(crate) fn set_test_transaction_flag(&mut self) {
198 if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
199 in_transaction: Some(s),
200 }) = self
201 {
202 s.test_transaction = true;
203 }
204 }
205}
206
207#[allow(missing_copy_implementations)]
209#[derive(#[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::fmt::Debug for ValidTransactionManagerStatus {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field1_finish(f,
"ValidTransactionManagerStatus", "in_transaction",
&&self.in_transaction)
}
}Debug, #[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::default::Default for ValidTransactionManagerStatus {
#[inline]
fn default() -> ValidTransactionManagerStatus {
ValidTransactionManagerStatus {
in_transaction: ::core::default::Default::default(),
}
}
}Default)]
210#[doc =
" Valid transaction status for the manager. Can return the current transaction depth"]
#[allow(missing_copy_implementations)]
#[non_exhaustive]
pub struct ValidTransactionManagerStatus {
#[doc = " Inner status, or `None` if no transaction is running"]
pub in_transaction: Option<InTransactionStatus>,
}#[diesel_derives::__diesel_public_if(
211 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
212 public_fields(in_transaction)
213)]
214pub struct ValidTransactionManagerStatus {
215 in_transaction: Option<InTransactionStatus>,
217}
218
219#[allow(missing_copy_implementations)]
222#[derive(#[automatically_derived]
#[allow(missing_copy_implementations)]
impl ::core::fmt::Debug for InTransactionStatus {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field3_finish(f,
"InTransactionStatus", "transaction_depth",
&self.transaction_depth,
"requires_rollback_maybe_up_to_top_level",
&self.requires_rollback_maybe_up_to_top_level, "test_transaction",
&&self.test_transaction)
}
}Debug)]
223#[doc = " Various status fields to track the status of"]
#[doc = " a transaction manager with a started transaction"]
#[allow(missing_copy_implementations)]
#[non_exhaustive]
pub struct InTransactionStatus {
#[doc = " The current depth of nested transactions"]
pub transaction_depth: NonZeroU32,
#[doc =
" If that is registered, savepoints rollbacks will still be attempted, but failure to do so"]
#[doc = " will not result in an error. (Some may succeed, some may not.)"]
pub requires_rollback_maybe_up_to_top_level: bool,
#[doc = " Is this transaction manager status marked as test-transaction?"]
pub test_transaction: bool,
}#[diesel_derives::__diesel_public_if(
224 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes",
225 public_fields(
226 test_transaction,
227 transaction_depth,
228 requires_rollback_maybe_up_to_top_level
229 )
230)]
231pub struct InTransactionStatus {
232 transaction_depth: NonZeroU32,
234 requires_rollback_maybe_up_to_top_level: bool,
237 test_transaction: bool,
239}
240
241impl ValidTransactionManagerStatus {
242 pub fn transaction_depth(&self) -> Option<NonZeroU32> {
247 self.in_transaction.as_ref().map(|it| it.transaction_depth)
248 }
249
250 pub fn change_transaction_depth(
253 &mut self,
254 transaction_depth_change: TransactionDepthChange,
255 ) -> QueryResult<()> {
256 match (&mut self.in_transaction, transaction_depth_change) {
257 (Some(in_transaction), TransactionDepthChange::IncreaseDepth) => {
258 in_transaction.transaction_depth =
261 NonZeroU32::new(in_transaction.transaction_depth.get().saturating_add(1))
262 .expect("nz + nz is always non-zero");
263 Ok(())
264 }
265 (Some(in_transaction), TransactionDepthChange::DecreaseDepth) => {
266 match NonZeroU32::new(in_transaction.transaction_depth.get() - 1) {
268 Some(depth) => in_transaction.transaction_depth = depth,
269 None => self.in_transaction = None,
270 }
271 Ok(())
272 }
273 (None, TransactionDepthChange::IncreaseDepth) => {
274 self.in_transaction = Some(InTransactionStatus {
275 transaction_depth: NonZeroU32::new(1).expect("1 is non-zero"),
276 requires_rollback_maybe_up_to_top_level: false,
277 test_transaction: false,
278 });
279 Ok(())
280 }
281 (None, TransactionDepthChange::DecreaseDepth) => {
282 Err(Error::NotInTransaction)
286 }
287 }
288 }
289}
290
291#[derive(#[automatically_derived]
impl ::core::fmt::Debug for TransactionDepthChange {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::write_str(f,
match self {
TransactionDepthChange::IncreaseDepth => "IncreaseDepth",
TransactionDepthChange::DecreaseDepth => "DecreaseDepth",
})
}
}Debug, #[automatically_derived]
impl ::core::clone::Clone for TransactionDepthChange {
#[inline]
fn clone(&self) -> TransactionDepthChange { *self }
}Clone, #[automatically_derived]
impl ::core::marker::Copy for TransactionDepthChange { }Copy)]
293pub enum TransactionDepthChange {
294 IncreaseDepth,
296 DecreaseDepth,
298}
299
300impl AnsiTransactionManager {
301 fn get_transaction_state<Conn>(
302 conn: &mut Conn,
303 ) -> QueryResult<&mut ValidTransactionManagerStatus>
304 where
305 Conn: Connection<TransactionManager = Self>,
306 {
307 conn.transaction_state().status.transaction_state()
308 }
309
310 pub fn begin_transaction_sql<Conn>(conn: &mut Conn, sql: &str) -> QueryResult<()>
316 where
317 Conn: Connection<TransactionManager = Self>,
318 {
319 let state = Self::get_transaction_state(conn)?;
320 if let Some(_depth) = state.transaction_depth() {
321 return Err(Error::AlreadyInTransaction);
322 }
323 let instrumentation_depth = NonZeroU32::new(1);
324 conn.instrumentation().on_connection_event(
327 super::instrumentation::InstrumentationEvent::BeginTransaction {
328 depth: instrumentation_depth.expect("We know that 1 is not zero"),
329 },
330 );
331 conn.batch_execute(sql)?;
332 Self::get_transaction_state(conn)?
333 .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
334
335 Ok(())
336 }
337}
338
339impl<Conn> TransactionManager<Conn> for AnsiTransactionManager
340where
341 Conn: Connection<TransactionManager = Self>,
342{
343 type TransactionStateData = Self;
344
345 fn begin_transaction(conn: &mut Conn) -> QueryResult<()> {
346 let transaction_state = Self::get_transaction_state(conn)?;
347 let transaction_depth = transaction_state.transaction_depth();
348 let start_transaction_sql = match transaction_depth {
349 None => Cow::from("BEGIN"),
350 Some(transaction_depth) => Cow::from(::alloc::__export::must_use({
::alloc::fmt::format(format_args!("SAVEPOINT diesel_savepoint_{0}",
transaction_depth))
})alloc::format!(
351 "SAVEPOINT diesel_savepoint_{transaction_depth}"
352 )),
353 };
354 let instrumentation_depth =
355 NonZeroU32::new(transaction_depth.map_or(0, NonZeroU32::get).wrapping_add(1));
356 let sql = &start_transaction_sql;
357 conn.instrumentation().on_connection_event(
360 super::instrumentation::InstrumentationEvent::BeginTransaction {
361 depth: instrumentation_depth.expect("Transaction depth is too large"),
362 },
363 );
364 conn.batch_execute(sql)?;
365 Self::get_transaction_state(conn)?
366 .change_transaction_depth(TransactionDepthChange::IncreaseDepth)?;
367
368 Ok(())
369 }
370
371 fn rollback_transaction(conn: &mut Conn) -> QueryResult<()> {
372 let transaction_state = Self::get_transaction_state(conn)?;
373
374 let (
375 (rollback_sql, rolling_back_top_level),
376 requires_rollback_maybe_up_to_top_level_before_execute,
377 ) = match transaction_state.in_transaction {
378 Some(ref in_transaction) => (
379 match in_transaction.transaction_depth.get() {
380 1 => (Cow::Borrowed("ROLLBACK"), true),
381 depth_gt1 => (
382 Cow::Owned(::alloc::__export::must_use({
::alloc::fmt::format(format_args!("ROLLBACK TO SAVEPOINT diesel_savepoint_{0}",
depth_gt1 - 1))
})alloc::format!(
383 "ROLLBACK TO SAVEPOINT diesel_savepoint_{}",
384 depth_gt1 - 1
385 )),
386 false,
387 ),
388 },
389 in_transaction.requires_rollback_maybe_up_to_top_level,
390 ),
391 None => return Err(Error::NotInTransaction),
392 };
393 let depth = transaction_state
394 .transaction_depth()
395 .expect("We know that we are in a transaction here");
396 conn.instrumentation().on_connection_event(
397 super::instrumentation::InstrumentationEvent::RollbackTransaction { depth },
398 );
399
400 match conn.batch_execute(&rollback_sql) {
401 Ok(()) => {
402 match Self::get_transaction_state(conn)?
403 .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
404 {
405 Ok(()) => {}
406 Err(Error::NotInTransaction) if rolling_back_top_level => {
407 }
410 Err(e) => return Err(e),
411 }
412 Ok(())
413 }
414 Err(rollback_error) => {
415 let tm_status = Self::transaction_manager_status_mut(conn);
416 match tm_status {
417 TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
418 in_transaction:
419 Some(InTransactionStatus {
420 transaction_depth,
421 requires_rollback_maybe_up_to_top_level,
422 ..
423 }),
424 }) if transaction_depth.get() > 1 => {
425 *transaction_depth = NonZeroU32::new(transaction_depth.get() - 1)
433 .expect("Depth was checked to be > 1");
434 *requires_rollback_maybe_up_to_top_level = true;
435 if requires_rollback_maybe_up_to_top_level_before_execute {
436 return Ok(());
439 }
440 }
441 TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
442 in_transaction: None,
443 }) => {
444 }
449 _ => tm_status.set_in_error(),
450 }
451 Err(rollback_error)
452 }
453 }
454 }
455
456 fn commit_transaction(conn: &mut Conn) -> QueryResult<()> {
462 let transaction_state = Self::get_transaction_state(conn)?;
463 let transaction_depth = transaction_state.transaction_depth();
464 let (commit_sql, committing_top_level) = match transaction_depth {
465 None => return Err(Error::NotInTransaction),
466 Some(transaction_depth) if transaction_depth.get() == 1 => {
467 (Cow::Borrowed("COMMIT"), true)
468 }
469 Some(transaction_depth) => (
470 Cow::Owned(::alloc::__export::must_use({
::alloc::fmt::format(format_args!("RELEASE SAVEPOINT diesel_savepoint_{0}",
transaction_depth.get() - 1))
})alloc::format!(
471 "RELEASE SAVEPOINT diesel_savepoint_{}",
472 transaction_depth.get() - 1
473 )),
474 false,
475 ),
476 };
477 let depth = transaction_state
478 .transaction_depth()
479 .expect("We know that we are in a transaction here");
480 conn.instrumentation().on_connection_event(
481 super::instrumentation::InstrumentationEvent::CommitTransaction { depth },
482 );
483 match conn.batch_execute(&commit_sql) {
484 Ok(()) => {
485 match Self::get_transaction_state(conn)?
486 .change_transaction_depth(TransactionDepthChange::DecreaseDepth)
487 {
488 Ok(()) => {}
489 Err(Error::NotInTransaction) if committing_top_level => {
490 }
493 Err(e) => return Err(e),
494 }
495 Ok(())
496 }
497 Err(commit_error) => {
498 if let TransactionManagerStatus::Valid(ValidTransactionManagerStatus {
499 in_transaction:
500 Some(InTransactionStatus {
501 requires_rollback_maybe_up_to_top_level: true,
502 ..
503 }),
504 }) = conn.transaction_state().status
505 {
506 match Self::rollback_transaction(conn) {
507 Ok(()) => {}
508 Err(rollback_error) => {
509 conn.transaction_state().status.set_in_error();
510 return Err(Error::RollbackErrorOnCommit {
511 rollback_error: Box::new(rollback_error),
512 commit_error: Box::new(commit_error),
513 });
514 }
515 }
516 }
517 Err(commit_error)
518 }
519 }
520 }
521
522 fn transaction_manager_status_mut(conn: &mut Conn) -> &mut TransactionManagerStatus {
523 &mut conn.transaction_state().status
524 }
525}
526
527#[cfg(test)]
528#[allow(clippy::uninlined_format_args)]
530mod test {
531 mod mock {
533 use crate::connection::Instrumentation;
534 use crate::connection::transaction_manager::AnsiTransactionManager;
535 use crate::connection::{
536 Connection, ConnectionSealed, SimpleConnection, TransactionManager,
537 };
538 use crate::result::QueryResult;
539 use crate::test_helpers::TestConnection;
540 use std::collections::VecDeque;
541
542 pub(crate) struct MockConnection {
543 pub(crate) next_results: VecDeque<QueryResult<usize>>,
544 pub(crate) next_batch_execute_results: VecDeque<QueryResult<()>>,
545 pub(crate) top_level_requires_rollback_after_next_batch_execute: bool,
546 transaction_state: AnsiTransactionManager,
547 instrumentation: Option<Box<dyn Instrumentation>>,
548 }
549
550 impl SimpleConnection for MockConnection {
551 fn batch_execute(&mut self, _query: &str) -> QueryResult<()> {
552 let res = self
553 .next_batch_execute_results
554 .pop_front()
555 .expect("No next result");
556 if self.top_level_requires_rollback_after_next_batch_execute {
557 self.transaction_state
558 .status
559 .set_requires_rollback_maybe_up_to_top_level(true);
560 }
561 res
562 }
563 }
564
565 impl ConnectionSealed for MockConnection {}
566
567 impl Connection for MockConnection {
568 type Backend = <TestConnection as Connection>::Backend;
569
570 type TransactionManager = AnsiTransactionManager;
571
572 fn establish(_database_url: &str) -> crate::ConnectionResult<Self> {
573 Ok(Self {
574 next_results: VecDeque::new(),
575 next_batch_execute_results: VecDeque::new(),
576 top_level_requires_rollback_after_next_batch_execute: false,
577 transaction_state: AnsiTransactionManager::default(),
578 instrumentation: None,
579 })
580 }
581
582 fn execute_returning_count<T>(&mut self, _source: &T) -> QueryResult<usize>
583 where
584 T: crate::query_builder::QueryFragment<Self::Backend>
585 + crate::query_builder::QueryId,
586 {
587 self.next_results.pop_front().expect("No next result")
588 }
589
590 fn transaction_state(
591 &mut self,
592 ) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData
593 {
594 &mut self.transaction_state
595 }
596
597 fn instrumentation(&mut self) -> &mut dyn crate::connection::Instrumentation {
598 &mut self.instrumentation
599 }
600
601 fn set_instrumentation(
602 &mut self,
603 instrumentation: impl crate::connection::Instrumentation,
604 ) {
605 self.instrumentation = Some(Box::new(instrumentation));
606 }
607
608 fn set_prepared_statement_cache_size(&mut self, _size: crate::connection::CacheSize) {
609 panic!("implement, if you want to use it")
610 }
611 }
612 }
613
614 #[diesel_test_helper::test]
615 #[cfg(feature = "postgres")]
616 fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
617 use crate::PgConnection;
618 use crate::connection::transaction_manager::AnsiTransactionManager;
619 use crate::connection::transaction_manager::TransactionManager;
620 use crate::result::Error;
621
622 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
623 assert_eq!(
624 None,
625 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
626 conn
627 ).transaction_depth().expect("Transaction depth")
628 );
629 let result = AnsiTransactionManager::commit_transaction(conn);
630 assert!(matches!(result, Err(Error::NotInTransaction)))
631 }
632
633 #[diesel_test_helper::test]
634 #[cfg(feature = "postgres")]
635 fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
636 use crate::PgConnection;
637 use crate::connection::transaction_manager::AnsiTransactionManager;
638 use crate::connection::transaction_manager::TransactionManager;
639 use crate::result::Error;
640
641 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
642 assert_eq!(
643 None,
644 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
645 conn
646 ).transaction_depth().expect("Transaction depth")
647 );
648 let result = AnsiTransactionManager::rollback_transaction(conn);
649 assert!(matches!(result, Err(Error::NotInTransaction)))
650 }
651
652 #[diesel_test_helper::test]
653 fn transaction_manager_enters_broken_state_when_connection_is_broken() {
654 use crate::connection::TransactionManagerStatus;
655 use crate::connection::transaction_manager::AnsiTransactionManager;
656 use crate::connection::transaction_manager::TransactionManager;
657 use crate::result::{DatabaseErrorKind, Error};
658 use crate::*;
659
660 let mut conn = mock::MockConnection::establish("mock").expect("Mock connection");
661
662 conn.next_batch_execute_results.push_back(Ok(()));
664 let result = conn.transaction(|conn| {
665 conn.next_results.push_back(Ok(1));
666 let query_result = sql_query("SELECT 1").execute(conn);
667 assert!(query_result.is_ok());
668 conn.next_batch_execute_results
670 .push_back(Err(Error::DatabaseError(
671 DatabaseErrorKind::Unknown,
672 Box::new("commit fails".to_string()),
673 )));
674 conn.top_level_requires_rollback_after_next_batch_execute = true;
675 conn.next_batch_execute_results
676 .push_back(Err(Error::DatabaseError(
677 DatabaseErrorKind::Unknown,
678 Box::new("rollback also fails".to_string()),
679 )));
680 Ok(())
681 });
682 assert!(
683 matches!(
684 &result,
685 Err(Error::RollbackErrorOnCommit {
686 rollback_error,
687 commit_error
688 }) if matches!(**commit_error, Error::DatabaseError(DatabaseErrorKind::Unknown, _))
689 && matches!(&**rollback_error,
690 Error::DatabaseError(DatabaseErrorKind::Unknown, msg)
691 if msg.message() == "rollback also fails"
692 )
693 ),
694 "Got {:?}",
695 result
696 );
697 assert!(matches!(
698 *AnsiTransactionManager::transaction_manager_status_mut(&mut conn),
699 TransactionManagerStatus::InError
700 ));
701 let result = conn.transaction(|_conn| Ok(()));
703 assert!(matches!(result, Err(Error::BrokenTransactionManager)))
704 }
705
706 #[diesel_test_helper::test]
707 #[cfg(feature = "mysql")]
708 fn mysql_transaction_is_rolled_back_upon_syntax_error() {
709 use crate::connection::transaction_manager::AnsiTransactionManager;
710 use crate::connection::transaction_manager::TransactionManager;
711 use crate::*;
712 use std::num::NonZeroU32;
713
714 let conn = &mut crate::test_helpers::connection_no_transaction();
715 assert_eq!(
716 None,
717 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
718 conn
719 ).transaction_depth().expect("Transaction depth")
720 );
721 let _result = conn.transaction(|conn| {
722 assert_eq!(
723 NonZeroU32::new(1),
724 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
725 conn
726 ).transaction_depth().expect("Transaction depth")
727 );
728 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
730 assert!(query_result.is_err());
731 query_result
732 });
733 assert_eq!(
734 None,
735 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
736 conn
737 ).transaction_depth().expect("Transaction depth")
738 );
739 }
740
741 #[diesel_test_helper::test]
742 #[cfg(feature = "__sqlite-shared")]
743 fn sqlite_transaction_is_rolled_back_upon_syntax_error() {
744 use crate::connection::transaction_manager::AnsiTransactionManager;
745 use crate::connection::transaction_manager::TransactionManager;
746 use crate::*;
747 use std::num::NonZeroU32;
748
749 let conn = &mut crate::test_helpers::connection();
750 assert_eq!(
751 None,
752 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
753 conn
754 ).transaction_depth().expect("Transaction depth")
755 );
756 let _result = conn.transaction(|conn| {
757 assert_eq!(
758 NonZeroU32::new(1),
759 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
760 conn
761 ).transaction_depth().expect("Transaction depth")
762 );
763 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
765 assert!(query_result.is_err());
766 query_result
767 });
768 assert_eq!(
769 None,
770 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
771 conn
772 ).transaction_depth().expect("Transaction depth")
773 );
774 }
775
776 #[diesel_test_helper::test]
777 #[cfg(feature = "mysql")]
778 fn nested_mysql_transaction_is_rolled_back_upon_syntax_error() {
779 use crate::connection::transaction_manager::AnsiTransactionManager;
780 use crate::connection::transaction_manager::TransactionManager;
781 use crate::*;
782 use std::num::NonZeroU32;
783
784 let conn = &mut crate::test_helpers::connection_no_transaction();
785 assert_eq!(
786 None,
787 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
788 conn
789 ).transaction_depth().expect("Transaction depth")
790 );
791 let result = conn.transaction(|conn| {
792 assert_eq!(
793 NonZeroU32::new(1),
794 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
795 conn
796 ).transaction_depth().expect("Transaction depth")
797 );
798 let result = conn.transaction(|conn| {
799 assert_eq!(
800 NonZeroU32::new(2),
801 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
802 conn
803 ).transaction_depth().expect("Transaction depth")
804 );
805 sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
807 });
808 assert!(result.is_err());
809 assert_eq!(
810 NonZeroU32::new(1),
811 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
812 conn
813 ).transaction_depth().expect("Transaction depth")
814 );
815 let query_result = sql_query("SELECT 1").execute(conn);
816 assert!(query_result.is_ok());
817 query_result
818 });
819 assert!(result.is_ok());
820 assert_eq!(
821 None,
822 <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(
823 conn
824 ).transaction_depth().expect("Transaction depth")
825 );
826 }
827
828 #[diesel_test_helper::test]
829 #[cfg(feature = "mysql")]
830 #[allow(clippy::needless_collect)]
833 fn mysql_transaction_depth_commits_tracked_properly_on_serialization_failure() {
834 use crate::result::DatabaseErrorKind::SerializationFailure;
835 use crate::result::Error::DatabaseError;
836 use crate::*;
837 use std::num::NonZeroU32;
838 use std::sync::{Arc, Barrier};
839 use std::thread;
840
841 table! {
842 #[sql_name = "mysql_transaction_depth_is_tracked_properly_on_commit_failure"]
843 serialization_example {
844 id -> Integer,
845 class -> Integer,
846 }
847 }
848
849 let conn = &mut crate::test_helpers::connection_no_transaction();
850
851 sql_query(
852 "DROP TABLE IF EXISTS mysql_transaction_depth_is_tracked_properly_on_commit_failure;",
853 )
854 .execute(conn)
855 .unwrap();
856 sql_query(
857 r#"
858 CREATE TABLE mysql_transaction_depth_is_tracked_properly_on_commit_failure (
859 id INT AUTO_INCREMENT PRIMARY KEY,
860 class INTEGER NOT NULL
861 )
862 "#,
863 )
864 .execute(conn)
865 .unwrap();
866
867 insert_into(serialization_example::table)
868 .values(&vec![
869 serialization_example::class.eq(1),
870 serialization_example::class.eq(2),
871 ])
872 .execute(conn)
873 .unwrap();
874
875 let before_barrier = Arc::new(Barrier::new(2));
876 let after_barrier = Arc::new(Barrier::new(2));
877
878 let threads = (1..3)
879 .map(|i| {
880 let before_barrier = before_barrier.clone();
881 let after_barrier = after_barrier.clone();
882 thread::spawn(move || {
883 use crate::connection::transaction_manager::AnsiTransactionManager;
884 use crate::connection::transaction_manager::TransactionManager;
885 let conn = &mut crate::test_helpers::connection_no_transaction();
886 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
887 crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
888
889 let result =
890 conn.transaction(|conn| {
891 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
892 let _ = serialization_example::table
893 .filter(serialization_example::class.eq(i))
894 .count()
895 .execute(conn)?;
896
897 let other_i = if i == 1 { 2 } else { 1 };
898 let q = insert_into(serialization_example::table)
899 .values(serialization_example::class.eq(other_i));
900 before_barrier.wait();
901
902 let r = q.execute(conn);
903 after_barrier.wait();
904 r
905 });
906
907 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
908
909 let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
910 assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
911 result
912 })
913 })
914 .collect::<Vec<_>>();
915 let second_trans_result =
916 conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
917 assert!(
918 second_trans_result.is_ok(),
919 "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}",
920 second_trans_result
921 );
922
923 let mut results = threads
924 .into_iter()
925 .map(|t| t.join().unwrap())
926 .collect::<Vec<_>>();
927
928 results.sort_by_key(|r| r.is_err());
929 assert!(results[0].is_ok(), "Got {:?} instead", results);
930 assert!(
932 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
933 "Got {:?} instead",
934 results
935 );
936 }
937
938 #[diesel_test_helper::test]
939 #[cfg(feature = "mysql")]
940 #[allow(clippy::needless_collect)]
943 fn mysql_nested_transaction_depth_commits_tracked_properly_on_serialization_failure() {
944 use crate::result::DatabaseErrorKind::SerializationFailure;
945 use crate::result::Error::DatabaseError;
946 use crate::*;
947 use std::num::NonZeroU32;
948 use std::sync::{Arc, Barrier};
949 use std::thread;
950
951 table! {
952 #[sql_name = "mysql_nested_trans_depth_is_tracked_properly_on_commit_failure"]
953 serialization_example {
954 id -> Integer,
955 class -> Integer,
956 }
957 }
958
959 let conn = &mut crate::test_helpers::connection_no_transaction();
960
961 sql_query(
962 "DROP TABLE IF EXISTS mysql_nested_trans_depth_is_tracked_properly_on_commit_failure;",
963 )
964 .execute(conn)
965 .unwrap();
966 sql_query(
967 r#"
968 CREATE TABLE mysql_nested_trans_depth_is_tracked_properly_on_commit_failure (
969 id INT AUTO_INCREMENT PRIMARY KEY,
970 class INTEGER NOT NULL
971 )
972 "#,
973 )
974 .execute(conn)
975 .unwrap();
976
977 insert_into(serialization_example::table)
978 .values(&vec![
979 serialization_example::class.eq(1),
980 serialization_example::class.eq(2),
981 ])
982 .execute(conn)
983 .unwrap();
984
985 let before_barrier = Arc::new(Barrier::new(2));
986 let after_barrier = Arc::new(Barrier::new(2));
987
988 let threads = (1..3)
989 .map(|i| {
990 let before_barrier = before_barrier.clone();
991 let after_barrier = after_barrier.clone();
992 thread::spawn(move || {
993 use crate::connection::transaction_manager::AnsiTransactionManager;
994 use crate::connection::transaction_manager::TransactionManager;
995 let conn = &mut crate::test_helpers::connection_no_transaction();
996 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
997 crate::sql_query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE").execute(conn)?;
998
999 let result =
1000 conn.transaction(|conn| {
1001 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1002 conn.transaction(|conn| {
1003 assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1004 let _ = serialization_example::table
1005 .filter(serialization_example::class.eq(i))
1006 .count()
1007 .execute(conn)?;
1008
1009 let other_i = if i == 1 { 2 } else { 1 };
1010 let q = insert_into(serialization_example::table)
1011 .values(serialization_example::class.eq(other_i));
1012 before_barrier.wait();
1013
1014 let r = q.execute(conn);
1015 after_barrier.wait();
1016 r
1017 })
1018 });
1019
1020 assert_eq!(None, <AnsiTransactionManager as TransactionManager<MysqlConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1021
1022 let second_trans_result = conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1023 assert!(second_trans_result.is_ok(), "Expected the thread connections to have been rolled back or committed, but second transaction exited with {:?}", second_trans_result);
1024 result
1025 })
1026 })
1027 .collect::<Vec<_>>();
1028 let second_trans_result =
1029 conn.transaction(|conn| crate::sql_query("SELECT 1").execute(conn));
1030 assert!(
1031 second_trans_result.is_ok(),
1032 "Expected the main connection to have been rolled back or committed, but second transaction exited with {:?}",
1033 second_trans_result
1034 );
1035
1036 let mut results = threads
1037 .into_iter()
1038 .map(|t| t.join().unwrap())
1039 .collect::<Vec<_>>();
1040
1041 results.sort_by_key(|r| r.is_err());
1042 assert!(results[0].is_ok(), "Got {:?} instead", results);
1043 assert!(
1044 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1045 "Got {:?} instead",
1046 results
1047 );
1048 }
1049
1050 #[diesel_test_helper::test]
1051 #[cfg(feature = "__sqlite-shared")]
1052 fn sqlite_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1053 use crate::connection::transaction_manager::AnsiTransactionManager;
1054 use crate::connection::transaction_manager::TransactionManager;
1055 use crate::result::Error;
1056 use crate::*;
1057 use std::num::NonZeroU32;
1058
1059 let conn = &mut crate::test_helpers::connection();
1060 assert_eq!(
1061 None,
1062 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1063 conn
1064 ).transaction_depth().expect("Transaction depth")
1065 );
1066 let result: Result<_, Error> = conn.transaction(|conn| {
1067 assert_eq!(
1068 NonZeroU32::new(1),
1069 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1070 conn
1071 ).transaction_depth().expect("Transaction depth")
1072 );
1073 sql_query("DROP TABLE IF EXISTS deferred_commit").execute(conn)?;
1074 sql_query("CREATE TABLE deferred_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1075 sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn)?;
1076 let result = sql_query("INSERT INTO deferred_commit VALUES(1)").execute(conn);
1077 assert!(result.is_ok());
1078 Ok(())
1079 });
1080 assert!(result.is_err());
1081 assert_eq!(
1082 None,
1083 <AnsiTransactionManager as TransactionManager<SqliteConnection>>::transaction_manager_status_mut(
1084 conn
1085 ).transaction_depth().expect("Transaction depth")
1086 );
1087 }
1088
1089 #[diesel_test_helper::test]
1092 #[cfg(feature = "postgres")]
1093 fn some_libpq_failures_are_recoverable_by_rolling_back_the_savepoint_only() {
1094 use crate::connection::{AnsiTransactionManager, TransactionManager};
1095 use crate::prelude::*;
1096 use crate::sql_query;
1097
1098 crate::table! {
1099 rollback_test (id) {
1100 id -> Int4,
1101 value -> Int4,
1102 }
1103 }
1104
1105 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1106 assert_eq!(
1107 None,
1108 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1109 conn
1110 ).transaction_depth().expect("Transaction depth")
1111 );
1112
1113 let res = conn.transaction(|conn| {
1114 sql_query(
1115 "CREATE TABLE IF NOT EXISTS rollback_test (id INT PRIMARY KEY, value INT NOT NULL)",
1116 )
1117 .execute(conn)?;
1118 conn.transaction(|conn| {
1119 sql_query("SET TRANSACTION READ ONLY").execute(conn)?;
1120 crate::update(rollback_test::table)
1121 .set(rollback_test::value.eq(0))
1122 .execute(conn)
1123 })
1124 .map(|_| {
1125 panic!("Should use the `or_else` branch");
1126 })
1127 .or_else(|_| sql_query("SELECT 1").execute(conn))
1128 .map(|_| ())
1129 });
1130 assert!(res.is_ok());
1131
1132 assert_eq!(
1133 None,
1134 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1135 conn
1136 ).transaction_depth().expect("Transaction depth")
1137 );
1138 }
1139
1140 #[diesel_test_helper::test]
1141 #[cfg(feature = "postgres")]
1142 fn other_libpq_failures_are_not_recoverable_by_rolling_back_the_savepoint_only() {
1143 use crate::connection::{AnsiTransactionManager, TransactionManager};
1144 use crate::prelude::*;
1145 use crate::sql_query;
1146 use std::num::NonZeroU32;
1147 use std::sync::{Arc, Barrier};
1148
1149 crate::table! {
1150 rollback_test2 (id) {
1151 id -> Int4,
1152 value -> Int4,
1153 }
1154 }
1155 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1156
1157 sql_query(
1158 "CREATE TABLE IF NOT EXISTS rollback_test2 (id INT PRIMARY KEY, value INT NOT NULL)",
1159 )
1160 .execute(conn)
1161 .unwrap();
1162
1163 let start_barrier = Arc::new(Barrier::new(2));
1164 let commit_barrier = Arc::new(Barrier::new(2));
1165
1166 let other_start_barrier = start_barrier.clone();
1167 let other_commit_barrier = commit_barrier.clone();
1168
1169 let t1 = std::thread::spawn(move || {
1170 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1171 assert_eq!(
1172 None,
1173 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1174 conn
1175 ).transaction_depth().expect("Transaction depth")
1176 );
1177 let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1178 assert_eq!(
1179 NonZeroU32::new(1),
1180 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1181 conn
1182 ).transaction_depth().expect("Transaction depth")
1183 );
1184 rollback_test2::table.load::<(i32, i32)>(conn)?;
1185 crate::insert_into(rollback_test2::table)
1186 .values((rollback_test2::id.eq(1), rollback_test2::value.eq(42)))
1187 .execute(conn)?;
1188 let r = conn.transaction(|conn| {
1189 assert_eq!(
1190 NonZeroU32::new(2),
1191 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1192 conn
1193 ).transaction_depth().expect("Transaction depth")
1194 );
1195 start_barrier.wait();
1196 commit_barrier.wait();
1197 let r = rollback_test2::table.load::<(i32, i32)>(conn);
1198 assert!(r.is_err());
1199 Err::<(), _>(crate::result::Error::RollbackTransaction)
1200 });
1201 assert_eq!(
1202 NonZeroU32::new(1),
1203 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1204 conn
1205 ).transaction_depth().expect("Transaction depth")
1206 );
1207 assert!(
1208 matches!(r, Err(crate::result::Error::RollbackTransaction)),
1209 "rollback failed (such errors should be ignored by transaction manager): {}",
1210 r.unwrap_err()
1211 );
1212 let r = rollback_test2::table.load::<(i32, i32)>(conn);
1213 assert!(r.is_err());
1214 r
1218 });
1219 assert!(r.is_err());
1220 assert_eq!(
1221 None,
1222 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1223 conn
1224 ).transaction_depth().expect("Transaction depth")
1225 );
1226 });
1227
1228 let t2 = std::thread::spawn(move || {
1229 other_start_barrier.wait();
1230 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1231 assert_eq!(
1232 None,
1233 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1234 conn
1235 ).transaction_depth().expect("Transaction depth")
1236 );
1237 let r = conn.build_transaction().serializable().run::<_, crate::result::Error, _>(|conn| {
1238 assert_eq!(
1239 NonZeroU32::new(1),
1240 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1241 conn
1242 ).transaction_depth().expect("Transaction depth")
1243 );
1244 let _ = rollback_test2::table.load::<(i32, i32)>(conn)?;
1245 crate::insert_into(rollback_test2::table)
1246 .values((rollback_test2::id.eq(23), rollback_test2::value.eq(42)))
1247 .execute(conn)?;
1248 Ok(())
1249 });
1250 other_commit_barrier.wait();
1251 assert!(r.is_ok(), "{:?}", r.unwrap_err());
1252 assert_eq!(
1253 None,
1254 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1255 conn
1256 ).transaction_depth().expect("Transaction depth")
1257 );
1258 });
1259 crate::sql_query("DELETE FROM rollback_test2")
1260 .execute(conn)
1261 .unwrap();
1262 t1.join().unwrap();
1263 t2.join().unwrap();
1264 }
1265}