diesel/pg/connection/
mod.rs

1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::{CopyFromSink, CopyToBuffer};
9use self::cursor::*;
10use self::private::{ConnectionAndTransactionManager, CopyFromWrapper, QueryFragmentHelper};
11use self::raw::{PgTransactionStatus, RawConnection};
12use self::stmt::Statement;
13use crate::connection::instrumentation::{DynInstrumentation, Instrumentation, StrQueryHelper};
14use crate::connection::statement_cache::{MaybeCached, StatementCache};
15use crate::connection::*;
16use crate::expression::QueryMetadata;
17use crate::pg::backend::PgNotification;
18use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
19use crate::pg::query_builder::copy::InternalCopyFromQuery;
20use crate::pg::{Pg, TransactionBuilder};
21use crate::query_builder::bind_collector::RawBytesBindCollector;
22use crate::query_builder::*;
23use crate::result::ConnectionError::CouldntSetupConfiguration;
24use crate::result::*;
25use crate::RunQueryDsl;
26use std::ffi::CString;
27use std::fmt::Debug;
28use std::os::raw as libc;
29
30use super::query_builder::copy::{CopyFromExpression, CopyTarget, CopyToCommand};
31
32pub(super) use self::result::PgResult;
33
34/// The connection string expected by `PgConnection::establish`
35/// should be a PostgreSQL connection string, as documented at
36/// <https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-CONNSTRING>
37///
38/// # Supported loading model implementations
39///
40/// * [`DefaultLoadingMode`]
41/// * [`PgRowByRowLoadingMode`]
42///
43/// If you are unsure which loading mode is the correct one for your application,
44/// you likely want to use the `DefaultLoadingMode` as that one offers
45/// generally better performance.
46///
47/// Due to the fact that `PgConnection` supports multiple loading modes
48/// it is **required** to always specify the used loading mode
49/// when calling [`RunQueryDsl::load_iter`]
50///
51/// ## `DefaultLoadingMode`
52///
53/// By using this mode `PgConnection` defaults to loading all response values at **once**
54/// and only performs deserialization afterward for the `DefaultLoadingMode`.
55/// Generally this mode will be more performant as it.
56///
57/// This loading mode allows users to perform hold more than one iterator at once using
58/// the same connection:
59/// ```rust
60/// # include!("../../doctest_setup.rs");
61/// #
62/// # fn main() {
63/// #     run_test().unwrap();
64/// # }
65/// #
66/// # fn run_test() -> QueryResult<()> {
67/// #     use schema::users;
68/// #     let connection = &mut establish_connection();
69/// use diesel::connection::DefaultLoadingMode;
70///
71/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
72/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
73///
74/// for r in iter1 {
75///     let (id, name) = r?;
76///     println!("Id: {} Name: {}", id, name);
77/// }
78///
79/// for r in iter2 {
80///     let (id, name) = r?;
81///     println!("Id: {} Name: {}", id, name);
82/// }
83/// #   Ok(())
84/// # }
85/// ```
86///
87/// ## `PgRowByRowLoadingMode`
88///
89/// By using this mode `PgConnection` defaults to loading each row of the result set
90/// separately. This might be desired for huge result sets.
91///
92/// This loading mode **prevents** creating more than one iterator at once using
93/// the same connection. The following code is **not** allowed:
94///
95/// ```compile_fail
96/// # include!("../../doctest_setup.rs");
97/// #
98/// # fn main() {
99/// #     run_test().unwrap();
100/// # }
101/// #
102/// # fn run_test() -> QueryResult<()> {
103/// #     use schema::users;
104/// #     let connection = &mut establish_connection();
105/// use diesel::pg::PgRowByRowLoadingMode;
106///
107/// let iter1 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
108/// // creating a second iterator generates an compiler error
109/// let iter2 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
110///
111/// for r in iter1 {
112///     let (id, name) = r?;
113///     println!("Id: {} Name: {}", id, name);
114/// }
115///
116/// for r in iter2 {
117///     let (id, name) = r?;
118///     println!("Id: {} Name: {}", id, name);
119/// }
120/// #   Ok(())
121/// # }
122/// ```
123#[allow(missing_debug_implementations)]
124#[cfg(feature = "postgres")]
125pub struct PgConnection {
126    statement_cache: StatementCache<Pg, Statement>,
127    metadata_cache: PgMetadataCache,
128    connection_and_transaction_manager: ConnectionAndTransactionManager,
129}
130
131// according to libpq documentation a connection can be transferred to other threads
132#[allow(unsafe_code)]
133unsafe impl Send for PgConnection {}
134
135impl SimpleConnection for PgConnection {
136    #[allow(unsafe_code)] // use of unsafe function
137    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
138        self.connection_and_transaction_manager
139            .instrumentation
140            .on_connection_event(InstrumentationEvent::StartQuery {
141                query: &StrQueryHelper::new(query),
142            });
143        let c_query = CString::new(query)?;
144        let inner_result = unsafe {
145            self.connection_and_transaction_manager
146                .raw_connection
147                .exec(c_query.as_ptr())
148        };
149        update_transaction_manager_status(
150            inner_result.and_then(|raw_result| {
151                PgResult::new(
152                    raw_result,
153                    &self.connection_and_transaction_manager.raw_connection,
154                )
155            }),
156            &mut self.connection_and_transaction_manager,
157            &|callback| callback(&StrQueryHelper::new(query)),
158            true,
159        )?;
160        Ok(())
161    }
162}
163
164/// A [`PgConnection`] specific loading mode to load rows one by one
165///
166/// See the documentation of [`PgConnection`] for details
167#[derive(Debug, Copy, Clone)]
168pub struct PgRowByRowLoadingMode;
169
170impl ConnectionSealed for PgConnection {}
171
172impl Connection for PgConnection {
173    type Backend = Pg;
174    type TransactionManager = AnsiTransactionManager;
175
176    fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
177        let mut instrumentation = DynInstrumentation::default_instrumentation();
178        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
179            url: database_url,
180        });
181        let r = RawConnection::establish(database_url).and_then(|raw_conn| {
182            let mut conn = PgConnection {
183                connection_and_transaction_manager: ConnectionAndTransactionManager {
184                    raw_connection: raw_conn,
185                    transaction_state: AnsiTransactionManager::default(),
186                    instrumentation: DynInstrumentation::none(),
187                },
188                statement_cache: StatementCache::new(),
189                metadata_cache: PgMetadataCache::new(),
190            };
191            conn.set_config_options()
192                .map_err(CouldntSetupConfiguration)?;
193            Ok(conn)
194        });
195        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
196            url: database_url,
197            error: r.as_ref().err(),
198        });
199        let mut conn = r?;
200        conn.connection_and_transaction_manager.instrumentation = instrumentation;
201        Ok(conn)
202    }
203
204    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
205    where
206        T: QueryFragment<Pg> + QueryId,
207    {
208        update_transaction_manager_status(
209            self.with_prepared_query(
210                Box::new(source),
211                true,
212                &mut |query, params, conn, _source| {
213                    let res = query
214                        .execute(&mut conn.raw_connection, &params, false)
215                        .map(|r| r.rows_affected());
216                    // according to https://www.postgresql.org/docs/current/libpq-async.html
217                    // `PQgetResult` needs to be called till a null pointer is returned
218                    while conn.raw_connection.get_next_result()?.is_some() {}
219                    res
220                },
221            ),
222            &mut self.connection_and_transaction_manager,
223            &|callback| source.instrumentation(callback),
224            true,
225        )
226    }
227
228    fn transaction_state(&mut self) -> &mut AnsiTransactionManager
229    where
230        Self: Sized,
231    {
232        &mut self.connection_and_transaction_manager.transaction_state
233    }
234
235    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
236        &mut *self.connection_and_transaction_manager.instrumentation
237    }
238
239    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
240        self.connection_and_transaction_manager.instrumentation = instrumentation.into();
241    }
242
243    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
244        self.statement_cache.set_cache_size(size);
245    }
246}
247
248impl<B> LoadConnection<B> for PgConnection
249where
250    Self: self::private::PgLoadingMode<B>,
251{
252    type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
253    type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
254
255    fn load<'conn, 'query, T>(
256        &'conn mut self,
257        source: T,
258    ) -> QueryResult<Self::Cursor<'conn, 'query>>
259    where
260        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
261        Self::Backend: QueryMetadata<T::SqlType>,
262    {
263        self.with_prepared_query(
264            Box::new(source),
265            false,
266            &mut |stmt, params, conn, source| {
267                use self::private::PgLoadingMode;
268                let result = inner_load(stmt, params, conn, &*source, Self::USE_ROW_BY_ROW_MODE)?;
269                Self::get_cursor(conn, result, source)
270            },
271        )
272    }
273}
274
275fn inner_load(
276    stmt: MaybeCached<'_, Statement>,
277    params: Vec<Option<Vec<u8>>>,
278    conn: &mut ConnectionAndTransactionManager,
279    source: &dyn QueryFragmentHelper<crate::result::Error>,
280    row_by_row: bool,
281) -> Result<PgResult, Error> {
282    let result = stmt.execute(&mut conn.raw_connection, &params, row_by_row);
283    update_transaction_manager_status(
284        result,
285        conn,
286        &|callback| source.instrumentation(callback),
287        false,
288    )
289}
290
291impl GetPgMetadataCache for PgConnection {
292    fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
293        &mut self.metadata_cache
294    }
295}
296
297#[inline(always)]
298fn update_transaction_manager_status<T>(
299    query_result: QueryResult<T>,
300    conn: &mut ConnectionAndTransactionManager,
301    instrumentation_callback: &dyn Fn(
302        &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
303    ),
304    final_call: bool,
305) -> QueryResult<T> {
306    /// avoid monomorphizing for every result type - this part will not be inlined
307    fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
308        let raw_conn: &mut RawConnection = &mut conn.raw_connection;
309        let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
310        // libpq keeps track of the transaction status internally, and that is accessible
311        // via `transaction_status`. We can use that to update the AnsiTransactionManager
312        // status
313        match raw_conn.transaction_status() {
314            PgTransactionStatus::InError => {
315                tm.status.set_requires_rollback_maybe_up_to_top_level(true)
316            }
317            PgTransactionStatus::Unknown => tm.status.set_in_error(),
318            PgTransactionStatus::Idle => {
319                // This is useful in particular for commit attempts (even
320                // if `COMMIT` errors it still exits transaction)
321
322                // This may repair the transaction manager
323                tm.status = TransactionManagerStatus::Valid(Default::default())
324            }
325            PgTransactionStatus::InTransaction => {
326                let transaction_status = &mut tm.status;
327                // If we weren't an error, it is possible that we were a transaction start
328                // -> we should tolerate any state
329                if is_err {
330                    // An error may not have us enter a transaction, so if we weren't in one
331                    // we may not be in one now
332                    if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
333                    {
334                        // -> transaction manager is broken
335                        transaction_status.set_in_error()
336                    }
337                } else {
338                    // If transaction was InError before, but now it's not (because we attempted
339                    // a rollback), we may pretend it's fixed because
340                    // if it isn't Postgres *will* tell us again.
341
342                    // Fun fact: if we have not received an `InTransaction` status however,
343                    // postgres will *not* warn us that transaction is broken when attempting to
344                    // commit, so we may think that commit has succeeded but in fact it hasn't.
345                    tm.status.set_requires_rollback_maybe_up_to_top_level(false)
346                }
347            }
348            PgTransactionStatus::Active => {
349                // This is a transient state for libpq - nothing we can deduce here.
350            }
351        }
352    }
353
354    fn non_generic_instrumentation(
355        query_result: Result<(), &Error>,
356        conn: &mut ConnectionAndTransactionManager,
357        instrumentation_callback: &dyn Fn(
358            &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
359        ),
360        final_call: bool,
361    ) {
362        if let Err(e) = query_result {
363            instrumentation_callback(&mut |query| {
364                conn.instrumentation
365                    .on_connection_event(InstrumentationEvent::FinishQuery {
366                        query,
367                        error: Some(e),
368                    })
369            });
370        } else if final_call {
371            instrumentation_callback(&mut |query| {
372                conn.instrumentation
373                    .on_connection_event(InstrumentationEvent::FinishQuery { query, error: None });
374            });
375        }
376    }
377
378    non_generic_inner(conn, query_result.is_err());
379    non_generic_instrumentation(
380        query_result.as_ref().map(|_| ()),
381        conn,
382        instrumentation_callback,
383        final_call,
384    );
385    query_result
386}
387
388#[cfg(feature = "r2d2")]
389impl crate::r2d2::R2D2Connection for PgConnection {
390    fn ping(&mut self) -> QueryResult<()> {
391        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
392    }
393
394    fn is_broken(&mut self) -> bool {
395        AnsiTransactionManager::is_broken_transaction_manager(self)
396    }
397}
398
399impl MultiConnectionHelper for PgConnection {
400    fn to_any<'a>(
401        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
402    ) -> &mut (dyn std::any::Any + 'a) {
403        lookup.as_any()
404    }
405
406    fn from_any(
407        lookup: &mut dyn std::any::Any,
408    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
409        lookup
410            .downcast_mut::<Self>()
411            .map(|conn| conn as &mut dyn super::PgMetadataLookup)
412    }
413}
414
415impl PgConnection {
416    /// Build a transaction, specifying additional details such as isolation level
417    ///
418    /// See [`TransactionBuilder`] for more examples.
419    ///
420    /// [`TransactionBuilder`]: crate::pg::TransactionBuilder
421    ///
422    /// ```rust
423    /// # include!("../../doctest_setup.rs");
424    /// #
425    /// # fn main() {
426    /// #     run_test().unwrap();
427    /// # }
428    /// #
429    /// # fn run_test() -> QueryResult<()> {
430    /// #     use schema::users::dsl::*;
431    /// #     let conn = &mut connection_no_transaction();
432    /// conn.build_transaction()
433    ///     .read_only()
434    ///     .serializable()
435    ///     .deferrable()
436    ///     .run(|conn| Ok(()))
437    /// # }
438    /// ```
439    pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
440        TransactionBuilder::new(self)
441    }
442
443    pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
444    where
445        S: CopyFromExpression<T>,
446    {
447        let query = CopyFromWrapper(std::cell::RefCell::new(InternalCopyFromQuery::new(target)));
448        let res =
449            self.with_prepared_query(Box::new(query), false, &mut |stmt, binds, conn, source| {
450                fn inner_copy_in<S, T>(
451                    stmt: MaybeCached<'_, Statement>,
452                    conn: &mut ConnectionAndTransactionManager,
453                    binds: Vec<Option<Vec<u8>>>,
454                    source: &dyn QueryFragmentHelper<S::Error>,
455                ) -> Result<usize, S::Error>
456                where
457                    S: CopyFromExpression<T>,
458                {
459                    let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
460                    let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
461                    let r = source.write_copy_from(&mut copy_in);
462                    copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
463                    let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
464                        crate::result::Error::DeserializationError(
465                            "Failed to receive result from the database".into(),
466                        )
467                    })?;
468                    let rows = next_res.rows_affected();
469                    while let Some(_r) = conn.raw_connection.get_next_result()? {}
470                    r?;
471                    Ok(rows)
472                }
473
474                let rows = inner_copy_in::<S, T>(stmt, conn, binds, &*source);
475                if let Err(ref e) = rows {
476                    let database_error = crate::result::Error::DatabaseError(
477                        crate::result::DatabaseErrorKind::Unknown,
478                        Box::new(e.to_string()),
479                    );
480                    source.instrumentation(&mut |query| {
481                        conn.instrumentation.on_connection_event(
482                            InstrumentationEvent::FinishQuery {
483                                query,
484                                error: Some(&database_error),
485                            },
486                        );
487                    });
488                } else {
489                    source.instrumentation(&mut |query| {
490                        conn.instrumentation.on_connection_event(
491                            InstrumentationEvent::FinishQuery { query, error: None },
492                        );
493                    });
494                }
495
496                rows
497            })?;
498
499        Ok(res)
500    }
501
502    pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
503    where
504        T: CopyTarget,
505    {
506        let res = self.with_prepared_query::<_, Error>(
507            Box::new(command),
508            false,
509            &mut |stmt, binds, conn, source| {
510                let res = stmt.execute(&mut conn.raw_connection, &binds, false);
511                source.instrumentation(&mut |query| {
512                    conn.instrumentation
513                        .on_connection_event(InstrumentationEvent::FinishQuery {
514                            query,
515                            error: res.as_ref().err(),
516                        });
517                });
518                Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
519            },
520        )?;
521        Ok(res)
522    }
523
524    fn with_prepared_query<'conn, 'query, R, E>(
525        &'conn mut self,
526        source: Box<dyn QueryFragmentHelper<E> + 'query>,
527        execute_returning_count: bool,
528        f: &mut dyn FnMut(
529            MaybeCached<'_, Statement>,
530            Vec<Option<Vec<u8>>>,
531            &'conn mut ConnectionAndTransactionManager,
532            Box<dyn QueryFragmentHelper<E> + 'query>,
533        ) -> Result<R, E>,
534    ) -> Result<R, E>
535    where
536        E: From<crate::result::Error>,
537    {
538        fn prepare_query_non_generic_inner<'a, E>(
539            connection_and_transaction_manager: &mut ConnectionAndTransactionManager,
540            cache: &'a mut StatementCache<Pg, Statement>,
541            source: &dyn QueryFragmentHelper<E>,
542            execute_returning_count: bool,
543            bind_collector: RawBytesBindCollector<Pg>,
544        ) -> QueryResult<(
545            Vec<Option<Vec<u8>>>,
546            Result<MaybeCached<'a, Statement>, Error>,
547        )> {
548            let binds = bind_collector.binds;
549            let metadata = bind_collector.metadata;
550            let query = cache.cached_statement_non_generic(
551                source.query_id(),
552                source,
553                &Pg,
554                &metadata,
555                &mut connection_and_transaction_manager.raw_connection,
556                Statement::prepare,
557                &mut *connection_and_transaction_manager.instrumentation,
558            );
559            if !execute_returning_count {
560                if let Err(ref e) = query {
561                    source.instrumentation(&mut |query| {
562                        connection_and_transaction_manager
563                            .instrumentation
564                            .on_connection_event(InstrumentationEvent::FinishQuery {
565                                query,
566                                error: Some(e),
567                            });
568                    });
569                }
570            }
571            Ok((binds, query))
572        }
573
574        let bind_collector = self.collect_binds(&*source)?;
575        let (binds, query) = prepare_query_non_generic_inner(
576            &mut self.connection_and_transaction_manager,
577            &mut self.statement_cache,
578            &*source,
579            execute_returning_count,
580            bind_collector,
581        )?;
582
583        f(
584            query?,
585            binds,
586            &mut self.connection_and_transaction_manager,
587            source,
588        )
589    }
590
591    fn collect_binds<E>(
592        &mut self,
593        source: &dyn QueryFragmentHelper<E>,
594    ) -> Result<RawBytesBindCollector<Pg>, crate::result::Error> {
595        source.instrumentation(&mut |query| {
596            self.connection_and_transaction_manager
597                .instrumentation
598                .on_connection_event(InstrumentationEvent::StartQuery { query });
599        });
600        let mut bind_collector = RawBytesBindCollector::<Pg>::new();
601        source.collect_binds(&mut bind_collector, self)?;
602        Ok(bind_collector)
603    }
604
605    fn set_config_options(&mut self) -> QueryResult<()> {
606        crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
607        crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
608        self.connection_and_transaction_manager
609            .raw_connection
610            .set_notice_processor(noop_notice_processor);
611        Ok(())
612    }
613
614    /// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][]
615    ///
616    /// The returned iterator can yield items even after a None value when new notifications have been received.
617    /// The iterator can be polled again after a `None` value was received as new notifications might have
618    /// been send in the mean time.
619    ///
620    /// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html
621    /// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html
622    ///
623    /// ## Example
624    ///
625    /// ```
626    /// # include!("../../doctest_setup.rs");
627    /// #
628    /// # fn main() {
629    /// #     run_test().unwrap();
630    /// # }
631    /// #
632    /// # fn run_test() -> QueryResult<()> {
633    /// #     let connection = &mut establish_connection();
634    ///
635    /// // register the notifications channel we want to receive notifications for
636    /// diesel::sql_query("LISTEN example_channel").execute(connection)?;
637    /// // send some notification
638    /// // this is usually done from a different connection/thread/application
639    /// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(connection)?;
640    ///
641    /// for result in connection.notifications_iter() {
642    ///     let notification = result.unwrap();
643    ///     assert_eq!(notification.channel, "example_channel");
644    ///     assert_eq!(notification.payload, "additional data");
645    ///
646    ///     println!(
647    ///         "Notification received from server process with id {}.",
648    ///         notification.process_id
649    ///     );
650    /// }
651    /// # Ok(())
652    /// # }
653    /// ```
654    pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
655        let conn = &self.connection_and_transaction_manager.raw_connection;
656        std::iter::from_fn(move || conn.pq_notifies().transpose())
657    }
658}
659
660extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
661
662mod private {
663    use super::*;
664
665    #[allow(missing_debug_implementations)]
666    pub struct ConnectionAndTransactionManager {
667        pub(super) raw_connection: RawConnection,
668        pub(super) transaction_state: AnsiTransactionManager,
669        pub(super) instrumentation: DynInstrumentation,
670    }
671
672    pub trait PgLoadingMode<B> {
673        const USE_ROW_BY_ROW_MODE: bool;
674        type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
675        type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
676
677        fn get_cursor<'conn, 'query>(
678            raw_connection: &'conn mut ConnectionAndTransactionManager,
679            result: PgResult,
680            source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
681        ) -> QueryResult<Self::Cursor<'conn, 'query>>;
682    }
683
684    impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
685        const USE_ROW_BY_ROW_MODE: bool = false;
686        type Cursor<'conn, 'query> = Cursor;
687        type Row<'conn, 'query> = self::row::PgRow;
688
689        fn get_cursor<'conn, 'query>(
690            conn: &'conn mut ConnectionAndTransactionManager,
691            result: PgResult,
692            source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
693        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
694            update_transaction_manager_status(
695                Cursor::new(result, &mut conn.raw_connection),
696                conn,
697                &|callback| source.instrumentation(callback),
698                true,
699            )
700        }
701    }
702
703    impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
704        const USE_ROW_BY_ROW_MODE: bool = true;
705        type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
706        type Row<'conn, 'query> = self::row::PgRow;
707
708        fn get_cursor<'conn, 'query>(
709            raw_connection: &'conn mut ConnectionAndTransactionManager,
710            result: PgResult,
711            source: Box<dyn QueryFragmentHelper<crate::result::Error> + 'query>,
712        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
713            Ok(RowByRowCursor::new(result, raw_connection, source))
714        }
715    }
716
717    // this trait exists to turn generic query types
718    // into trait objects to deduplicate the connection code
719    pub trait QueryFragmentHelper<E>:
720        crate::connection::statement_cache::QueryFragmentForCachedStatement<crate::pg::Pg>
721    {
722        fn query_id(&self) -> Option<std::any::TypeId>;
723
724        fn instrumentation(
725            &self,
726            callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
727        );
728
729        fn collect_binds(
730            &self,
731            bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
732            conn: &mut PgConnection,
733        ) -> QueryResult<()>;
734
735        fn write_copy_from(&self, _sink: &mut CopyFromSink<'_>) -> Result<(), E> {
736            Ok(())
737        }
738    }
739
740    // any type that implement `QueryFragment` and `QueryId` works in this location
741    impl<T> QueryFragmentHelper<diesel::result::Error> for T
742    where
743        T: QueryFragment<crate::pg::Pg> + QueryId,
744    {
745        fn query_id(&self) -> Option<std::any::TypeId> {
746            <T as QueryId>::query_id()
747        }
748
749        fn instrumentation(
750            &self,
751            callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
752        ) {
753            callback(&crate::debug_query(self))
754        }
755
756        fn collect_binds(
757            &self,
758            bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
759            conn: &mut PgConnection,
760        ) -> QueryResult<()> {
761            <Self as QueryFragment<diesel::pg::Pg>>::collect_binds(
762                self,
763                bind_collector,
764                conn,
765                &crate::pg::Pg,
766            )
767        }
768    }
769
770    // This wrapper exists as we need to have custom behavior for copy from
771    // statements (fn write_copy_from is relevant)
772    pub(super) struct CopyFromWrapper<S, T>(
773        pub(super) std::cell::RefCell<InternalCopyFromQuery<S, T>>,
774    );
775
776    impl<S, T> crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>
777        for CopyFromWrapper<S, T>
778    where
779        InternalCopyFromQuery<S, T>:
780            crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
781    {
782        fn construct_sql(&self, backend: &Pg) -> QueryResult<String> {
783            self.0.borrow().construct_sql(backend)
784        }
785
786        fn is_safe_to_cache_prepared(&self, backend: &Pg) -> QueryResult<bool> {
787            self.0.borrow().is_safe_to_cache_prepared(backend)
788        }
789    }
790
791    impl<S, T> QueryFragmentHelper<S::Error> for CopyFromWrapper<S, T>
792    where
793        S: CopyFromExpression<T>,
794        InternalCopyFromQuery<S, T>: QueryFragmentHelper<crate::result::Error>,
795        Self: crate::connection::statement_cache::QueryFragmentForCachedStatement<Pg>,
796    {
797        fn query_id(&self) -> Option<std::any::TypeId> {
798            self.0.borrow().query_id()
799        }
800
801        fn instrumentation(
802            &self,
803            callback: &mut dyn FnMut(&dyn crate::connection::instrumentation::DebugQuery),
804        ) {
805            callback(&crate::debug_query(&*self.0.borrow()))
806        }
807
808        fn collect_binds(
809            &self,
810            bind_collector: &mut RawBytesBindCollector<crate::pg::Pg>,
811            conn: &mut PgConnection,
812        ) -> QueryResult<()> {
813            <InternalCopyFromQuery<S, T> as QueryFragmentHelper<crate::result::Error>>::collect_binds(
814            &*self.0.borrow(),
815            bind_collector,
816            conn,
817        )
818        }
819
820        fn write_copy_from(&self, sink: &mut CopyFromSink<'_>) -> Result<(), S::Error> {
821            self.0.borrow_mut().target.callback(sink)
822        }
823    }
824}
825
826#[cfg(test)]
827// that's a false positive for `panic!`/`assert!` on rust 2018
828#[allow(clippy::uninlined_format_args)]
829mod tests {
830    extern crate dotenvy;
831
832    use super::*;
833    use crate::prelude::*;
834    use crate::result::Error::DatabaseError;
835    use std::num::NonZeroU32;
836
837    fn connection() -> PgConnection {
838        crate::test_helpers::pg_connection_no_transaction()
839    }
840
841    #[diesel_test_helper::test]
842    fn notifications_arrive() {
843        use crate::sql_query;
844
845        let conn = &mut connection();
846        sql_query("LISTEN test_notifications")
847            .execute(conn)
848            .unwrap();
849        sql_query("NOTIFY test_notifications, 'first'")
850            .execute(conn)
851            .unwrap();
852        sql_query("NOTIFY test_notifications, 'second'")
853            .execute(conn)
854            .unwrap();
855
856        let notifications = conn
857            .notifications_iter()
858            .map(Result::unwrap)
859            .collect::<Vec<_>>();
860
861        assert_eq!(2, notifications.len());
862        assert_eq!(notifications[0].channel, "test_notifications");
863        assert_eq!(notifications[1].channel, "test_notifications");
864        assert_eq!(notifications[0].payload, "first");
865        assert_eq!(notifications[1].payload, "second");
866
867        let next_notification = conn.notifications_iter().next();
868        assert!(
869            next_notification.is_none(),
870            "Got a next notification, while not expecting one: {next_notification:?}"
871        );
872
873        sql_query("NOTIFY test_notifications")
874            .execute(conn)
875            .unwrap();
876        assert_eq!(
877            conn.notifications_iter().next().unwrap().unwrap().payload,
878            ""
879        );
880    }
881
882    #[diesel_test_helper::test]
883    fn malformed_sql_query() {
884        let connection = &mut connection();
885        let query =
886            crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
887
888        if let Err(DatabaseError(_, string)) = query {
889            assert_eq!(Some(26), string.statement_position());
890        } else {
891            unreachable!();
892        }
893    }
894
895    table! {
896        users {
897            id -> Integer,
898            name -> Text,
899        }
900    }
901
902    #[diesel_test_helper::test]
903    fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
904        use crate::connection::{AnsiTransactionManager, TransactionManager};
905        use crate::result::Error;
906        use crate::PgConnection;
907
908        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
909        assert_eq!(
910            None,
911            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
912                conn
913            ).transaction_depth().expect("Transaction depth")
914        );
915        let result = AnsiTransactionManager::commit_transaction(conn);
916        assert!(matches!(result, Err(Error::NotInTransaction)))
917    }
918
919    #[diesel_test_helper::test]
920    fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
921        use crate::connection::{AnsiTransactionManager, TransactionManager};
922        use crate::result::Error;
923        use crate::PgConnection;
924
925        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
926        assert_eq!(
927            None,
928            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
929                conn
930            ).transaction_depth().expect("Transaction depth")
931        );
932        let result = AnsiTransactionManager::rollback_transaction(conn);
933        assert!(matches!(result, Err(Error::NotInTransaction)))
934    }
935
936    #[diesel_test_helper::test]
937    fn postgres_transaction_is_rolled_back_upon_syntax_error() {
938        use std::num::NonZeroU32;
939
940        use crate::connection::{AnsiTransactionManager, TransactionManager};
941        use crate::pg::connection::raw::PgTransactionStatus;
942        use crate::*;
943        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
944        assert_eq!(
945            None,
946            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
947                conn
948            ).transaction_depth().expect("Transaction depth")
949        );
950        let _result = conn.build_transaction().run(|conn| {
951            assert_eq!(
952                NonZeroU32::new(1),
953                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
954                    conn
955                ).transaction_depth().expect("Transaction depth")
956            );
957            // In Postgres, a syntax error breaks the transaction block
958            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
959            assert!(query_result.is_err());
960            assert_eq!(
961                PgTransactionStatus::InError,
962                conn.connection_and_transaction_manager.raw_connection.transaction_status()
963            );
964            query_result
965        });
966        assert_eq!(
967            None,
968            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
969                conn
970            ).transaction_depth().expect("Transaction depth")
971        );
972        assert_eq!(
973            PgTransactionStatus::Idle,
974            conn.connection_and_transaction_manager
975                .raw_connection
976                .transaction_status()
977        );
978    }
979
980    #[diesel_test_helper::test]
981    fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
982        use std::num::NonZeroU32;
983
984        use crate::connection::{AnsiTransactionManager, TransactionManager};
985        use crate::pg::connection::raw::PgTransactionStatus;
986        use crate::*;
987        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
988        assert_eq!(
989            None,
990            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
991                conn
992            ).transaction_depth().expect("Transaction depth")
993        );
994        let result = conn.build_transaction().run(|conn| {
995            assert_eq!(
996                NonZeroU32::new(1),
997                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
998                    conn
999            ).transaction_depth().expect("Transaction depth")
1000            );
1001            let result = conn.build_transaction().run(|conn| {
1002                assert_eq!(
1003                    NonZeroU32::new(2),
1004                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1005                        conn
1006            ).transaction_depth().expect("Transaction depth")
1007                );
1008                sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
1009            });
1010            assert!(result.is_err());
1011            assert_eq!(
1012                NonZeroU32::new(1),
1013                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1014                    conn
1015            ).transaction_depth().expect("Transaction depth")
1016            );
1017            let query_result = sql_query("SELECT 1").execute(conn);
1018            assert!(query_result.is_ok());
1019            assert_eq!(
1020                PgTransactionStatus::InTransaction,
1021                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1022            );
1023            query_result
1024        });
1025        assert!(result.is_ok());
1026        assert_eq!(
1027            PgTransactionStatus::Idle,
1028            conn.connection_and_transaction_manager
1029                .raw_connection
1030                .transaction_status()
1031        );
1032        assert_eq!(
1033            None,
1034            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1035                conn
1036            ).transaction_depth().expect("Transaction depth")
1037        );
1038    }
1039
1040    #[diesel_test_helper::test]
1041    // This function uses collect with an side effect (spawning threads)
1042    // so this is a false positive from clippy
1043    #[allow(clippy::needless_collect)]
1044    fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
1045        use crate::pg::connection::raw::PgTransactionStatus;
1046        use crate::result::DatabaseErrorKind::SerializationFailure;
1047        use crate::result::Error::DatabaseError;
1048        use crate::*;
1049        use std::sync::{Arc, Barrier};
1050        use std::thread;
1051
1052        table! {
1053            #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
1054            serialization_example {
1055                id -> Serial,
1056                class -> Integer,
1057            }
1058        }
1059
1060        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1061
1062        sql_query(
1063            "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
1064        )
1065        .execute(conn)
1066        .unwrap();
1067        sql_query(
1068            r#"
1069            CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
1070                id SERIAL PRIMARY KEY,
1071                class INTEGER NOT NULL
1072            )
1073        "#,
1074        )
1075        .execute(conn)
1076        .unwrap();
1077
1078        insert_into(serialization_example::table)
1079            .values(&vec![
1080                serialization_example::class.eq(1),
1081                serialization_example::class.eq(2),
1082            ])
1083            .execute(conn)
1084            .unwrap();
1085
1086        let before_barrier = Arc::new(Barrier::new(2));
1087        let after_barrier = Arc::new(Barrier::new(2));
1088        let threads = (1..3)
1089            .map(|i| {
1090                let before_barrier = before_barrier.clone();
1091                let after_barrier = after_barrier.clone();
1092                thread::spawn(move || {
1093                    use crate::connection::AnsiTransactionManager;
1094                    use crate::connection::TransactionManager;
1095                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1096                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1097
1098                    let result = conn.build_transaction().serializable().run(|conn| {
1099                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1100
1101                        let _ = serialization_example::table
1102                            .filter(serialization_example::class.eq(i))
1103                            .count()
1104                            .execute(conn)?;
1105
1106                        let other_i = if i == 1 { 2 } else { 1 };
1107                        let q = insert_into(serialization_example::table)
1108                            .values(serialization_example::class.eq(other_i));
1109                        before_barrier.wait();
1110
1111                        let r = q.execute(conn);
1112                        after_barrier.wait();
1113                        r
1114                    });
1115                    assert_eq!(
1116                        PgTransactionStatus::Idle,
1117                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1118                    );
1119
1120                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1121                    result
1122                })
1123            })
1124            .collect::<Vec<_>>();
1125
1126        let mut results = threads
1127            .into_iter()
1128            .map(|t| t.join().unwrap())
1129            .collect::<Vec<_>>();
1130
1131        results.sort_by_key(|r| r.is_err());
1132
1133        assert!(results[0].is_ok(), "Got {:?} instead", results);
1134        assert!(
1135            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1136            "Got {:?} instead",
1137            results
1138        );
1139        assert_eq!(
1140            PgTransactionStatus::Idle,
1141            conn.connection_and_transaction_manager
1142                .raw_connection
1143                .transaction_status()
1144        );
1145    }
1146
1147    #[diesel_test_helper::test]
1148    // This function uses collect with an side effect (spawning threads)
1149    // so this is a false positive from clippy
1150    #[allow(clippy::needless_collect)]
1151    fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
1152        use crate::pg::connection::raw::PgTransactionStatus;
1153        use crate::result::DatabaseErrorKind::SerializationFailure;
1154        use crate::result::Error::DatabaseError;
1155        use crate::*;
1156        use std::sync::{Arc, Barrier};
1157        use std::thread;
1158
1159        table! {
1160            #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
1161            serialization_example {
1162                id -> Serial,
1163                class -> Integer,
1164            }
1165        }
1166
1167        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1168
1169        sql_query(
1170            "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1171        )
1172        .execute(conn)
1173        .unwrap();
1174        sql_query(
1175            r#"
1176            CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1177                id SERIAL PRIMARY KEY,
1178                class INTEGER NOT NULL
1179            )
1180        "#,
1181        )
1182        .execute(conn)
1183        .unwrap();
1184
1185        insert_into(serialization_example::table)
1186            .values(&vec![
1187                serialization_example::class.eq(1),
1188                serialization_example::class.eq(2),
1189            ])
1190            .execute(conn)
1191            .unwrap();
1192
1193        let before_barrier = Arc::new(Barrier::new(2));
1194        let after_barrier = Arc::new(Barrier::new(2));
1195        let threads = (1..3)
1196            .map(|i| {
1197                let before_barrier = before_barrier.clone();
1198                let after_barrier = after_barrier.clone();
1199                thread::spawn(move || {
1200                    use crate::connection::AnsiTransactionManager;
1201                    use crate::connection::TransactionManager;
1202                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1203                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1204
1205                    let result = conn.build_transaction().serializable().run(|conn| {
1206                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1207                        let r = conn.transaction(|conn| {
1208                            assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1209
1210                            let _ = serialization_example::table
1211                                .filter(serialization_example::class.eq(i))
1212                                .count()
1213                                .execute(conn)?;
1214
1215                            let other_i = if i == 1 { 2 } else { 1 };
1216                            let q = insert_into(serialization_example::table)
1217                                .values(serialization_example::class.eq(other_i));
1218                            before_barrier.wait();
1219
1220                            let r = q.execute(conn);
1221                            after_barrier.wait();
1222                            r
1223                        });
1224                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1225                        assert_eq!(
1226                            PgTransactionStatus::InTransaction,
1227                            conn.connection_and_transaction_manager.raw_connection.transaction_status()
1228                        );
1229                        r
1230                    });
1231                    assert_eq!(
1232                        PgTransactionStatus::Idle,
1233                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1234                    );
1235
1236                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1237                    result
1238                })
1239            })
1240            .collect::<Vec<_>>();
1241
1242        let mut results = threads
1243            .into_iter()
1244            .map(|t| t.join().unwrap())
1245            .collect::<Vec<_>>();
1246
1247        results.sort_by_key(|r| r.is_err());
1248
1249        assert!(results[0].is_ok(), "Got {:?} instead", results);
1250        assert!(
1251            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1252            "Got {:?} instead",
1253            results
1254        );
1255        assert_eq!(
1256            PgTransactionStatus::Idle,
1257            conn.connection_and_transaction_manager
1258                .raw_connection
1259                .transaction_status()
1260        );
1261    }
1262
1263    #[diesel_test_helper::test]
1264    fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1265        use crate::connection::{AnsiTransactionManager, TransactionManager};
1266        use crate::pg::connection::raw::PgTransactionStatus;
1267        use crate::result::Error;
1268        use crate::*;
1269
1270        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1271        assert_eq!(
1272            None,
1273            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1274                conn
1275            ).transaction_depth().expect("Transaction depth")
1276        );
1277        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1278            assert_eq!(
1279                NonZeroU32::new(1),
1280                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1281                    conn
1282            ).transaction_depth().expect("Transaction depth")
1283            );
1284            sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1285            sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1286                .execute(conn)?;
1287            sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1288            let result =
1289                sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1290            assert!(result.is_ok());
1291            assert_eq!(
1292                PgTransactionStatus::InTransaction,
1293                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1294            );
1295            Ok(())
1296        });
1297        assert_eq!(
1298            None,
1299            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1300                conn
1301            ).transaction_depth().expect("Transaction depth")
1302        );
1303        assert_eq!(
1304            PgTransactionStatus::Idle,
1305            conn.connection_and_transaction_manager
1306                .raw_connection
1307                .transaction_status()
1308        );
1309        assert!(result.is_err());
1310    }
1311
1312    #[diesel_test_helper::test]
1313    fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1314        use crate::connection::{AnsiTransactionManager, TransactionManager};
1315        use crate::pg::connection::raw::PgTransactionStatus;
1316        use crate::result::Error;
1317        use crate::*;
1318
1319        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1320        assert_eq!(
1321            None,
1322            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1323                conn
1324            ).transaction_depth().expect("Transaction depth")
1325        );
1326        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1327            assert_eq!(
1328                NonZeroU32::new(1),
1329                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1330                    conn
1331            ).transaction_depth().expect("Transaction depth")
1332            );
1333            sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1334            sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1335                .execute(conn)?;
1336            sql_query(
1337                r#"
1338                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1339                        RETURNS trigger
1340                        LANGUAGE plpgsql
1341                        AS $$
1342                    DECLARE
1343                    BEGIN
1344                        IF NEW.value = 42 THEN
1345                            RAISE EXCEPTION 'Transaction kaboom';
1346                        END IF;
1347                    RETURN NEW;
1348
1349                    END;$$;
1350                "#,
1351            )
1352            .execute(conn)?;
1353
1354            sql_query(
1355                r#"
1356                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1357                        AFTER INSERT ON "deferred_trigger_commit"
1358                        DEFERRABLE INITIALLY DEFERRED
1359                        FOR EACH ROW
1360                        EXECUTE PROCEDURE transaction_depth_blow_up()
1361            "#,
1362            )
1363            .execute(conn)?;
1364            let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1365            assert!(result.is_ok());
1366            assert_eq!(
1367                PgTransactionStatus::InTransaction,
1368                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1369            );
1370            Ok(())
1371        });
1372        assert_eq!(
1373            None,
1374            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1375                conn
1376            ).transaction_depth().expect("Transaction depth")
1377        );
1378        assert_eq!(
1379            PgTransactionStatus::Idle,
1380            conn.connection_and_transaction_manager
1381                .raw_connection
1382                .transaction_status()
1383        );
1384        assert!(result.is_err());
1385    }
1386
1387    #[diesel_test_helper::test]
1388    fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1389        use crate::connection::{AnsiTransactionManager, TransactionManager};
1390        use crate::pg::connection::raw::PgTransactionStatus;
1391        use crate::result::Error;
1392        use crate::*;
1393
1394        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1395        assert_eq!(
1396            None,
1397            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1398                conn
1399            ).transaction_depth().expect("Transaction depth")
1400        );
1401        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1402            assert_eq!(
1403                NonZeroU32::new(1),
1404                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1405                    conn
1406            ).transaction_depth().expect("Transaction depth")
1407            );
1408            sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1409            sql_query(
1410                "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1411            )
1412            .execute(conn)?;
1413            sql_query(
1414                r#"
1415                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1416                        RETURNS trigger
1417                        LANGUAGE plpgsql
1418                        AS $$
1419                    DECLARE
1420                    BEGIN
1421                        IF NEW.value = 42 THEN
1422                            RAISE EXCEPTION 'Transaction kaboom';
1423                        END IF;
1424                    RETURN NEW;
1425
1426                    END;$$;
1427                "#,
1428            )
1429            .execute(conn)?;
1430
1431            sql_query(
1432                r#"
1433                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1434                        AFTER INSERT ON "deferred_trigger_nested_commit"
1435                        DEFERRABLE INITIALLY DEFERRED
1436                        FOR EACH ROW
1437                        EXECUTE PROCEDURE transaction_depth_blow_up()
1438            "#,
1439            )
1440            .execute(conn)?;
1441            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1442                let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1443                    .execute(conn);
1444                assert!(result.is_ok());
1445                Ok(())
1446            });
1447            assert!(inner_result.is_err());
1448            assert_eq!(
1449                PgTransactionStatus::InTransaction,
1450                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1451            );
1452            Ok(())
1453        });
1454        assert_eq!(
1455            None,
1456            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1457                conn
1458            ).transaction_depth().expect("Transaction depth")
1459        );
1460        assert_eq!(
1461            PgTransactionStatus::Idle,
1462            conn.connection_and_transaction_manager
1463                .raw_connection
1464                .transaction_status()
1465        );
1466        assert!(result.is_ok(), "Expected success, got {:?}", result);
1467    }
1468
1469    #[diesel_test_helper::test]
1470    fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1471        use crate::connection::{AnsiTransactionManager, TransactionManager};
1472        use crate::pg::connection::raw::PgTransactionStatus;
1473        use crate::result::Error;
1474        use crate::*;
1475
1476        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1477        assert_eq!(
1478            None,
1479            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1480                conn
1481            ).transaction_depth().expect("Transaction depth")
1482        );
1483        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1484            assert_eq!(
1485                NonZeroU32::new(1),
1486                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1487                    conn
1488            ).transaction_depth().expect("Transaction depth")
1489            );
1490            sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1491            sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1492            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1493                assert_eq!(
1494                    NonZeroU32::new(2),
1495                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1496                        conn
1497                    ).transaction_depth().expect("Transaction depth")
1498                );
1499                sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1500                let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1501                assert!(result.is_ok());
1502                Ok(())
1503            });
1504            assert!(inner_result.is_err());
1505            assert_eq!(
1506                PgTransactionStatus::InTransaction,
1507                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1508            );
1509            assert_eq!(
1510                NonZeroU32::new(1),
1511                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1512                    conn
1513            ).transaction_depth().expect("Transaction depth")
1514            );
1515            sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1516        });
1517        assert_eq!(
1518            None,
1519            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1520                conn
1521            ).transaction_depth().expect("Transaction depth")
1522        );
1523        assert_eq!(
1524            PgTransactionStatus::Idle,
1525            conn.connection_and_transaction_manager
1526                .raw_connection
1527                .transaction_status()
1528        );
1529        assert!(result.is_ok());
1530    }
1531}