1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::ConnectionAndTransactionManager;
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::connection::instrumentation::{
14 DebugQuery, DynInstrumentation, Instrumentation, StrQueryHelper,
15};
16use crate::connection::statement_cache::{MaybeCached, StatementCache};
17use crate::connection::*;
18use crate::expression::QueryMetadata;
19use crate::pg::backend::PgNotification;
20use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
21use crate::pg::query_builder::copy::InternalCopyFromQuery;
22use crate::pg::{Pg, TransactionBuilder};
23use crate::query_builder::bind_collector::RawBytesBindCollector;
24use crate::query_builder::*;
25use crate::result::ConnectionError::CouldntSetupConfiguration;
26use crate::result::*;
27use crate::RunQueryDsl;
28use std::ffi::CString;
29use std::fmt::Debug;
30use std::os::raw as libc;
31
32use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
33
34pub(super) use self::result::PgResult;
35
36#[allow(missing_debug_implementations)]
126#[cfg(feature = "postgres")]
127pub struct PgConnection {
128 statement_cache: StatementCache<Pg, Statement>,
129 metadata_cache: PgMetadataCache,
130 connection_and_transaction_manager: ConnectionAndTransactionManager,
131}
132
133#[allow(unsafe_code)]
135unsafe impl Send for PgConnection {}
136
137impl SimpleConnection for PgConnection {
138 #[allow(unsafe_code)] fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
140 self.connection_and_transaction_manager
141 .instrumentation
142 .on_connection_event(InstrumentationEvent::StartQuery {
143 query: &StrQueryHelper::new(query),
144 });
145 let c_query = CString::new(query)?;
146 let inner_result = unsafe {
147 self.connection_and_transaction_manager
148 .raw_connection
149 .exec(c_query.as_ptr())
150 };
151 update_transaction_manager_status(
152 inner_result.and_then(|raw_result| {
153 PgResult::new(
154 raw_result,
155 &self.connection_and_transaction_manager.raw_connection,
156 )
157 }),
158 &mut self.connection_and_transaction_manager,
159 &StrQueryHelper::new(query),
160 true,
161 )?;
162 Ok(())
163 }
164}
165
166#[derive(Debug, Copy, Clone)]
170pub struct PgRowByRowLoadingMode;
171
172impl ConnectionSealed for PgConnection {}
173
174impl Connection for PgConnection {
175 type Backend = Pg;
176 type TransactionManager = AnsiTransactionManager;
177
178 fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
179 let mut instrumentation = DynInstrumentation::default_instrumentation();
180 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
181 url: database_url,
182 });
183 let r = RawConnection::establish(database_url).and_then(|raw_conn| {
184 let mut conn = PgConnection {
185 connection_and_transaction_manager: ConnectionAndTransactionManager {
186 raw_connection: raw_conn,
187 transaction_state: AnsiTransactionManager::default(),
188 instrumentation: DynInstrumentation::none(),
189 },
190 statement_cache: StatementCache::new(),
191 metadata_cache: PgMetadataCache::new(),
192 };
193 conn.set_config_options()
194 .map_err(CouldntSetupConfiguration)?;
195 Ok(conn)
196 });
197 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
198 url: database_url,
199 error: r.as_ref().err(),
200 });
201 let mut conn = r?;
202 conn.connection_and_transaction_manager.instrumentation = instrumentation;
203 Ok(conn)
204 }
205
206 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
207 where
208 T: QueryFragment<Pg> + QueryId,
209 {
210 update_transaction_manager_status(
211 self.with_prepared_query(source, true, |query, params, conn, _source| {
212 let res = query
213 .execute(&mut conn.raw_connection, ¶ms, false)
214 .map(|r| r.rows_affected());
215 while conn.raw_connection.get_next_result()?.is_some() {}
218 res
219 }),
220 &mut self.connection_and_transaction_manager,
221 &crate::debug_query(source),
222 true,
223 )
224 }
225
226 fn transaction_state(&mut self) -> &mut AnsiTransactionManager
227 where
228 Self: Sized,
229 {
230 &mut self.connection_and_transaction_manager.transaction_state
231 }
232
233 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
234 &mut *self.connection_and_transaction_manager.instrumentation
235 }
236
237 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
238 self.connection_and_transaction_manager.instrumentation = instrumentation.into();
239 }
240
241 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
242 self.statement_cache.set_cache_size(size);
243 }
244}
245
246impl<B> LoadConnection<B> for PgConnection
247where
248 Self: self::private::PgLoadingMode<B>,
249{
250 type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
251 type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
252
253 fn load<'conn, 'query, T>(
254 &'conn mut self,
255 source: T,
256 ) -> QueryResult<Self::Cursor<'conn, 'query>>
257 where
258 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
259 Self::Backend: QueryMetadata<T::SqlType>,
260 {
261 self.with_prepared_query(source, false, |stmt, params, conn, source| {
262 use self::private::PgLoadingMode;
263 let result = stmt.execute(&mut conn.raw_connection, ¶ms, Self::USE_ROW_BY_ROW_MODE);
264 let result = update_transaction_manager_status(
265 result,
266 conn,
267 &crate::debug_query(&source),
268 false,
269 )?;
270 Self::get_cursor(conn, result, source)
271 })
272 }
273}
274
275impl GetPgMetadataCache for PgConnection {
276 fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
277 &mut self.metadata_cache
278 }
279}
280
281#[inline(always)]
282fn update_transaction_manager_status<T>(
283 query_result: QueryResult<T>,
284 conn: &mut ConnectionAndTransactionManager,
285 source: &dyn DebugQuery,
286 final_call: bool,
287) -> QueryResult<T> {
288 fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
290 let raw_conn: &mut RawConnection = &mut conn.raw_connection;
291 let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
292 match raw_conn.transaction_status() {
296 PgTransactionStatus::InError => {
297 tm.status.set_requires_rollback_maybe_up_to_top_level(true)
298 }
299 PgTransactionStatus::Unknown => tm.status.set_in_error(),
300 PgTransactionStatus::Idle => {
301 tm.status = TransactionManagerStatus::Valid(Default::default())
306 }
307 PgTransactionStatus::InTransaction => {
308 let transaction_status = &mut tm.status;
309 if is_err {
312 if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
315 {
316 transaction_status.set_in_error()
318 }
319 } else {
320 tm.status.set_requires_rollback_maybe_up_to_top_level(false)
328 }
329 }
330 PgTransactionStatus::Active => {
331 }
333 }
334 }
335
336 fn non_generic_instrumentation(
337 query_result: Result<(), &Error>,
338 conn: &mut ConnectionAndTransactionManager,
339 source: &dyn DebugQuery,
340 final_call: bool,
341 ) {
342 if let Err(e) = query_result {
343 conn.instrumentation
344 .on_connection_event(InstrumentationEvent::FinishQuery {
345 query: source,
346 error: Some(e),
347 });
348 } else if final_call {
349 conn.instrumentation
350 .on_connection_event(InstrumentationEvent::FinishQuery {
351 query: source,
352 error: None,
353 });
354 }
355 }
356
357 non_generic_inner(conn, query_result.is_err());
358 non_generic_instrumentation(query_result.as_ref().map(|_| ()), conn, source, final_call);
359 query_result
360}
361
362#[cfg(feature = "r2d2")]
363impl crate::r2d2::R2D2Connection for PgConnection {
364 fn ping(&mut self) -> QueryResult<()> {
365 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
366 }
367
368 fn is_broken(&mut self) -> bool {
369 AnsiTransactionManager::is_broken_transaction_manager(self)
370 }
371}
372
373impl MultiConnectionHelper for PgConnection {
374 fn to_any<'a>(
375 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
376 ) -> &mut (dyn std::any::Any + 'a) {
377 lookup.as_any()
378 }
379
380 fn from_any(
381 lookup: &mut dyn std::any::Any,
382 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
383 lookup
384 .downcast_mut::<Self>()
385 .map(|conn| conn as &mut dyn super::PgMetadataLookup)
386 }
387}
388
389impl PgConnection {
390 pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
414 TransactionBuilder::new(self)
415 }
416
417 pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
418 where
419 S: CopyFromExpression<T>,
420 {
421 let query = InternalCopyFromQuery::new(target);
422 let res = self.with_prepared_query(query, false, |stmt, binds, conn, mut source| {
423 fn inner_copy_in<S, T>(
424 stmt: MaybeCached<'_, Statement>,
425 conn: &mut ConnectionAndTransactionManager,
426 binds: Vec<Option<Vec<u8>>>,
427 source: &mut InternalCopyFromQuery<S, T>,
428 ) -> Result<usize, S::Error>
429 where
430 S: CopyFromExpression<T>,
431 {
432 let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
433 let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
434 let r = source.target.callback(&mut copy_in);
435 copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
436 let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
437 crate::result::Error::DeserializationError(
438 "Failed to receive result from the database".into(),
439 )
440 })?;
441 let rows = next_res.rows_affected();
442 while let Some(_r) = conn.raw_connection.get_next_result()? {}
443 r?;
444 Ok(rows)
445 }
446
447 let rows = inner_copy_in(stmt, conn, binds, &mut source);
448 if let Err(ref e) = rows {
449 let database_error = crate::result::Error::DatabaseError(
450 crate::result::DatabaseErrorKind::Unknown,
451 Box::new(e.to_string()),
452 );
453 conn.instrumentation
454 .on_connection_event(InstrumentationEvent::FinishQuery {
455 query: &crate::debug_query(&source),
456 error: Some(&database_error),
457 });
458 } else {
459 conn.instrumentation
460 .on_connection_event(InstrumentationEvent::FinishQuery {
461 query: &crate::debug_query(&source),
462 error: None,
463 });
464 }
465
466 rows
467 })?;
468
469 Ok(res)
470 }
471
472 pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
473 where
474 T: CopyTarget,
475 {
476 let res = self.with_prepared_query::<_, _, Error>(
477 command,
478 false,
479 |stmt, binds, conn, source| {
480 let res = stmt.execute(&mut conn.raw_connection, &binds, false);
481 conn.instrumentation
482 .on_connection_event(InstrumentationEvent::FinishQuery {
483 query: &crate::debug_query(&source),
484 error: res.as_ref().err(),
485 });
486 Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
487 },
488 )?;
489 Ok(res)
490 }
491
492 fn with_prepared_query<'conn, T, R, E>(
493 &'conn mut self,
494 source: T,
495 execute_returning_count: bool,
496 f: impl FnOnce(
497 MaybeCached<'_, Statement>,
498 Vec<Option<Vec<u8>>>,
499 &'conn mut ConnectionAndTransactionManager,
500 T,
501 ) -> Result<R, E>,
502 ) -> Result<R, E>
503 where
504 T: QueryFragment<Pg> + QueryId,
505 E: From<crate::result::Error>,
506 {
507 self.connection_and_transaction_manager
508 .instrumentation
509 .on_connection_event(InstrumentationEvent::StartQuery {
510 query: &crate::debug_query(&source),
511 });
512 let mut bind_collector = RawBytesBindCollector::<Pg>::new();
513 source.collect_binds(&mut bind_collector, self, &Pg)?;
514 let binds = bind_collector.binds;
515 let metadata = bind_collector.metadata;
516
517 let cache = &mut self.statement_cache;
518 let conn = &mut self.connection_and_transaction_manager.raw_connection;
519 let query = cache.cached_statement(
520 &source,
521 &Pg,
522 &metadata,
523 conn,
524 Statement::prepare,
525 &mut *self.connection_and_transaction_manager.instrumentation,
526 );
527 if !execute_returning_count {
528 if let Err(ref e) = query {
529 self.connection_and_transaction_manager
530 .instrumentation
531 .on_connection_event(InstrumentationEvent::FinishQuery {
532 query: &crate::debug_query(&source),
533 error: Some(e),
534 });
535 }
536 }
537
538 f(
539 query?,
540 binds,
541 &mut self.connection_and_transaction_manager,
542 source,
543 )
544 }
545
546 fn set_config_options(&mut self) -> QueryResult<()> {
547 crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
548 crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
549 self.connection_and_transaction_manager
550 .raw_connection
551 .set_notice_processor(noop_notice_processor);
552 Ok(())
553 }
554
555 pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
596 let conn = &self.connection_and_transaction_manager.raw_connection;
597 std::iter::from_fn(move || conn.pq_notifies().transpose())
598 }
599}
600
601extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
602
603mod private {
604 use super::*;
605
606 #[allow(missing_debug_implementations)]
607 pub struct ConnectionAndTransactionManager {
608 pub(super) raw_connection: RawConnection,
609 pub(super) transaction_state: AnsiTransactionManager,
610 pub(super) instrumentation: DynInstrumentation,
611 }
612
613 pub trait PgLoadingMode<B> {
614 const USE_ROW_BY_ROW_MODE: bool;
615 type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
616 type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
617
618 fn get_cursor<'conn, 'query>(
619 raw_connection: &'conn mut ConnectionAndTransactionManager,
620 result: PgResult,
621 source: impl QueryFragment<Pg> + 'query,
622 ) -> QueryResult<Self::Cursor<'conn, 'query>>;
623 }
624
625 impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
626 const USE_ROW_BY_ROW_MODE: bool = false;
627 type Cursor<'conn, 'query> = Cursor;
628 type Row<'conn, 'query> = self::row::PgRow;
629
630 fn get_cursor<'conn, 'query>(
631 conn: &'conn mut ConnectionAndTransactionManager,
632 result: PgResult,
633 source: impl QueryFragment<Pg> + 'query,
634 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
635 update_transaction_manager_status(
636 Cursor::new(result, &mut conn.raw_connection),
637 conn,
638 &crate::debug_query(&source),
639 true,
640 )
641 }
642 }
643
644 impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
645 const USE_ROW_BY_ROW_MODE: bool = true;
646 type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
647 type Row<'conn, 'query> = self::row::PgRow;
648
649 fn get_cursor<'conn, 'query>(
650 raw_connection: &'conn mut ConnectionAndTransactionManager,
651 result: PgResult,
652 source: impl QueryFragment<Pg> + 'query,
653 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
654 Ok(RowByRowCursor::new(
655 result,
656 raw_connection,
657 Box::new(source),
658 ))
659 }
660 }
661}
662
663#[cfg(test)]
664#[allow(clippy::uninlined_format_args)]
666mod tests {
667 extern crate dotenvy;
668
669 use super::*;
670 use crate::prelude::*;
671 use crate::result::Error::DatabaseError;
672 use std::num::NonZeroU32;
673
674 fn connection() -> PgConnection {
675 crate::test_helpers::pg_connection_no_transaction()
676 }
677
678 #[diesel_test_helper::test]
679 fn notifications_arrive() {
680 use crate::sql_query;
681
682 let conn = &mut connection();
683 sql_query("LISTEN test_notifications")
684 .execute(conn)
685 .unwrap();
686 sql_query("NOTIFY test_notifications, 'first'")
687 .execute(conn)
688 .unwrap();
689 sql_query("NOTIFY test_notifications, 'second'")
690 .execute(conn)
691 .unwrap();
692
693 let notifications = conn
694 .notifications_iter()
695 .map(Result::unwrap)
696 .collect::<Vec<_>>();
697
698 assert_eq!(2, notifications.len());
699 assert_eq!(notifications[0].channel, "test_notifications");
700 assert_eq!(notifications[1].channel, "test_notifications");
701 assert_eq!(notifications[0].payload, "first");
702 assert_eq!(notifications[1].payload, "second");
703
704 let next_notification = conn.notifications_iter().next();
705 assert!(
706 next_notification.is_none(),
707 "Got a next notification, while not expecting one: {next_notification:?}"
708 );
709
710 sql_query("NOTIFY test_notifications")
711 .execute(conn)
712 .unwrap();
713 assert_eq!(
714 conn.notifications_iter().next().unwrap().unwrap().payload,
715 ""
716 );
717 }
718
719 #[diesel_test_helper::test]
720 fn malformed_sql_query() {
721 let connection = &mut connection();
722 let query =
723 crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
724
725 if let Err(DatabaseError(_, string)) = query {
726 assert_eq!(Some(26), string.statement_position());
727 } else {
728 unreachable!();
729 }
730 }
731
732 table! {
733 users {
734 id -> Integer,
735 name -> Text,
736 }
737 }
738
739 #[diesel_test_helper::test]
740 fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
741 use crate::connection::{AnsiTransactionManager, TransactionManager};
742 use crate::result::Error;
743 use crate::PgConnection;
744
745 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
746 assert_eq!(
747 None,
748 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
749 conn
750 ).transaction_depth().expect("Transaction depth")
751 );
752 let result = AnsiTransactionManager::commit_transaction(conn);
753 assert!(matches!(result, Err(Error::NotInTransaction)))
754 }
755
756 #[diesel_test_helper::test]
757 fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
758 use crate::connection::{AnsiTransactionManager, TransactionManager};
759 use crate::result::Error;
760 use crate::PgConnection;
761
762 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
763 assert_eq!(
764 None,
765 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
766 conn
767 ).transaction_depth().expect("Transaction depth")
768 );
769 let result = AnsiTransactionManager::rollback_transaction(conn);
770 assert!(matches!(result, Err(Error::NotInTransaction)))
771 }
772
773 #[diesel_test_helper::test]
774 fn postgres_transaction_is_rolled_back_upon_syntax_error() {
775 use std::num::NonZeroU32;
776
777 use crate::connection::{AnsiTransactionManager, TransactionManager};
778 use crate::pg::connection::raw::PgTransactionStatus;
779 use crate::*;
780 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
781 assert_eq!(
782 None,
783 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
784 conn
785 ).transaction_depth().expect("Transaction depth")
786 );
787 let _result = conn.build_transaction().run(|conn| {
788 assert_eq!(
789 NonZeroU32::new(1),
790 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
791 conn
792 ).transaction_depth().expect("Transaction depth")
793 );
794 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
796 assert!(query_result.is_err());
797 assert_eq!(
798 PgTransactionStatus::InError,
799 conn.connection_and_transaction_manager.raw_connection.transaction_status()
800 );
801 query_result
802 });
803 assert_eq!(
804 None,
805 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
806 conn
807 ).transaction_depth().expect("Transaction depth")
808 );
809 assert_eq!(
810 PgTransactionStatus::Idle,
811 conn.connection_and_transaction_manager
812 .raw_connection
813 .transaction_status()
814 );
815 }
816
817 #[diesel_test_helper::test]
818 fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
819 use std::num::NonZeroU32;
820
821 use crate::connection::{AnsiTransactionManager, TransactionManager};
822 use crate::pg::connection::raw::PgTransactionStatus;
823 use crate::*;
824 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
825 assert_eq!(
826 None,
827 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
828 conn
829 ).transaction_depth().expect("Transaction depth")
830 );
831 let result = conn.build_transaction().run(|conn| {
832 assert_eq!(
833 NonZeroU32::new(1),
834 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
835 conn
836 ).transaction_depth().expect("Transaction depth")
837 );
838 let result = conn.build_transaction().run(|conn| {
839 assert_eq!(
840 NonZeroU32::new(2),
841 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
842 conn
843 ).transaction_depth().expect("Transaction depth")
844 );
845 sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
846 });
847 assert!(result.is_err());
848 assert_eq!(
849 NonZeroU32::new(1),
850 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
851 conn
852 ).transaction_depth().expect("Transaction depth")
853 );
854 let query_result = sql_query("SELECT 1").execute(conn);
855 assert!(query_result.is_ok());
856 assert_eq!(
857 PgTransactionStatus::InTransaction,
858 conn.connection_and_transaction_manager.raw_connection.transaction_status()
859 );
860 query_result
861 });
862 assert!(result.is_ok());
863 assert_eq!(
864 PgTransactionStatus::Idle,
865 conn.connection_and_transaction_manager
866 .raw_connection
867 .transaction_status()
868 );
869 assert_eq!(
870 None,
871 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
872 conn
873 ).transaction_depth().expect("Transaction depth")
874 );
875 }
876
877 #[diesel_test_helper::test]
878 #[allow(clippy::needless_collect)]
881 fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
882 use crate::pg::connection::raw::PgTransactionStatus;
883 use crate::result::DatabaseErrorKind::SerializationFailure;
884 use crate::result::Error::DatabaseError;
885 use crate::*;
886 use std::sync::{Arc, Barrier};
887 use std::thread;
888
889 table! {
890 #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
891 serialization_example {
892 id -> Serial,
893 class -> Integer,
894 }
895 }
896
897 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
898
899 sql_query(
900 "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
901 )
902 .execute(conn)
903 .unwrap();
904 sql_query(
905 r#"
906 CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
907 id SERIAL PRIMARY KEY,
908 class INTEGER NOT NULL
909 )
910 "#,
911 )
912 .execute(conn)
913 .unwrap();
914
915 insert_into(serialization_example::table)
916 .values(&vec![
917 serialization_example::class.eq(1),
918 serialization_example::class.eq(2),
919 ])
920 .execute(conn)
921 .unwrap();
922
923 let before_barrier = Arc::new(Barrier::new(2));
924 let after_barrier = Arc::new(Barrier::new(2));
925 let threads = (1..3)
926 .map(|i| {
927 let before_barrier = before_barrier.clone();
928 let after_barrier = after_barrier.clone();
929 thread::spawn(move || {
930 use crate::connection::AnsiTransactionManager;
931 use crate::connection::TransactionManager;
932 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
933 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
934
935 let result = conn.build_transaction().serializable().run(|conn| {
936 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
937
938 let _ = serialization_example::table
939 .filter(serialization_example::class.eq(i))
940 .count()
941 .execute(conn)?;
942
943 let other_i = if i == 1 { 2 } else { 1 };
944 let q = insert_into(serialization_example::table)
945 .values(serialization_example::class.eq(other_i));
946 before_barrier.wait();
947
948 let r = q.execute(conn);
949 after_barrier.wait();
950 r
951 });
952 assert_eq!(
953 PgTransactionStatus::Idle,
954 conn.connection_and_transaction_manager.raw_connection.transaction_status()
955 );
956
957 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
958 result
959 })
960 })
961 .collect::<Vec<_>>();
962
963 let mut results = threads
964 .into_iter()
965 .map(|t| t.join().unwrap())
966 .collect::<Vec<_>>();
967
968 results.sort_by_key(|r| r.is_err());
969
970 assert!(results[0].is_ok(), "Got {:?} instead", results);
971 assert!(
972 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
973 "Got {:?} instead",
974 results
975 );
976 assert_eq!(
977 PgTransactionStatus::Idle,
978 conn.connection_and_transaction_manager
979 .raw_connection
980 .transaction_status()
981 );
982 }
983
984 #[diesel_test_helper::test]
985 #[allow(clippy::needless_collect)]
988 fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
989 use crate::pg::connection::raw::PgTransactionStatus;
990 use crate::result::DatabaseErrorKind::SerializationFailure;
991 use crate::result::Error::DatabaseError;
992 use crate::*;
993 use std::sync::{Arc, Barrier};
994 use std::thread;
995
996 table! {
997 #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
998 serialization_example {
999 id -> Serial,
1000 class -> Integer,
1001 }
1002 }
1003
1004 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1005
1006 sql_query(
1007 "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1008 )
1009 .execute(conn)
1010 .unwrap();
1011 sql_query(
1012 r#"
1013 CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1014 id SERIAL PRIMARY KEY,
1015 class INTEGER NOT NULL
1016 )
1017 "#,
1018 )
1019 .execute(conn)
1020 .unwrap();
1021
1022 insert_into(serialization_example::table)
1023 .values(&vec![
1024 serialization_example::class.eq(1),
1025 serialization_example::class.eq(2),
1026 ])
1027 .execute(conn)
1028 .unwrap();
1029
1030 let before_barrier = Arc::new(Barrier::new(2));
1031 let after_barrier = Arc::new(Barrier::new(2));
1032 let threads = (1..3)
1033 .map(|i| {
1034 let before_barrier = before_barrier.clone();
1035 let after_barrier = after_barrier.clone();
1036 thread::spawn(move || {
1037 use crate::connection::AnsiTransactionManager;
1038 use crate::connection::TransactionManager;
1039 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1040 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1041
1042 let result = conn.build_transaction().serializable().run(|conn| {
1043 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1044 let r = conn.transaction(|conn| {
1045 assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1046
1047 let _ = serialization_example::table
1048 .filter(serialization_example::class.eq(i))
1049 .count()
1050 .execute(conn)?;
1051
1052 let other_i = if i == 1 { 2 } else { 1 };
1053 let q = insert_into(serialization_example::table)
1054 .values(serialization_example::class.eq(other_i));
1055 before_barrier.wait();
1056
1057 let r = q.execute(conn);
1058 after_barrier.wait();
1059 r
1060 });
1061 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1062 assert_eq!(
1063 PgTransactionStatus::InTransaction,
1064 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1065 );
1066 r
1067 });
1068 assert_eq!(
1069 PgTransactionStatus::Idle,
1070 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1071 );
1072
1073 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1074 result
1075 })
1076 })
1077 .collect::<Vec<_>>();
1078
1079 let mut results = threads
1080 .into_iter()
1081 .map(|t| t.join().unwrap())
1082 .collect::<Vec<_>>();
1083
1084 results.sort_by_key(|r| r.is_err());
1085
1086 assert!(results[0].is_ok(), "Got {:?} instead", results);
1087 assert!(
1088 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1089 "Got {:?} instead",
1090 results
1091 );
1092 assert_eq!(
1093 PgTransactionStatus::Idle,
1094 conn.connection_and_transaction_manager
1095 .raw_connection
1096 .transaction_status()
1097 );
1098 }
1099
1100 #[diesel_test_helper::test]
1101 fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1102 use crate::connection::{AnsiTransactionManager, TransactionManager};
1103 use crate::pg::connection::raw::PgTransactionStatus;
1104 use crate::result::Error;
1105 use crate::*;
1106
1107 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1108 assert_eq!(
1109 None,
1110 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1111 conn
1112 ).transaction_depth().expect("Transaction depth")
1113 );
1114 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1115 assert_eq!(
1116 NonZeroU32::new(1),
1117 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1118 conn
1119 ).transaction_depth().expect("Transaction depth")
1120 );
1121 sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1122 sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1123 .execute(conn)?;
1124 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1125 let result =
1126 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1127 assert!(result.is_ok());
1128 assert_eq!(
1129 PgTransactionStatus::InTransaction,
1130 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1131 );
1132 Ok(())
1133 });
1134 assert_eq!(
1135 None,
1136 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1137 conn
1138 ).transaction_depth().expect("Transaction depth")
1139 );
1140 assert_eq!(
1141 PgTransactionStatus::Idle,
1142 conn.connection_and_transaction_manager
1143 .raw_connection
1144 .transaction_status()
1145 );
1146 assert!(result.is_err());
1147 }
1148
1149 #[diesel_test_helper::test]
1150 fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1151 use crate::connection::{AnsiTransactionManager, TransactionManager};
1152 use crate::pg::connection::raw::PgTransactionStatus;
1153 use crate::result::Error;
1154 use crate::*;
1155
1156 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1157 assert_eq!(
1158 None,
1159 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1160 conn
1161 ).transaction_depth().expect("Transaction depth")
1162 );
1163 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1164 assert_eq!(
1165 NonZeroU32::new(1),
1166 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1167 conn
1168 ).transaction_depth().expect("Transaction depth")
1169 );
1170 sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1171 sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1172 .execute(conn)?;
1173 sql_query(
1174 r#"
1175 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1176 RETURNS trigger
1177 LANGUAGE plpgsql
1178 AS $$
1179 DECLARE
1180 BEGIN
1181 IF NEW.value = 42 THEN
1182 RAISE EXCEPTION 'Transaction kaboom';
1183 END IF;
1184 RETURN NEW;
1185
1186 END;$$;
1187 "#,
1188 )
1189 .execute(conn)?;
1190
1191 sql_query(
1192 r#"
1193 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1194 AFTER INSERT ON "deferred_trigger_commit"
1195 DEFERRABLE INITIALLY DEFERRED
1196 FOR EACH ROW
1197 EXECUTE PROCEDURE transaction_depth_blow_up()
1198 "#,
1199 )
1200 .execute(conn)?;
1201 let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1202 assert!(result.is_ok());
1203 assert_eq!(
1204 PgTransactionStatus::InTransaction,
1205 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1206 );
1207 Ok(())
1208 });
1209 assert_eq!(
1210 None,
1211 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1212 conn
1213 ).transaction_depth().expect("Transaction depth")
1214 );
1215 assert_eq!(
1216 PgTransactionStatus::Idle,
1217 conn.connection_and_transaction_manager
1218 .raw_connection
1219 .transaction_status()
1220 );
1221 assert!(result.is_err());
1222 }
1223
1224 #[diesel_test_helper::test]
1225 fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1226 use crate::connection::{AnsiTransactionManager, TransactionManager};
1227 use crate::pg::connection::raw::PgTransactionStatus;
1228 use crate::result::Error;
1229 use crate::*;
1230
1231 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1232 assert_eq!(
1233 None,
1234 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1235 conn
1236 ).transaction_depth().expect("Transaction depth")
1237 );
1238 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1239 assert_eq!(
1240 NonZeroU32::new(1),
1241 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1242 conn
1243 ).transaction_depth().expect("Transaction depth")
1244 );
1245 sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1246 sql_query(
1247 "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1248 )
1249 .execute(conn)?;
1250 sql_query(
1251 r#"
1252 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1253 RETURNS trigger
1254 LANGUAGE plpgsql
1255 AS $$
1256 DECLARE
1257 BEGIN
1258 IF NEW.value = 42 THEN
1259 RAISE EXCEPTION 'Transaction kaboom';
1260 END IF;
1261 RETURN NEW;
1262
1263 END;$$;
1264 "#,
1265 )
1266 .execute(conn)?;
1267
1268 sql_query(
1269 r#"
1270 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1271 AFTER INSERT ON "deferred_trigger_nested_commit"
1272 DEFERRABLE INITIALLY DEFERRED
1273 FOR EACH ROW
1274 EXECUTE PROCEDURE transaction_depth_blow_up()
1275 "#,
1276 )
1277 .execute(conn)?;
1278 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1279 let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1280 .execute(conn);
1281 assert!(result.is_ok());
1282 Ok(())
1283 });
1284 assert!(inner_result.is_err());
1285 assert_eq!(
1286 PgTransactionStatus::InTransaction,
1287 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1288 );
1289 Ok(())
1290 });
1291 assert_eq!(
1292 None,
1293 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1294 conn
1295 ).transaction_depth().expect("Transaction depth")
1296 );
1297 assert_eq!(
1298 PgTransactionStatus::Idle,
1299 conn.connection_and_transaction_manager
1300 .raw_connection
1301 .transaction_status()
1302 );
1303 assert!(result.is_ok(), "Expected success, got {:?}", result);
1304 }
1305
1306 #[diesel_test_helper::test]
1307 fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1308 use crate::connection::{AnsiTransactionManager, TransactionManager};
1309 use crate::pg::connection::raw::PgTransactionStatus;
1310 use crate::result::Error;
1311 use crate::*;
1312
1313 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1314 assert_eq!(
1315 None,
1316 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1317 conn
1318 ).transaction_depth().expect("Transaction depth")
1319 );
1320 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1321 assert_eq!(
1322 NonZeroU32::new(1),
1323 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1324 conn
1325 ).transaction_depth().expect("Transaction depth")
1326 );
1327 sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1328 sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1329 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1330 assert_eq!(
1331 NonZeroU32::new(2),
1332 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1333 conn
1334 ).transaction_depth().expect("Transaction depth")
1335 );
1336 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1337 let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1338 assert!(result.is_ok());
1339 Ok(())
1340 });
1341 assert!(inner_result.is_err());
1342 assert_eq!(
1343 PgTransactionStatus::InTransaction,
1344 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1345 );
1346 assert_eq!(
1347 NonZeroU32::new(1),
1348 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1349 conn
1350 ).transaction_depth().expect("Transaction depth")
1351 );
1352 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1353 });
1354 assert_eq!(
1355 None,
1356 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1357 conn
1358 ).transaction_depth().expect("Transaction depth")
1359 );
1360 assert_eq!(
1361 PgTransactionStatus::Idle,
1362 conn.connection_and_transaction_manager
1363 .raw_connection
1364 .transaction_status()
1365 );
1366 assert!(result.is_ok());
1367 }
1368}