1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::{ConnectionAndTransactionManager, CopyFromWrapper, QueryFragmentHelper};
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::RunQueryDsl;
14use crate::connection::instrumentation::{DynInstrumentation, Instrumentation, StrQueryHelper};
15use crate::connection::statement_cache::{MaybeCached, StatementCache};
16use crate::connection::*;
17use crate::expression::QueryMetadata;
18use crate::pg::backend::PgNotification;
19use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
20use crate::pg::query_builder::copy::InternalCopyFromQuery;
21use crate::pg::{Pg, TransactionBuilder};
22use crate::query_builder::bind_collector::RawBytesBindCollector;
23use crate::query_builder::*;
24use crate::result::ConnectionError::CouldntSetupConfiguration;
25use crate::result::*;
26use alloc::ffi::CString;
27use core::ffi as libc;
28use core::fmt::Debug;
29
30use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
31
32pub(super) use self::result::PgResult;
33
34#[allow(missing_debug_implementations)]
124#[cfg(feature = "postgres")]
125pub struct PgConnection {
126 statement_cache: StatementCache<Pg, Statement>,
127 metadata_cache: PgMetadataCache,
128 connection_and_transaction_manager: ConnectionAndTransactionManager,
129}
130
131#[allow(unsafe_code)]
133unsafe impl Send for PgConnection {}
134
135impl SimpleConnection for PgConnection {
136 #[allow(unsafe_code)] fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
138 self.connection_and_transaction_manager
139 .instrumentation
140 .on_connection_event(InstrumentationEvent::StartQuery {
141 query: &StrQueryHelper::new(query),
142 });
143 let c_query = CString::new(query)?;
144 let inner_result = unsafe {
145 self.connection_and_transaction_manager
146 .raw_connection
147 .exec(c_query.as_ptr())
148 };
149 update_transaction_manager_status(
150 inner_result.and_then(|raw_result| {
151 PgResult::new(
152 raw_result,
153 &self.connection_and_transaction_manager.raw_connection,
154 )
155 }),
156 &mut self.connection_and_transaction_manager,
157 &|callback| callback(&StrQueryHelper::new(query)),
158 true,
159 )?;
160 Ok(())
161 }
162}
163
164#[derive(#[automatically_derived]
impl ::core::fmt::Debug for PgRowByRowLoadingMode {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::write_str(f, "PgRowByRowLoadingMode")
}
}Debug, #[automatically_derived]
impl ::core::marker::Copy for PgRowByRowLoadingMode { }Copy, #[automatically_derived]
impl ::core::clone::Clone for PgRowByRowLoadingMode {
#[inline]
fn clone(&self) -> PgRowByRowLoadingMode { *self }
}Clone)]
168pub struct PgRowByRowLoadingMode;
169
170impl ConnectionSealed for PgConnection {}
171
172impl Connection for PgConnection {
173 type Backend = Pg;
174 type TransactionManager = AnsiTransactionManager;
175
176 fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
177 let mut instrumentation = DynInstrumentation::default_instrumentation();
178 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
179 url: database_url,
180 });
181 let r = RawConnection::establish(database_url).and_then(|raw_conn| {
182 let mut conn = PgConnection {
183 connection_and_transaction_manager: ConnectionAndTransactionManager {
184 raw_connection: raw_conn,
185 transaction_state: AnsiTransactionManager::default(),
186 instrumentation: DynInstrumentation::none(),
187 },
188 statement_cache: StatementCache::new(),
189 metadata_cache: PgMetadataCache::new(),
190 };
191 conn.set_config_options()
192 .map_err(CouldntSetupConfiguration)?;
193 Ok(conn)
194 });
195 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
196 url: database_url,
197 error: r.as_ref().err(),
198 });
199 let mut conn = r?;
200 conn.connection_and_transaction_manager.instrumentation = instrumentation;
201 Ok(conn)
202 }
203
204 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
205 where
206 T: QueryFragment<Pg> + QueryId,
207 {
208 update_transaction_manager_status(
209 self.with_prepared_query(
210 Box::new(source),
211 true,
212 &mut |query, params, conn, _source| {
213 let res = query
214 .execute(&mut conn.raw_connection, ¶ms, false)
215 .map(|r| r.rows_affected());
216 while conn.raw_connection.get_next_result()?.is_some() {}
219 res
220 },
221 ),
222 &mut self.connection_and_transaction_manager,
223 &|callback| source.instrumentation(callback),
224 true,
225 )
226 }
227
228 fn transaction_state(&mut self) -> &mut AnsiTransactionManager
229 where
230 Self: Sized,
231 {
232 &mut self.connection_and_transaction_manager.transaction_state
233 }
234
235 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
236 &mut *self.connection_and_transaction_manager.instrumentation
237 }
238
239 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
240 self.connection_and_transaction_manager.instrumentation = instrumentation.into();
241 }
242
243 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
244 self.statement_cache.set_cache_size(size);
245 }
246}
247
248impl<B> LoadConnection<B> for PgConnection
249where
250 Self: self::private::PgLoadingMode<B>,
251{
252 type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
253 type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
254
255 fn load<'conn, 'query, T>(
256 &'conn mut self,
257 source: T,
258 ) -> QueryResult<Self::Cursor<'conn, 'query>>
259 where
260 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
261 Self::Backend: QueryMetadata<T::SqlType>,
262 {
263 self.with_prepared_query(
264 Box::new(source),
265 false,
266 &mut |stmt, params, conn, source| {
267 use self::private::PgLoadingMode;
268 let result = inner_load(stmt, params, conn, &*source, Self::USE_ROW_BY_ROW_MODE)?;
269 Self::get_cursor(conn, result, source)
270 },
271 )
272 }
273}
274
275fn inner_load(
276 stmt: MaybeCached<'_, Statement>,
277 params: Vec<Option<Vec<u8>>>,
278 conn: &mut ConnectionAndTransactionManager,
279 source: &dyn QueryFragmentHelper<crate::result::Error>,
280 row_by_row: bool,
281) -> Result<PgResult, Error> {
282 let result = stmt.execute(&mut conn.raw_connection, ¶ms, row_by_row);
283 update_transaction_manager_status(
284 result,
285 conn,
286 &|callback| source.instrumentation(callback),
287 false,
288 )
289}
290
291impl GetPgMetadataCache for PgConnection {
292 fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
293 &mut self.metadata_cache
294 }
295}
296
297#[inline(always)]
298fn update_transaction_manager_status<T>(
299 query_result: QueryResult<T>,
300 conn: &mut ConnectionAndTransactionManager,
301 instrumentation_callback: &dyn Fn(
302 &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
303 ),
304 final_call: bool,
305) -> QueryResult<T> {
306 fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
308 let raw_conn: &mut RawConnection = &mut conn.raw_connection;
309 let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
310 match raw_conn.transaction_status() {
314 PgTransactionStatus::InError => {
315 tm.status.set_requires_rollback_maybe_up_to_top_level(true)
316 }
317 PgTransactionStatus::Unknown => tm.status.set_in_error(),
318 PgTransactionStatus::Idle => {
319 tm.status = TransactionManagerStatus::Valid(Default::default())
324 }
325 PgTransactionStatus::InTransaction => {
326 let transaction_status = &mut tm.status;
327 if is_err {
330 if !#[allow(non_exhaustive_omitted_patterns)] match transaction_status {
TransactionManagerStatus::Valid(valid_tm) if
valid_tm.transaction_depth().is_some() => true,
_ => false,
}matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
333 {
334 transaction_status.set_in_error()
336 }
337 } else {
338 tm.status.set_requires_rollback_maybe_up_to_top_level(false)
346 }
347 }
348 PgTransactionStatus::Active => {
349 }
351 }
352 }
353
354 fn non_generic_instrumentation(
355 query_result: Result<(), &Error>,
356 conn: &mut ConnectionAndTransactionManager,
357 instrumentation_callback: &dyn Fn(
358 &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
359 ),
360 final_call: bool,
361 ) {
362 if let Err(e) = query_result {
363 instrumentation_callback(&mut |query| {
364 conn.instrumentation
365 .on_connection_event(InstrumentationEvent::FinishQuery {
366 query,
367 error: Some(e),
368 })
369 });
370 } else if final_call {
371 instrumentation_callback(&mut |query| {
372 conn.instrumentation
373 .on_connection_event(InstrumentationEvent::FinishQuery { query, error: None });
374 });
375 }
376 }
377
378 non_generic_inner(conn, query_result.is_err());
379 non_generic_instrumentation(
380 query_result.as_ref().map(|_| ()),
381 conn,
382 instrumentation_callback,
383 final_call,
384 );
385 query_result
386}
387
388#[cfg(feature = "r2d2")]
389impl crate::r2d2::R2D2Connection for PgConnection {
390 fn ping(&mut self) -> QueryResult<()> {
391 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
392 }
393
394 fn is_broken(&mut self) -> bool {
395 AnsiTransactionManager::is_broken_transaction_manager(self)
396 }
397}
398
399impl MultiConnectionHelper for PgConnection {
400 fn to_any<'a>(
401 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
402 ) -> &mut (dyn core::any::Any + 'a) {
403 lookup.as_any()
404 }
405
406 fn from_any(
407 lookup: &mut dyn core::any::Any,
408 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
409 lookup
410 .downcast_mut::<Self>()
411 .map(|conn| conn as &mut dyn super::PgMetadataLookup)
412 }
413}
414
415impl PgConnection {
416 pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
440 TransactionBuilder::new(self)
441 }
442
443 pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
444 where
445 S: CopyFromExpression<T>,
446 {
447 let query = CopyFromWrapper(core::cell::RefCell::new(InternalCopyFromQuery::new(target)));
448 let res =
449 self.with_prepared_query(Box::new(query), false, &mut |stmt, binds, conn, source| {
450 fn inner_copy_in<S, T>(
451 stmt: MaybeCached<'_, Statement>,
452 conn: &mut ConnectionAndTransactionManager,
453 binds: Vec<Option<Vec<u8>>>,
454 source: &dyn QueryFragmentHelper<S::Error>,
455 ) -> Result<usize, S::Error>
456 where
457 S: CopyFromExpression<T>,
458 {
459 let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
460 let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
461 let r = source.write_copy_from(&mut copy_in);
462 copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
463 let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
464 crate::result::Error::DeserializationError(
465 "Failed to receive result from the database".into(),
466 )
467 })?;
468 let rows = next_res.rows_affected();
469 while let Some(_r) = conn.raw_connection.get_next_result()? {}
470 r?;
471 Ok(rows)
472 }
473
474 let rows = inner_copy_in::<S, T>(stmt, conn, binds, &*source);
475 if let Err(ref e) = rows {
476 let database_error = crate::result::Error::DatabaseError(
477 crate::result::DatabaseErrorKind::Unknown,
478 Box::new(e.to_string()),
479 );
480 source.instrumentation(&mut |query| {
481 conn.instrumentation.on_connection_event(
482 InstrumentationEvent::FinishQuery {
483 query,
484 error: Some(&database_error),
485 },
486 );
487 });
488 } else {
489 source.instrumentation(&mut |query| {
490 conn.instrumentation.on_connection_event(
491 InstrumentationEvent::FinishQuery { query, error: None },
492 );
493 });
494 }
495
496 rows
497 })?;
498
499 Ok(res)
500 }
501
502 pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
503 where
504 T: CopyTarget,
505 {
506 let res = self.with_prepared_query::<_, Error>(
507 Box::new(command),
508 false,
509 &mut |stmt, binds, conn, source| {
510 let res = stmt.execute(&mut conn.raw_connection, &binds, false);
511 source.instrumentation(&mut |query| {
512 conn.instrumentation
513 .on_connection_event(InstrumentationEvent::FinishQuery {
514 query,
515 error: res.as_ref().err(),
516 });
517 });
518 Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
519 },
520 )?;
521 Ok(res)
522 }
523
524 fn with_prepared_query<'conn, 'query, R, E>(
525 &'conn mut self,
526 source: Box<dyn QueryFragmentHelper<E> + 'query>,
527 execute_returning_count: bool,
528 f: &mut dyn FnMut(
529 MaybeCached<'_, Statement>,
530 Vec<Option<Vec<u8>>>,
531 &'conn mut ConnectionAndTransactionManager,
532 Box<dyn QueryFragmentHelper<E> + 'query>,
533 ) -> Result<R, E>,
534 ) -> Result<R, E>
535 where
536 E: From<crate::result::Error>,
537 {
538 fn prepare_query_non_generic_inner<'a, E>(
539 connection_and_transaction_manager: &mut ConnectionAndTransactionManager,
540 cache: &'a mut StatementCache<Pg, Statement>,
541 source: &dyn QueryFragmentHelper<E>,
542 execute_returning_count: bool,
543 bind_collector: RawBytesBindCollector<Pg>,
544 ) -> QueryResult<(
545 Vec<Option<Vec<u8>>>,
546 Result<MaybeCached<'a, Statement>, Error>,
547 )> {
548 let binds = bind_collector.binds;
549 let metadata = bind_collector.metadata;
550 let query = cache.cached_statement_non_generic(
551 source.query_id(),
552 source,
553 &Pg,
554 &metadata,
555 &mut connection_and_transaction_manager.raw_connection,
556 Statement::prepare,
557 &mut *connection_and_transaction_manager.instrumentation,
558 );
559 if !execute_returning_count && let Err(ref e) = query {
560 source.instrumentation(&mut |query| {
561 connection_and_transaction_manager
562 .instrumentation
563 .on_connection_event(InstrumentationEvent::FinishQuery {
564 query,
565 error: Some(e),
566 });
567 });
568 }
569 Ok((binds, query))
570 }
571
572 let bind_collector = self.collect_binds(&*source)?;
573 let (binds, query) = prepare_query_non_generic_inner(
574 &mut self.connection_and_transaction_manager,
575 &mut self.statement_cache,
576 &*source,
577 execute_returning_count,
578 bind_collector,
579 )?;
580
581 f(
582 query?,
583 binds,
584 &mut self.connection_and_transaction_manager,
585 source,
586 )
587 }
588
589 fn collect_binds<E>(
590 &mut self,
591 source: &dyn QueryFragmentHelper<E>,
592 ) -> Result<RawBytesBindCollector<Pg>, crate::result::Error> {
593 source.instrumentation(&mut |query| {
594 self.connection_and_transaction_manager
595 .instrumentation
596 .on_connection_event(InstrumentationEvent::StartQuery { query });
597 });
598 let mut bind_collector = RawBytesBindCollector::<Pg>::new();
599 source.collect_binds(&mut bind_collector, self)?;
600 Ok(bind_collector)
601 }
602
603 fn set_config_options(&mut self) -> QueryResult<()> {
604 crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
605 crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
606 self.connection_and_transaction_manager
607 .raw_connection
608 .set_notice_processor(noop_notice_processor);
609 Ok(())
610 }
611
612 pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
653 let conn = &self.connection_and_transaction_manager.raw_connection;
654 core::iter::from_fn(move || conn.pq_notifies().transpose())
655 }
656}
657
658extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
659
660mod private {
661 use super::*;
662
663 #[allow(missing_debug_implementations)]
664 pub struct ConnectionAndTransactionManager {
665 pub(super) raw_connection: RawConnection,
666 pub(super) transaction_state: AnsiTransactionManager,
667 pub(super) instrumentation: DynInstrumentation,
668 }
669
670 pub trait PgLoadingMode<B> {
671 const USE_ROW_BY_ROW_MODE: bool;
672 type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
673 type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
674
675 fn get_cursor<'conn, 'query>(
676 raw_connection: &'conn mut ConnectionAndTransactionManager,
677 result: PgResult,
678 source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
679 ) -> QueryResult<Self::Cursor<'conn, 'query>>;
680 }
681
682 impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
683 const USE_ROW_BY_ROW_MODE: bool = false;
684 type Cursor<'conn, 'query> = Cursor;
685 type Row<'conn, 'query> = self::row::PgRow;
686
687 fn get_cursor<'conn, 'query>(
688 conn: &'conn mut ConnectionAndTransactionManager,
689 result: PgResult,
690 source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
691 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
692 update_transaction_manager_status(
693 Cursor::new(result, &mut conn.raw_connection),
694 conn,
695 &|callback| source.instrumentation(callback),
696 true,
697 )
698 }
699 }
700
701 impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
702 const USE_ROW_BY_ROW_MODE: bool = true;
703 type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
704 type Row<'conn, 'query> = self::row::PgRow;
705
706 fn get_cursor<'conn, 'query>(
707 raw_connection: &'conn mut ConnectionAndTransactionManager,
708 result: PgResult,
709 source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
710 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
711 Ok(RowByRowCursor::new(result, raw_connection, source))
712 }
713 }
714
715 pub trait QueryFragmentHelper<E>:
718 crate::connection::statement_cache::QueryFragmentForCachedStatement<crate::pg::Pg>
719 {
720 fn query_id(&self) -> Option<core::any::TypeId>;
721
722 fn instrumentation(
723 &self,
724 callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
725 );
726
727 fn collect_binds(
728 &self,
729 bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
730 conn: &mut PgConnection,
731 ) -> QueryResult<()>;
732
733 fn write_copy_from(&self, _sink: &mut CopyFromSink<'_>) -> Result<(), E> {
734 Ok(())
735 }
736 }
737
738 impl<T> QueryFragmentHelper<diesel::result::Error> for T
740 where
741 T: QueryFragment<crate::pg::Pg> + QueryId,
742 {
743 fn query_id(&self) -> Option<core::any::TypeId> {
744 <T as QueryId>::query_id()
745 }
746
747 fn instrumentation(
748 &self,
749 callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
750 ) {
751 callback(&crate::debug_query(self))
752 }
753
754 fn collect_binds(
755 &self,
756 bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
757 conn: &mut PgConnection,
758 ) -> QueryResult<()> {
759 <Self as QueryFragment<diesel::pg::Pg>>::collect_binds(
760 self,
761 bind_collector,
762 conn,
763 &crate::pg::Pg,
764 )
765 }
766 }
767
768 pub(super) struct CopyFromWrapper<S, T>(
771 pub(super) core::cell::RefCell<InternalCopyFromQuery<S, T>>,
772 );
773
774 impl<S, T> crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>
775 for CopyFromWrapper<S, T>
776 where
777 InternalCopyFromQuery<S, T>:
778 crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
779 {
780 fn construct_sql(&self, backend: &Pg) -> QueryResult<String> {
781 self.0.borrow().construct_sql(backend)
782 }
783
784 fn is_safe_to_cache_prepared(&self, backend: &Pg) -> QueryResult<bool> {
785 self.0.borrow().is_safe_to_cache_prepared(backend)
786 }
787 }
788
789 impl<S, T> QueryFragmentHelper<S::Error> for CopyFromWrapper<S, T>
790 where
791 S: CopyFromExpression<T>,
792 InternalCopyFromQuery<S, T>: QueryFragmentHelper<crate::result::Error>,
793 Self: crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
794 {
795 fn query_id(&self) -> Option<core::any::TypeId> {
796 self.0.borrow().query_id()
797 }
798
799 fn instrumentation(
800 &self,
801 callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
802 ) {
803 callback(&crate::debug_query(&*self.0.borrow()))
804 }
805
806 fn collect_binds(
807 &self,
808 bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
809 conn: &mut PgConnection,
810 ) -> QueryResult<()> {
811 <InternalCopyFromQuery<S, T> as QueryFragmentHelper<crate::result::Error>>::collect_binds(
812 &*self.0.borrow(),
813 bind_collector,
814 conn,
815 )
816 }
817
818 fn write_copy_from(&self, sink: &mut CopyFromSink<'_>) -> Result<(), S::Error> {
819 self.0.borrow_mut().target.callback(sink)
820 }
821 }
822}
823
824#[cfg(test)]
825#[allow(clippy::uninlined_format_args)]
827mod tests {
828 extern crate dotenvy;
829
830 use super::*;
831 use crate::prelude::*;
832 use crate::result::Error::DatabaseError;
833 use std::num::NonZeroU32;
834
835 fn connection() -> PgConnection {
836 crate::test_helpers::pg_connection_no_transaction()
837 }
838
839 #[diesel_test_helper::test]
840 fn notifications_arrive() {
841 use crate::sql_query;
842
843 let conn = &mut connection();
844 sql_query("LISTEN test_notifications")
845 .execute(conn)
846 .unwrap();
847 sql_query("NOTIFY test_notifications, 'first'")
848 .execute(conn)
849 .unwrap();
850 sql_query("NOTIFY test_notifications, 'second'")
851 .execute(conn)
852 .unwrap();
853
854 let notifications = conn
855 .notifications_iter()
856 .map(Result::unwrap)
857 .collect::<Vec<_>>();
858
859 assert_eq!(2, notifications.len());
860 assert_eq!(notifications[0].channel, "test_notifications");
861 assert_eq!(notifications[1].channel, "test_notifications");
862 assert_eq!(notifications[0].payload, "first");
863 assert_eq!(notifications[1].payload, "second");
864
865 let next_notification = conn.notifications_iter().next();
866 assert!(
867 next_notification.is_none(),
868 "Got a next notification, while not expecting one: {next_notification:?}"
869 );
870
871 sql_query("NOTIFY test_notifications")
872 .execute(conn)
873 .unwrap();
874 assert_eq!(
875 conn.notifications_iter().next().unwrap().unwrap().payload,
876 ""
877 );
878 }
879
880 #[diesel_test_helper::test]
881 fn malformed_sql_query() {
882 let connection = &mut connection();
883 let query =
884 crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
885
886 if let Err(DatabaseError(_, string)) = query {
887 assert_eq!(Some(26), string.statement_position());
888 } else {
889 unreachable!();
890 }
891 }
892
893 table! {
894 users {
895 id -> Integer,
896 name -> Text,
897 }
898 }
899
900 #[diesel_test_helper::test]
901 fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
902 use crate::PgConnection;
903 use crate::connection::{AnsiTransactionManager, TransactionManager};
904 use crate::result::Error;
905
906 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
907 assert_eq!(
908 None,
909 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
910 conn
911 ).transaction_depth().expect("Transaction depth")
912 );
913 let result = AnsiTransactionManager::commit_transaction(conn);
914 assert!(matches!(result, Err(Error::NotInTransaction)))
915 }
916
917 #[diesel_test_helper::test]
918 fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
919 use crate::PgConnection;
920 use crate::connection::{AnsiTransactionManager, TransactionManager};
921 use crate::result::Error;
922
923 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
924 assert_eq!(
925 None,
926 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
927 conn
928 ).transaction_depth().expect("Transaction depth")
929 );
930 let result = AnsiTransactionManager::rollback_transaction(conn);
931 assert!(matches!(result, Err(Error::NotInTransaction)))
932 }
933
934 #[diesel_test_helper::test]
935 fn postgres_transaction_is_rolled_back_upon_syntax_error() {
936 use std::num::NonZeroU32;
937
938 use crate::connection::{AnsiTransactionManager, TransactionManager};
939 use crate::pg::connection::raw::PgTransactionStatus;
940 use crate::*;
941 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
942 assert_eq!(
943 None,
944 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
945 conn
946 ).transaction_depth().expect("Transaction depth")
947 );
948 let _result = conn.build_transaction().run(|conn| {
949 assert_eq!(
950 NonZeroU32::new(1),
951 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
952 conn
953 ).transaction_depth().expect("Transaction depth")
954 );
955 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
957 assert!(query_result.is_err());
958 assert_eq!(
959 PgTransactionStatus::InError,
960 conn.connection_and_transaction_manager.raw_connection.transaction_status()
961 );
962 query_result
963 });
964 assert_eq!(
965 None,
966 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
967 conn
968 ).transaction_depth().expect("Transaction depth")
969 );
970 assert_eq!(
971 PgTransactionStatus::Idle,
972 conn.connection_and_transaction_manager
973 .raw_connection
974 .transaction_status()
975 );
976 }
977
978 #[diesel_test_helper::test]
979 fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
980 use std::num::NonZeroU32;
981
982 use crate::connection::{AnsiTransactionManager, TransactionManager};
983 use crate::pg::connection::raw::PgTransactionStatus;
984 use crate::*;
985 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
986 assert_eq!(
987 None,
988 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
989 conn
990 ).transaction_depth().expect("Transaction depth")
991 );
992 let result = conn.build_transaction().run(|conn| {
993 assert_eq!(
994 NonZeroU32::new(1),
995 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
996 conn
997 ).transaction_depth().expect("Transaction depth")
998 );
999 let result = conn.build_transaction().run(|conn| {
1000 assert_eq!(
1001 NonZeroU32::new(2),
1002 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1003 conn
1004 ).transaction_depth().expect("Transaction depth")
1005 );
1006 sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
1007 });
1008 assert!(result.is_err());
1009 assert_eq!(
1010 NonZeroU32::new(1),
1011 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1012 conn
1013 ).transaction_depth().expect("Transaction depth")
1014 );
1015 let query_result = sql_query("SELECT 1").execute(conn);
1016 assert!(query_result.is_ok());
1017 assert_eq!(
1018 PgTransactionStatus::InTransaction,
1019 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1020 );
1021 query_result
1022 });
1023 assert!(result.is_ok());
1024 assert_eq!(
1025 PgTransactionStatus::Idle,
1026 conn.connection_and_transaction_manager
1027 .raw_connection
1028 .transaction_status()
1029 );
1030 assert_eq!(
1031 None,
1032 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1033 conn
1034 ).transaction_depth().expect("Transaction depth")
1035 );
1036 }
1037
1038 #[diesel_test_helper::test]
1039 #[allow(clippy::needless_collect)]
1042 fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
1043 use crate::pg::connection::raw::PgTransactionStatus;
1044 use crate::result::DatabaseErrorKind::SerializationFailure;
1045 use crate::result::Error::DatabaseError;
1046 use crate::*;
1047 use std::sync::{Arc, Barrier};
1048 use std::thread;
1049
1050 table! {
1051 #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
1052 serialization_example {
1053 id -> Serial,
1054 class -> Integer,
1055 }
1056 }
1057
1058 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1059
1060 sql_query(
1061 "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
1062 )
1063 .execute(conn)
1064 .unwrap();
1065 sql_query(
1066 r#"
1067 CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
1068 id SERIAL PRIMARY KEY,
1069 class INTEGER NOT NULL
1070 )
1071 "#,
1072 )
1073 .execute(conn)
1074 .unwrap();
1075
1076 insert_into(serialization_example::table)
1077 .values(&vec![
1078 serialization_example::class.eq(1),
1079 serialization_example::class.eq(2),
1080 ])
1081 .execute(conn)
1082 .unwrap();
1083
1084 let before_barrier = Arc::new(Barrier::new(2));
1085 let after_barrier = Arc::new(Barrier::new(2));
1086 let threads = (1..3)
1087 .map(|i| {
1088 let before_barrier = before_barrier.clone();
1089 let after_barrier = after_barrier.clone();
1090 thread::spawn(move || {
1091 use crate::connection::AnsiTransactionManager;
1092 use crate::connection::TransactionManager;
1093 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1094 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1095
1096 let result = conn.build_transaction().serializable().run(|conn| {
1097 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1098
1099 let _ = serialization_example::table
1100 .filter(serialization_example::class.eq(i))
1101 .count()
1102 .execute(conn)?;
1103
1104 let other_i = if i == 1 { 2 } else { 1 };
1105 let q = insert_into(serialization_example::table)
1106 .values(serialization_example::class.eq(other_i));
1107 before_barrier.wait();
1108
1109 let r = q.execute(conn);
1110 after_barrier.wait();
1111 r
1112 });
1113 assert_eq!(
1114 PgTransactionStatus::Idle,
1115 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1116 );
1117
1118 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1119 result
1120 })
1121 })
1122 .collect::<Vec<_>>();
1123
1124 let mut results = threads
1125 .into_iter()
1126 .map(|t| t.join().unwrap())
1127 .collect::<Vec<_>>();
1128
1129 results.sort_by_key(|r| r.is_err());
1130
1131 assert!(results[0].is_ok(), "Got {:?} instead", results);
1132 assert!(
1133 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1134 "Got {:?} instead",
1135 results
1136 );
1137 assert_eq!(
1138 PgTransactionStatus::Idle,
1139 conn.connection_and_transaction_manager
1140 .raw_connection
1141 .transaction_status()
1142 );
1143 }
1144
1145 #[diesel_test_helper::test]
1146 #[allow(clippy::needless_collect)]
1149 fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
1150 use crate::pg::connection::raw::PgTransactionStatus;
1151 use crate::result::DatabaseErrorKind::SerializationFailure;
1152 use crate::result::Error::DatabaseError;
1153 use crate::*;
1154 use std::sync::{Arc, Barrier};
1155 use std::thread;
1156
1157 table! {
1158 #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
1159 serialization_example {
1160 id -> Serial,
1161 class -> Integer,
1162 }
1163 }
1164
1165 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1166
1167 sql_query(
1168 "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1169 )
1170 .execute(conn)
1171 .unwrap();
1172 sql_query(
1173 r#"
1174 CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1175 id SERIAL PRIMARY KEY,
1176 class INTEGER NOT NULL
1177 )
1178 "#,
1179 )
1180 .execute(conn)
1181 .unwrap();
1182
1183 insert_into(serialization_example::table)
1184 .values(&vec![
1185 serialization_example::class.eq(1),
1186 serialization_example::class.eq(2),
1187 ])
1188 .execute(conn)
1189 .unwrap();
1190
1191 let before_barrier = Arc::new(Barrier::new(2));
1192 let after_barrier = Arc::new(Barrier::new(2));
1193 let threads = (1..3)
1194 .map(|i| {
1195 let before_barrier = before_barrier.clone();
1196 let after_barrier = after_barrier.clone();
1197 thread::spawn(move || {
1198 use crate::connection::AnsiTransactionManager;
1199 use crate::connection::TransactionManager;
1200 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1201 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1202
1203 let result = conn.build_transaction().serializable().run(|conn| {
1204 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1205 let r = conn.transaction(|conn| {
1206 assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1207
1208 let _ = serialization_example::table
1209 .filter(serialization_example::class.eq(i))
1210 .count()
1211 .execute(conn)?;
1212
1213 let other_i = if i == 1 { 2 } else { 1 };
1214 let q = insert_into(serialization_example::table)
1215 .values(serialization_example::class.eq(other_i));
1216 before_barrier.wait();
1217
1218 let r = q.execute(conn);
1219 after_barrier.wait();
1220 r
1221 });
1222 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1223 assert_eq!(
1224 PgTransactionStatus::InTransaction,
1225 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1226 );
1227 r
1228 });
1229 assert_eq!(
1230 PgTransactionStatus::Idle,
1231 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1232 );
1233
1234 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1235 result
1236 })
1237 })
1238 .collect::<Vec<_>>();
1239
1240 let mut results = threads
1241 .into_iter()
1242 .map(|t| t.join().unwrap())
1243 .collect::<Vec<_>>();
1244
1245 results.sort_by_key(|r| r.is_err());
1246
1247 assert!(results[0].is_ok(), "Got {:?} instead", results);
1248 assert!(
1249 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1250 "Got {:?} instead",
1251 results
1252 );
1253 assert_eq!(
1254 PgTransactionStatus::Idle,
1255 conn.connection_and_transaction_manager
1256 .raw_connection
1257 .transaction_status()
1258 );
1259 }
1260
1261 #[diesel_test_helper::test]
1262 fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1263 use crate::connection::{AnsiTransactionManager, TransactionManager};
1264 use crate::pg::connection::raw::PgTransactionStatus;
1265 use crate::result::Error;
1266 use crate::*;
1267
1268 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1269 assert_eq!(
1270 None,
1271 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1272 conn
1273 ).transaction_depth().expect("Transaction depth")
1274 );
1275 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1276 assert_eq!(
1277 NonZeroU32::new(1),
1278 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1279 conn
1280 ).transaction_depth().expect("Transaction depth")
1281 );
1282 sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1283 sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1284 .execute(conn)?;
1285 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1286 let result =
1287 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1288 assert!(result.is_ok());
1289 assert_eq!(
1290 PgTransactionStatus::InTransaction,
1291 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1292 );
1293 Ok(())
1294 });
1295 assert_eq!(
1296 None,
1297 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1298 conn
1299 ).transaction_depth().expect("Transaction depth")
1300 );
1301 assert_eq!(
1302 PgTransactionStatus::Idle,
1303 conn.connection_and_transaction_manager
1304 .raw_connection
1305 .transaction_status()
1306 );
1307 assert!(result.is_err());
1308 }
1309
1310 #[diesel_test_helper::test]
1311 fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1312 use crate::connection::{AnsiTransactionManager, TransactionManager};
1313 use crate::pg::connection::raw::PgTransactionStatus;
1314 use crate::result::Error;
1315 use crate::*;
1316
1317 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1318 assert_eq!(
1319 None,
1320 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1321 conn
1322 ).transaction_depth().expect("Transaction depth")
1323 );
1324 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1325 assert_eq!(
1326 NonZeroU32::new(1),
1327 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1328 conn
1329 ).transaction_depth().expect("Transaction depth")
1330 );
1331 sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1332 sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1333 .execute(conn)?;
1334 sql_query(
1335 r#"
1336 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1337 RETURNS trigger
1338 LANGUAGE plpgsql
1339 AS $$
1340 DECLARE
1341 BEGIN
1342 IF NEW.value = 42 THEN
1343 RAISE EXCEPTION 'Transaction kaboom';
1344 END IF;
1345 RETURN NEW;
1346
1347 END;$$;
1348 "#,
1349 )
1350 .execute(conn)?;
1351
1352 sql_query(
1353 r#"
1354 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1355 AFTER INSERT ON "deferred_trigger_commit"
1356 DEFERRABLE INITIALLY DEFERRED
1357 FOR EACH ROW
1358 EXECUTE PROCEDURE transaction_depth_blow_up()
1359 "#,
1360 )
1361 .execute(conn)?;
1362 let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1363 assert!(result.is_ok());
1364 assert_eq!(
1365 PgTransactionStatus::InTransaction,
1366 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1367 );
1368 Ok(())
1369 });
1370 assert_eq!(
1371 None,
1372 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1373 conn
1374 ).transaction_depth().expect("Transaction depth")
1375 );
1376 assert_eq!(
1377 PgTransactionStatus::Idle,
1378 conn.connection_and_transaction_manager
1379 .raw_connection
1380 .transaction_status()
1381 );
1382 assert!(result.is_err());
1383 }
1384
1385 #[diesel_test_helper::test]
1386 fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1387 use crate::connection::{AnsiTransactionManager, TransactionManager};
1388 use crate::pg::connection::raw::PgTransactionStatus;
1389 use crate::result::Error;
1390 use crate::*;
1391
1392 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1393 assert_eq!(
1394 None,
1395 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1396 conn
1397 ).transaction_depth().expect("Transaction depth")
1398 );
1399 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1400 assert_eq!(
1401 NonZeroU32::new(1),
1402 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1403 conn
1404 ).transaction_depth().expect("Transaction depth")
1405 );
1406 sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1407 sql_query(
1408 "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1409 )
1410 .execute(conn)?;
1411 sql_query(
1412 r#"
1413 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1414 RETURNS trigger
1415 LANGUAGE plpgsql
1416 AS $$
1417 DECLARE
1418 BEGIN
1419 IF NEW.value = 42 THEN
1420 RAISE EXCEPTION 'Transaction kaboom';
1421 END IF;
1422 RETURN NEW;
1423
1424 END;$$;
1425 "#,
1426 )
1427 .execute(conn)?;
1428
1429 sql_query(
1430 r#"
1431 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1432 AFTER INSERT ON "deferred_trigger_nested_commit"
1433 DEFERRABLE INITIALLY DEFERRED
1434 FOR EACH ROW
1435 EXECUTE PROCEDURE transaction_depth_blow_up()
1436 "#,
1437 )
1438 .execute(conn)?;
1439 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1440 let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1441 .execute(conn);
1442 assert!(result.is_ok());
1443 Ok(())
1444 });
1445 assert!(inner_result.is_err());
1446 assert_eq!(
1447 PgTransactionStatus::InTransaction,
1448 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1449 );
1450 Ok(())
1451 });
1452 assert_eq!(
1453 None,
1454 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1455 conn
1456 ).transaction_depth().expect("Transaction depth")
1457 );
1458 assert_eq!(
1459 PgTransactionStatus::Idle,
1460 conn.connection_and_transaction_manager
1461 .raw_connection
1462 .transaction_status()
1463 );
1464 assert!(result.is_ok(), "Expected success, got {:?}", result);
1465 }
1466
1467 #[diesel_test_helper::test]
1468 fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1469 use crate::connection::{AnsiTransactionManager, TransactionManager};
1470 use crate::pg::connection::raw::PgTransactionStatus;
1471 use crate::result::Error;
1472 use crate::*;
1473
1474 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1475 assert_eq!(
1476 None,
1477 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1478 conn
1479 ).transaction_depth().expect("Transaction depth")
1480 );
1481 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1482 assert_eq!(
1483 NonZeroU32::new(1),
1484 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1485 conn
1486 ).transaction_depth().expect("Transaction depth")
1487 );
1488 sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1489 sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1490 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1491 assert_eq!(
1492 NonZeroU32::new(2),
1493 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1494 conn
1495 ).transaction_depth().expect("Transaction depth")
1496 );
1497 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1498 let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1499 assert!(result.is_ok());
1500 Ok(())
1501 });
1502 assert!(inner_result.is_err());
1503 assert_eq!(
1504 PgTransactionStatus::InTransaction,
1505 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1506 );
1507 assert_eq!(
1508 NonZeroU32::new(1),
1509 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1510 conn
1511 ).transaction_depth().expect("Transaction depth")
1512 );
1513 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1514 });
1515 assert_eq!(
1516 None,
1517 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1518 conn
1519 ).transaction_depth().expect("Transaction depth")
1520 );
1521 assert_eq!(
1522 PgTransactionStatus::Idle,
1523 conn.connection_and_transaction_manager
1524 .raw_connection
1525 .transaction_status()
1526 );
1527 assert!(result.is_ok());
1528 }
1529}