1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::ConnectionAndTransactionManager;
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::connection::instrumentation::{
14 DebugQuery, DynInstrumentation, Instrumentation, StrQueryHelper,
15};
16use crate::connection::statement_cache::{MaybeCached, StatementCache};
17use crate::connection::*;
18use crate::expression::QueryMetadata;
19use crate::pg::backend::PgNotification;
20use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
21use crate::pg::query_builder::copy::InternalCopyFromQuery;
22use crate::pg::{Pg, TransactionBuilder};
23use crate::query_builder::bind_collector::RawBytesBindCollector;
24use crate::query_builder::*;
25use crate::result::ConnectionError::CouldntSetupConfiguration;
26use crate::result::*;
27use crate::RunQueryDsl;
28use std::ffi::CString;
29use std::fmt::Debug;
30use std::os::raw as libc;
31
32use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
33
34pub(super) use self::result::PgResult;
35
36#[allow(missing_debug_implementations)]
126#[cfg(feature = "postgres")]
127pub struct PgConnection {
128 statement_cache: StatementCache<Pg, Statement>,
129 metadata_cache: PgMetadataCache,
130 connection_and_transaction_manager: ConnectionAndTransactionManager,
131}
132
133#[allow(unsafe_code)]
135unsafe impl Send for PgConnection {}
136
137impl SimpleConnection for PgConnection {
138 #[allow(unsafe_code)] fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
140 self.connection_and_transaction_manager
141 .instrumentation
142 .on_connection_event(InstrumentationEvent::StartQuery {
143 query: &StrQueryHelper::new(query),
144 });
145 let c_query = CString::new(query)?;
146 let inner_result = unsafe {
147 self.connection_and_transaction_manager
148 .raw_connection
149 .exec(c_query.as_ptr())
150 };
151 update_transaction_manager_status(
152 inner_result.and_then(|raw_result| {
153 PgResult::new(
154 raw_result,
155 &self.connection_and_transaction_manager.raw_connection,
156 )
157 }),
158 &mut self.connection_and_transaction_manager,
159 &StrQueryHelper::new(query),
160 true,
161 )?;
162 Ok(())
163 }
164}
165
166#[derive(Debug, Copy, Clone)]
170pub struct PgRowByRowLoadingMode;
171
172impl ConnectionSealed for PgConnection {}
173
174impl Connection for PgConnection {
175 type Backend = Pg;
176 type TransactionManager = AnsiTransactionManager;
177
178 fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
179 let mut instrumentation = DynInstrumentation::default_instrumentation();
180 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
181 url: database_url,
182 });
183 let r = RawConnection::establish(database_url).and_then(|raw_conn| {
184 let mut conn = PgConnection {
185 connection_and_transaction_manager: ConnectionAndTransactionManager {
186 raw_connection: raw_conn,
187 transaction_state: AnsiTransactionManager::default(),
188 instrumentation: DynInstrumentation::none(),
189 },
190 statement_cache: StatementCache::new(),
191 metadata_cache: PgMetadataCache::new(),
192 };
193 conn.set_config_options()
194 .map_err(CouldntSetupConfiguration)?;
195 Ok(conn)
196 });
197 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
198 url: database_url,
199 error: r.as_ref().err(),
200 });
201 let mut conn = r?;
202 conn.connection_and_transaction_manager.instrumentation = instrumentation;
203 Ok(conn)
204 }
205
206 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
207 where
208 T: QueryFragment<Pg> + QueryId,
209 {
210 update_transaction_manager_status(
211 self.with_prepared_query(source, true, |query, params, conn, _source| {
212 let res = query
213 .execute(&mut conn.raw_connection, ¶ms, false)
214 .map(|r| r.rows_affected());
215 while conn.raw_connection.get_next_result()?.is_some() {}
218 res
219 }),
220 &mut self.connection_and_transaction_manager,
221 &crate::debug_query(source),
222 true,
223 )
224 }
225
226 fn transaction_state(&mut self) -> &mut AnsiTransactionManager
227 where
228 Self: Sized,
229 {
230 &mut self.connection_and_transaction_manager.transaction_state
231 }
232
233 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
234 &mut *self.connection_and_transaction_manager.instrumentation
235 }
236
237 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
238 self.connection_and_transaction_manager.instrumentation = instrumentation.into();
239 }
240
241 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
242 self.statement_cache.set_cache_size(size);
243 }
244}
245
246impl<B> LoadConnection<B> for PgConnection
247where
248 Self: self::private::PgLoadingMode<B>,
249{
250 type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
251 type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
252
253 fn load<'conn, 'query, T>(
254 &'conn mut self,
255 source: T,
256 ) -> QueryResult<Self::Cursor<'conn, 'query>>
257 where
258 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
259 Self::Backend: QueryMetadata<T::SqlType>,
260 {
261 self.with_prepared_query(source, false, |stmt, params, conn, source| {
262 use self::private::PgLoadingMode;
263 let result = stmt.execute(&mut conn.raw_connection, ¶ms, Self::USE_ROW_BY_ROW_MODE);
264 let result = update_transaction_manager_status(
265 result,
266 conn,
267 &crate::debug_query(&source),
268 false,
269 )?;
270 Self::get_cursor(conn, result, source)
271 })
272 }
273}
274
275impl GetPgMetadataCache for PgConnection {
276 fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
277 &mut self.metadata_cache
278 }
279}
280
281#[inline(always)]
282fn update_transaction_manager_status<T>(
283 query_result: QueryResult<T>,
284 conn: &mut ConnectionAndTransactionManager,
285 source: &dyn DebugQuery,
286 final_call: bool,
287) -> QueryResult<T> {
288 fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
290 let raw_conn: &mut RawConnection = &mut conn.raw_connection;
291 let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
292 match raw_conn.transaction_status() {
296 PgTransactionStatus::InError => {
297 tm.status.set_requires_rollback_maybe_up_to_top_level(true)
298 }
299 PgTransactionStatus::Unknown => tm.status.set_in_error(),
300 PgTransactionStatus::Idle => {
301 tm.status = TransactionManagerStatus::Valid(Default::default())
306 }
307 PgTransactionStatus::InTransaction => {
308 let transaction_status = &mut tm.status;
309 if is_err {
312 if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
315 {
316 transaction_status.set_in_error()
318 }
319 } else {
320 tm.status.set_requires_rollback_maybe_up_to_top_level(false)
328 }
329 }
330 PgTransactionStatus::Active => {
331 }
333 }
334 }
335 non_generic_inner(conn, query_result.is_err());
336 if let Err(ref e) = query_result {
337 conn.instrumentation
338 .on_connection_event(InstrumentationEvent::FinishQuery {
339 query: source,
340 error: Some(e),
341 });
342 } else if final_call {
343 conn.instrumentation
344 .on_connection_event(InstrumentationEvent::FinishQuery {
345 query: source,
346 error: None,
347 });
348 }
349 query_result
350}
351
352#[cfg(feature = "r2d2")]
353impl crate::r2d2::R2D2Connection for PgConnection {
354 fn ping(&mut self) -> QueryResult<()> {
355 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
356 }
357
358 fn is_broken(&mut self) -> bool {
359 AnsiTransactionManager::is_broken_transaction_manager(self)
360 }
361}
362
363impl MultiConnectionHelper for PgConnection {
364 fn to_any<'a>(
365 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
366 ) -> &mut (dyn std::any::Any + 'a) {
367 lookup.as_any()
368 }
369
370 fn from_any(
371 lookup: &mut dyn std::any::Any,
372 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
373 lookup
374 .downcast_mut::<Self>()
375 .map(|conn| conn as &mut dyn super::PgMetadataLookup)
376 }
377}
378
379impl PgConnection {
380 pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
404 TransactionBuilder::new(self)
405 }
406
407 pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
408 where
409 S: CopyFromExpression<T>,
410 {
411 let query = InternalCopyFromQuery::new(target);
412 let res = self.with_prepared_query(query, false, |stmt, binds, conn, mut source| {
413 fn inner_copy_in<S, T>(
414 stmt: MaybeCached<'_, Statement>,
415 conn: &mut ConnectionAndTransactionManager,
416 binds: Vec<Option<Vec<u8>>>,
417 source: &mut InternalCopyFromQuery<S, T>,
418 ) -> Result<usize, S::Error>
419 where
420 S: CopyFromExpression<T>,
421 {
422 let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
423 let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
424 let r = source.target.callback(&mut copy_in);
425 copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
426 let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
427 crate::result::Error::DeserializationError(
428 "Failed to receive result from the database".into(),
429 )
430 })?;
431 let rows = next_res.rows_affected();
432 while let Some(_r) = conn.raw_connection.get_next_result()? {}
433 r?;
434 Ok(rows)
435 }
436
437 let rows = inner_copy_in(stmt, conn, binds, &mut source);
438 if let Err(ref e) = rows {
439 let database_error = crate::result::Error::DatabaseError(
440 crate::result::DatabaseErrorKind::Unknown,
441 Box::new(e.to_string()),
442 );
443 conn.instrumentation
444 .on_connection_event(InstrumentationEvent::FinishQuery {
445 query: &crate::debug_query(&source),
446 error: Some(&database_error),
447 });
448 } else {
449 conn.instrumentation
450 .on_connection_event(InstrumentationEvent::FinishQuery {
451 query: &crate::debug_query(&source),
452 error: None,
453 });
454 }
455
456 rows
457 })?;
458
459 Ok(res)
460 }
461
462 pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
463 where
464 T: CopyTarget,
465 {
466 let res = self.with_prepared_query::<_, _, Error>(
467 command,
468 false,
469 |stmt, binds, conn, source| {
470 let res = stmt.execute(&mut conn.raw_connection, &binds, false);
471 conn.instrumentation
472 .on_connection_event(InstrumentationEvent::FinishQuery {
473 query: &crate::debug_query(&source),
474 error: res.as_ref().err(),
475 });
476 Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
477 },
478 )?;
479 Ok(res)
480 }
481
482 fn with_prepared_query<'conn, T, R, E>(
483 &'conn mut self,
484 source: T,
485 execute_returning_count: bool,
486 f: impl FnOnce(
487 MaybeCached<'_, Statement>,
488 Vec<Option<Vec<u8>>>,
489 &'conn mut ConnectionAndTransactionManager,
490 T,
491 ) -> Result<R, E>,
492 ) -> Result<R, E>
493 where
494 T: QueryFragment<Pg> + QueryId,
495 E: From<crate::result::Error>,
496 {
497 self.connection_and_transaction_manager
498 .instrumentation
499 .on_connection_event(InstrumentationEvent::StartQuery {
500 query: &crate::debug_query(&source),
501 });
502 let mut bind_collector = RawBytesBindCollector::<Pg>::new();
503 source.collect_binds(&mut bind_collector, self, &Pg)?;
504 let binds = bind_collector.binds;
505 let metadata = bind_collector.metadata;
506
507 let cache = &mut self.statement_cache;
508 let conn = &mut self.connection_and_transaction_manager.raw_connection;
509 let query = cache.cached_statement(
510 &source,
511 &Pg,
512 &metadata,
513 conn,
514 Statement::prepare,
515 &mut *self.connection_and_transaction_manager.instrumentation,
516 );
517 if !execute_returning_count {
518 if let Err(ref e) = query {
519 self.connection_and_transaction_manager
520 .instrumentation
521 .on_connection_event(InstrumentationEvent::FinishQuery {
522 query: &crate::debug_query(&source),
523 error: Some(e),
524 });
525 }
526 }
527
528 f(
529 query?,
530 binds,
531 &mut self.connection_and_transaction_manager,
532 source,
533 )
534 }
535
536 fn set_config_options(&mut self) -> QueryResult<()> {
537 crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
538 crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
539 self.connection_and_transaction_manager
540 .raw_connection
541 .set_notice_processor(noop_notice_processor);
542 Ok(())
543 }
544
545 pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
586 let conn = &self.connection_and_transaction_manager.raw_connection;
587 std::iter::from_fn(move || conn.pq_notifies().transpose())
588 }
589}
590
591extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
592
593mod private {
594 use super::*;
595
596 #[allow(missing_debug_implementations)]
597 pub struct ConnectionAndTransactionManager {
598 pub(super) raw_connection: RawConnection,
599 pub(super) transaction_state: AnsiTransactionManager,
600 pub(super) instrumentation: DynInstrumentation,
601 }
602
603 pub trait PgLoadingMode<B> {
604 const USE_ROW_BY_ROW_MODE: bool;
605 type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
606 type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
607
608 fn get_cursor<'conn, 'query>(
609 raw_connection: &'conn mut ConnectionAndTransactionManager,
610 result: PgResult,
611 source: impl QueryFragment<Pg> + 'query,
612 ) -> QueryResult<Self::Cursor<'conn, 'query>>;
613 }
614
615 impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
616 const USE_ROW_BY_ROW_MODE: bool = false;
617 type Cursor<'conn, 'query> = Cursor;
618 type Row<'conn, 'query> = self::row::PgRow;
619
620 fn get_cursor<'conn, 'query>(
621 conn: &'conn mut ConnectionAndTransactionManager,
622 result: PgResult,
623 source: impl QueryFragment<Pg> + 'query,
624 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
625 update_transaction_manager_status(
626 Cursor::new(result, &mut conn.raw_connection),
627 conn,
628 &crate::debug_query(&source),
629 true,
630 )
631 }
632 }
633
634 impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
635 const USE_ROW_BY_ROW_MODE: bool = true;
636 type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
637 type Row<'conn, 'query> = self::row::PgRow;
638
639 fn get_cursor<'conn, 'query>(
640 raw_connection: &'conn mut ConnectionAndTransactionManager,
641 result: PgResult,
642 source: impl QueryFragment<Pg> + 'query,
643 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
644 Ok(RowByRowCursor::new(
645 result,
646 raw_connection,
647 Box::new(source),
648 ))
649 }
650 }
651}
652
653#[cfg(test)]
654#[allow(clippy::uninlined_format_args)]
656mod tests {
657 extern crate dotenvy;
658
659 use super::*;
660 use crate::prelude::*;
661 use crate::result::Error::DatabaseError;
662 use std::num::NonZeroU32;
663
664 fn connection() -> PgConnection {
665 crate::test_helpers::pg_connection_no_transaction()
666 }
667
668 #[diesel_test_helper::test]
669 fn notifications_arrive() {
670 use crate::sql_query;
671
672 let conn = &mut connection();
673 sql_query("LISTEN test_notifications")
674 .execute(conn)
675 .unwrap();
676 sql_query("NOTIFY test_notifications, 'first'")
677 .execute(conn)
678 .unwrap();
679 sql_query("NOTIFY test_notifications, 'second'")
680 .execute(conn)
681 .unwrap();
682
683 let notifications = conn
684 .notifications_iter()
685 .map(Result::unwrap)
686 .collect::<Vec<_>>();
687
688 assert_eq!(2, notifications.len());
689 assert_eq!(notifications[0].channel, "test_notifications");
690 assert_eq!(notifications[1].channel, "test_notifications");
691 assert_eq!(notifications[0].payload, "first");
692 assert_eq!(notifications[1].payload, "second");
693
694 let next_notification = conn.notifications_iter().next();
695 assert!(
696 next_notification.is_none(),
697 "Got a next notification, while not expecting one: {next_notification:?}"
698 );
699
700 sql_query("NOTIFY test_notifications")
701 .execute(conn)
702 .unwrap();
703 assert_eq!(
704 conn.notifications_iter().next().unwrap().unwrap().payload,
705 ""
706 );
707 }
708
709 #[diesel_test_helper::test]
710 fn malformed_sql_query() {
711 let connection = &mut connection();
712 let query =
713 crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
714
715 if let Err(DatabaseError(_, string)) = query {
716 assert_eq!(Some(26), string.statement_position());
717 } else {
718 unreachable!();
719 }
720 }
721
722 table! {
723 users {
724 id -> Integer,
725 name -> Text,
726 }
727 }
728
729 #[diesel_test_helper::test]
730 fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
731 use crate::connection::{AnsiTransactionManager, TransactionManager};
732 use crate::result::Error;
733 use crate::PgConnection;
734
735 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
736 assert_eq!(
737 None,
738 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
739 conn
740 ).transaction_depth().expect("Transaction depth")
741 );
742 let result = AnsiTransactionManager::commit_transaction(conn);
743 assert!(matches!(result, Err(Error::NotInTransaction)))
744 }
745
746 #[diesel_test_helper::test]
747 fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
748 use crate::connection::{AnsiTransactionManager, TransactionManager};
749 use crate::result::Error;
750 use crate::PgConnection;
751
752 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
753 assert_eq!(
754 None,
755 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
756 conn
757 ).transaction_depth().expect("Transaction depth")
758 );
759 let result = AnsiTransactionManager::rollback_transaction(conn);
760 assert!(matches!(result, Err(Error::NotInTransaction)))
761 }
762
763 #[diesel_test_helper::test]
764 fn postgres_transaction_is_rolled_back_upon_syntax_error() {
765 use std::num::NonZeroU32;
766
767 use crate::connection::{AnsiTransactionManager, TransactionManager};
768 use crate::pg::connection::raw::PgTransactionStatus;
769 use crate::*;
770 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
771 assert_eq!(
772 None,
773 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
774 conn
775 ).transaction_depth().expect("Transaction depth")
776 );
777 let _result = conn.build_transaction().run(|conn| {
778 assert_eq!(
779 NonZeroU32::new(1),
780 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
781 conn
782 ).transaction_depth().expect("Transaction depth")
783 );
784 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
786 assert!(query_result.is_err());
787 assert_eq!(
788 PgTransactionStatus::InError,
789 conn.connection_and_transaction_manager.raw_connection.transaction_status()
790 );
791 query_result
792 });
793 assert_eq!(
794 None,
795 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
796 conn
797 ).transaction_depth().expect("Transaction depth")
798 );
799 assert_eq!(
800 PgTransactionStatus::Idle,
801 conn.connection_and_transaction_manager
802 .raw_connection
803 .transaction_status()
804 );
805 }
806
807 #[diesel_test_helper::test]
808 fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
809 use std::num::NonZeroU32;
810
811 use crate::connection::{AnsiTransactionManager, TransactionManager};
812 use crate::pg::connection::raw::PgTransactionStatus;
813 use crate::*;
814 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
815 assert_eq!(
816 None,
817 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
818 conn
819 ).transaction_depth().expect("Transaction depth")
820 );
821 let result = conn.build_transaction().run(|conn| {
822 assert_eq!(
823 NonZeroU32::new(1),
824 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
825 conn
826 ).transaction_depth().expect("Transaction depth")
827 );
828 let result = conn.build_transaction().run(|conn| {
829 assert_eq!(
830 NonZeroU32::new(2),
831 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
832 conn
833 ).transaction_depth().expect("Transaction depth")
834 );
835 sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
836 });
837 assert!(result.is_err());
838 assert_eq!(
839 NonZeroU32::new(1),
840 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
841 conn
842 ).transaction_depth().expect("Transaction depth")
843 );
844 let query_result = sql_query("SELECT 1").execute(conn);
845 assert!(query_result.is_ok());
846 assert_eq!(
847 PgTransactionStatus::InTransaction,
848 conn.connection_and_transaction_manager.raw_connection.transaction_status()
849 );
850 query_result
851 });
852 assert!(result.is_ok());
853 assert_eq!(
854 PgTransactionStatus::Idle,
855 conn.connection_and_transaction_manager
856 .raw_connection
857 .transaction_status()
858 );
859 assert_eq!(
860 None,
861 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
862 conn
863 ).transaction_depth().expect("Transaction depth")
864 );
865 }
866
867 #[diesel_test_helper::test]
868 #[allow(clippy::needless_collect)]
871 fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
872 use crate::pg::connection::raw::PgTransactionStatus;
873 use crate::result::DatabaseErrorKind::SerializationFailure;
874 use crate::result::Error::DatabaseError;
875 use crate::*;
876 use std::sync::{Arc, Barrier};
877 use std::thread;
878
879 table! {
880 #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
881 serialization_example {
882 id -> Serial,
883 class -> Integer,
884 }
885 }
886
887 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
888
889 sql_query(
890 "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
891 )
892 .execute(conn)
893 .unwrap();
894 sql_query(
895 r#"
896 CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
897 id SERIAL PRIMARY KEY,
898 class INTEGER NOT NULL
899 )
900 "#,
901 )
902 .execute(conn)
903 .unwrap();
904
905 insert_into(serialization_example::table)
906 .values(&vec![
907 serialization_example::class.eq(1),
908 serialization_example::class.eq(2),
909 ])
910 .execute(conn)
911 .unwrap();
912
913 let before_barrier = Arc::new(Barrier::new(2));
914 let after_barrier = Arc::new(Barrier::new(2));
915 let threads = (1..3)
916 .map(|i| {
917 let before_barrier = before_barrier.clone();
918 let after_barrier = after_barrier.clone();
919 thread::spawn(move || {
920 use crate::connection::AnsiTransactionManager;
921 use crate::connection::TransactionManager;
922 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
923 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
924
925 let result = conn.build_transaction().serializable().run(|conn| {
926 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
927
928 let _ = serialization_example::table
929 .filter(serialization_example::class.eq(i))
930 .count()
931 .execute(conn)?;
932
933 let other_i = if i == 1 { 2 } else { 1 };
934 let q = insert_into(serialization_example::table)
935 .values(serialization_example::class.eq(other_i));
936 before_barrier.wait();
937
938 let r = q.execute(conn);
939 after_barrier.wait();
940 r
941 });
942 assert_eq!(
943 PgTransactionStatus::Idle,
944 conn.connection_and_transaction_manager.raw_connection.transaction_status()
945 );
946
947 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
948 result
949 })
950 })
951 .collect::<Vec<_>>();
952
953 let mut results = threads
954 .into_iter()
955 .map(|t| t.join().unwrap())
956 .collect::<Vec<_>>();
957
958 results.sort_by_key(|r| r.is_err());
959
960 assert!(results[0].is_ok(), "Got {:?} instead", results);
961 assert!(
962 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
963 "Got {:?} instead",
964 results
965 );
966 assert_eq!(
967 PgTransactionStatus::Idle,
968 conn.connection_and_transaction_manager
969 .raw_connection
970 .transaction_status()
971 );
972 }
973
974 #[diesel_test_helper::test]
975 #[allow(clippy::needless_collect)]
978 fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
979 use crate::pg::connection::raw::PgTransactionStatus;
980 use crate::result::DatabaseErrorKind::SerializationFailure;
981 use crate::result::Error::DatabaseError;
982 use crate::*;
983 use std::sync::{Arc, Barrier};
984 use std::thread;
985
986 table! {
987 #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
988 serialization_example {
989 id -> Serial,
990 class -> Integer,
991 }
992 }
993
994 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
995
996 sql_query(
997 "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
998 )
999 .execute(conn)
1000 .unwrap();
1001 sql_query(
1002 r#"
1003 CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1004 id SERIAL PRIMARY KEY,
1005 class INTEGER NOT NULL
1006 )
1007 "#,
1008 )
1009 .execute(conn)
1010 .unwrap();
1011
1012 insert_into(serialization_example::table)
1013 .values(&vec![
1014 serialization_example::class.eq(1),
1015 serialization_example::class.eq(2),
1016 ])
1017 .execute(conn)
1018 .unwrap();
1019
1020 let before_barrier = Arc::new(Barrier::new(2));
1021 let after_barrier = Arc::new(Barrier::new(2));
1022 let threads = (1..3)
1023 .map(|i| {
1024 let before_barrier = before_barrier.clone();
1025 let after_barrier = after_barrier.clone();
1026 thread::spawn(move || {
1027 use crate::connection::AnsiTransactionManager;
1028 use crate::connection::TransactionManager;
1029 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1030 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1031
1032 let result = conn.build_transaction().serializable().run(|conn| {
1033 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1034 let r = conn.transaction(|conn| {
1035 assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1036
1037 let _ = serialization_example::table
1038 .filter(serialization_example::class.eq(i))
1039 .count()
1040 .execute(conn)?;
1041
1042 let other_i = if i == 1 { 2 } else { 1 };
1043 let q = insert_into(serialization_example::table)
1044 .values(serialization_example::class.eq(other_i));
1045 before_barrier.wait();
1046
1047 let r = q.execute(conn);
1048 after_barrier.wait();
1049 r
1050 });
1051 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1052 assert_eq!(
1053 PgTransactionStatus::InTransaction,
1054 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1055 );
1056 r
1057 });
1058 assert_eq!(
1059 PgTransactionStatus::Idle,
1060 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1061 );
1062
1063 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1064 result
1065 })
1066 })
1067 .collect::<Vec<_>>();
1068
1069 let mut results = threads
1070 .into_iter()
1071 .map(|t| t.join().unwrap())
1072 .collect::<Vec<_>>();
1073
1074 results.sort_by_key(|r| r.is_err());
1075
1076 assert!(results[0].is_ok(), "Got {:?} instead", results);
1077 assert!(
1078 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1079 "Got {:?} instead",
1080 results
1081 );
1082 assert_eq!(
1083 PgTransactionStatus::Idle,
1084 conn.connection_and_transaction_manager
1085 .raw_connection
1086 .transaction_status()
1087 );
1088 }
1089
1090 #[diesel_test_helper::test]
1091 fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1092 use crate::connection::{AnsiTransactionManager, TransactionManager};
1093 use crate::pg::connection::raw::PgTransactionStatus;
1094 use crate::result::Error;
1095 use crate::*;
1096
1097 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1098 assert_eq!(
1099 None,
1100 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1101 conn
1102 ).transaction_depth().expect("Transaction depth")
1103 );
1104 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1105 assert_eq!(
1106 NonZeroU32::new(1),
1107 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1108 conn
1109 ).transaction_depth().expect("Transaction depth")
1110 );
1111 sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1112 sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1113 .execute(conn)?;
1114 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1115 let result =
1116 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1117 assert!(result.is_ok());
1118 assert_eq!(
1119 PgTransactionStatus::InTransaction,
1120 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1121 );
1122 Ok(())
1123 });
1124 assert_eq!(
1125 None,
1126 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1127 conn
1128 ).transaction_depth().expect("Transaction depth")
1129 );
1130 assert_eq!(
1131 PgTransactionStatus::Idle,
1132 conn.connection_and_transaction_manager
1133 .raw_connection
1134 .transaction_status()
1135 );
1136 assert!(result.is_err());
1137 }
1138
1139 #[diesel_test_helper::test]
1140 fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1141 use crate::connection::{AnsiTransactionManager, TransactionManager};
1142 use crate::pg::connection::raw::PgTransactionStatus;
1143 use crate::result::Error;
1144 use crate::*;
1145
1146 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1147 assert_eq!(
1148 None,
1149 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1150 conn
1151 ).transaction_depth().expect("Transaction depth")
1152 );
1153 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1154 assert_eq!(
1155 NonZeroU32::new(1),
1156 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1157 conn
1158 ).transaction_depth().expect("Transaction depth")
1159 );
1160 sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1161 sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1162 .execute(conn)?;
1163 sql_query(
1164 r#"
1165 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1166 RETURNS trigger
1167 LANGUAGE plpgsql
1168 AS $$
1169 DECLARE
1170 BEGIN
1171 IF NEW.value = 42 THEN
1172 RAISE EXCEPTION 'Transaction kaboom';
1173 END IF;
1174 RETURN NEW;
1175
1176 END;$$;
1177 "#,
1178 )
1179 .execute(conn)?;
1180
1181 sql_query(
1182 r#"
1183 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1184 AFTER INSERT ON "deferred_trigger_commit"
1185 DEFERRABLE INITIALLY DEFERRED
1186 FOR EACH ROW
1187 EXECUTE PROCEDURE transaction_depth_blow_up()
1188 "#,
1189 )
1190 .execute(conn)?;
1191 let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1192 assert!(result.is_ok());
1193 assert_eq!(
1194 PgTransactionStatus::InTransaction,
1195 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1196 );
1197 Ok(())
1198 });
1199 assert_eq!(
1200 None,
1201 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1202 conn
1203 ).transaction_depth().expect("Transaction depth")
1204 );
1205 assert_eq!(
1206 PgTransactionStatus::Idle,
1207 conn.connection_and_transaction_manager
1208 .raw_connection
1209 .transaction_status()
1210 );
1211 assert!(result.is_err());
1212 }
1213
1214 #[diesel_test_helper::test]
1215 fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1216 use crate::connection::{AnsiTransactionManager, TransactionManager};
1217 use crate::pg::connection::raw::PgTransactionStatus;
1218 use crate::result::Error;
1219 use crate::*;
1220
1221 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1222 assert_eq!(
1223 None,
1224 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1225 conn
1226 ).transaction_depth().expect("Transaction depth")
1227 );
1228 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1229 assert_eq!(
1230 NonZeroU32::new(1),
1231 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1232 conn
1233 ).transaction_depth().expect("Transaction depth")
1234 );
1235 sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1236 sql_query(
1237 "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1238 )
1239 .execute(conn)?;
1240 sql_query(
1241 r#"
1242 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1243 RETURNS trigger
1244 LANGUAGE plpgsql
1245 AS $$
1246 DECLARE
1247 BEGIN
1248 IF NEW.value = 42 THEN
1249 RAISE EXCEPTION 'Transaction kaboom';
1250 END IF;
1251 RETURN NEW;
1252
1253 END;$$;
1254 "#,
1255 )
1256 .execute(conn)?;
1257
1258 sql_query(
1259 r#"
1260 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1261 AFTER INSERT ON "deferred_trigger_nested_commit"
1262 DEFERRABLE INITIALLY DEFERRED
1263 FOR EACH ROW
1264 EXECUTE PROCEDURE transaction_depth_blow_up()
1265 "#,
1266 )
1267 .execute(conn)?;
1268 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1269 let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1270 .execute(conn);
1271 assert!(result.is_ok());
1272 Ok(())
1273 });
1274 assert!(inner_result.is_err());
1275 assert_eq!(
1276 PgTransactionStatus::InTransaction,
1277 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1278 );
1279 Ok(())
1280 });
1281 assert_eq!(
1282 None,
1283 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1284 conn
1285 ).transaction_depth().expect("Transaction depth")
1286 );
1287 assert_eq!(
1288 PgTransactionStatus::Idle,
1289 conn.connection_and_transaction_manager
1290 .raw_connection
1291 .transaction_status()
1292 );
1293 assert!(result.is_ok(), "Expected success, got {:?}", result);
1294 }
1295
1296 #[diesel_test_helper::test]
1297 fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1298 use crate::connection::{AnsiTransactionManager, TransactionManager};
1299 use crate::pg::connection::raw::PgTransactionStatus;
1300 use crate::result::Error;
1301 use crate::*;
1302
1303 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1304 assert_eq!(
1305 None,
1306 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1307 conn
1308 ).transaction_depth().expect("Transaction depth")
1309 );
1310 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1311 assert_eq!(
1312 NonZeroU32::new(1),
1313 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1314 conn
1315 ).transaction_depth().expect("Transaction depth")
1316 );
1317 sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1318 sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1319 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1320 assert_eq!(
1321 NonZeroU32::new(2),
1322 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1323 conn
1324 ).transaction_depth().expect("Transaction depth")
1325 );
1326 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1327 let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1328 assert!(result.is_ok());
1329 Ok(())
1330 });
1331 assert!(inner_result.is_err());
1332 assert_eq!(
1333 PgTransactionStatus::InTransaction,
1334 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1335 );
1336 assert_eq!(
1337 NonZeroU32::new(1),
1338 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1339 conn
1340 ).transaction_depth().expect("Transaction depth")
1341 );
1342 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1343 });
1344 assert_eq!(
1345 None,
1346 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1347 conn
1348 ).transaction_depth().expect("Transaction depth")
1349 );
1350 assert_eq!(
1351 PgTransactionStatus::Idle,
1352 conn.connection_and_transaction_manager
1353 .raw_connection
1354 .transaction_status()
1355 );
1356 assert!(result.is_ok());
1357 }
1358}