diesel/pg/connection/
mod.rs

1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::ConnectionAndTransactionManager;
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::connection::instrumentation::{
14    DebugQuery, DynInstrumentation, Instrumentation, StrQueryHelper,
15};
16use crate::connection::statement_cache::{MaybeCached, StatementCache};
17use crate::connection::*;
18use crate::expression::QueryMetadata;
19use crate::pg::backend::PgNotification;
20use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
21use crate::pg::query_builder::copy::InternalCopyFromQuery;
22use crate::pg::{Pg, TransactionBuilder};
23use crate::query_builder::bind_collector::RawBytesBindCollector;
24use crate::query_builder::*;
25use crate::result::ConnectionError::CouldntSetupConfiguration;
26use crate::result::*;
27use crate::RunQueryDsl;
28use std::ffi::CString;
29use std::fmt::Debug;
30use std::os::raw as libc;
31
32use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
33
34pub(super) use self::result::PgResult;
35
36/// The connection string expected by `PgConnection::establish`
37/// should be a PostgreSQL connection string, as documented at
38/// <https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-CONNSTRING>
39///
40/// # Supported loading model implementations
41///
42/// * [`DefaultLoadingMode`]
43/// * [`PgRowByRowLoadingMode`]
44///
45/// If you are unsure which loading mode is the correct one for your application,
46/// you likely want to use the `DefaultLoadingMode` as that one offers
47/// generally better performance.
48///
49/// Due to the fact that `PgConnection` supports multiple loading modes
50/// it is **required** to always specify the used loading mode
51/// when calling [`RunQueryDsl::load_iter`]
52///
53/// ## `DefaultLoadingMode`
54///
55/// By using this mode `PgConnection` defaults to loading all response values at **once**
56/// and only performs deserialization afterward for the `DefaultLoadingMode`.
57/// Generally this mode will be more performant as it.
58///
59/// This loading mode allows users to perform hold more than one iterator at once using
60/// the same connection:
61/// ```rust
62/// # include!("../../doctest_setup.rs");
63/// #
64/// # fn main() {
65/// #     run_test().unwrap();
66/// # }
67/// #
68/// # fn run_test() -> QueryResult<()> {
69/// #     use schema::users;
70/// #     let connection = &mut establish_connection();
71/// use diesel::connection::DefaultLoadingMode;
72///
73/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
74/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
75///
76/// for r in iter1 {
77///     let (id, name) = r?;
78///     println!("Id: {} Name: {}", id, name);
79/// }
80///
81/// for r in iter2 {
82///     let (id, name) = r?;
83///     println!("Id: {} Name: {}", id, name);
84/// }
85/// #   Ok(())
86/// # }
87/// ```
88///
89/// ## `PgRowByRowLoadingMode`
90///
91/// By using this mode `PgConnection` defaults to loading each row of the result set
92/// separately. This might be desired for huge result sets.
93///
94/// This loading mode **prevents** creating more than one iterator at once using
95/// the same connection. The following code is **not** allowed:
96///
97/// ```compile_fail
98/// # include!("../../doctest_setup.rs");
99/// #
100/// # fn main() {
101/// #     run_test().unwrap();
102/// # }
103/// #
104/// # fn run_test() -> QueryResult<()> {
105/// #     use schema::users;
106/// #     let connection = &mut establish_connection();
107/// use diesel::pg::PgRowByRowLoadingMode;
108///
109/// let iter1 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
110/// // creating a second iterator generates an compiler error
111/// let iter2 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
112///
113/// for r in iter1 {
114///     let (id, name) = r?;
115///     println!("Id: {} Name: {}", id, name);
116/// }
117///
118/// for r in iter2 {
119///     let (id, name) = r?;
120///     println!("Id: {} Name: {}", id, name);
121/// }
122/// #   Ok(())
123/// # }
124/// ```
125#[allow(missing_debug_implementations)]
126#[cfg(feature = "postgres")]
127pub struct PgConnection {
128    statement_cache: StatementCache<Pg, Statement>,
129    metadata_cache: PgMetadataCache,
130    connection_and_transaction_manager: ConnectionAndTransactionManager,
131}
132
133// according to libpq documentation a connection can be transferred to other threads
134#[allow(unsafe_code)]
135unsafe impl Send for PgConnection {}
136
137impl SimpleConnection for PgConnection {
138    #[allow(unsafe_code)] // use of unsafe function
139    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
140        self.connection_and_transaction_manager
141            .instrumentation
142            .on_connection_event(InstrumentationEvent::StartQuery {
143                query: &StrQueryHelper::new(query),
144            });
145        let c_query = CString::new(query)?;
146        let inner_result = unsafe {
147            self.connection_and_transaction_manager
148                .raw_connection
149                .exec(c_query.as_ptr())
150        };
151        update_transaction_manager_status(
152            inner_result.and_then(|raw_result| {
153                PgResult::new(
154                    raw_result,
155                    &self.connection_and_transaction_manager.raw_connection,
156                )
157            }),
158            &mut self.connection_and_transaction_manager,
159            &StrQueryHelper::new(query),
160            true,
161        )?;
162        Ok(())
163    }
164}
165
166/// A [`PgConnection`] specific loading mode to load rows one by one
167///
168/// See the documentation of [`PgConnection`] for details
169#[derive(Debug, Copy, Clone)]
170pub struct PgRowByRowLoadingMode;
171
172impl ConnectionSealed for PgConnection {}
173
174impl Connection for PgConnection {
175    type Backend = Pg;
176    type TransactionManager = AnsiTransactionManager;
177
178    fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
179        let mut instrumentation = DynInstrumentation::default_instrumentation();
180        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
181            url: database_url,
182        });
183        let r = RawConnection::establish(database_url).and_then(|raw_conn| {
184            let mut conn = PgConnection {
185                connection_and_transaction_manager: ConnectionAndTransactionManager {
186                    raw_connection: raw_conn,
187                    transaction_state: AnsiTransactionManager::default(),
188                    instrumentation: DynInstrumentation::none(),
189                },
190                statement_cache: StatementCache::new(),
191                metadata_cache: PgMetadataCache::new(),
192            };
193            conn.set_config_options()
194                .map_err(CouldntSetupConfiguration)?;
195            Ok(conn)
196        });
197        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
198            url: database_url,
199            error: r.as_ref().err(),
200        });
201        let mut conn = r?;
202        conn.connection_and_transaction_manager.instrumentation = instrumentation;
203        Ok(conn)
204    }
205
206    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
207    where
208        T: QueryFragment<Pg> + QueryId,
209    {
210        update_transaction_manager_status(
211            self.with_prepared_query(source, true, |query, params, conn, _source| {
212                let res = query
213                    .execute(&mut conn.raw_connection, &params, false)
214                    .map(|r| r.rows_affected());
215                // according to https://www.postgresql.org/docs/current/libpq-async.html
216                // `PQgetResult` needs to be called till a null pointer is returned
217                while conn.raw_connection.get_next_result()?.is_some() {}
218                res
219            }),
220            &mut self.connection_and_transaction_manager,
221            &crate::debug_query(source),
222            true,
223        )
224    }
225
226    fn transaction_state(&mut self) -> &mut AnsiTransactionManager
227    where
228        Self: Sized,
229    {
230        &mut self.connection_and_transaction_manager.transaction_state
231    }
232
233    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
234        &mut *self.connection_and_transaction_manager.instrumentation
235    }
236
237    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
238        self.connection_and_transaction_manager.instrumentation = instrumentation.into();
239    }
240
241    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
242        self.statement_cache.set_cache_size(size);
243    }
244}
245
246impl<B> LoadConnection<B> for PgConnection
247where
248    Self: self::private::PgLoadingMode<B>,
249{
250    type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
251    type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
252
253    fn load<'conn, 'query, T>(
254        &'conn mut self,
255        source: T,
256    ) -> QueryResult<Self::Cursor<'conn, 'query>>
257    where
258        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
259        Self::Backend: QueryMetadata<T::SqlType>,
260    {
261        self.with_prepared_query(source, false, |stmt, params, conn, source| {
262            use self::private::PgLoadingMode;
263            let result = stmt.execute(&mut conn.raw_connection, &params, Self::USE_ROW_BY_ROW_MODE);
264            let result = update_transaction_manager_status(
265                result,
266                conn,
267                &crate::debug_query(&source),
268                false,
269            )?;
270            Self::get_cursor(conn, result, source)
271        })
272    }
273}
274
275impl GetPgMetadataCache for PgConnection {
276    fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
277        &mut self.metadata_cache
278    }
279}
280
281#[inline(always)]
282fn update_transaction_manager_status<T>(
283    query_result: QueryResult<T>,
284    conn: &mut ConnectionAndTransactionManager,
285    source: &dyn DebugQuery,
286    final_call: bool,
287) -> QueryResult<T> {
288    /// avoid monomorphizing for every result type - this part will not be inlined
289    fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
290        let raw_conn: &mut RawConnection = &mut conn.raw_connection;
291        let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
292        // libpq keeps track of the transaction status internally, and that is accessible
293        // via `transaction_status`. We can use that to update the AnsiTransactionManager
294        // status
295        match raw_conn.transaction_status() {
296            PgTransactionStatus::InError => {
297                tm.status.set_requires_rollback_maybe_up_to_top_level(true)
298            }
299            PgTransactionStatus::Unknown => tm.status.set_in_error(),
300            PgTransactionStatus::Idle => {
301                // This is useful in particular for commit attempts (even
302                // if `COMMIT` errors it still exits transaction)
303
304                // This may repair the transaction manager
305                tm.status = TransactionManagerStatus::Valid(Default::default())
306            }
307            PgTransactionStatus::InTransaction => {
308                let transaction_status = &mut tm.status;
309                // If we weren't an error, it is possible that we were a transaction start
310                // -> we should tolerate any state
311                if is_err {
312                    // An error may not have us enter a transaction, so if we weren't in one
313                    // we may not be in one now
314                    if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
315                    {
316                        // -> transaction manager is broken
317                        transaction_status.set_in_error()
318                    }
319                } else {
320                    // If transaction was InError before, but now it's not (because we attempted
321                    // a rollback), we may pretend it's fixed because
322                    // if it isn't Postgres *will* tell us again.
323
324                    // Fun fact: if we have not received an `InTransaction` status however,
325                    // postgres will *not* warn us that transaction is broken when attempting to
326                    // commit, so we may think that commit has succeeded but in fact it hasn't.
327                    tm.status.set_requires_rollback_maybe_up_to_top_level(false)
328                }
329            }
330            PgTransactionStatus::Active => {
331                // This is a transient state for libpq - nothing we can deduce here.
332            }
333        }
334    }
335
336    fn non_generic_instrumentation(
337        query_result: Result<(), &Error>,
338        conn: &mut ConnectionAndTransactionManager,
339        source: &dyn DebugQuery,
340        final_call: bool,
341    ) {
342        if let Err(e) = query_result {
343            conn.instrumentation
344                .on_connection_event(InstrumentationEvent::FinishQuery {
345                    query: source,
346                    error: Some(e),
347                });
348        } else if final_call {
349            conn.instrumentation
350                .on_connection_event(InstrumentationEvent::FinishQuery {
351                    query: source,
352                    error: None,
353                });
354        }
355    }
356
357    non_generic_inner(conn, query_result.is_err());
358    non_generic_instrumentation(query_result.as_ref().map(|_| ()), conn, source, final_call);
359    query_result
360}
361
362#[cfg(feature = "r2d2")]
363impl crate::r2d2::R2D2Connection for PgConnection {
364    fn ping(&mut self) -> QueryResult<()> {
365        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
366    }
367
368    fn is_broken(&mut self) -> bool {
369        AnsiTransactionManager::is_broken_transaction_manager(self)
370    }
371}
372
373impl MultiConnectionHelper for PgConnection {
374    fn to_any<'a>(
375        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
376    ) -> &mut (dyn std::any::Any + 'a) {
377        lookup.as_any()
378    }
379
380    fn from_any(
381        lookup: &mut dyn std::any::Any,
382    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
383        lookup
384            .downcast_mut::<Self>()
385            .map(|conn| conn as &mut dyn super::PgMetadataLookup)
386    }
387}
388
389impl PgConnection {
390    /// Build a transaction, specifying additional details such as isolation level
391    ///
392    /// See [`TransactionBuilder`] for more examples.
393    ///
394    /// [`TransactionBuilder`]: crate::pg::TransactionBuilder
395    ///
396    /// ```rust
397    /// # include!("../../doctest_setup.rs");
398    /// #
399    /// # fn main() {
400    /// #     run_test().unwrap();
401    /// # }
402    /// #
403    /// # fn run_test() -> QueryResult<()> {
404    /// #     use schema::users::dsl::*;
405    /// #     let conn = &mut connection_no_transaction();
406    /// conn.build_transaction()
407    ///     .read_only()
408    ///     .serializable()
409    ///     .deferrable()
410    ///     .run(|conn| Ok(()))
411    /// # }
412    /// ```
413    pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
414        TransactionBuilder::new(self)
415    }
416
417    pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
418    where
419        S: CopyFromExpression<T>,
420    {
421        let query = InternalCopyFromQuery::new(target);
422        let res = self.with_prepared_query(query, false, |stmt, binds, conn, mut source| {
423            fn inner_copy_in<S, T>(
424                stmt: MaybeCached<'_, Statement>,
425                conn: &mut ConnectionAndTransactionManager,
426                binds: Vec<Option<Vec<u8>>>,
427                source: &mut InternalCopyFromQuery<S, T>,
428            ) -> Result<usize, S::Error>
429            where
430                S: CopyFromExpression<T>,
431            {
432                let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
433                let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
434                let r = source.target.callback(&mut copy_in);
435                copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
436                let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
437                    crate::result::Error::DeserializationError(
438                        "Failed to receive result from the database".into(),
439                    )
440                })?;
441                let rows = next_res.rows_affected();
442                while let Some(_r) = conn.raw_connection.get_next_result()? {}
443                r?;
444                Ok(rows)
445            }
446
447            let rows = inner_copy_in(stmt, conn, binds, &mut source);
448            if let Err(ref e) = rows {
449                let database_error = crate::result::Error::DatabaseError(
450                    crate::result::DatabaseErrorKind::Unknown,
451                    Box::new(e.to_string()),
452                );
453                conn.instrumentation
454                    .on_connection_event(InstrumentationEvent::FinishQuery {
455                        query: &crate::debug_query(&source),
456                        error: Some(&database_error),
457                    });
458            } else {
459                conn.instrumentation
460                    .on_connection_event(InstrumentationEvent::FinishQuery {
461                        query: &crate::debug_query(&source),
462                        error: None,
463                    });
464            }
465
466            rows
467        })?;
468
469        Ok(res)
470    }
471
472    pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
473    where
474        T: CopyTarget,
475    {
476        let res = self.with_prepared_query::<_, _, Error>(
477            command,
478            false,
479            |stmt, binds, conn, source| {
480                let res = stmt.execute(&mut conn.raw_connection, &binds, false);
481                conn.instrumentation
482                    .on_connection_event(InstrumentationEvent::FinishQuery {
483                        query: &crate::debug_query(&source),
484                        error: res.as_ref().err(),
485                    });
486                Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
487            },
488        )?;
489        Ok(res)
490    }
491
492    fn with_prepared_query<'conn, T, R, E>(
493        &'conn mut self,
494        source: T,
495        execute_returning_count: bool,
496        f: impl FnOnce(
497            MaybeCached<'_, Statement>,
498            Vec<Option<Vec<u8>>>,
499            &'conn mut ConnectionAndTransactionManager,
500            T,
501        ) -> Result<R, E>,
502    ) -> Result<R, E>
503    where
504        T: QueryFragment<Pg> + QueryId,
505        E: From<crate::result::Error>,
506    {
507        self.connection_and_transaction_manager
508            .instrumentation
509            .on_connection_event(InstrumentationEvent::StartQuery {
510                query: &crate::debug_query(&source),
511            });
512        let mut bind_collector = RawBytesBindCollector::<Pg>::new();
513        source.collect_binds(&mut bind_collector, self, &Pg)?;
514        let binds = bind_collector.binds;
515        let metadata = bind_collector.metadata;
516
517        let cache = &mut self.statement_cache;
518        let conn = &mut self.connection_and_transaction_manager.raw_connection;
519        let query = cache.cached_statement(
520            &source,
521            &Pg,
522            &metadata,
523            conn,
524            Statement::prepare,
525            &mut *self.connection_and_transaction_manager.instrumentation,
526        );
527        if !execute_returning_count {
528            if let Err(ref e) = query {
529                self.connection_and_transaction_manager
530                    .instrumentation
531                    .on_connection_event(InstrumentationEvent::FinishQuery {
532                        query: &crate::debug_query(&source),
533                        error: Some(e),
534                    });
535            }
536        }
537
538        f(
539            query?,
540            binds,
541            &mut self.connection_and_transaction_manager,
542            source,
543        )
544    }
545
546    fn set_config_options(&mut self) -> QueryResult<()> {
547        crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
548        crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
549        self.connection_and_transaction_manager
550            .raw_connection
551            .set_notice_processor(noop_notice_processor);
552        Ok(())
553    }
554
555    /// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][]
556    ///
557    /// The returned iterator can yield items even after a None value when new notifications have been received.
558    /// The iterator can be polled again after a `None` value was received as new notifications might have
559    /// been send in the mean time.
560    ///
561    /// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html
562    /// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html
563    ///
564    /// ## Example
565    ///
566    /// ```
567    /// # include!("../../doctest_setup.rs");
568    /// #
569    /// # fn main() {
570    /// #     run_test().unwrap();
571    /// # }
572    /// #
573    /// # fn run_test() -> QueryResult<()> {
574    /// #     let connection = &mut establish_connection();
575    ///
576    /// // register the notifications channel we want to receive notifications for
577    /// diesel::sql_query("LISTEN example_channel").execute(connection)?;
578    /// // send some notification
579    /// // this is usually done from a different connection/thread/application
580    /// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(connection)?;
581    ///
582    /// for result in connection.notifications_iter() {
583    ///     let notification = result.unwrap();
584    ///     assert_eq!(notification.channel, "example_channel");
585    ///     assert_eq!(notification.payload, "additional data");
586    ///
587    ///     println!(
588    ///         "Notification received from server process with id {}.",
589    ///         notification.process_id
590    ///     );
591    /// }
592    /// # Ok(())
593    /// # }
594    /// ```
595    pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
596        let conn = &self.connection_and_transaction_manager.raw_connection;
597        std::iter::from_fn(move || conn.pq_notifies().transpose())
598    }
599}
600
601extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
602
603mod private {
604    use super::*;
605
606    #[allow(missing_debug_implementations)]
607    pub struct ConnectionAndTransactionManager {
608        pub(super) raw_connection: RawConnection,
609        pub(super) transaction_state: AnsiTransactionManager,
610        pub(super) instrumentation: DynInstrumentation,
611    }
612
613    pub trait PgLoadingMode<B> {
614        const USE_ROW_BY_ROW_MODE: bool;
615        type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
616        type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
617
618        fn get_cursor<'conn, 'query>(
619            raw_connection: &'conn mut ConnectionAndTransactionManager,
620            result: PgResult,
621            source: impl QueryFragment<Pg> + 'query,
622        ) -> QueryResult<Self::Cursor<'conn, 'query>>;
623    }
624
625    impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
626        const USE_ROW_BY_ROW_MODE: bool = false;
627        type Cursor<'conn, 'query> = Cursor;
628        type Row<'conn, 'query> = self::row::PgRow;
629
630        fn get_cursor<'conn, 'query>(
631            conn: &'conn mut ConnectionAndTransactionManager,
632            result: PgResult,
633            source: impl QueryFragment<Pg> + 'query,
634        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
635            update_transaction_manager_status(
636                Cursor::new(result, &mut conn.raw_connection),
637                conn,
638                &crate::debug_query(&source),
639                true,
640            )
641        }
642    }
643
644    impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
645        const USE_ROW_BY_ROW_MODE: bool = true;
646        type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
647        type Row<'conn, 'query> = self::row::PgRow;
648
649        fn get_cursor<'conn, 'query>(
650            raw_connection: &'conn mut ConnectionAndTransactionManager,
651            result: PgResult,
652            source: impl QueryFragment<Pg> + 'query,
653        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
654            Ok(RowByRowCursor::new(
655                result,
656                raw_connection,
657                Box::new(source),
658            ))
659        }
660    }
661}
662
663#[cfg(test)]
664// that's a false positive for `panic!`/`assert!` on rust 2018
665#[allow(clippy::uninlined_format_args)]
666mod tests {
667    extern crate dotenvy;
668
669    use super::*;
670    use crate::prelude::*;
671    use crate::result::Error::DatabaseError;
672    use std::num::NonZeroU32;
673
674    fn connection() -> PgConnection {
675        crate::test_helpers::pg_connection_no_transaction()
676    }
677
678    #[diesel_test_helper::test]
679    fn notifications_arrive() {
680        use crate::sql_query;
681
682        let conn = &mut connection();
683        sql_query("LISTEN test_notifications")
684            .execute(conn)
685            .unwrap();
686        sql_query("NOTIFY test_notifications, 'first'")
687            .execute(conn)
688            .unwrap();
689        sql_query("NOTIFY test_notifications, 'second'")
690            .execute(conn)
691            .unwrap();
692
693        let notifications = conn
694            .notifications_iter()
695            .map(Result::unwrap)
696            .collect::<Vec<_>>();
697
698        assert_eq!(2, notifications.len());
699        assert_eq!(notifications[0].channel, "test_notifications");
700        assert_eq!(notifications[1].channel, "test_notifications");
701        assert_eq!(notifications[0].payload, "first");
702        assert_eq!(notifications[1].payload, "second");
703
704        let next_notification = conn.notifications_iter().next();
705        assert!(
706            next_notification.is_none(),
707            "Got a next notification, while not expecting one: {next_notification:?}"
708        );
709
710        sql_query("NOTIFY test_notifications")
711            .execute(conn)
712            .unwrap();
713        assert_eq!(
714            conn.notifications_iter().next().unwrap().unwrap().payload,
715            ""
716        );
717    }
718
719    #[diesel_test_helper::test]
720    fn malformed_sql_query() {
721        let connection = &mut connection();
722        let query =
723            crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
724
725        if let Err(DatabaseError(_, string)) = query {
726            assert_eq!(Some(26), string.statement_position());
727        } else {
728            unreachable!();
729        }
730    }
731
732    table! {
733        users {
734            id -> Integer,
735            name -> Text,
736        }
737    }
738
739    #[diesel_test_helper::test]
740    fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
741        use crate::connection::{AnsiTransactionManager, TransactionManager};
742        use crate::result::Error;
743        use crate::PgConnection;
744
745        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
746        assert_eq!(
747            None,
748            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
749                conn
750            ).transaction_depth().expect("Transaction depth")
751        );
752        let result = AnsiTransactionManager::commit_transaction(conn);
753        assert!(matches!(result, Err(Error::NotInTransaction)))
754    }
755
756    #[diesel_test_helper::test]
757    fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
758        use crate::connection::{AnsiTransactionManager, TransactionManager};
759        use crate::result::Error;
760        use crate::PgConnection;
761
762        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
763        assert_eq!(
764            None,
765            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
766                conn
767            ).transaction_depth().expect("Transaction depth")
768        );
769        let result = AnsiTransactionManager::rollback_transaction(conn);
770        assert!(matches!(result, Err(Error::NotInTransaction)))
771    }
772
773    #[diesel_test_helper::test]
774    fn postgres_transaction_is_rolled_back_upon_syntax_error() {
775        use std::num::NonZeroU32;
776
777        use crate::connection::{AnsiTransactionManager, TransactionManager};
778        use crate::pg::connection::raw::PgTransactionStatus;
779        use crate::*;
780        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
781        assert_eq!(
782            None,
783            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
784                conn
785            ).transaction_depth().expect("Transaction depth")
786        );
787        let _result = conn.build_transaction().run(|conn| {
788            assert_eq!(
789                NonZeroU32::new(1),
790                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
791                    conn
792                ).transaction_depth().expect("Transaction depth")
793            );
794            // In Postgres, a syntax error breaks the transaction block
795            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
796            assert!(query_result.is_err());
797            assert_eq!(
798                PgTransactionStatus::InError,
799                conn.connection_and_transaction_manager.raw_connection.transaction_status()
800            );
801            query_result
802        });
803        assert_eq!(
804            None,
805            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
806                conn
807            ).transaction_depth().expect("Transaction depth")
808        );
809        assert_eq!(
810            PgTransactionStatus::Idle,
811            conn.connection_and_transaction_manager
812                .raw_connection
813                .transaction_status()
814        );
815    }
816
817    #[diesel_test_helper::test]
818    fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
819        use std::num::NonZeroU32;
820
821        use crate::connection::{AnsiTransactionManager, TransactionManager};
822        use crate::pg::connection::raw::PgTransactionStatus;
823        use crate::*;
824        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
825        assert_eq!(
826            None,
827            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
828                conn
829            ).transaction_depth().expect("Transaction depth")
830        );
831        let result = conn.build_transaction().run(|conn| {
832            assert_eq!(
833                NonZeroU32::new(1),
834                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
835                    conn
836            ).transaction_depth().expect("Transaction depth")
837            );
838            let result = conn.build_transaction().run(|conn| {
839                assert_eq!(
840                    NonZeroU32::new(2),
841                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
842                        conn
843            ).transaction_depth().expect("Transaction depth")
844                );
845                sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
846            });
847            assert!(result.is_err());
848            assert_eq!(
849                NonZeroU32::new(1),
850                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
851                    conn
852            ).transaction_depth().expect("Transaction depth")
853            );
854            let query_result = sql_query("SELECT 1").execute(conn);
855            assert!(query_result.is_ok());
856            assert_eq!(
857                PgTransactionStatus::InTransaction,
858                conn.connection_and_transaction_manager.raw_connection.transaction_status()
859            );
860            query_result
861        });
862        assert!(result.is_ok());
863        assert_eq!(
864            PgTransactionStatus::Idle,
865            conn.connection_and_transaction_manager
866                .raw_connection
867                .transaction_status()
868        );
869        assert_eq!(
870            None,
871            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
872                conn
873            ).transaction_depth().expect("Transaction depth")
874        );
875    }
876
877    #[diesel_test_helper::test]
878    // This function uses collect with an side effect (spawning threads)
879    // so this is a false positive from clippy
880    #[allow(clippy::needless_collect)]
881    fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
882        use crate::pg::connection::raw::PgTransactionStatus;
883        use crate::result::DatabaseErrorKind::SerializationFailure;
884        use crate::result::Error::DatabaseError;
885        use crate::*;
886        use std::sync::{Arc, Barrier};
887        use std::thread;
888
889        table! {
890            #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
891            serialization_example {
892                id -> Serial,
893                class -> Integer,
894            }
895        }
896
897        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
898
899        sql_query(
900            "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
901        )
902        .execute(conn)
903        .unwrap();
904        sql_query(
905            r#"
906            CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
907                id SERIAL PRIMARY KEY,
908                class INTEGER NOT NULL
909            )
910        "#,
911        )
912        .execute(conn)
913        .unwrap();
914
915        insert_into(serialization_example::table)
916            .values(&vec![
917                serialization_example::class.eq(1),
918                serialization_example::class.eq(2),
919            ])
920            .execute(conn)
921            .unwrap();
922
923        let before_barrier = Arc::new(Barrier::new(2));
924        let after_barrier = Arc::new(Barrier::new(2));
925        let threads = (1..3)
926            .map(|i| {
927                let before_barrier = before_barrier.clone();
928                let after_barrier = after_barrier.clone();
929                thread::spawn(move || {
930                    use crate::connection::AnsiTransactionManager;
931                    use crate::connection::TransactionManager;
932                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
933                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
934
935                    let result = conn.build_transaction().serializable().run(|conn| {
936                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
937
938                        let _ = serialization_example::table
939                            .filter(serialization_example::class.eq(i))
940                            .count()
941                            .execute(conn)?;
942
943                        let other_i = if i == 1 { 2 } else { 1 };
944                        let q = insert_into(serialization_example::table)
945                            .values(serialization_example::class.eq(other_i));
946                        before_barrier.wait();
947
948                        let r = q.execute(conn);
949                        after_barrier.wait();
950                        r
951                    });
952                    assert_eq!(
953                        PgTransactionStatus::Idle,
954                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
955                    );
956
957                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
958                    result
959                })
960            })
961            .collect::<Vec<_>>();
962
963        let mut results = threads
964            .into_iter()
965            .map(|t| t.join().unwrap())
966            .collect::<Vec<_>>();
967
968        results.sort_by_key(|r| r.is_err());
969
970        assert!(results[0].is_ok(), "Got {:?} instead", results);
971        assert!(
972            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
973            "Got {:?} instead",
974            results
975        );
976        assert_eq!(
977            PgTransactionStatus::Idle,
978            conn.connection_and_transaction_manager
979                .raw_connection
980                .transaction_status()
981        );
982    }
983
984    #[diesel_test_helper::test]
985    // This function uses collect with an side effect (spawning threads)
986    // so this is a false positive from clippy
987    #[allow(clippy::needless_collect)]
988    fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
989        use crate::pg::connection::raw::PgTransactionStatus;
990        use crate::result::DatabaseErrorKind::SerializationFailure;
991        use crate::result::Error::DatabaseError;
992        use crate::*;
993        use std::sync::{Arc, Barrier};
994        use std::thread;
995
996        table! {
997            #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
998            serialization_example {
999                id -> Serial,
1000                class -> Integer,
1001            }
1002        }
1003
1004        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1005
1006        sql_query(
1007            "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1008        )
1009        .execute(conn)
1010        .unwrap();
1011        sql_query(
1012            r#"
1013            CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1014                id SERIAL PRIMARY KEY,
1015                class INTEGER NOT NULL
1016            )
1017        "#,
1018        )
1019        .execute(conn)
1020        .unwrap();
1021
1022        insert_into(serialization_example::table)
1023            .values(&vec![
1024                serialization_example::class.eq(1),
1025                serialization_example::class.eq(2),
1026            ])
1027            .execute(conn)
1028            .unwrap();
1029
1030        let before_barrier = Arc::new(Barrier::new(2));
1031        let after_barrier = Arc::new(Barrier::new(2));
1032        let threads = (1..3)
1033            .map(|i| {
1034                let before_barrier = before_barrier.clone();
1035                let after_barrier = after_barrier.clone();
1036                thread::spawn(move || {
1037                    use crate::connection::AnsiTransactionManager;
1038                    use crate::connection::TransactionManager;
1039                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1040                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1041
1042                    let result = conn.build_transaction().serializable().run(|conn| {
1043                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1044                        let r = conn.transaction(|conn| {
1045                            assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1046
1047                            let _ = serialization_example::table
1048                                .filter(serialization_example::class.eq(i))
1049                                .count()
1050                                .execute(conn)?;
1051
1052                            let other_i = if i == 1 { 2 } else { 1 };
1053                            let q = insert_into(serialization_example::table)
1054                                .values(serialization_example::class.eq(other_i));
1055                            before_barrier.wait();
1056
1057                            let r = q.execute(conn);
1058                            after_barrier.wait();
1059                            r
1060                        });
1061                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1062                        assert_eq!(
1063                            PgTransactionStatus::InTransaction,
1064                            conn.connection_and_transaction_manager.raw_connection.transaction_status()
1065                        );
1066                        r
1067                    });
1068                    assert_eq!(
1069                        PgTransactionStatus::Idle,
1070                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1071                    );
1072
1073                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1074                    result
1075                })
1076            })
1077            .collect::<Vec<_>>();
1078
1079        let mut results = threads
1080            .into_iter()
1081            .map(|t| t.join().unwrap())
1082            .collect::<Vec<_>>();
1083
1084        results.sort_by_key(|r| r.is_err());
1085
1086        assert!(results[0].is_ok(), "Got {:?} instead", results);
1087        assert!(
1088            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1089            "Got {:?} instead",
1090            results
1091        );
1092        assert_eq!(
1093            PgTransactionStatus::Idle,
1094            conn.connection_and_transaction_manager
1095                .raw_connection
1096                .transaction_status()
1097        );
1098    }
1099
1100    #[diesel_test_helper::test]
1101    fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1102        use crate::connection::{AnsiTransactionManager, TransactionManager};
1103        use crate::pg::connection::raw::PgTransactionStatus;
1104        use crate::result::Error;
1105        use crate::*;
1106
1107        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1108        assert_eq!(
1109            None,
1110            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1111                conn
1112            ).transaction_depth().expect("Transaction depth")
1113        );
1114        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1115            assert_eq!(
1116                NonZeroU32::new(1),
1117                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1118                    conn
1119            ).transaction_depth().expect("Transaction depth")
1120            );
1121            sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1122            sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1123                .execute(conn)?;
1124            sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1125            let result =
1126                sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1127            assert!(result.is_ok());
1128            assert_eq!(
1129                PgTransactionStatus::InTransaction,
1130                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1131            );
1132            Ok(())
1133        });
1134        assert_eq!(
1135            None,
1136            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1137                conn
1138            ).transaction_depth().expect("Transaction depth")
1139        );
1140        assert_eq!(
1141            PgTransactionStatus::Idle,
1142            conn.connection_and_transaction_manager
1143                .raw_connection
1144                .transaction_status()
1145        );
1146        assert!(result.is_err());
1147    }
1148
1149    #[diesel_test_helper::test]
1150    fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1151        use crate::connection::{AnsiTransactionManager, TransactionManager};
1152        use crate::pg::connection::raw::PgTransactionStatus;
1153        use crate::result::Error;
1154        use crate::*;
1155
1156        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1157        assert_eq!(
1158            None,
1159            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1160                conn
1161            ).transaction_depth().expect("Transaction depth")
1162        );
1163        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1164            assert_eq!(
1165                NonZeroU32::new(1),
1166                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1167                    conn
1168            ).transaction_depth().expect("Transaction depth")
1169            );
1170            sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1171            sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1172                .execute(conn)?;
1173            sql_query(
1174                r#"
1175                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1176                        RETURNS trigger
1177                        LANGUAGE plpgsql
1178                        AS $$
1179                    DECLARE
1180                    BEGIN
1181                        IF NEW.value = 42 THEN
1182                            RAISE EXCEPTION 'Transaction kaboom';
1183                        END IF;
1184                    RETURN NEW;
1185
1186                    END;$$;
1187                "#,
1188            )
1189            .execute(conn)?;
1190
1191            sql_query(
1192                r#"
1193                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1194                        AFTER INSERT ON "deferred_trigger_commit"
1195                        DEFERRABLE INITIALLY DEFERRED
1196                        FOR EACH ROW
1197                        EXECUTE PROCEDURE transaction_depth_blow_up()
1198            "#,
1199            )
1200            .execute(conn)?;
1201            let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1202            assert!(result.is_ok());
1203            assert_eq!(
1204                PgTransactionStatus::InTransaction,
1205                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1206            );
1207            Ok(())
1208        });
1209        assert_eq!(
1210            None,
1211            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1212                conn
1213            ).transaction_depth().expect("Transaction depth")
1214        );
1215        assert_eq!(
1216            PgTransactionStatus::Idle,
1217            conn.connection_and_transaction_manager
1218                .raw_connection
1219                .transaction_status()
1220        );
1221        assert!(result.is_err());
1222    }
1223
1224    #[diesel_test_helper::test]
1225    fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1226        use crate::connection::{AnsiTransactionManager, TransactionManager};
1227        use crate::pg::connection::raw::PgTransactionStatus;
1228        use crate::result::Error;
1229        use crate::*;
1230
1231        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1232        assert_eq!(
1233            None,
1234            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1235                conn
1236            ).transaction_depth().expect("Transaction depth")
1237        );
1238        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1239            assert_eq!(
1240                NonZeroU32::new(1),
1241                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1242                    conn
1243            ).transaction_depth().expect("Transaction depth")
1244            );
1245            sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1246            sql_query(
1247                "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1248            )
1249            .execute(conn)?;
1250            sql_query(
1251                r#"
1252                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1253                        RETURNS trigger
1254                        LANGUAGE plpgsql
1255                        AS $$
1256                    DECLARE
1257                    BEGIN
1258                        IF NEW.value = 42 THEN
1259                            RAISE EXCEPTION 'Transaction kaboom';
1260                        END IF;
1261                    RETURN NEW;
1262
1263                    END;$$;
1264                "#,
1265            )
1266            .execute(conn)?;
1267
1268            sql_query(
1269                r#"
1270                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1271                        AFTER INSERT ON "deferred_trigger_nested_commit"
1272                        DEFERRABLE INITIALLY DEFERRED
1273                        FOR EACH ROW
1274                        EXECUTE PROCEDURE transaction_depth_blow_up()
1275            "#,
1276            )
1277            .execute(conn)?;
1278            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1279                let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1280                    .execute(conn);
1281                assert!(result.is_ok());
1282                Ok(())
1283            });
1284            assert!(inner_result.is_err());
1285            assert_eq!(
1286                PgTransactionStatus::InTransaction,
1287                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1288            );
1289            Ok(())
1290        });
1291        assert_eq!(
1292            None,
1293            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1294                conn
1295            ).transaction_depth().expect("Transaction depth")
1296        );
1297        assert_eq!(
1298            PgTransactionStatus::Idle,
1299            conn.connection_and_transaction_manager
1300                .raw_connection
1301                .transaction_status()
1302        );
1303        assert!(result.is_ok(), "Expected success, got {:?}", result);
1304    }
1305
1306    #[diesel_test_helper::test]
1307    fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1308        use crate::connection::{AnsiTransactionManager, TransactionManager};
1309        use crate::pg::connection::raw::PgTransactionStatus;
1310        use crate::result::Error;
1311        use crate::*;
1312
1313        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1314        assert_eq!(
1315            None,
1316            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1317                conn
1318            ).transaction_depth().expect("Transaction depth")
1319        );
1320        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1321            assert_eq!(
1322                NonZeroU32::new(1),
1323                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1324                    conn
1325            ).transaction_depth().expect("Transaction depth")
1326            );
1327            sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1328            sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1329            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1330                assert_eq!(
1331                    NonZeroU32::new(2),
1332                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1333                        conn
1334                    ).transaction_depth().expect("Transaction depth")
1335                );
1336                sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1337                let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1338                assert!(result.is_ok());
1339                Ok(())
1340            });
1341            assert!(inner_result.is_err());
1342            assert_eq!(
1343                PgTransactionStatus::InTransaction,
1344                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1345            );
1346            assert_eq!(
1347                NonZeroU32::new(1),
1348                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1349                    conn
1350            ).transaction_depth().expect("Transaction depth")
1351            );
1352            sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1353        });
1354        assert_eq!(
1355            None,
1356            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1357                conn
1358            ).transaction_depth().expect("Transaction depth")
1359        );
1360        assert_eq!(
1361            PgTransactionStatus::Idle,
1362            conn.connection_and_transaction_manager
1363                .raw_connection
1364                .transaction_status()
1365        );
1366        assert!(result.is_ok());
1367    }
1368}