Skip to main content

diesel/pg/connection/
mod.rs

1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::{ConnectionAndTransactionManager, CopyFromWrapper, QueryFragmentHelper};
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::RunQueryDsl;
14use crate::connection::instrumentation::{DynInstrumentation, Instrumentation, StrQueryHelper};
15use crate::connection::statement_cache::{MaybeCached, StatementCache};
16use crate::connection::*;
17use crate::expression::QueryMetadata;
18use crate::pg::backend::PgNotification;
19use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
20use crate::pg::query_builder::copy::InternalCopyFromQuery;
21use crate::pg::{Pg, TransactionBuilder};
22use crate::query_builder::bind_collector::RawBytesBindCollector;
23use crate::query_builder::*;
24use crate::result::ConnectionError::CouldntSetupConfiguration;
25use crate::result::*;
26use alloc::ffi::CString;
27use core::ffi as libc;
28use core::fmt::Debug;
29
30use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
31
32pub(super) use self::result::PgResult;
33
34/// The connection string expected by `PgConnection::establish`
35/// should be a PostgreSQL connection string, as documented at
36/// <https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-CONNSTRING>
37///
38/// # Supported loading model implementations
39///
40/// * [`DefaultLoadingMode`]
41/// * [`PgRowByRowLoadingMode`]
42///
43/// If you are unsure which loading mode is the correct one for your application,
44/// you likely want to use the `DefaultLoadingMode` as it's simpler and more intuitive.
45/// However, if you plan to process each row on its own, you should use the `PgRowByRowLoadingMode` and
46/// you can also expect a performance boost.
47///
48/// Due to the fact that `PgConnection` supports multiple loading modes
49/// it is **required** to always specify the used loading mode
50/// when calling [`RunQueryDsl::load_iter`]
51///
52/// ## `DefaultLoadingMode`
53///
54/// By using this mode `PgConnection` defaults to loading all response values at **once**
55/// and only performs deserialization afterward for the `DefaultLoadingMode`.
56///
57/// This loading mode allows users to perform hold more than one iterator at once using
58/// the same connection:
59/// ```rust
60/// # include!("../../doctest_setup.rs");
61/// #
62/// # fn main() {
63/// #     run_test().unwrap();
64/// # }
65/// #
66/// # fn run_test() -> QueryResult<()> {
67/// #     use schema::users;
68/// #     let connection = &mut establish_connection();
69/// use diesel::connection::DefaultLoadingMode;
70///
71/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
72/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
73///
74/// for r in iter1 {
75///     let (id, name) = r?;
76///     println!("Id: {} Name: {}", id, name);
77/// }
78///
79/// for r in iter2 {
80///     let (id, name) = r?;
81///     println!("Id: {} Name: {}", id, name);
82/// }
83/// #   Ok(())
84/// # }
85/// ```
86///
87/// ## `PgRowByRowLoadingMode`
88///
89/// By using this mode `PgConnection` defaults to loading each row of the result set
90/// separately. This might be desired for huge result sets.
91///
92/// This loading mode **prevents** creating more than one iterator at once using
93/// the same connection. The following code is **not** allowed:
94///
95/// ```compile_fail
96/// # include!("../../doctest_setup.rs");
97/// #
98/// # fn main() {
99/// #     run_test().unwrap();
100/// # }
101/// #
102/// # fn run_test() -> QueryResult<()> {
103/// #     use schema::users;
104/// #     let connection = &mut establish_connection();
105/// use diesel::pg::PgRowByRowLoadingMode;
106///
107/// let iter1 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
108/// // creating a second iterator generates an compiler error
109/// let iter2 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
110///
111/// for r in iter1 {
112///     let (id, name) = r?;
113///     println!("Id: {} Name: {}", id, name);
114/// }
115///
116/// for r in iter2 {
117///     let (id, name) = r?;
118///     println!("Id: {} Name: {}", id, name);
119/// }
120/// #   Ok(())
121/// # }
122/// ```
123#[allow(missing_debug_implementations)]
124#[cfg(feature = "postgres")]
125pub struct PgConnection {
126    statement_cache: StatementCache<Pg, Statement>,
127    metadata_cache: PgMetadataCache,
128    connection_and_transaction_manager: ConnectionAndTransactionManager,
129}
130
131// according to libpq documentation a connection can be transferred to other threads
132#[allow(unsafe_code)]
133unsafe impl Send for PgConnection {}
134
135impl SimpleConnection for PgConnection {
136    #[allow(unsafe_code)] // use of unsafe function
137    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
138        self.connection_and_transaction_manager
139            .instrumentation
140            .on_connection_event(InstrumentationEvent::StartQuery {
141                query: &StrQueryHelper::new(query),
142            });
143        let c_query = CString::new(query)?;
144        let inner_result = unsafe {
145            self.connection_and_transaction_manager
146                .raw_connection
147                .exec(c_query.as_ptr())
148        };
149        update_transaction_manager_status(
150            inner_result.and_then(|raw_result| {
151                PgResult::new(
152                    raw_result,
153                    &self.connection_and_transaction_manager.raw_connection,
154                )
155            }),
156            &mut self.connection_and_transaction_manager,
157            &|callback| callback(&StrQueryHelper::new(query)),
158            true,
159        )?;
160        Ok(())
161    }
162}
163
164/// A [`PgConnection`] specific loading mode to load rows one by one
165///
166/// See the documentation of [`PgConnection`] for details
167#[derive(#[automatically_derived]
impl ::core::fmt::Debug for PgRowByRowLoadingMode {
    #[inline]
    fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
        ::core::fmt::Formatter::write_str(f, "PgRowByRowLoadingMode")
    }
}Debug, #[automatically_derived]
impl ::core::marker::Copy for PgRowByRowLoadingMode { }Copy, #[automatically_derived]
impl ::core::clone::Clone for PgRowByRowLoadingMode {
    #[inline]
    fn clone(&self) -> PgRowByRowLoadingMode { *self }
}Clone)]
168pub struct PgRowByRowLoadingMode;
169
170impl ConnectionSealed for PgConnection {}
171
172impl Connection for PgConnection {
173    type Backend = Pg;
174    type TransactionManager = AnsiTransactionManager;
175
176    fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
177        let mut instrumentation = DynInstrumentation::default_instrumentation();
178        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
179            url: database_url,
180        });
181        let r = RawConnection::establish(database_url).and_then(|raw_conn| {
182            let mut conn = PgConnection {
183                connection_and_transaction_manager: ConnectionAndTransactionManager {
184                    raw_connection: raw_conn,
185                    transaction_state: AnsiTransactionManager::default(),
186                    instrumentation: DynInstrumentation::none(),
187                },
188                statement_cache: StatementCache::new(),
189                metadata_cache: PgMetadataCache::new(),
190            };
191            conn.set_config_options()
192                .map_err(CouldntSetupConfiguration)?;
193            Ok(conn)
194        });
195        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
196            url: database_url,
197            error: r.as_ref().err(),
198        });
199        let mut conn = r?;
200        conn.connection_and_transaction_manager.instrumentation = instrumentation;
201        Ok(conn)
202    }
203
204    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
205    where
206        T: QueryFragment<Pg> + QueryId,
207    {
208        update_transaction_manager_status(
209            self.with_prepared_query(
210                Box::new(source),
211                true,
212                &mut |query, params, conn, _source| {
213                    let res = query
214                        .execute(&mut conn.raw_connection, &params, false)
215                        .map(|r| r.rows_affected());
216                    // according to https://www.postgresql.org/docs/current/libpq-async.html
217                    // `PQgetResult` needs to be called till a null pointer is returned
218                    while conn.raw_connection.get_next_result()?.is_some() {}
219                    res
220                },
221            ),
222            &mut self.connection_and_transaction_manager,
223            &|callback| source.instrumentation(callback),
224            true,
225        )
226    }
227
228    fn transaction_state(&mut self) -> &mut AnsiTransactionManager
229    where
230        Self: Sized,
231    {
232        &mut self.connection_and_transaction_manager.transaction_state
233    }
234
235    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
236        &mut *self.connection_and_transaction_manager.instrumentation
237    }
238
239    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
240        self.connection_and_transaction_manager.instrumentation = instrumentation.into();
241    }
242
243    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
244        self.statement_cache.set_cache_size(size);
245    }
246}
247
248impl<B> LoadConnection<B> for PgConnection
249where
250    Self: self::private::PgLoadingMode<B>,
251{
252    type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
253    type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
254
255    fn load<'conn, 'query, T>(
256        &'conn mut self,
257        source: T,
258    ) -> QueryResult<Self::Cursor<'conn, 'query>>
259    where
260        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
261        Self::Backend: QueryMetadata<T::SqlType>,
262    {
263        self.with_prepared_query(
264            Box::new(source),
265            false,
266            &mut |stmt, params, conn, source| {
267                use self::private::PgLoadingMode;
268                let result = inner_load(stmt, params, conn, &*source, Self::USE_ROW_BY_ROW_MODE)?;
269                Self::get_cursor(conn, result, source)
270            },
271        )
272    }
273}
274
275fn inner_load(
276    stmt: MaybeCached<'_, Statement>,
277    params: Vec<Option<Vec<u8>>>,
278    conn: &mut ConnectionAndTransactionManager,
279    source: &dyn QueryFragmentHelper<crate::result::Error>,
280    row_by_row: bool,
281) -> Result<PgResult, Error> {
282    let result = stmt.execute(&mut conn.raw_connection, &params, row_by_row);
283    update_transaction_manager_status(
284        result,
285        conn,
286        &|callback| source.instrumentation(callback),
287        false,
288    )
289}
290
291impl GetPgMetadataCache for PgConnection {
292    fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
293        &mut self.metadata_cache
294    }
295}
296
297#[inline(always)]
298fn update_transaction_manager_status<T>(
299    query_result: QueryResult<T>,
300    conn: &mut ConnectionAndTransactionManager,
301    instrumentation_callback: &dyn Fn(
302        &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
303    ),
304    final_call: bool,
305) -> QueryResult<T> {
306    /// avoid monomorphizing for every result type - this part will not be inlined
307    fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
308        let raw_conn: &mut RawConnection = &mut conn.raw_connection;
309        let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
310        // libpq keeps track of the transaction status internally, and that is accessible
311        // via `transaction_status`. We can use that to update the AnsiTransactionManager
312        // status
313        match raw_conn.transaction_status() {
314            PgTransactionStatus::InError => {
315                tm.status.set_requires_rollback_maybe_up_to_top_level(true)
316            }
317            PgTransactionStatus::Unknown => tm.status.set_in_error(),
318            PgTransactionStatus::Idle => {
319                // This is useful in particular for commit attempts (even
320                // if `COMMIT` errors it still exits transaction)
321
322                // This may repair the transaction manager
323                tm.status = TransactionManagerStatus::Valid(Default::default())
324            }
325            PgTransactionStatus::InTransaction => {
326                let transaction_status = &mut tm.status;
327                // If we weren't an error, it is possible that we were a transaction start
328                // -> we should tolerate any state
329                if is_err {
330                    // An error may not have us enter a transaction, so if we weren't in one
331                    // we may not be in one now
332                    if !#[allow(non_exhaustive_omitted_patterns)] match transaction_status {
    TransactionManagerStatus::Valid(valid_tm) if
        valid_tm.transaction_depth().is_some() => true,
    _ => false,
}matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
333                    {
334                        // -> transaction manager is broken
335                        transaction_status.set_in_error()
336                    }
337                } else {
338                    // If transaction was InError before, but now it's not (because we attempted
339                    // a rollback), we may pretend it's fixed because
340                    // if it isn't Postgres *will* tell us again.
341
342                    // Fun fact: if we have not received an `InTransaction` status however,
343                    // postgres will *not* warn us that transaction is broken when attempting to
344                    // commit, so we may think that commit has succeeded but in fact it hasn't.
345                    tm.status.set_requires_rollback_maybe_up_to_top_level(false)
346                }
347            }
348            PgTransactionStatus::Active => {
349                // This is a transient state for libpq - nothing we can deduce here.
350            }
351        }
352    }
353
354    fn non_generic_instrumentation(
355        query_result: Result<(), &Error>,
356        conn: &mut ConnectionAndTransactionManager,
357        instrumentation_callback: &dyn Fn(
358            &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
359        ),
360        final_call: bool,
361    ) {
362        if let Err(e) = query_result {
363            instrumentation_callback(&mut |query| {
364                conn.instrumentation
365                    .on_connection_event(InstrumentationEvent::FinishQuery {
366                        query,
367                        error: Some(e),
368                    })
369            });
370        } else if final_call {
371            instrumentation_callback(&mut |query| {
372                conn.instrumentation
373                    .on_connection_event(InstrumentationEvent::FinishQuery { query, error: None });
374            });
375        }
376    }
377
378    non_generic_inner(conn, query_result.is_err());
379    non_generic_instrumentation(
380        query_result.as_ref().map(|_| ()),
381        conn,
382        instrumentation_callback,
383        final_call,
384    );
385    query_result
386}
387
388#[cfg(feature = "r2d2")]
389impl crate::r2d2::R2D2Connection for PgConnection {
390    fn ping(&mut self) -> QueryResult<()> {
391        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
392    }
393
394    fn is_broken(&mut self) -> bool {
395        AnsiTransactionManager::is_broken_transaction_manager(self)
396    }
397}
398
399impl MultiConnectionHelper for PgConnection {
400    fn to_any<'a>(
401        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
402    ) -> &mut (dyn core::any::Any + 'a) {
403        lookup.as_any()
404    }
405
406    fn from_any(
407        lookup: &mut dyn core::any::Any,
408    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
409        lookup
410            .downcast_mut::<Self>()
411            .map(|conn| conn as &mut dyn super::PgMetadataLookup)
412    }
413}
414
415impl PgConnection {
416    /// Build a transaction, specifying additional details such as isolation level
417    ///
418    /// See [`TransactionBuilder`] for more examples.
419    ///
420    /// [`TransactionBuilder`]: crate::pg::TransactionBuilder
421    ///
422    /// ```rust
423    /// # include!("../../doctest_setup.rs");
424    /// #
425    /// # fn main() {
426    /// #     run_test().unwrap();
427    /// # }
428    /// #
429    /// # fn run_test() -> QueryResult<()> {
430    /// #     use schema::users::dsl::*;
431    /// #     let conn = &mut connection_no_transaction();
432    /// conn.build_transaction()
433    ///     .read_only()
434    ///     .serializable()
435    ///     .deferrable()
436    ///     .run(|conn| Ok(()))
437    /// # }
438    /// ```
439    pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
440        TransactionBuilder::new(self)
441    }
442
443    pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
444    where
445        S: CopyFromExpression<T>,
446    {
447        let query = CopyFromWrapper(core::cell::RefCell::new(InternalCopyFromQuery::new(target)));
448        let res =
449            self.with_prepared_query(Box::new(query), false, &mut |stmt, binds, conn, source| {
450                fn inner_copy_in<S, T>(
451                    stmt: MaybeCached<'_, Statement>,
452                    conn: &mut ConnectionAndTransactionManager,
453                    binds: Vec<Option<Vec<u8>>>,
454                    source: &dyn QueryFragmentHelper<S::Error>,
455                ) -> Result<usize, S::Error>
456                where
457                    S: CopyFromExpression<T>,
458                {
459                    let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
460                    let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
461                    let r = source.write_copy_from(&mut copy_in);
462                    copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
463                    let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
464                        crate::result::Error::DeserializationError(
465                            "Failed to receive result from the database".into(),
466                        )
467                    })?;
468                    let rows = next_res.rows_affected();
469                    while let Some(_r) = conn.raw_connection.get_next_result()? {}
470                    r?;
471                    Ok(rows)
472                }
473
474                let rows = inner_copy_in::<S, T>(stmt, conn, binds, &*source);
475                if let Err(ref e) = rows {
476                    let database_error = crate::result::Error::DatabaseError(
477                        crate::result::DatabaseErrorKind::Unknown,
478                        Box::new(e.to_string()),
479                    );
480                    source.instrumentation(&mut |query| {
481                        conn.instrumentation.on_connection_event(
482                            InstrumentationEvent::FinishQuery {
483                                query,
484                                error: Some(&database_error),
485                            },
486                        );
487                    });
488                } else {
489                    source.instrumentation(&mut |query| {
490                        conn.instrumentation.on_connection_event(
491                            InstrumentationEvent::FinishQuery { query, error: None },
492                        );
493                    });
494                }
495
496                rows
497            })?;
498
499        Ok(res)
500    }
501
502    pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
503    where
504        T: CopyTarget,
505    {
506        let res = self.with_prepared_query::<_, Error>(
507            Box::new(command),
508            false,
509            &mut |stmt, binds, conn, source| {
510                let res = stmt.execute(&mut conn.raw_connection, &binds, false);
511                source.instrumentation(&mut |query| {
512                    conn.instrumentation
513                        .on_connection_event(InstrumentationEvent::FinishQuery {
514                            query,
515                            error: res.as_ref().err(),
516                        });
517                });
518                Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
519            },
520        )?;
521        Ok(res)
522    }
523
524    fn with_prepared_query<'conn, 'query, R, E>(
525        &'conn mut self,
526        source: Box<dyn QueryFragmentHelper<E> + 'query>,
527        execute_returning_count: bool,
528        f: &mut dyn FnMut(
529            MaybeCached<'_, Statement>,
530            Vec<Option<Vec<u8>>>,
531            &'conn mut ConnectionAndTransactionManager,
532            Box<dyn QueryFragmentHelper<E> + 'query>,
533        ) -> Result<R, E>,
534    ) -> Result<R, E>
535    where
536        E: From<crate::result::Error>,
537    {
538        fn prepare_query_non_generic_inner<'a, E>(
539            connection_and_transaction_manager: &mut ConnectionAndTransactionManager,
540            cache: &'a mut StatementCache<Pg, Statement>,
541            source: &dyn QueryFragmentHelper<E>,
542            execute_returning_count: bool,
543            bind_collector: RawBytesBindCollector<Pg>,
544        ) -> QueryResult<(
545            Vec<Option<Vec<u8>>>,
546            Result<MaybeCached<'a, Statement>, Error>,
547        )> {
548            let binds = bind_collector.binds;
549            let metadata = bind_collector.metadata;
550            let query = cache.cached_statement_non_generic(
551                source.query_id(),
552                source,
553                &Pg,
554                &metadata,
555                &mut connection_and_transaction_manager.raw_connection,
556                Statement::prepare,
557                &mut *connection_and_transaction_manager.instrumentation,
558            );
559            if !execute_returning_count && let Err(ref e) = query {
560                source.instrumentation(&mut |query| {
561                    connection_and_transaction_manager
562                        .instrumentation
563                        .on_connection_event(InstrumentationEvent::FinishQuery {
564                            query,
565                            error: Some(e),
566                        });
567                });
568            }
569            Ok((binds, query))
570        }
571
572        let bind_collector = self.collect_binds(&*source)?;
573        let (binds, query) = prepare_query_non_generic_inner(
574            &mut self.connection_and_transaction_manager,
575            &mut self.statement_cache,
576            &*source,
577            execute_returning_count,
578            bind_collector,
579        )?;
580
581        f(
582            query?,
583            binds,
584            &mut self.connection_and_transaction_manager,
585            source,
586        )
587    }
588
589    fn collect_binds<E>(
590        &mut self,
591        source: &dyn QueryFragmentHelper<E>,
592    ) -> Result<RawBytesBindCollector<Pg>, crate::result::Error> {
593        source.instrumentation(&mut |query| {
594            self.connection_and_transaction_manager
595                .instrumentation
596                .on_connection_event(InstrumentationEvent::StartQuery { query });
597        });
598        let mut bind_collector = RawBytesBindCollector::<Pg>::new();
599        source.collect_binds(&mut bind_collector, self)?;
600        Ok(bind_collector)
601    }
602
603    fn set_config_options(&mut self) -> QueryResult<()> {
604        crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
605        crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
606        self.connection_and_transaction_manager
607            .raw_connection
608            .set_notice_processor(noop_notice_processor);
609        Ok(())
610    }
611
612    /// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][]
613    ///
614    /// The returned iterator can yield items even after a None value when new notifications have been received.
615    /// The iterator can be polled again after a `None` value was received as new notifications might have
616    /// been send in the mean time.
617    ///
618    /// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html
619    /// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html
620    ///
621    /// ## Example
622    ///
623    /// ```
624    /// # include!("../../doctest_setup.rs");
625    /// #
626    /// # fn main() {
627    /// #     run_test().unwrap();
628    /// # }
629    /// #
630    /// # fn run_test() -> QueryResult<()> {
631    /// #     let connection = &mut establish_connection();
632    ///
633    /// // register the notifications channel we want to receive notifications for
634    /// diesel::sql_query("LISTEN example_channel").execute(connection)?;
635    /// // send some notification
636    /// // this is usually done from a different connection/thread/application
637    /// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(connection)?;
638    ///
639    /// for result in connection.notifications_iter() {
640    ///     let notification = result.unwrap();
641    ///     assert_eq!(notification.channel, "example_channel");
642    ///     assert_eq!(notification.payload, "additional data");
643    ///
644    ///     println!(
645    ///         "Notification received from server process with id {}.",
646    ///         notification.process_id
647    ///     );
648    /// }
649    /// # Ok(())
650    /// # }
651    /// ```
652    pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
653        let conn = &self.connection_and_transaction_manager.raw_connection;
654        core::iter::from_fn(move || conn.pq_notifies().transpose())
655    }
656}
657
658extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
659
660mod private {
661    use super::*;
662
663    #[allow(missing_debug_implementations)]
664    pub struct ConnectionAndTransactionManager {
665        pub(super) raw_connection: RawConnection,
666        pub(super) transaction_state: AnsiTransactionManager,
667        pub(super) instrumentation: DynInstrumentation,
668    }
669
670    pub trait PgLoadingMode<B> {
671        const USE_ROW_BY_ROW_MODE: bool;
672        type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
673        type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
674
675        fn get_cursor<'conn, 'query>(
676            raw_connection: &'conn mut ConnectionAndTransactionManager,
677            result: PgResult,
678            source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
679        ) -> QueryResult<Self::Cursor<'conn, 'query>>;
680    }
681
682    impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
683        const USE_ROW_BY_ROW_MODE: bool = false;
684        type Cursor<'conn, 'query> = Cursor;
685        type Row<'conn, 'query> = self::row::PgRow;
686
687        fn get_cursor<'conn, 'query>(
688            conn: &'conn mut ConnectionAndTransactionManager,
689            result: PgResult,
690            source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
691        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
692            update_transaction_manager_status(
693                Cursor::new(result, &mut conn.raw_connection),
694                conn,
695                &|callback| source.instrumentation(callback),
696                true,
697            )
698        }
699    }
700
701    impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
702        const USE_ROW_BY_ROW_MODE: bool = true;
703        type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
704        type Row<'conn, 'query> = self::row::PgRow;
705
706        fn get_cursor<'conn, 'query>(
707            raw_connection: &'conn mut ConnectionAndTransactionManager,
708            result: PgResult,
709            source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
710        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
711            Ok(RowByRowCursor::new(result, raw_connection, source))
712        }
713    }
714
715    // this trait exists to turn generic query types
716    // into trait objects to deduplicate the connection code
717    pub trait QueryFragmentHelper<E>:
718        crate::connection::statement_cache::QueryFragmentForCachedStatement<crate::pg::Pg>
719    {
720        fn query_id(&self) -> Option<core::any::TypeId>;
721
722        fn instrumentation(
723            &self,
724            callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
725        );
726
727        fn collect_binds(
728            &self,
729            bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
730            conn: &mut PgConnection,
731        ) -> QueryResult<()>;
732
733        fn write_copy_from(&self, _sink: &mut CopyFromSink<'_>) -> Result<(), E> {
734            Ok(())
735        }
736    }
737
738    // any type that implement `QueryFragment` and `QueryId` works in this location
739    impl<T> QueryFragmentHelper<diesel::result::Error> for T
740    where
741        T: QueryFragment<crate::pg::Pg> + QueryId,
742    {
743        fn query_id(&self) -> Option<core::any::TypeId> {
744            <T as QueryId>::query_id()
745        }
746
747        fn instrumentation(
748            &self,
749            callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
750        ) {
751            callback(&crate::debug_query(self))
752        }
753
754        fn collect_binds(
755            &self,
756            bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
757            conn: &mut PgConnection,
758        ) -> QueryResult<()> {
759            <Self as QueryFragment<diesel::pg::Pg>>::collect_binds(
760                self,
761                bind_collector,
762                conn,
763                &crate::pg::Pg,
764            )
765        }
766    }
767
768    // This wrapper exists as we need to have custom behavior for copy from
769    // statements (fn write_copy_from is relevant)
770    pub(super) struct CopyFromWrapper<S, T>(
771        pub(super) core::cell::RefCell<InternalCopyFromQuery<S, T>>,
772    );
773
774    impl<S, T> crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>
775        for CopyFromWrapper<S, T>
776    where
777        InternalCopyFromQuery<S, T>:
778            crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
779    {
780        fn construct_sql(&self, backend: &Pg) -> QueryResult<String> {
781            self.0.borrow().construct_sql(backend)
782        }
783
784        fn is_safe_to_cache_prepared(&self, backend: &Pg) -> QueryResult<bool> {
785            self.0.borrow().is_safe_to_cache_prepared(backend)
786        }
787    }
788
789    impl<S, T> QueryFragmentHelper<S::Error> for CopyFromWrapper<S, T>
790    where
791        S: CopyFromExpression<T>,
792        InternalCopyFromQuery<S, T>: QueryFragmentHelper<crate::result::Error>,
793        Self: crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
794    {
795        fn query_id(&self) -> Option<core::any::TypeId> {
796            self.0.borrow().query_id()
797        }
798
799        fn instrumentation(
800            &self,
801            callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
802        ) {
803            callback(&crate::debug_query(&*self.0.borrow()))
804        }
805
806        fn collect_binds(
807            &self,
808            bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
809            conn: &mut PgConnection,
810        ) -> QueryResult<()> {
811            <InternalCopyFromQuery<S, T> as QueryFragmentHelper<crate::result::Error>>::collect_binds(
812            &*self.0.borrow(),
813            bind_collector,
814            conn,
815        )
816        }
817
818        fn write_copy_from(&self, sink: &mut CopyFromSink<'_>) -> Result<(), S::Error> {
819            self.0.borrow_mut().target.callback(sink)
820        }
821    }
822}
823
824#[cfg(test)]
825// that's a false positive for `panic!`/`assert!` on rust 2018
826#[allow(clippy::uninlined_format_args)]
827mod tests {
828    extern crate dotenvy;
829
830    use super::*;
831    use crate::prelude::*;
832    use crate::result::Error::DatabaseError;
833    use std::num::NonZeroU32;
834
835    fn connection() -> PgConnection {
836        crate::test_helpers::pg_connection_no_transaction()
837    }
838
839    #[diesel_test_helper::test]
840    fn notifications_arrive() {
841        use crate::sql_query;
842
843        let conn = &mut connection();
844        sql_query("LISTEN test_notifications")
845            .execute(conn)
846            .unwrap();
847        sql_query("NOTIFY test_notifications, 'first'")
848            .execute(conn)
849            .unwrap();
850        sql_query("NOTIFY test_notifications, 'second'")
851            .execute(conn)
852            .unwrap();
853
854        let notifications = conn
855            .notifications_iter()
856            .map(Result::unwrap)
857            .collect::<Vec<_>>();
858
859        assert_eq!(2, notifications.len());
860        assert_eq!(notifications[0].channel, "test_notifications");
861        assert_eq!(notifications[1].channel, "test_notifications");
862        assert_eq!(notifications[0].payload, "first");
863        assert_eq!(notifications[1].payload, "second");
864
865        let next_notification = conn.notifications_iter().next();
866        assert!(
867            next_notification.is_none(),
868            "Got a next notification, while not expecting one: {next_notification:?}"
869        );
870
871        sql_query("NOTIFY test_notifications")
872            .execute(conn)
873            .unwrap();
874        assert_eq!(
875            conn.notifications_iter().next().unwrap().unwrap().payload,
876            ""
877        );
878    }
879
880    #[diesel_test_helper::test]
881    fn malformed_sql_query() {
882        let connection = &mut connection();
883        let query =
884            crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
885
886        if let Err(DatabaseError(_, string)) = query {
887            assert_eq!(Some(26), string.statement_position());
888        } else {
889            unreachable!();
890        }
891    }
892
893    table! {
894        users {
895            id -> Integer,
896            name -> Text,
897        }
898    }
899
900    #[diesel_test_helper::test]
901    fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
902        use crate::PgConnection;
903        use crate::connection::{AnsiTransactionManager, TransactionManager};
904        use crate::result::Error;
905
906        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
907        assert_eq!(
908            None,
909            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
910                conn
911            ).transaction_depth().expect("Transaction depth")
912        );
913        let result = AnsiTransactionManager::commit_transaction(conn);
914        assert!(matches!(result, Err(Error::NotInTransaction)))
915    }
916
917    #[diesel_test_helper::test]
918    fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
919        use crate::PgConnection;
920        use crate::connection::{AnsiTransactionManager, TransactionManager};
921        use crate::result::Error;
922
923        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
924        assert_eq!(
925            None,
926            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
927                conn
928            ).transaction_depth().expect("Transaction depth")
929        );
930        let result = AnsiTransactionManager::rollback_transaction(conn);
931        assert!(matches!(result, Err(Error::NotInTransaction)))
932    }
933
934    #[diesel_test_helper::test]
935    fn postgres_transaction_is_rolled_back_upon_syntax_error() {
936        use std::num::NonZeroU32;
937
938        use crate::connection::{AnsiTransactionManager, TransactionManager};
939        use crate::pg::connection::raw::PgTransactionStatus;
940        use crate::*;
941        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
942        assert_eq!(
943            None,
944            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
945                conn
946            ).transaction_depth().expect("Transaction depth")
947        );
948        let _result = conn.build_transaction().run(|conn| {
949            assert_eq!(
950                NonZeroU32::new(1),
951                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
952                    conn
953                ).transaction_depth().expect("Transaction depth")
954            );
955            // In Postgres, a syntax error breaks the transaction block
956            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
957            assert!(query_result.is_err());
958            assert_eq!(
959                PgTransactionStatus::InError,
960                conn.connection_and_transaction_manager.raw_connection.transaction_status()
961            );
962            query_result
963        });
964        assert_eq!(
965            None,
966            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
967                conn
968            ).transaction_depth().expect("Transaction depth")
969        );
970        assert_eq!(
971            PgTransactionStatus::Idle,
972            conn.connection_and_transaction_manager
973                .raw_connection
974                .transaction_status()
975        );
976    }
977
978    #[diesel_test_helper::test]
979    fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
980        use std::num::NonZeroU32;
981
982        use crate::connection::{AnsiTransactionManager, TransactionManager};
983        use crate::pg::connection::raw::PgTransactionStatus;
984        use crate::*;
985        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
986        assert_eq!(
987            None,
988            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
989                conn
990            ).transaction_depth().expect("Transaction depth")
991        );
992        let result = conn.build_transaction().run(|conn| {
993            assert_eq!(
994                NonZeroU32::new(1),
995                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
996                    conn
997            ).transaction_depth().expect("Transaction depth")
998            );
999            let result = conn.build_transaction().run(|conn| {
1000                assert_eq!(
1001                    NonZeroU32::new(2),
1002                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1003                        conn
1004            ).transaction_depth().expect("Transaction depth")
1005                );
1006                sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
1007            });
1008            assert!(result.is_err());
1009            assert_eq!(
1010                NonZeroU32::new(1),
1011                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1012                    conn
1013            ).transaction_depth().expect("Transaction depth")
1014            );
1015            let query_result = sql_query("SELECT 1").execute(conn);
1016            assert!(query_result.is_ok());
1017            assert_eq!(
1018                PgTransactionStatus::InTransaction,
1019                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1020            );
1021            query_result
1022        });
1023        assert!(result.is_ok());
1024        assert_eq!(
1025            PgTransactionStatus::Idle,
1026            conn.connection_and_transaction_manager
1027                .raw_connection
1028                .transaction_status()
1029        );
1030        assert_eq!(
1031            None,
1032            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1033                conn
1034            ).transaction_depth().expect("Transaction depth")
1035        );
1036    }
1037
1038    #[diesel_test_helper::test]
1039    // This function uses collect with an side effect (spawning threads)
1040    // so this is a false positive from clippy
1041    #[allow(clippy::needless_collect)]
1042    fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
1043        use crate::pg::connection::raw::PgTransactionStatus;
1044        use crate::result::DatabaseErrorKind::SerializationFailure;
1045        use crate::result::Error::DatabaseError;
1046        use crate::*;
1047        use std::sync::{Arc, Barrier};
1048        use std::thread;
1049
1050        table! {
1051            #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
1052            serialization_example {
1053                id -> Serial,
1054                class -> Integer,
1055            }
1056        }
1057
1058        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1059
1060        sql_query(
1061            "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
1062        )
1063        .execute(conn)
1064        .unwrap();
1065        sql_query(
1066            r#"
1067            CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
1068                id SERIAL PRIMARY KEY,
1069                class INTEGER NOT NULL
1070            )
1071        "#,
1072        )
1073        .execute(conn)
1074        .unwrap();
1075
1076        insert_into(serialization_example::table)
1077            .values(&vec![
1078                serialization_example::class.eq(1),
1079                serialization_example::class.eq(2),
1080            ])
1081            .execute(conn)
1082            .unwrap();
1083
1084        let before_barrier = Arc::new(Barrier::new(2));
1085        let after_barrier = Arc::new(Barrier::new(2));
1086        let threads = (1..3)
1087            .map(|i| {
1088                let before_barrier = before_barrier.clone();
1089                let after_barrier = after_barrier.clone();
1090                thread::spawn(move || {
1091                    use crate::connection::AnsiTransactionManager;
1092                    use crate::connection::TransactionManager;
1093                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1094                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1095
1096                    let result = conn.build_transaction().serializable().run(|conn| {
1097                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1098
1099                        let _ = serialization_example::table
1100                            .filter(serialization_example::class.eq(i))
1101                            .count()
1102                            .execute(conn)?;
1103
1104                        let other_i = if i == 1 { 2 } else { 1 };
1105                        let q = insert_into(serialization_example::table)
1106                            .values(serialization_example::class.eq(other_i));
1107                        before_barrier.wait();
1108
1109                        let r = q.execute(conn);
1110                        after_barrier.wait();
1111                        r
1112                    });
1113                    assert_eq!(
1114                        PgTransactionStatus::Idle,
1115                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1116                    );
1117
1118                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1119                    result
1120                })
1121            })
1122            .collect::<Vec<_>>();
1123
1124        let mut results = threads
1125            .into_iter()
1126            .map(|t| t.join().unwrap())
1127            .collect::<Vec<_>>();
1128
1129        results.sort_by_key(|r| r.is_err());
1130
1131        assert!(results[0].is_ok(), "Got {:?} instead", results);
1132        assert!(
1133            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1134            "Got {:?} instead",
1135            results
1136        );
1137        assert_eq!(
1138            PgTransactionStatus::Idle,
1139            conn.connection_and_transaction_manager
1140                .raw_connection
1141                .transaction_status()
1142        );
1143    }
1144
1145    #[diesel_test_helper::test]
1146    // This function uses collect with an side effect (spawning threads)
1147    // so this is a false positive from clippy
1148    #[allow(clippy::needless_collect)]
1149    fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
1150        use crate::pg::connection::raw::PgTransactionStatus;
1151        use crate::result::DatabaseErrorKind::SerializationFailure;
1152        use crate::result::Error::DatabaseError;
1153        use crate::*;
1154        use std::sync::{Arc, Barrier};
1155        use std::thread;
1156
1157        table! {
1158            #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
1159            serialization_example {
1160                id -> Serial,
1161                class -> Integer,
1162            }
1163        }
1164
1165        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1166
1167        sql_query(
1168            "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1169        )
1170        .execute(conn)
1171        .unwrap();
1172        sql_query(
1173            r#"
1174            CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1175                id SERIAL PRIMARY KEY,
1176                class INTEGER NOT NULL
1177            )
1178        "#,
1179        )
1180        .execute(conn)
1181        .unwrap();
1182
1183        insert_into(serialization_example::table)
1184            .values(&vec![
1185                serialization_example::class.eq(1),
1186                serialization_example::class.eq(2),
1187            ])
1188            .execute(conn)
1189            .unwrap();
1190
1191        let before_barrier = Arc::new(Barrier::new(2));
1192        let after_barrier = Arc::new(Barrier::new(2));
1193        let threads = (1..3)
1194            .map(|i| {
1195                let before_barrier = before_barrier.clone();
1196                let after_barrier = after_barrier.clone();
1197                thread::spawn(move || {
1198                    use crate::connection::AnsiTransactionManager;
1199                    use crate::connection::TransactionManager;
1200                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1201                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1202
1203                    let result = conn.build_transaction().serializable().run(|conn| {
1204                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1205                        let r = conn.transaction(|conn| {
1206                            assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1207
1208                            let _ = serialization_example::table
1209                                .filter(serialization_example::class.eq(i))
1210                                .count()
1211                                .execute(conn)?;
1212
1213                            let other_i = if i == 1 { 2 } else { 1 };
1214                            let q = insert_into(serialization_example::table)
1215                                .values(serialization_example::class.eq(other_i));
1216                            before_barrier.wait();
1217
1218                            let r = q.execute(conn);
1219                            after_barrier.wait();
1220                            r
1221                        });
1222                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1223                        assert_eq!(
1224                            PgTransactionStatus::InTransaction,
1225                            conn.connection_and_transaction_manager.raw_connection.transaction_status()
1226                        );
1227                        r
1228                    });
1229                    assert_eq!(
1230                        PgTransactionStatus::Idle,
1231                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1232                    );
1233
1234                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1235                    result
1236                })
1237            })
1238            .collect::<Vec<_>>();
1239
1240        let mut results = threads
1241            .into_iter()
1242            .map(|t| t.join().unwrap())
1243            .collect::<Vec<_>>();
1244
1245        results.sort_by_key(|r| r.is_err());
1246
1247        assert!(results[0].is_ok(), "Got {:?} instead", results);
1248        assert!(
1249            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1250            "Got {:?} instead",
1251            results
1252        );
1253        assert_eq!(
1254            PgTransactionStatus::Idle,
1255            conn.connection_and_transaction_manager
1256                .raw_connection
1257                .transaction_status()
1258        );
1259    }
1260
1261    #[diesel_test_helper::test]
1262    fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1263        use crate::connection::{AnsiTransactionManager, TransactionManager};
1264        use crate::pg::connection::raw::PgTransactionStatus;
1265        use crate::result::Error;
1266        use crate::*;
1267
1268        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1269        assert_eq!(
1270            None,
1271            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1272                conn
1273            ).transaction_depth().expect("Transaction depth")
1274        );
1275        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1276            assert_eq!(
1277                NonZeroU32::new(1),
1278                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1279                    conn
1280            ).transaction_depth().expect("Transaction depth")
1281            );
1282            sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1283            sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1284                .execute(conn)?;
1285            sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1286            let result =
1287                sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1288            assert!(result.is_ok());
1289            assert_eq!(
1290                PgTransactionStatus::InTransaction,
1291                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1292            );
1293            Ok(())
1294        });
1295        assert_eq!(
1296            None,
1297            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1298                conn
1299            ).transaction_depth().expect("Transaction depth")
1300        );
1301        assert_eq!(
1302            PgTransactionStatus::Idle,
1303            conn.connection_and_transaction_manager
1304                .raw_connection
1305                .transaction_status()
1306        );
1307        assert!(result.is_err());
1308    }
1309
1310    #[diesel_test_helper::test]
1311    fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1312        use crate::connection::{AnsiTransactionManager, TransactionManager};
1313        use crate::pg::connection::raw::PgTransactionStatus;
1314        use crate::result::Error;
1315        use crate::*;
1316
1317        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1318        assert_eq!(
1319            None,
1320            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1321                conn
1322            ).transaction_depth().expect("Transaction depth")
1323        );
1324        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1325            assert_eq!(
1326                NonZeroU32::new(1),
1327                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1328                    conn
1329            ).transaction_depth().expect("Transaction depth")
1330            );
1331            sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1332            sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1333                .execute(conn)?;
1334            sql_query(
1335                r#"
1336                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1337                        RETURNS trigger
1338                        LANGUAGE plpgsql
1339                        AS $$
1340                    DECLARE
1341                    BEGIN
1342                        IF NEW.value = 42 THEN
1343                            RAISE EXCEPTION 'Transaction kaboom';
1344                        END IF;
1345                    RETURN NEW;
1346
1347                    END;$$;
1348                "#,
1349            )
1350            .execute(conn)?;
1351
1352            sql_query(
1353                r#"
1354                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1355                        AFTER INSERT ON "deferred_trigger_commit"
1356                        DEFERRABLE INITIALLY DEFERRED
1357                        FOR EACH ROW
1358                        EXECUTE PROCEDURE transaction_depth_blow_up()
1359            "#,
1360            )
1361            .execute(conn)?;
1362            let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1363            assert!(result.is_ok());
1364            assert_eq!(
1365                PgTransactionStatus::InTransaction,
1366                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1367            );
1368            Ok(())
1369        });
1370        assert_eq!(
1371            None,
1372            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1373                conn
1374            ).transaction_depth().expect("Transaction depth")
1375        );
1376        assert_eq!(
1377            PgTransactionStatus::Idle,
1378            conn.connection_and_transaction_manager
1379                .raw_connection
1380                .transaction_status()
1381        );
1382        assert!(result.is_err());
1383    }
1384
1385    #[diesel_test_helper::test]
1386    fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1387        use crate::connection::{AnsiTransactionManager, TransactionManager};
1388        use crate::pg::connection::raw::PgTransactionStatus;
1389        use crate::result::Error;
1390        use crate::*;
1391
1392        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1393        assert_eq!(
1394            None,
1395            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1396                conn
1397            ).transaction_depth().expect("Transaction depth")
1398        );
1399        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1400            assert_eq!(
1401                NonZeroU32::new(1),
1402                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1403                    conn
1404            ).transaction_depth().expect("Transaction depth")
1405            );
1406            sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1407            sql_query(
1408                "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1409            )
1410            .execute(conn)?;
1411            sql_query(
1412                r#"
1413                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1414                        RETURNS trigger
1415                        LANGUAGE plpgsql
1416                        AS $$
1417                    DECLARE
1418                    BEGIN
1419                        IF NEW.value = 42 THEN
1420                            RAISE EXCEPTION 'Transaction kaboom';
1421                        END IF;
1422                    RETURN NEW;
1423
1424                    END;$$;
1425                "#,
1426            )
1427            .execute(conn)?;
1428
1429            sql_query(
1430                r#"
1431                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1432                        AFTER INSERT ON "deferred_trigger_nested_commit"
1433                        DEFERRABLE INITIALLY DEFERRED
1434                        FOR EACH ROW
1435                        EXECUTE PROCEDURE transaction_depth_blow_up()
1436            "#,
1437            )
1438            .execute(conn)?;
1439            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1440                let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1441                    .execute(conn);
1442                assert!(result.is_ok());
1443                Ok(())
1444            });
1445            assert!(inner_result.is_err());
1446            assert_eq!(
1447                PgTransactionStatus::InTransaction,
1448                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1449            );
1450            Ok(())
1451        });
1452        assert_eq!(
1453            None,
1454            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1455                conn
1456            ).transaction_depth().expect("Transaction depth")
1457        );
1458        assert_eq!(
1459            PgTransactionStatus::Idle,
1460            conn.connection_and_transaction_manager
1461                .raw_connection
1462                .transaction_status()
1463        );
1464        assert!(result.is_ok(), "Expected success, got {:?}", result);
1465    }
1466
1467    #[diesel_test_helper::test]
1468    fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1469        use crate::connection::{AnsiTransactionManager, TransactionManager};
1470        use crate::pg::connection::raw::PgTransactionStatus;
1471        use crate::result::Error;
1472        use crate::*;
1473
1474        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1475        assert_eq!(
1476            None,
1477            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1478                conn
1479            ).transaction_depth().expect("Transaction depth")
1480        );
1481        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1482            assert_eq!(
1483                NonZeroU32::new(1),
1484                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1485                    conn
1486            ).transaction_depth().expect("Transaction depth")
1487            );
1488            sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1489            sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1490            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1491                assert_eq!(
1492                    NonZeroU32::new(2),
1493                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1494                        conn
1495                    ).transaction_depth().expect("Transaction depth")
1496                );
1497                sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1498                let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1499                assert!(result.is_ok());
1500                Ok(())
1501            });
1502            assert!(inner_result.is_err());
1503            assert_eq!(
1504                PgTransactionStatus::InTransaction,
1505                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1506            );
1507            assert_eq!(
1508                NonZeroU32::new(1),
1509                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1510                    conn
1511            ).transaction_depth().expect("Transaction depth")
1512            );
1513            sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1514        });
1515        assert_eq!(
1516            None,
1517            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1518                conn
1519            ).transaction_depth().expect("Transaction depth")
1520        );
1521        assert_eq!(
1522            PgTransactionStatus::Idle,
1523            conn.connection_and_transaction_manager
1524                .raw_connection
1525                .transaction_status()
1526        );
1527        assert!(result.is_ok());
1528    }
1529}