diesel/pg/connection/
mod.rs

1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::ConnectionAndTransactionManager;
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::connection::instrumentation::{
14    DebugQuery, DynInstrumentation, Instrumentation, StrQueryHelper,
15};
16use crate::connection::statement_cache::{MaybeCached, StatementCache};
17use crate::connection::*;
18use crate::expression::QueryMetadata;
19use crate::pg::backend::PgNotification;
20use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
21use crate::pg::query_builder::copy::InternalCopyFromQuery;
22use crate::pg::{Pg, TransactionBuilder};
23use crate::query_builder::bind_collector::RawBytesBindCollector;
24use crate::query_builder::*;
25use crate::result::ConnectionError::CouldntSetupConfiguration;
26use crate::result::*;
27use crate::RunQueryDsl;
28use std::ffi::CString;
29use std::fmt::Debug;
30use std::os::raw as libc;
31
32use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
33
34pub(super) use self::result::PgResult;
35
36/// The connection string expected by `PgConnection::establish`
37/// should be a PostgreSQL connection string, as documented at
38/// <https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-CONNSTRING>
39///
40/// # Supported loading model implementations
41///
42/// * [`DefaultLoadingMode`]
43/// * [`PgRowByRowLoadingMode`]
44///
45/// If you are unsure which loading mode is the correct one for your application,
46/// you likely want to use the `DefaultLoadingMode` as that one offers
47/// generally better performance.
48///
49/// Due to the fact that `PgConnection` supports multiple loading modes
50/// it is **required** to always specify the used loading mode
51/// when calling [`RunQueryDsl::load_iter`]
52///
53/// ## `DefaultLoadingMode`
54///
55/// By using this mode `PgConnection` defaults to loading all response values at **once**
56/// and only performs deserialization afterward for the `DefaultLoadingMode`.
57/// Generally this mode will be more performant as it.
58///
59/// This loading mode allows users to perform hold more than one iterator at once using
60/// the same connection:
61/// ```rust
62/// # include!("../../doctest_setup.rs");
63/// #
64/// # fn main() {
65/// #     run_test().unwrap();
66/// # }
67/// #
68/// # fn run_test() -> QueryResult<()> {
69/// #     use schema::users;
70/// #     let connection = &mut establish_connection();
71/// use diesel::connection::DefaultLoadingMode;
72///
73/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
74/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
75///
76/// for r in iter1 {
77///     let (id, name) = r?;
78///     println!("Id: {} Name: {}", id, name);
79/// }
80///
81/// for r in iter2 {
82///     let (id, name) = r?;
83///     println!("Id: {} Name: {}", id, name);
84/// }
85/// #   Ok(())
86/// # }
87/// ```
88///
89/// ## `PgRowByRowLoadingMode`
90///
91/// By using this mode `PgConnection` defaults to loading each row of the result set
92/// separately. This might be desired for huge result sets.
93///
94/// This loading mode **prevents** creating more than one iterator at once using
95/// the same connection. The following code is **not** allowed:
96///
97/// ```compile_fail
98/// # include!("../../doctest_setup.rs");
99/// #
100/// # fn main() {
101/// #     run_test().unwrap();
102/// # }
103/// #
104/// # fn run_test() -> QueryResult<()> {
105/// #     use schema::users;
106/// #     let connection = &mut establish_connection();
107/// use diesel::pg::PgRowByRowLoadingMode;
108///
109/// let iter1 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
110/// // creating a second iterator generates an compiler error
111/// let iter2 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
112///
113/// for r in iter1 {
114///     let (id, name) = r?;
115///     println!("Id: {} Name: {}", id, name);
116/// }
117///
118/// for r in iter2 {
119///     let (id, name) = r?;
120///     println!("Id: {} Name: {}", id, name);
121/// }
122/// #   Ok(())
123/// # }
124/// ```
125#[allow(missing_debug_implementations)]
126#[cfg(feature = "postgres")]
127pub struct PgConnection {
128    statement_cache: StatementCache<Pg, Statement>,
129    metadata_cache: PgMetadataCache,
130    connection_and_transaction_manager: ConnectionAndTransactionManager,
131}
132
133// according to libpq documentation a connection can be transferred to other threads
134#[allow(unsafe_code)]
135unsafe impl Send for PgConnection {}
136
137impl SimpleConnection for PgConnection {
138    #[allow(unsafe_code)] // use of unsafe function
139    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
140        self.connection_and_transaction_manager
141            .instrumentation
142            .on_connection_event(InstrumentationEvent::StartQuery {
143                query: &StrQueryHelper::new(query),
144            });
145        let c_query = CString::new(query)?;
146        let inner_result = unsafe {
147            self.connection_and_transaction_manager
148                .raw_connection
149                .exec(c_query.as_ptr())
150        };
151        update_transaction_manager_status(
152            inner_result.and_then(|raw_result| {
153                PgResult::new(
154                    raw_result,
155                    &self.connection_and_transaction_manager.raw_connection,
156                )
157            }),
158            &mut self.connection_and_transaction_manager,
159            &StrQueryHelper::new(query),
160            true,
161        )?;
162        Ok(())
163    }
164}
165
166/// A [`PgConnection`] specific loading mode to load rows one by one
167///
168/// See the documentation of [`PgConnection`] for details
169#[derive(Debug, Copy, Clone)]
170pub struct PgRowByRowLoadingMode;
171
172impl ConnectionSealed for PgConnection {}
173
174impl Connection for PgConnection {
175    type Backend = Pg;
176    type TransactionManager = AnsiTransactionManager;
177
178    fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
179        let mut instrumentation = DynInstrumentation::default_instrumentation();
180        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
181            url: database_url,
182        });
183        let r = RawConnection::establish(database_url).and_then(|raw_conn| {
184            let mut conn = PgConnection {
185                connection_and_transaction_manager: ConnectionAndTransactionManager {
186                    raw_connection: raw_conn,
187                    transaction_state: AnsiTransactionManager::default(),
188                    instrumentation: DynInstrumentation::none(),
189                },
190                statement_cache: StatementCache::new(),
191                metadata_cache: PgMetadataCache::new(),
192            };
193            conn.set_config_options()
194                .map_err(CouldntSetupConfiguration)?;
195            Ok(conn)
196        });
197        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
198            url: database_url,
199            error: r.as_ref().err(),
200        });
201        let mut conn = r?;
202        conn.connection_and_transaction_manager.instrumentation = instrumentation;
203        Ok(conn)
204    }
205
206    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
207    where
208        T: QueryFragment<Pg> + QueryId,
209    {
210        update_transaction_manager_status(
211            self.with_prepared_query(source, true, |query, params, conn, _source| {
212                let res = query
213                    .execute(&mut conn.raw_connection, &params, false)
214                    .map(|r| r.rows_affected());
215                // according to https://www.postgresql.org/docs/current/libpq-async.html
216                // `PQgetResult` needs to be called till a null pointer is returned
217                while conn.raw_connection.get_next_result()?.is_some() {}
218                res
219            }),
220            &mut self.connection_and_transaction_manager,
221            &crate::debug_query(source),
222            true,
223        )
224    }
225
226    fn transaction_state(&mut self) -> &mut AnsiTransactionManager
227    where
228        Self: Sized,
229    {
230        &mut self.connection_and_transaction_manager.transaction_state
231    }
232
233    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
234        &mut *self.connection_and_transaction_manager.instrumentation
235    }
236
237    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
238        self.connection_and_transaction_manager.instrumentation = instrumentation.into();
239    }
240
241    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
242        self.statement_cache.set_cache_size(size);
243    }
244}
245
246impl<B> LoadConnection<B> for PgConnection
247where
248    Self: self::private::PgLoadingMode<B>,
249{
250    type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
251    type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
252
253    fn load<'conn, 'query, T>(
254        &'conn mut self,
255        source: T,
256    ) -> QueryResult<Self::Cursor<'conn, 'query>>
257    where
258        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
259        Self::Backend: QueryMetadata<T::SqlType>,
260    {
261        self.with_prepared_query(source, false, |stmt, params, conn, source| {
262            use self::private::PgLoadingMode;
263            let result = stmt.execute(&mut conn.raw_connection, &params, Self::USE_ROW_BY_ROW_MODE);
264            let result = update_transaction_manager_status(
265                result,
266                conn,
267                &crate::debug_query(&source),
268                false,
269            )?;
270            Self::get_cursor(conn, result, source)
271        })
272    }
273}
274
275impl GetPgMetadataCache for PgConnection {
276    fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
277        &mut self.metadata_cache
278    }
279}
280
281#[inline(always)]
282fn update_transaction_manager_status<T>(
283    query_result: QueryResult<T>,
284    conn: &mut ConnectionAndTransactionManager,
285    source: &dyn DebugQuery,
286    final_call: bool,
287) -> QueryResult<T> {
288    /// avoid monomorphizing for every result type - this part will not be inlined
289    fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
290        let raw_conn: &mut RawConnection = &mut conn.raw_connection;
291        let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
292        // libpq keeps track of the transaction status internally, and that is accessible
293        // via `transaction_status`. We can use that to update the AnsiTransactionManager
294        // status
295        match raw_conn.transaction_status() {
296            PgTransactionStatus::InError => {
297                tm.status.set_requires_rollback_maybe_up_to_top_level(true)
298            }
299            PgTransactionStatus::Unknown => tm.status.set_in_error(),
300            PgTransactionStatus::Idle => {
301                // This is useful in particular for commit attempts (even
302                // if `COMMIT` errors it still exits transaction)
303
304                // This may repair the transaction manager
305                tm.status = TransactionManagerStatus::Valid(Default::default())
306            }
307            PgTransactionStatus::InTransaction => {
308                let transaction_status = &mut tm.status;
309                // If we weren't an error, it is possible that we were a transaction start
310                // -> we should tolerate any state
311                if is_err {
312                    // An error may not have us enter a transaction, so if we weren't in one
313                    // we may not be in one now
314                    if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
315                    {
316                        // -> transaction manager is broken
317                        transaction_status.set_in_error()
318                    }
319                } else {
320                    // If transaction was InError before, but now it's not (because we attempted
321                    // a rollback), we may pretend it's fixed because
322                    // if it isn't Postgres *will* tell us again.
323
324                    // Fun fact: if we have not received an `InTransaction` status however,
325                    // postgres will *not* warn us that transaction is broken when attempting to
326                    // commit, so we may think that commit has succeeded but in fact it hasn't.
327                    tm.status.set_requires_rollback_maybe_up_to_top_level(false)
328                }
329            }
330            PgTransactionStatus::Active => {
331                // This is a transient state for libpq - nothing we can deduce here.
332            }
333        }
334    }
335    non_generic_inner(conn, query_result.is_err());
336    if let Err(ref e) = query_result {
337        conn.instrumentation
338            .on_connection_event(InstrumentationEvent::FinishQuery {
339                query: source,
340                error: Some(e),
341            });
342    } else if final_call {
343        conn.instrumentation
344            .on_connection_event(InstrumentationEvent::FinishQuery {
345                query: source,
346                error: None,
347            });
348    }
349    query_result
350}
351
352#[cfg(feature = "r2d2")]
353impl crate::r2d2::R2D2Connection for PgConnection {
354    fn ping(&mut self) -> QueryResult<()> {
355        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
356    }
357
358    fn is_broken(&mut self) -> bool {
359        AnsiTransactionManager::is_broken_transaction_manager(self)
360    }
361}
362
363impl MultiConnectionHelper for PgConnection {
364    fn to_any<'a>(
365        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
366    ) -> &mut (dyn std::any::Any + 'a) {
367        lookup.as_any()
368    }
369
370    fn from_any(
371        lookup: &mut dyn std::any::Any,
372    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
373        lookup
374            .downcast_mut::<Self>()
375            .map(|conn| conn as &mut dyn super::PgMetadataLookup)
376    }
377}
378
379impl PgConnection {
380    /// Build a transaction, specifying additional details such as isolation level
381    ///
382    /// See [`TransactionBuilder`] for more examples.
383    ///
384    /// [`TransactionBuilder`]: crate::pg::TransactionBuilder
385    ///
386    /// ```rust
387    /// # include!("../../doctest_setup.rs");
388    /// #
389    /// # fn main() {
390    /// #     run_test().unwrap();
391    /// # }
392    /// #
393    /// # fn run_test() -> QueryResult<()> {
394    /// #     use schema::users::dsl::*;
395    /// #     let conn = &mut connection_no_transaction();
396    /// conn.build_transaction()
397    ///     .read_only()
398    ///     .serializable()
399    ///     .deferrable()
400    ///     .run(|conn| Ok(()))
401    /// # }
402    /// ```
403    pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
404        TransactionBuilder::new(self)
405    }
406
407    pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
408    where
409        S: CopyFromExpression<T>,
410    {
411        let query = InternalCopyFromQuery::new(target);
412        let res = self.with_prepared_query(query, false, |stmt, binds, conn, mut source| {
413            fn inner_copy_in<S, T>(
414                stmt: MaybeCached<'_, Statement>,
415                conn: &mut ConnectionAndTransactionManager,
416                binds: Vec<Option<Vec<u8>>>,
417                source: &mut InternalCopyFromQuery<S, T>,
418            ) -> Result<usize, S::Error>
419            where
420                S: CopyFromExpression<T>,
421            {
422                let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
423                let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
424                let r = source.target.callback(&mut copy_in);
425                copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
426                let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
427                    crate::result::Error::DeserializationError(
428                        "Failed to receive result from the database".into(),
429                    )
430                })?;
431                let rows = next_res.rows_affected();
432                while let Some(_r) = conn.raw_connection.get_next_result()? {}
433                r?;
434                Ok(rows)
435            }
436
437            let rows = inner_copy_in(stmt, conn, binds, &mut source);
438            if let Err(ref e) = rows {
439                let database_error = crate::result::Error::DatabaseError(
440                    crate::result::DatabaseErrorKind::Unknown,
441                    Box::new(e.to_string()),
442                );
443                conn.instrumentation
444                    .on_connection_event(InstrumentationEvent::FinishQuery {
445                        query: &crate::debug_query(&source),
446                        error: Some(&database_error),
447                    });
448            } else {
449                conn.instrumentation
450                    .on_connection_event(InstrumentationEvent::FinishQuery {
451                        query: &crate::debug_query(&source),
452                        error: None,
453                    });
454            }
455
456            rows
457        })?;
458
459        Ok(res)
460    }
461
462    pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
463    where
464        T: CopyTarget,
465    {
466        let res = self.with_prepared_query::<_, _, Error>(
467            command,
468            false,
469            |stmt, binds, conn, source| {
470                let res = stmt.execute(&mut conn.raw_connection, &binds, false);
471                conn.instrumentation
472                    .on_connection_event(InstrumentationEvent::FinishQuery {
473                        query: &crate::debug_query(&source),
474                        error: res.as_ref().err(),
475                    });
476                Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
477            },
478        )?;
479        Ok(res)
480    }
481
482    fn with_prepared_query<'conn, T, R, E>(
483        &'conn mut self,
484        source: T,
485        execute_returning_count: bool,
486        f: impl FnOnce(
487            MaybeCached<'_, Statement>,
488            Vec<Option<Vec<u8>>>,
489            &'conn mut ConnectionAndTransactionManager,
490            T,
491        ) -> Result<R, E>,
492    ) -> Result<R, E>
493    where
494        T: QueryFragment<Pg> + QueryId,
495        E: From<crate::result::Error>,
496    {
497        self.connection_and_transaction_manager
498            .instrumentation
499            .on_connection_event(InstrumentationEvent::StartQuery {
500                query: &crate::debug_query(&source),
501            });
502        let mut bind_collector = RawBytesBindCollector::<Pg>::new();
503        source.collect_binds(&mut bind_collector, self, &Pg)?;
504        let binds = bind_collector.binds;
505        let metadata = bind_collector.metadata;
506
507        let cache = &mut self.statement_cache;
508        let conn = &mut self.connection_and_transaction_manager.raw_connection;
509        let query = cache.cached_statement(
510            &source,
511            &Pg,
512            &metadata,
513            conn,
514            Statement::prepare,
515            &mut *self.connection_and_transaction_manager.instrumentation,
516        );
517        if !execute_returning_count {
518            if let Err(ref e) = query {
519                self.connection_and_transaction_manager
520                    .instrumentation
521                    .on_connection_event(InstrumentationEvent::FinishQuery {
522                        query: &crate::debug_query(&source),
523                        error: Some(e),
524                    });
525            }
526        }
527
528        f(
529            query?,
530            binds,
531            &mut self.connection_and_transaction_manager,
532            source,
533        )
534    }
535
536    fn set_config_options(&mut self) -> QueryResult<()> {
537        crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
538        crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
539        self.connection_and_transaction_manager
540            .raw_connection
541            .set_notice_processor(noop_notice_processor);
542        Ok(())
543    }
544
545    /// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][]
546    ///
547    /// The returned iterator can yield items even after a None value when new notifications have been received.
548    /// The iterator can be polled again after a `None` value was received as new notifications might have
549    /// been send in the mean time.
550    ///
551    /// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html
552    /// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html
553    ///
554    /// ## Example
555    ///
556    /// ```
557    /// # include!("../../doctest_setup.rs");
558    /// #
559    /// # fn main() {
560    /// #     run_test().unwrap();
561    /// # }
562    /// #
563    /// # fn run_test() -> QueryResult<()> {
564    /// #     let connection = &mut establish_connection();
565    ///
566    /// // register the notifications channel we want to receive notifications for
567    /// diesel::sql_query("LISTEN example_channel").execute(connection)?;
568    /// // send some notification
569    /// // this is usually done from a different connection/thread/application
570    /// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(connection)?;
571    ///
572    /// for result in connection.notifications_iter() {
573    ///     let notification = result.unwrap();
574    ///     assert_eq!(notification.channel, "example_channel");
575    ///     assert_eq!(notification.payload, "additional data");
576    ///
577    ///     println!(
578    ///         "Notification received from server process with id {}.",
579    ///         notification.process_id
580    ///    );
581    /// }
582    /// # Ok(())
583    /// # }
584    /// ```
585    pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
586        let conn = &self.connection_and_transaction_manager.raw_connection;
587        std::iter::from_fn(move || conn.pq_notifies().transpose())
588    }
589}
590
591extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
592
593mod private {
594    use super::*;
595
596    #[allow(missing_debug_implementations)]
597    pub struct ConnectionAndTransactionManager {
598        pub(super) raw_connection: RawConnection,
599        pub(super) transaction_state: AnsiTransactionManager,
600        pub(super) instrumentation: DynInstrumentation,
601    }
602
603    pub trait PgLoadingMode<B> {
604        const USE_ROW_BY_ROW_MODE: bool;
605        type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
606        type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
607
608        fn get_cursor<'conn, 'query>(
609            raw_connection: &'conn mut ConnectionAndTransactionManager,
610            result: PgResult,
611            source: impl QueryFragment<Pg> + 'query,
612        ) -> QueryResult<Self::Cursor<'conn, 'query>>;
613    }
614
615    impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
616        const USE_ROW_BY_ROW_MODE: bool = false;
617        type Cursor<'conn, 'query> = Cursor;
618        type Row<'conn, 'query> = self::row::PgRow;
619
620        fn get_cursor<'conn, 'query>(
621            conn: &'conn mut ConnectionAndTransactionManager,
622            result: PgResult,
623            source: impl QueryFragment<Pg> + 'query,
624        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
625            update_transaction_manager_status(
626                Cursor::new(result, &mut conn.raw_connection),
627                conn,
628                &crate::debug_query(&source),
629                true,
630            )
631        }
632    }
633
634    impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
635        const USE_ROW_BY_ROW_MODE: bool = true;
636        type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
637        type Row<'conn, 'query> = self::row::PgRow;
638
639        fn get_cursor<'conn, 'query>(
640            raw_connection: &'conn mut ConnectionAndTransactionManager,
641            result: PgResult,
642            source: impl QueryFragment<Pg> + 'query,
643        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
644            Ok(RowByRowCursor::new(
645                result,
646                raw_connection,
647                Box::new(source),
648            ))
649        }
650    }
651}
652
653#[cfg(test)]
654// that's a false positive for `panic!`/`assert!` on rust 2018
655#[allow(clippy::uninlined_format_args)]
656mod tests {
657    extern crate dotenvy;
658
659    use super::*;
660    use crate::prelude::*;
661    use crate::result::Error::DatabaseError;
662    use std::num::NonZeroU32;
663
664    fn connection() -> PgConnection {
665        crate::test_helpers::pg_connection_no_transaction()
666    }
667
668    #[diesel_test_helper::test]
669    fn notifications_arrive() {
670        use crate::sql_query;
671
672        let conn = &mut connection();
673        sql_query("LISTEN test_notifications")
674            .execute(conn)
675            .unwrap();
676        sql_query("NOTIFY test_notifications, 'first'")
677            .execute(conn)
678            .unwrap();
679        sql_query("NOTIFY test_notifications, 'second'")
680            .execute(conn)
681            .unwrap();
682
683        let notifications = conn
684            .notifications_iter()
685            .map(Result::unwrap)
686            .collect::<Vec<_>>();
687
688        assert_eq!(2, notifications.len());
689        assert_eq!(notifications[0].channel, "test_notifications");
690        assert_eq!(notifications[1].channel, "test_notifications");
691        assert_eq!(notifications[0].payload, "first");
692        assert_eq!(notifications[1].payload, "second");
693
694        let next_notification = conn.notifications_iter().next();
695        assert!(
696            next_notification.is_none(),
697            "Got a next notification, while not expecting one: {next_notification:?}"
698        );
699
700        sql_query("NOTIFY test_notifications")
701            .execute(conn)
702            .unwrap();
703        assert_eq!(
704            conn.notifications_iter().next().unwrap().unwrap().payload,
705            ""
706        );
707    }
708
709    #[diesel_test_helper::test]
710    fn malformed_sql_query() {
711        let connection = &mut connection();
712        let query =
713            crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
714
715        if let Err(DatabaseError(_, string)) = query {
716            assert_eq!(Some(26), string.statement_position());
717        } else {
718            unreachable!();
719        }
720    }
721
722    table! {
723        users {
724            id -> Integer,
725            name -> Text,
726        }
727    }
728
729    #[diesel_test_helper::test]
730    fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
731        use crate::connection::{AnsiTransactionManager, TransactionManager};
732        use crate::result::Error;
733        use crate::PgConnection;
734
735        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
736        assert_eq!(
737            None,
738            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
739                conn
740            ).transaction_depth().expect("Transaction depth")
741        );
742        let result = AnsiTransactionManager::commit_transaction(conn);
743        assert!(matches!(result, Err(Error::NotInTransaction)))
744    }
745
746    #[diesel_test_helper::test]
747    fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
748        use crate::connection::{AnsiTransactionManager, TransactionManager};
749        use crate::result::Error;
750        use crate::PgConnection;
751
752        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
753        assert_eq!(
754            None,
755            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
756                conn
757            ).transaction_depth().expect("Transaction depth")
758        );
759        let result = AnsiTransactionManager::rollback_transaction(conn);
760        assert!(matches!(result, Err(Error::NotInTransaction)))
761    }
762
763    #[diesel_test_helper::test]
764    fn postgres_transaction_is_rolled_back_upon_syntax_error() {
765        use std::num::NonZeroU32;
766
767        use crate::connection::{AnsiTransactionManager, TransactionManager};
768        use crate::pg::connection::raw::PgTransactionStatus;
769        use crate::*;
770        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
771        assert_eq!(
772            None,
773            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
774                conn
775            ).transaction_depth().expect("Transaction depth")
776        );
777        let _result = conn.build_transaction().run(|conn| {
778            assert_eq!(
779                NonZeroU32::new(1),
780                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
781                    conn
782                ).transaction_depth().expect("Transaction depth")
783            );
784            // In Postgres, a syntax error breaks the transaction block
785            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
786            assert!(query_result.is_err());
787            assert_eq!(
788                PgTransactionStatus::InError,
789                conn.connection_and_transaction_manager.raw_connection.transaction_status()
790            );
791            query_result
792        });
793        assert_eq!(
794            None,
795            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
796                conn
797            ).transaction_depth().expect("Transaction depth")
798        );
799        assert_eq!(
800            PgTransactionStatus::Idle,
801            conn.connection_and_transaction_manager
802                .raw_connection
803                .transaction_status()
804        );
805    }
806
807    #[diesel_test_helper::test]
808    fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
809        use std::num::NonZeroU32;
810
811        use crate::connection::{AnsiTransactionManager, TransactionManager};
812        use crate::pg::connection::raw::PgTransactionStatus;
813        use crate::*;
814        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
815        assert_eq!(
816            None,
817            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
818                conn
819            ).transaction_depth().expect("Transaction depth")
820        );
821        let result = conn.build_transaction().run(|conn| {
822            assert_eq!(
823                NonZeroU32::new(1),
824                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
825                    conn
826            ).transaction_depth().expect("Transaction depth")
827            );
828            let result = conn.build_transaction().run(|conn| {
829                assert_eq!(
830                    NonZeroU32::new(2),
831                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
832                        conn
833            ).transaction_depth().expect("Transaction depth")
834                );
835                sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
836            });
837            assert!(result.is_err());
838            assert_eq!(
839                NonZeroU32::new(1),
840                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
841                    conn
842            ).transaction_depth().expect("Transaction depth")
843            );
844            let query_result = sql_query("SELECT 1").execute(conn);
845            assert!(query_result.is_ok());
846            assert_eq!(
847                PgTransactionStatus::InTransaction,
848                conn.connection_and_transaction_manager.raw_connection.transaction_status()
849            );
850            query_result
851        });
852        assert!(result.is_ok());
853        assert_eq!(
854            PgTransactionStatus::Idle,
855            conn.connection_and_transaction_manager
856                .raw_connection
857                .transaction_status()
858        );
859        assert_eq!(
860            None,
861            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
862                conn
863            ).transaction_depth().expect("Transaction depth")
864        );
865    }
866
867    #[diesel_test_helper::test]
868    // This function uses collect with an side effect (spawning threads)
869    // so this is a false positive from clippy
870    #[allow(clippy::needless_collect)]
871    fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
872        use crate::pg::connection::raw::PgTransactionStatus;
873        use crate::result::DatabaseErrorKind::SerializationFailure;
874        use crate::result::Error::DatabaseError;
875        use crate::*;
876        use std::sync::{Arc, Barrier};
877        use std::thread;
878
879        table! {
880            #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
881            serialization_example {
882                id -> Serial,
883                class -> Integer,
884            }
885        }
886
887        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
888
889        sql_query(
890            "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
891        )
892        .execute(conn)
893        .unwrap();
894        sql_query(
895            r#"
896            CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
897                id SERIAL PRIMARY KEY,
898                class INTEGER NOT NULL
899            )
900        "#,
901        )
902        .execute(conn)
903        .unwrap();
904
905        insert_into(serialization_example::table)
906            .values(&vec![
907                serialization_example::class.eq(1),
908                serialization_example::class.eq(2),
909            ])
910            .execute(conn)
911            .unwrap();
912
913        let before_barrier = Arc::new(Barrier::new(2));
914        let after_barrier = Arc::new(Barrier::new(2));
915        let threads = (1..3)
916            .map(|i| {
917                let before_barrier = before_barrier.clone();
918                let after_barrier = after_barrier.clone();
919                thread::spawn(move || {
920                    use crate::connection::AnsiTransactionManager;
921                    use crate::connection::TransactionManager;
922                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
923                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
924
925                    let result = conn.build_transaction().serializable().run(|conn| {
926                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
927
928                        let _ = serialization_example::table
929                            .filter(serialization_example::class.eq(i))
930                            .count()
931                            .execute(conn)?;
932
933                        let other_i = if i == 1 { 2 } else { 1 };
934                        let q = insert_into(serialization_example::table)
935                            .values(serialization_example::class.eq(other_i));
936                        before_barrier.wait();
937
938                        let r = q.execute(conn);
939                        after_barrier.wait();
940                        r
941                    });
942                    assert_eq!(
943                        PgTransactionStatus::Idle,
944                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
945                    );
946
947                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
948                    result
949                })
950            })
951            .collect::<Vec<_>>();
952
953        let mut results = threads
954            .into_iter()
955            .map(|t| t.join().unwrap())
956            .collect::<Vec<_>>();
957
958        results.sort_by_key(|r| r.is_err());
959
960        assert!(results[0].is_ok(), "Got {:?} instead", results);
961        assert!(
962            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
963            "Got {:?} instead",
964            results
965        );
966        assert_eq!(
967            PgTransactionStatus::Idle,
968            conn.connection_and_transaction_manager
969                .raw_connection
970                .transaction_status()
971        );
972    }
973
974    #[diesel_test_helper::test]
975    // This function uses collect with an side effect (spawning threads)
976    // so this is a false positive from clippy
977    #[allow(clippy::needless_collect)]
978    fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
979        use crate::pg::connection::raw::PgTransactionStatus;
980        use crate::result::DatabaseErrorKind::SerializationFailure;
981        use crate::result::Error::DatabaseError;
982        use crate::*;
983        use std::sync::{Arc, Barrier};
984        use std::thread;
985
986        table! {
987            #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
988            serialization_example {
989                id -> Serial,
990                class -> Integer,
991            }
992        }
993
994        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
995
996        sql_query(
997            "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
998        )
999        .execute(conn)
1000        .unwrap();
1001        sql_query(
1002            r#"
1003            CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1004                id SERIAL PRIMARY KEY,
1005                class INTEGER NOT NULL
1006            )
1007        "#,
1008        )
1009        .execute(conn)
1010        .unwrap();
1011
1012        insert_into(serialization_example::table)
1013            .values(&vec![
1014                serialization_example::class.eq(1),
1015                serialization_example::class.eq(2),
1016            ])
1017            .execute(conn)
1018            .unwrap();
1019
1020        let before_barrier = Arc::new(Barrier::new(2));
1021        let after_barrier = Arc::new(Barrier::new(2));
1022        let threads = (1..3)
1023            .map(|i| {
1024                let before_barrier = before_barrier.clone();
1025                let after_barrier = after_barrier.clone();
1026                thread::spawn(move || {
1027                    use crate::connection::AnsiTransactionManager;
1028                    use crate::connection::TransactionManager;
1029                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1030                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1031
1032                    let result = conn.build_transaction().serializable().run(|conn| {
1033                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1034                        let r = conn.transaction(|conn| {
1035                            assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1036
1037                            let _ = serialization_example::table
1038                                .filter(serialization_example::class.eq(i))
1039                                .count()
1040                                .execute(conn)?;
1041
1042                            let other_i = if i == 1 { 2 } else { 1 };
1043                            let q = insert_into(serialization_example::table)
1044                                .values(serialization_example::class.eq(other_i));
1045                            before_barrier.wait();
1046
1047                            let r = q.execute(conn);
1048                            after_barrier.wait();
1049                            r
1050                        });
1051                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1052                        assert_eq!(
1053                            PgTransactionStatus::InTransaction,
1054                            conn.connection_and_transaction_manager.raw_connection.transaction_status()
1055                        );
1056                        r
1057                    });
1058                    assert_eq!(
1059                        PgTransactionStatus::Idle,
1060                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1061                    );
1062
1063                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1064                    result
1065                })
1066            })
1067            .collect::<Vec<_>>();
1068
1069        let mut results = threads
1070            .into_iter()
1071            .map(|t| t.join().unwrap())
1072            .collect::<Vec<_>>();
1073
1074        results.sort_by_key(|r| r.is_err());
1075
1076        assert!(results[0].is_ok(), "Got {:?} instead", results);
1077        assert!(
1078            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1079            "Got {:?} instead",
1080            results
1081        );
1082        assert_eq!(
1083            PgTransactionStatus::Idle,
1084            conn.connection_and_transaction_manager
1085                .raw_connection
1086                .transaction_status()
1087        );
1088    }
1089
1090    #[diesel_test_helper::test]
1091    fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1092        use crate::connection::{AnsiTransactionManager, TransactionManager};
1093        use crate::pg::connection::raw::PgTransactionStatus;
1094        use crate::result::Error;
1095        use crate::*;
1096
1097        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1098        assert_eq!(
1099            None,
1100            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1101                conn
1102            ).transaction_depth().expect("Transaction depth")
1103        );
1104        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1105            assert_eq!(
1106                NonZeroU32::new(1),
1107                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1108                    conn
1109            ).transaction_depth().expect("Transaction depth")
1110            );
1111            sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1112            sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1113                .execute(conn)?;
1114            sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1115            let result =
1116                sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1117            assert!(result.is_ok());
1118            assert_eq!(
1119                PgTransactionStatus::InTransaction,
1120                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1121            );
1122            Ok(())
1123        });
1124        assert_eq!(
1125            None,
1126            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1127                conn
1128            ).transaction_depth().expect("Transaction depth")
1129        );
1130        assert_eq!(
1131            PgTransactionStatus::Idle,
1132            conn.connection_and_transaction_manager
1133                .raw_connection
1134                .transaction_status()
1135        );
1136        assert!(result.is_err());
1137    }
1138
1139    #[diesel_test_helper::test]
1140    fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1141        use crate::connection::{AnsiTransactionManager, TransactionManager};
1142        use crate::pg::connection::raw::PgTransactionStatus;
1143        use crate::result::Error;
1144        use crate::*;
1145
1146        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1147        assert_eq!(
1148            None,
1149            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1150                conn
1151            ).transaction_depth().expect("Transaction depth")
1152        );
1153        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1154            assert_eq!(
1155                NonZeroU32::new(1),
1156                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1157                    conn
1158            ).transaction_depth().expect("Transaction depth")
1159            );
1160            sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1161            sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1162                .execute(conn)?;
1163            sql_query(
1164                r#"
1165                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1166                        RETURNS trigger
1167                        LANGUAGE plpgsql
1168                        AS $$
1169                    DECLARE
1170                    BEGIN
1171                        IF NEW.value = 42 THEN
1172                            RAISE EXCEPTION 'Transaction kaboom';
1173                        END IF;
1174                    RETURN NEW;
1175
1176                    END;$$;
1177                "#,
1178            )
1179            .execute(conn)?;
1180
1181            sql_query(
1182                r#"
1183                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1184                        AFTER INSERT ON "deferred_trigger_commit"
1185                        DEFERRABLE INITIALLY DEFERRED
1186                        FOR EACH ROW
1187                        EXECUTE PROCEDURE transaction_depth_blow_up()
1188            "#,
1189            )
1190            .execute(conn)?;
1191            let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1192            assert!(result.is_ok());
1193            assert_eq!(
1194                PgTransactionStatus::InTransaction,
1195                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1196            );
1197            Ok(())
1198        });
1199        assert_eq!(
1200            None,
1201            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1202                conn
1203            ).transaction_depth().expect("Transaction depth")
1204        );
1205        assert_eq!(
1206            PgTransactionStatus::Idle,
1207            conn.connection_and_transaction_manager
1208                .raw_connection
1209                .transaction_status()
1210        );
1211        assert!(result.is_err());
1212    }
1213
1214    #[diesel_test_helper::test]
1215    fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1216        use crate::connection::{AnsiTransactionManager, TransactionManager};
1217        use crate::pg::connection::raw::PgTransactionStatus;
1218        use crate::result::Error;
1219        use crate::*;
1220
1221        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1222        assert_eq!(
1223            None,
1224            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1225                conn
1226            ).transaction_depth().expect("Transaction depth")
1227        );
1228        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1229            assert_eq!(
1230                NonZeroU32::new(1),
1231                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1232                    conn
1233            ).transaction_depth().expect("Transaction depth")
1234            );
1235            sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1236            sql_query(
1237                "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1238            )
1239            .execute(conn)?;
1240            sql_query(
1241                r#"
1242                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1243                        RETURNS trigger
1244                        LANGUAGE plpgsql
1245                        AS $$
1246                    DECLARE
1247                    BEGIN
1248                        IF NEW.value = 42 THEN
1249                            RAISE EXCEPTION 'Transaction kaboom';
1250                        END IF;
1251                    RETURN NEW;
1252
1253                    END;$$;
1254                "#,
1255            )
1256            .execute(conn)?;
1257
1258            sql_query(
1259                r#"
1260                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1261                        AFTER INSERT ON "deferred_trigger_nested_commit"
1262                        DEFERRABLE INITIALLY DEFERRED
1263                        FOR EACH ROW
1264                        EXECUTE PROCEDURE transaction_depth_blow_up()
1265            "#,
1266            )
1267            .execute(conn)?;
1268            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1269                let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1270                    .execute(conn);
1271                assert!(result.is_ok());
1272                Ok(())
1273            });
1274            assert!(inner_result.is_err());
1275            assert_eq!(
1276                PgTransactionStatus::InTransaction,
1277                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1278            );
1279            Ok(())
1280        });
1281        assert_eq!(
1282            None,
1283            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1284                conn
1285            ).transaction_depth().expect("Transaction depth")
1286        );
1287        assert_eq!(
1288            PgTransactionStatus::Idle,
1289            conn.connection_and_transaction_manager
1290                .raw_connection
1291                .transaction_status()
1292        );
1293        assert!(result.is_ok(), "Expected success, got {:?}", result);
1294    }
1295
1296    #[diesel_test_helper::test]
1297    fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1298        use crate::connection::{AnsiTransactionManager, TransactionManager};
1299        use crate::pg::connection::raw::PgTransactionStatus;
1300        use crate::result::Error;
1301        use crate::*;
1302
1303        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1304        assert_eq!(
1305            None,
1306            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1307                conn
1308            ).transaction_depth().expect("Transaction depth")
1309        );
1310        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1311            assert_eq!(
1312                NonZeroU32::new(1),
1313                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1314                    conn
1315            ).transaction_depth().expect("Transaction depth")
1316            );
1317            sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1318            sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1319            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1320                assert_eq!(
1321                    NonZeroU32::new(2),
1322                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1323                        conn
1324                    ).transaction_depth().expect("Transaction depth")
1325                );
1326                sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1327                let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1328                assert!(result.is_ok());
1329                Ok(())
1330            });
1331            assert!(inner_result.is_err());
1332            assert_eq!(
1333                PgTransactionStatus::InTransaction,
1334                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1335            );
1336            assert_eq!(
1337                NonZeroU32::new(1),
1338                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1339                    conn
1340            ).transaction_depth().expect("Transaction depth")
1341            );
1342            sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1343        });
1344        assert_eq!(
1345            None,
1346            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1347                conn
1348            ).transaction_depth().expect("Transaction depth")
1349        );
1350        assert_eq!(
1351            PgTransactionStatus::Idle,
1352            conn.connection_and_transaction_manager
1353                .raw_connection
1354                .transaction_status()
1355        );
1356        assert!(result.is_ok());
1357    }
1358}