1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::{ConnectionAndTransactionManager, CopyFromWrapper, QueryFragmentHelper};
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::connection::instrumentation::{DynInstrumentation, Instrumentation, StrQueryHelper};
14use crate::connection::statement_cache::{MaybeCached, StatementCache};
15use crate::connection::*;
16use crate::expression::QueryMetadata;
17use crate::pg::backend::PgNotification;
18use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
19use crate::pg::query_builder::copy::InternalCopyFromQuery;
20use crate::pg::{Pg, TransactionBuilder};
21use crate::query_builder::bind_collector::RawBytesBindCollector;
22use crate::query_builder::*;
23use crate::result::ConnectionError::CouldntSetupConfiguration;
24use crate::result::*;
25use crate::RunQueryDsl;
26use std::ffi::CString;
27use std::fmt::Debug;
28use std::os::raw as libc;
29
30use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
31
32pub(super) use self::result::PgResult;
33
34#[allow(missing_debug_implementations)]
124#[cfg(feature = "postgres")]
125pub struct PgConnection {
126 statement_cache: StatementCache<Pg, Statement>,
127 metadata_cache: PgMetadataCache,
128 connection_and_transaction_manager: ConnectionAndTransactionManager,
129}
130
131#[allow(unsafe_code)]
133unsafe impl Send for PgConnection {}
134
135impl SimpleConnection for PgConnection {
136 #[allow(unsafe_code)] fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
138 self.connection_and_transaction_manager
139 .instrumentation
140 .on_connection_event(InstrumentationEvent::StartQuery {
141 query: &StrQueryHelper::new(query),
142 });
143 let c_query = CString::new(query)?;
144 let inner_result = unsafe {
145 self.connection_and_transaction_manager
146 .raw_connection
147 .exec(c_query.as_ptr())
148 };
149 update_transaction_manager_status(
150 inner_result.and_then(|raw_result| {
151 PgResult::new(
152 raw_result,
153 &self.connection_and_transaction_manager.raw_connection,
154 )
155 }),
156 &mut self.connection_and_transaction_manager,
157 &|callback| callback(&StrQueryHelper::new(query)),
158 true,
159 )?;
160 Ok(())
161 }
162}
163
164#[derive(Debug, Copy, Clone)]
168pub struct PgRowByRowLoadingMode;
169
170impl ConnectionSealed for PgConnection {}
171
172impl Connection for PgConnection {
173 type Backend = Pg;
174 type TransactionManager = AnsiTransactionManager;
175
176 fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
177 let mut instrumentation = DynInstrumentation::default_instrumentation();
178 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
179 url: database_url,
180 });
181 let r = RawConnection::establish(database_url).and_then(|raw_conn| {
182 let mut conn = PgConnection {
183 connection_and_transaction_manager: ConnectionAndTransactionManager {
184 raw_connection: raw_conn,
185 transaction_state: AnsiTransactionManager::default(),
186 instrumentation: DynInstrumentation::none(),
187 },
188 statement_cache: StatementCache::new(),
189 metadata_cache: PgMetadataCache::new(),
190 };
191 conn.set_config_options()
192 .map_err(CouldntSetupConfiguration)?;
193 Ok(conn)
194 });
195 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
196 url: database_url,
197 error: r.as_ref().err(),
198 });
199 let mut conn = r?;
200 conn.connection_and_transaction_manager.instrumentation = instrumentation;
201 Ok(conn)
202 }
203
204 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
205 where
206 T: QueryFragment<Pg> + QueryId,
207 {
208 update_transaction_manager_status(
209 self.with_prepared_query(
210 Box::new(source),
211 true,
212 &mut |query, params, conn, _source| {
213 let res = query
214 .execute(&mut conn.raw_connection, ¶ms, false)
215 .map(|r| r.rows_affected());
216 while conn.raw_connection.get_next_result()?.is_some() {}
219 res
220 },
221 ),
222 &mut self.connection_and_transaction_manager,
223 &|callback| source.instrumentation(callback),
224 true,
225 )
226 }
227
228 fn transaction_state(&mut self) -> &mut AnsiTransactionManager
229 where
230 Self: Sized,
231 {
232 &mut self.connection_and_transaction_manager.transaction_state
233 }
234
235 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
236 &mut *self.connection_and_transaction_manager.instrumentation
237 }
238
239 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
240 self.connection_and_transaction_manager.instrumentation = instrumentation.into();
241 }
242
243 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
244 self.statement_cache.set_cache_size(size);
245 }
246}
247
248impl<B> LoadConnection<B> for PgConnection
249where
250 Self: self::private::PgLoadingMode<B>,
251{
252 type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
253 type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
254
255 fn load<'conn, 'query, T>(
256 &'conn mut self,
257 source: T,
258 ) -> QueryResult<Self::Cursor<'conn, 'query>>
259 where
260 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
261 Self::Backend: QueryMetadata<T::SqlType>,
262 {
263 self.with_prepared_query(
264 Box::new(source),
265 false,
266 &mut |stmt, params, conn, source| {
267 use self::private::PgLoadingMode;
268 let result = inner_load(stmt, params, conn, &*source, Self::USE_ROW_BY_ROW_MODE)?;
269 Self::get_cursor(conn, result, source)
270 },
271 )
272 }
273}
274
275fn inner_load(
276 stmt: MaybeCached<'_, Statement>,
277 params: Vec<Option<Vec<u8>>>,
278 conn: &mut ConnectionAndTransactionManager,
279 source: &dyn QueryFragmentHelper<crate::result::Error>,
280 row_by_row: bool,
281) -> Result<PgResult, Error> {
282 let result = stmt.execute(&mut conn.raw_connection, ¶ms, row_by_row);
283 update_transaction_manager_status(
284 result,
285 conn,
286 &|callback| source.instrumentation(callback),
287 false,
288 )
289}
290
291impl GetPgMetadataCache for PgConnection {
292 fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
293 &mut self.metadata_cache
294 }
295}
296
297#[inline(always)]
298fn update_transaction_manager_status<T>(
299 query_result: QueryResult<T>,
300 conn: &mut ConnectionAndTransactionManager,
301 instrumentation_callback: &dyn Fn(
302 &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
303 ),
304 final_call: bool,
305) -> QueryResult<T> {
306 fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
308 let raw_conn: &mut RawConnection = &mut conn.raw_connection;
309 let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
310 match raw_conn.transaction_status() {
314 PgTransactionStatus::InError => {
315 tm.status.set_requires_rollback_maybe_up_to_top_level(true)
316 }
317 PgTransactionStatus::Unknown => tm.status.set_in_error(),
318 PgTransactionStatus::Idle => {
319 tm.status = TransactionManagerStatus::Valid(Default::default())
324 }
325 PgTransactionStatus::InTransaction => {
326 let transaction_status = &mut tm.status;
327 if is_err {
330 if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
333 {
334 transaction_status.set_in_error()
336 }
337 } else {
338 tm.status.set_requires_rollback_maybe_up_to_top_level(false)
346 }
347 }
348 PgTransactionStatus::Active => {
349 }
351 }
352 }
353
354 fn non_generic_instrumentation(
355 query_result: Result<(), &Error>,
356 conn: &mut ConnectionAndTransactionManager,
357 instrumentation_callback: &dyn Fn(
358 &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
359 ),
360 final_call: bool,
361 ) {
362 if let Err(e) = query_result {
363 instrumentation_callback(&mut |query| {
364 conn.instrumentation
365 .on_connection_event(InstrumentationEvent::FinishQuery {
366 query,
367 error: Some(e),
368 })
369 });
370 } else if final_call {
371 instrumentation_callback(&mut |query| {
372 conn.instrumentation
373 .on_connection_event(InstrumentationEvent::FinishQuery { query, error: None });
374 });
375 }
376 }
377
378 non_generic_inner(conn, query_result.is_err());
379 non_generic_instrumentation(
380 query_result.as_ref().map(|_| ()),
381 conn,
382 instrumentation_callback,
383 final_call,
384 );
385 query_result
386}
387
388#[cfg(feature = "r2d2")]
389impl crate::r2d2::R2D2Connection for PgConnection {
390 fn ping(&mut self) -> QueryResult<()> {
391 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
392 }
393
394 fn is_broken(&mut self) -> bool {
395 AnsiTransactionManager::is_broken_transaction_manager(self)
396 }
397}
398
399impl MultiConnectionHelper for PgConnection {
400 fn to_any<'a>(
401 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
402 ) -> &mut (dyn std::any::Any + 'a) {
403 lookup.as_any()
404 }
405
406 fn from_any(
407 lookup: &mut dyn std::any::Any,
408 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
409 lookup
410 .downcast_mut::<Self>()
411 .map(|conn| conn as &mut dyn super::PgMetadataLookup)
412 }
413}
414
415impl PgConnection {
416 pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
440 TransactionBuilder::new(self)
441 }
442
443 pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
444 where
445 S: CopyFromExpression<T>,
446 {
447 let query = CopyFromWrapper(std::cell::RefCell::new(InternalCopyFromQuery::new(target)));
448 let res =
449 self.with_prepared_query(Box::new(query), false, &mut |stmt, binds, conn, source| {
450 fn inner_copy_in<S, T>(
451 stmt: MaybeCached<'_, Statement>,
452 conn: &mut ConnectionAndTransactionManager,
453 binds: Vec<Option<Vec<u8>>>,
454 source: &dyn QueryFragmentHelper<S::Error>,
455 ) -> Result<usize, S::Error>
456 where
457 S: CopyFromExpression<T>,
458 {
459 let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
460 let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
461 let r = source.write_copy_from(&mut copy_in);
462 copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
463 let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
464 crate::result::Error::DeserializationError(
465 "Failed to receive result from the database".into(),
466 )
467 })?;
468 let rows = next_res.rows_affected();
469 while let Some(_r) = conn.raw_connection.get_next_result()? {}
470 r?;
471 Ok(rows)
472 }
473
474 let rows = inner_copy_in::<S, T>(stmt, conn, binds, &*source);
475 if let Err(ref e) = rows {
476 let database_error = crate::result::Error::DatabaseError(
477 crate::result::DatabaseErrorKind::Unknown,
478 Box::new(e.to_string()),
479 );
480 source.instrumentation(&mut |query| {
481 conn.instrumentation.on_connection_event(
482 InstrumentationEvent::FinishQuery {
483 query,
484 error: Some(&database_error),
485 },
486 );
487 });
488 } else {
489 source.instrumentation(&mut |query| {
490 conn.instrumentation.on_connection_event(
491 InstrumentationEvent::FinishQuery { query, error: None },
492 );
493 });
494 }
495
496 rows
497 })?;
498
499 Ok(res)
500 }
501
502 pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
503 where
504 T: CopyTarget,
505 {
506 let res = self.with_prepared_query::<_, Error>(
507 Box::new(command),
508 false,
509 &mut |stmt, binds, conn, source| {
510 let res = stmt.execute(&mut conn.raw_connection, &binds, false);
511 source.instrumentation(&mut |query| {
512 conn.instrumentation
513 .on_connection_event(InstrumentationEvent::FinishQuery {
514 query,
515 error: res.as_ref().err(),
516 });
517 });
518 Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
519 },
520 )?;
521 Ok(res)
522 }
523
524 fn with_prepared_query<'conn, 'query, R, E>(
525 &'conn mut self,
526 source: Box<dyn QueryFragmentHelper<E> + 'query>,
527 execute_returning_count: bool,
528 f: &mut dyn FnMut(
529 MaybeCached<'_, Statement>,
530 Vec<Option<Vec<u8>>>,
531 &'conn mut ConnectionAndTransactionManager,
532 Box<dyn QueryFragmentHelper<E> + 'query>,
533 ) -> Result<R, E>,
534 ) -> Result<R, E>
535 where
536 E: From<crate::result::Error>,
537 {
538 fn prepare_query_non_generic_inner<'a, E>(
539 connection_and_transaction_manager: &mut ConnectionAndTransactionManager,
540 cache: &'a mut StatementCache<Pg, Statement>,
541 source: &dyn QueryFragmentHelper<E>,
542 execute_returning_count: bool,
543 bind_collector: RawBytesBindCollector<Pg>,
544 ) -> QueryResult<(
545 Vec<Option<Vec<u8>>>,
546 Result<MaybeCached<'a, Statement>, Error>,
547 )> {
548 let binds = bind_collector.binds;
549 let metadata = bind_collector.metadata;
550 let query = cache.cached_statement_non_generic(
551 source.query_id(),
552 source,
553 &Pg,
554 &metadata,
555 &mut connection_and_transaction_manager.raw_connection,
556 Statement::prepare,
557 &mut *connection_and_transaction_manager.instrumentation,
558 );
559 if !execute_returning_count {
560 if let Err(ref e) = query {
561 source.instrumentation(&mut |query| {
562 connection_and_transaction_manager
563 .instrumentation
564 .on_connection_event(InstrumentationEvent::FinishQuery {
565 query,
566 error: Some(e),
567 });
568 });
569 }
570 }
571 Ok((binds, query))
572 }
573
574 let bind_collector = self.collect_binds(&*source)?;
575 let (binds, query) = prepare_query_non_generic_inner(
576 &mut self.connection_and_transaction_manager,
577 &mut self.statement_cache,
578 &*source,
579 execute_returning_count,
580 bind_collector,
581 )?;
582
583 f(
584 query?,
585 binds,
586 &mut self.connection_and_transaction_manager,
587 source,
588 )
589 }
590
591 fn collect_binds<E>(
592 &mut self,
593 source: &dyn QueryFragmentHelper<E>,
594 ) -> Result<RawBytesBindCollector<Pg>, crate::result::Error> {
595 source.instrumentation(&mut |query| {
596 self.connection_and_transaction_manager
597 .instrumentation
598 .on_connection_event(InstrumentationEvent::StartQuery { query });
599 });
600 let mut bind_collector = RawBytesBindCollector::<Pg>::new();
601 source.collect_binds(&mut bind_collector, self)?;
602 Ok(bind_collector)
603 }
604
605 fn set_config_options(&mut self) -> QueryResult<()> {
606 crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
607 crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
608 self.connection_and_transaction_manager
609 .raw_connection
610 .set_notice_processor(noop_notice_processor);
611 Ok(())
612 }
613
614 pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
655 let conn = &self.connection_and_transaction_manager.raw_connection;
656 std::iter::from_fn(move || conn.pq_notifies().transpose())
657 }
658}
659
660extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
661
662mod private {
663 use super::*;
664
665 #[allow(missing_debug_implementations)]
666 pub struct ConnectionAndTransactionManager {
667 pub(super) raw_connection: RawConnection,
668 pub(super) transaction_state: AnsiTransactionManager,
669 pub(super) instrumentation: DynInstrumentation,
670 }
671
672 pub trait PgLoadingMode<B> {
673 const USE_ROW_BY_ROW_MODE: bool;
674 type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
675 type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
676
677 fn get_cursor<'conn, 'query>(
678 raw_connection: &'conn mut ConnectionAndTransactionManager,
679 result: PgResult,
680 source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
681 ) -> QueryResult<Self::Cursor<'conn, 'query>>;
682 }
683
684 impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
685 const USE_ROW_BY_ROW_MODE: bool = false;
686 type Cursor<'conn, 'query> = Cursor;
687 type Row<'conn, 'query> = self::row::PgRow;
688
689 fn get_cursor<'conn, 'query>(
690 conn: &'conn mut ConnectionAndTransactionManager,
691 result: PgResult,
692 source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
693 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
694 update_transaction_manager_status(
695 Cursor::new(result, &mut conn.raw_connection),
696 conn,
697 &|callback| source.instrumentation(callback),
698 true,
699 )
700 }
701 }
702
703 impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
704 const USE_ROW_BY_ROW_MODE: bool = true;
705 type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
706 type Row<'conn, 'query> = self::row::PgRow;
707
708 fn get_cursor<'conn, 'query>(
709 raw_connection: &'conn mut ConnectionAndTransactionManager,
710 result: PgResult,
711 source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
712 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
713 Ok(RowByRowCursor::new(result, raw_connection, source))
714 }
715 }
716
717 pub trait QueryFragmentHelper<E>:
720 crate::connection::statement_cache::QueryFragmentForCachedStatement<crate::pg::Pg>
721 {
722 fn query_id(&self) -> Option<std::any::TypeId>;
723
724 fn instrumentation(
725 &self,
726 callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
727 );
728
729 fn collect_binds(
730 &self,
731 bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
732 conn: &mut PgConnection,
733 ) -> QueryResult<()>;
734
735 fn write_copy_from(&self, _sink: &mut CopyFromSink<'_>) -> Result<(), E> {
736 Ok(())
737 }
738 }
739
740 impl<T> QueryFragmentHelper<diesel::result::Error> for T
742 where
743 T: QueryFragment<crate::pg::Pg> + QueryId,
744 {
745 fn query_id(&self) -> Option<std::any::TypeId> {
746 <T as QueryId>::query_id()
747 }
748
749 fn instrumentation(
750 &self,
751 callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
752 ) {
753 callback(&crate::debug_query(self))
754 }
755
756 fn collect_binds(
757 &self,
758 bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
759 conn: &mut PgConnection,
760 ) -> QueryResult<()> {
761 <Self as QueryFragment<diesel::pg::Pg>>::collect_binds(
762 self,
763 bind_collector,
764 conn,
765 &crate::pg::Pg,
766 )
767 }
768 }
769
770 pub(super) struct CopyFromWrapper<S, T>(
773 pub(super) std::cell::RefCell<InternalCopyFromQuery<S, T>>,
774 );
775
776 impl<S, T> crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>
777 for CopyFromWrapper<S, T>
778 where
779 InternalCopyFromQuery<S, T>:
780 crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
781 {
782 fn construct_sql(&self, backend: &Pg) -> QueryResult<String> {
783 self.0.borrow().construct_sql(backend)
784 }
785
786 fn is_safe_to_cache_prepared(&self, backend: &Pg) -> QueryResult<bool> {
787 self.0.borrow().is_safe_to_cache_prepared(backend)
788 }
789 }
790
791 impl<S, T> QueryFragmentHelper<S::Error> for CopyFromWrapper<S, T>
792 where
793 S: CopyFromExpression<T>,
794 InternalCopyFromQuery<S, T>: QueryFragmentHelper<crate::result::Error>,
795 Self: crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
796 {
797 fn query_id(&self) -> Option<std::any::TypeId> {
798 self.0.borrow().query_id()
799 }
800
801 fn instrumentation(
802 &self,
803 callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
804 ) {
805 callback(&crate::debug_query(&*self.0.borrow()))
806 }
807
808 fn collect_binds(
809 &self,
810 bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
811 conn: &mut PgConnection,
812 ) -> QueryResult<()> {
813 <InternalCopyFromQuery<S, T> as QueryFragmentHelper<crate::result::Error>>::collect_binds(
814 &*self.0.borrow(),
815 bind_collector,
816 conn,
817 )
818 }
819
820 fn write_copy_from(&self, sink: &mut CopyFromSink<'_>) -> Result<(), S::Error> {
821 self.0.borrow_mut().target.callback(sink)
822 }
823 }
824}
825
826#[cfg(test)]
827#[allow(clippy::uninlined_format_args)]
829mod tests {
830 extern crate dotenvy;
831
832 use super::*;
833 use crate::prelude::*;
834 use crate::result::Error::DatabaseError;
835 use std::num::NonZeroU32;
836
837 fn connection() -> PgConnection {
838 crate::test_helpers::pg_connection_no_transaction()
839 }
840
841 #[diesel_test_helper::test]
842 fn notifications_arrive() {
843 use crate::sql_query;
844
845 let conn = &mut connection();
846 sql_query("LISTEN test_notifications")
847 .execute(conn)
848 .unwrap();
849 sql_query("NOTIFY test_notifications, 'first'")
850 .execute(conn)
851 .unwrap();
852 sql_query("NOTIFY test_notifications, 'second'")
853 .execute(conn)
854 .unwrap();
855
856 let notifications = conn
857 .notifications_iter()
858 .map(Result::unwrap)
859 .collect::<Vec<_>>();
860
861 assert_eq!(2, notifications.len());
862 assert_eq!(notifications[0].channel, "test_notifications");
863 assert_eq!(notifications[1].channel, "test_notifications");
864 assert_eq!(notifications[0].payload, "first");
865 assert_eq!(notifications[1].payload, "second");
866
867 let next_notification = conn.notifications_iter().next();
868 assert!(
869 next_notification.is_none(),
870 "Got a next notification, while not expecting one: {next_notification:?}"
871 );
872
873 sql_query("NOTIFY test_notifications")
874 .execute(conn)
875 .unwrap();
876 assert_eq!(
877 conn.notifications_iter().next().unwrap().unwrap().payload,
878 ""
879 );
880 }
881
882 #[diesel_test_helper::test]
883 fn malformed_sql_query() {
884 let connection = &mut connection();
885 let query =
886 crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
887
888 if let Err(DatabaseError(_, string)) = query {
889 assert_eq!(Some(26), string.statement_position());
890 } else {
891 unreachable!();
892 }
893 }
894
895 table! {
896 users {
897 id -> Integer,
898 name -> Text,
899 }
900 }
901
902 #[diesel_test_helper::test]
903 fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
904 use crate::connection::{AnsiTransactionManager, TransactionManager};
905 use crate::result::Error;
906 use crate::PgConnection;
907
908 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
909 assert_eq!(
910 None,
911 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
912 conn
913 ).transaction_depth().expect("Transaction depth")
914 );
915 let result = AnsiTransactionManager::commit_transaction(conn);
916 assert!(matches!(result, Err(Error::NotInTransaction)))
917 }
918
919 #[diesel_test_helper::test]
920 fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
921 use crate::connection::{AnsiTransactionManager, TransactionManager};
922 use crate::result::Error;
923 use crate::PgConnection;
924
925 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
926 assert_eq!(
927 None,
928 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
929 conn
930 ).transaction_depth().expect("Transaction depth")
931 );
932 let result = AnsiTransactionManager::rollback_transaction(conn);
933 assert!(matches!(result, Err(Error::NotInTransaction)))
934 }
935
936 #[diesel_test_helper::test]
937 fn postgres_transaction_is_rolled_back_upon_syntax_error() {
938 use std::num::NonZeroU32;
939
940 use crate::connection::{AnsiTransactionManager, TransactionManager};
941 use crate::pg::connection::raw::PgTransactionStatus;
942 use crate::*;
943 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
944 assert_eq!(
945 None,
946 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
947 conn
948 ).transaction_depth().expect("Transaction depth")
949 );
950 let _result = conn.build_transaction().run(|conn| {
951 assert_eq!(
952 NonZeroU32::new(1),
953 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
954 conn
955 ).transaction_depth().expect("Transaction depth")
956 );
957 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
959 assert!(query_result.is_err());
960 assert_eq!(
961 PgTransactionStatus::InError,
962 conn.connection_and_transaction_manager.raw_connection.transaction_status()
963 );
964 query_result
965 });
966 assert_eq!(
967 None,
968 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
969 conn
970 ).transaction_depth().expect("Transaction depth")
971 );
972 assert_eq!(
973 PgTransactionStatus::Idle,
974 conn.connection_and_transaction_manager
975 .raw_connection
976 .transaction_status()
977 );
978 }
979
980 #[diesel_test_helper::test]
981 fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
982 use std::num::NonZeroU32;
983
984 use crate::connection::{AnsiTransactionManager, TransactionManager};
985 use crate::pg::connection::raw::PgTransactionStatus;
986 use crate::*;
987 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
988 assert_eq!(
989 None,
990 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
991 conn
992 ).transaction_depth().expect("Transaction depth")
993 );
994 let result = conn.build_transaction().run(|conn| {
995 assert_eq!(
996 NonZeroU32::new(1),
997 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
998 conn
999 ).transaction_depth().expect("Transaction depth")
1000 );
1001 let result = conn.build_transaction().run(|conn| {
1002 assert_eq!(
1003 NonZeroU32::new(2),
1004 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1005 conn
1006 ).transaction_depth().expect("Transaction depth")
1007 );
1008 sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
1009 });
1010 assert!(result.is_err());
1011 assert_eq!(
1012 NonZeroU32::new(1),
1013 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1014 conn
1015 ).transaction_depth().expect("Transaction depth")
1016 );
1017 let query_result = sql_query("SELECT 1").execute(conn);
1018 assert!(query_result.is_ok());
1019 assert_eq!(
1020 PgTransactionStatus::InTransaction,
1021 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1022 );
1023 query_result
1024 });
1025 assert!(result.is_ok());
1026 assert_eq!(
1027 PgTransactionStatus::Idle,
1028 conn.connection_and_transaction_manager
1029 .raw_connection
1030 .transaction_status()
1031 );
1032 assert_eq!(
1033 None,
1034 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1035 conn
1036 ).transaction_depth().expect("Transaction depth")
1037 );
1038 }
1039
1040 #[diesel_test_helper::test]
1041 #[allow(clippy::needless_collect)]
1044 fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
1045 use crate::pg::connection::raw::PgTransactionStatus;
1046 use crate::result::DatabaseErrorKind::SerializationFailure;
1047 use crate::result::Error::DatabaseError;
1048 use crate::*;
1049 use std::sync::{Arc, Barrier};
1050 use std::thread;
1051
1052 table! {
1053 #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
1054 serialization_example {
1055 id -> Serial,
1056 class -> Integer,
1057 }
1058 }
1059
1060 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1061
1062 sql_query(
1063 "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
1064 )
1065 .execute(conn)
1066 .unwrap();
1067 sql_query(
1068 r#"
1069 CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
1070 id SERIAL PRIMARY KEY,
1071 class INTEGER NOT NULL
1072 )
1073 "#,
1074 )
1075 .execute(conn)
1076 .unwrap();
1077
1078 insert_into(serialization_example::table)
1079 .values(&vec![
1080 serialization_example::class.eq(1),
1081 serialization_example::class.eq(2),
1082 ])
1083 .execute(conn)
1084 .unwrap();
1085
1086 let before_barrier = Arc::new(Barrier::new(2));
1087 let after_barrier = Arc::new(Barrier::new(2));
1088 let threads = (1..3)
1089 .map(|i| {
1090 let before_barrier = before_barrier.clone();
1091 let after_barrier = after_barrier.clone();
1092 thread::spawn(move || {
1093 use crate::connection::AnsiTransactionManager;
1094 use crate::connection::TransactionManager;
1095 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1096 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1097
1098 let result = conn.build_transaction().serializable().run(|conn| {
1099 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1100
1101 let _ = serialization_example::table
1102 .filter(serialization_example::class.eq(i))
1103 .count()
1104 .execute(conn)?;
1105
1106 let other_i = if i == 1 { 2 } else { 1 };
1107 let q = insert_into(serialization_example::table)
1108 .values(serialization_example::class.eq(other_i));
1109 before_barrier.wait();
1110
1111 let r = q.execute(conn);
1112 after_barrier.wait();
1113 r
1114 });
1115 assert_eq!(
1116 PgTransactionStatus::Idle,
1117 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1118 );
1119
1120 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1121 result
1122 })
1123 })
1124 .collect::<Vec<_>>();
1125
1126 let mut results = threads
1127 .into_iter()
1128 .map(|t| t.join().unwrap())
1129 .collect::<Vec<_>>();
1130
1131 results.sort_by_key(|r| r.is_err());
1132
1133 assert!(results[0].is_ok(), "Got {:?} instead", results);
1134 assert!(
1135 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1136 "Got {:?} instead",
1137 results
1138 );
1139 assert_eq!(
1140 PgTransactionStatus::Idle,
1141 conn.connection_and_transaction_manager
1142 .raw_connection
1143 .transaction_status()
1144 );
1145 }
1146
1147 #[diesel_test_helper::test]
1148 #[allow(clippy::needless_collect)]
1151 fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
1152 use crate::pg::connection::raw::PgTransactionStatus;
1153 use crate::result::DatabaseErrorKind::SerializationFailure;
1154 use crate::result::Error::DatabaseError;
1155 use crate::*;
1156 use std::sync::{Arc, Barrier};
1157 use std::thread;
1158
1159 table! {
1160 #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
1161 serialization_example {
1162 id -> Serial,
1163 class -> Integer,
1164 }
1165 }
1166
1167 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1168
1169 sql_query(
1170 "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1171 )
1172 .execute(conn)
1173 .unwrap();
1174 sql_query(
1175 r#"
1176 CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1177 id SERIAL PRIMARY KEY,
1178 class INTEGER NOT NULL
1179 )
1180 "#,
1181 )
1182 .execute(conn)
1183 .unwrap();
1184
1185 insert_into(serialization_example::table)
1186 .values(&vec![
1187 serialization_example::class.eq(1),
1188 serialization_example::class.eq(2),
1189 ])
1190 .execute(conn)
1191 .unwrap();
1192
1193 let before_barrier = Arc::new(Barrier::new(2));
1194 let after_barrier = Arc::new(Barrier::new(2));
1195 let threads = (1..3)
1196 .map(|i| {
1197 let before_barrier = before_barrier.clone();
1198 let after_barrier = after_barrier.clone();
1199 thread::spawn(move || {
1200 use crate::connection::AnsiTransactionManager;
1201 use crate::connection::TransactionManager;
1202 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1203 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1204
1205 let result = conn.build_transaction().serializable().run(|conn| {
1206 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1207 let r = conn.transaction(|conn| {
1208 assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1209
1210 let _ = serialization_example::table
1211 .filter(serialization_example::class.eq(i))
1212 .count()
1213 .execute(conn)?;
1214
1215 let other_i = if i == 1 { 2 } else { 1 };
1216 let q = insert_into(serialization_example::table)
1217 .values(serialization_example::class.eq(other_i));
1218 before_barrier.wait();
1219
1220 let r = q.execute(conn);
1221 after_barrier.wait();
1222 r
1223 });
1224 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1225 assert_eq!(
1226 PgTransactionStatus::InTransaction,
1227 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1228 );
1229 r
1230 });
1231 assert_eq!(
1232 PgTransactionStatus::Idle,
1233 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1234 );
1235
1236 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1237 result
1238 })
1239 })
1240 .collect::<Vec<_>>();
1241
1242 let mut results = threads
1243 .into_iter()
1244 .map(|t| t.join().unwrap())
1245 .collect::<Vec<_>>();
1246
1247 results.sort_by_key(|r| r.is_err());
1248
1249 assert!(results[0].is_ok(), "Got {:?} instead", results);
1250 assert!(
1251 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1252 "Got {:?} instead",
1253 results
1254 );
1255 assert_eq!(
1256 PgTransactionStatus::Idle,
1257 conn.connection_and_transaction_manager
1258 .raw_connection
1259 .transaction_status()
1260 );
1261 }
1262
1263 #[diesel_test_helper::test]
1264 fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1265 use crate::connection::{AnsiTransactionManager, TransactionManager};
1266 use crate::pg::connection::raw::PgTransactionStatus;
1267 use crate::result::Error;
1268 use crate::*;
1269
1270 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1271 assert_eq!(
1272 None,
1273 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1274 conn
1275 ).transaction_depth().expect("Transaction depth")
1276 );
1277 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1278 assert_eq!(
1279 NonZeroU32::new(1),
1280 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1281 conn
1282 ).transaction_depth().expect("Transaction depth")
1283 );
1284 sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1285 sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1286 .execute(conn)?;
1287 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1288 let result =
1289 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1290 assert!(result.is_ok());
1291 assert_eq!(
1292 PgTransactionStatus::InTransaction,
1293 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1294 );
1295 Ok(())
1296 });
1297 assert_eq!(
1298 None,
1299 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1300 conn
1301 ).transaction_depth().expect("Transaction depth")
1302 );
1303 assert_eq!(
1304 PgTransactionStatus::Idle,
1305 conn.connection_and_transaction_manager
1306 .raw_connection
1307 .transaction_status()
1308 );
1309 assert!(result.is_err());
1310 }
1311
1312 #[diesel_test_helper::test]
1313 fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1314 use crate::connection::{AnsiTransactionManager, TransactionManager};
1315 use crate::pg::connection::raw::PgTransactionStatus;
1316 use crate::result::Error;
1317 use crate::*;
1318
1319 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1320 assert_eq!(
1321 None,
1322 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1323 conn
1324 ).transaction_depth().expect("Transaction depth")
1325 );
1326 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1327 assert_eq!(
1328 NonZeroU32::new(1),
1329 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1330 conn
1331 ).transaction_depth().expect("Transaction depth")
1332 );
1333 sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1334 sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1335 .execute(conn)?;
1336 sql_query(
1337 r#"
1338 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1339 RETURNS trigger
1340 LANGUAGE plpgsql
1341 AS $$
1342 DECLARE
1343 BEGIN
1344 IF NEW.value = 42 THEN
1345 RAISE EXCEPTION 'Transaction kaboom';
1346 END IF;
1347 RETURN NEW;
1348
1349 END;$$;
1350 "#,
1351 )
1352 .execute(conn)?;
1353
1354 sql_query(
1355 r#"
1356 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1357 AFTER INSERT ON "deferred_trigger_commit"
1358 DEFERRABLE INITIALLY DEFERRED
1359 FOR EACH ROW
1360 EXECUTE PROCEDURE transaction_depth_blow_up()
1361 "#,
1362 )
1363 .execute(conn)?;
1364 let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1365 assert!(result.is_ok());
1366 assert_eq!(
1367 PgTransactionStatus::InTransaction,
1368 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1369 );
1370 Ok(())
1371 });
1372 assert_eq!(
1373 None,
1374 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1375 conn
1376 ).transaction_depth().expect("Transaction depth")
1377 );
1378 assert_eq!(
1379 PgTransactionStatus::Idle,
1380 conn.connection_and_transaction_manager
1381 .raw_connection
1382 .transaction_status()
1383 );
1384 assert!(result.is_err());
1385 }
1386
1387 #[diesel_test_helper::test]
1388 fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1389 use crate::connection::{AnsiTransactionManager, TransactionManager};
1390 use crate::pg::connection::raw::PgTransactionStatus;
1391 use crate::result::Error;
1392 use crate::*;
1393
1394 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1395 assert_eq!(
1396 None,
1397 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1398 conn
1399 ).transaction_depth().expect("Transaction depth")
1400 );
1401 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1402 assert_eq!(
1403 NonZeroU32::new(1),
1404 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1405 conn
1406 ).transaction_depth().expect("Transaction depth")
1407 );
1408 sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1409 sql_query(
1410 "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1411 )
1412 .execute(conn)?;
1413 sql_query(
1414 r#"
1415 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1416 RETURNS trigger
1417 LANGUAGE plpgsql
1418 AS $$
1419 DECLARE
1420 BEGIN
1421 IF NEW.value = 42 THEN
1422 RAISE EXCEPTION 'Transaction kaboom';
1423 END IF;
1424 RETURN NEW;
1425
1426 END;$$;
1427 "#,
1428 )
1429 .execute(conn)?;
1430
1431 sql_query(
1432 r#"
1433 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1434 AFTER INSERT ON "deferred_trigger_nested_commit"
1435 DEFERRABLE INITIALLY DEFERRED
1436 FOR EACH ROW
1437 EXECUTE PROCEDURE transaction_depth_blow_up()
1438 "#,
1439 )
1440 .execute(conn)?;
1441 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1442 let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1443 .execute(conn);
1444 assert!(result.is_ok());
1445 Ok(())
1446 });
1447 assert!(inner_result.is_err());
1448 assert_eq!(
1449 PgTransactionStatus::InTransaction,
1450 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1451 );
1452 Ok(())
1453 });
1454 assert_eq!(
1455 None,
1456 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1457 conn
1458 ).transaction_depth().expect("Transaction depth")
1459 );
1460 assert_eq!(
1461 PgTransactionStatus::Idle,
1462 conn.connection_and_transaction_manager
1463 .raw_connection
1464 .transaction_status()
1465 );
1466 assert!(result.is_ok(), "Expected success, got {:?}", result);
1467 }
1468
1469 #[diesel_test_helper::test]
1470 fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1471 use crate::connection::{AnsiTransactionManager, TransactionManager};
1472 use crate::pg::connection::raw::PgTransactionStatus;
1473 use crate::result::Error;
1474 use crate::*;
1475
1476 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1477 assert_eq!(
1478 None,
1479 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1480 conn
1481 ).transaction_depth().expect("Transaction depth")
1482 );
1483 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1484 assert_eq!(
1485 NonZeroU32::new(1),
1486 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1487 conn
1488 ).transaction_depth().expect("Transaction depth")
1489 );
1490 sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1491 sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1492 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1493 assert_eq!(
1494 NonZeroU32::new(2),
1495 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1496 conn
1497 ).transaction_depth().expect("Transaction depth")
1498 );
1499 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1500 let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1501 assert!(result.is_ok());
1502 Ok(())
1503 });
1504 assert!(inner_result.is_err());
1505 assert_eq!(
1506 PgTransactionStatus::InTransaction,
1507 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1508 );
1509 assert_eq!(
1510 NonZeroU32::new(1),
1511 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1512 conn
1513 ).transaction_depth().expect("Transaction depth")
1514 );
1515 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1516 });
1517 assert_eq!(
1518 None,
1519 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1520 conn
1521 ).transaction_depth().expect("Transaction depth")
1522 );
1523 assert_eq!(
1524 PgTransactionStatus::Idle,
1525 conn.connection_and_transaction_manager
1526 .raw_connection
1527 .transaction_status()
1528 );
1529 assert!(result.is_ok());
1530 }
1531}