diesel/pg/connection/
mod.rs

1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::CopyFromSink;
9use self::copy::CopyToBuffer;
10use self::cursor::*;
11use self::private::ConnectionAndTransactionManager;
12use self::raw::{PgTransactionStatus, RawConnection};
13use self::stmt::Statement;
14use crate::connection::instrumentation::DebugQuery;
15use crate::connection::instrumentation::Instrumentation;
16use crate::connection::instrumentation::StrQueryHelper;
17use crate::connection::statement_cache::{MaybeCached, StatementCache};
18use crate::connection::*;
19use crate::expression::QueryMetadata;
20use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
21use crate::pg::query_builder::copy::InternalCopyFromQuery;
22use crate::pg::{Pg, TransactionBuilder};
23use crate::query_builder::bind_collector::RawBytesBindCollector;
24use crate::query_builder::*;
25use crate::result::ConnectionError::CouldntSetupConfiguration;
26use crate::result::*;
27use crate::RunQueryDsl;
28use std::ffi::CString;
29use std::fmt::Debug;
30use std::os::raw as libc;
31
32use super::query_builder::copy::CopyFromExpression;
33use super::query_builder::copy::CopyTarget;
34use super::query_builder::copy::CopyToCommand;
35
36pub(super) use self::result::PgResult;
37
38/// The connection string expected by `PgConnection::establish`
39/// should be a PostgreSQL connection string, as documented at
40/// <https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-CONNSTRING>
41///
42/// # Supported loading model implementations
43///
44/// * [`DefaultLoadingMode`]
45/// * [`PgRowByRowLoadingMode`]
46///
47/// If you are unsure which loading mode is the correct one for your application,
48/// you likely want to use the `DefaultLoadingMode` as that one offers
49/// generally better performance.
50///
51/// Due to the fact that `PgConnection` supports multiple loading modes
52/// it is **required** to always specify the used loading mode
53/// when calling [`RunQueryDsl::load_iter`]
54///
55/// ## `DefaultLoadingMode`
56///
57/// By using this mode `PgConnection` defaults to loading all response values at **once**
58/// and only performs deserialization afterward for the `DefaultLoadingMode`.
59/// Generally this mode will be more performant as it.
60///
61/// This loading mode allows users to perform hold more than one iterator at once using
62/// the same connection:
63/// ```rust
64/// # include!("../../doctest_setup.rs");
65/// #
66/// # fn main() {
67/// #     run_test().unwrap();
68/// # }
69/// #
70/// # fn run_test() -> QueryResult<()> {
71/// #     use schema::users;
72/// #     let connection = &mut establish_connection();
73/// use diesel::connection::DefaultLoadingMode;
74///
75/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
76/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
77///
78/// for r in iter1 {
79///     let (id, name) = r?;
80///     println!("Id: {} Name: {}", id, name);
81/// }
82///
83/// for r in iter2 {
84///     let (id, name) = r?;
85///     println!("Id: {} Name: {}", id, name);
86/// }
87/// #   Ok(())
88/// # }
89/// ```
90///
91/// ## `PgRowByRowLoadingMode`
92///
93/// By using this mode `PgConnection` defaults to loading each row of the result set
94/// separately. This might be desired for huge result sets.
95///
96/// This loading mode **prevents** creating more than one iterator at once using
97/// the same connection. The following code is **not** allowed:
98///
99/// ```compile_fail
100/// # include!("../../doctest_setup.rs");
101/// #
102/// # fn main() {
103/// #     run_test().unwrap();
104/// # }
105/// #
106/// # fn run_test() -> QueryResult<()> {
107/// #     use schema::users;
108/// #     let connection = &mut establish_connection();
109/// use diesel::pg::PgRowByRowLoadingMode;
110///
111/// let iter1 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
112/// // creating a second iterator generates an compiler error
113/// let iter2 = users::table.load_iter::<(i32, String), PgRowByRowLoadingMode>(connection)?;
114///
115/// for r in iter1 {
116///     let (id, name) = r?;
117///     println!("Id: {} Name: {}", id, name);
118/// }
119///
120/// for r in iter2 {
121///     let (id, name) = r?;
122///     println!("Id: {} Name: {}", id, name);
123/// }
124/// #   Ok(())
125/// # }
126/// ```
127#[allow(missing_debug_implementations)]
128#[cfg(feature = "postgres")]
129pub struct PgConnection {
130    statement_cache: StatementCache<Pg, Statement>,
131    metadata_cache: PgMetadataCache,
132    connection_and_transaction_manager: ConnectionAndTransactionManager,
133}
134
135// according to libpq documentation a connection can be transferred to other threads
136#[allow(unsafe_code)]
137unsafe impl Send for PgConnection {}
138
139impl SimpleConnection for PgConnection {
140    #[allow(unsafe_code)] // use of unsafe function
141    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
142        self.connection_and_transaction_manager
143            .instrumentation
144            .on_connection_event(InstrumentationEvent::StartQuery {
145                query: &StrQueryHelper::new(query),
146            });
147        let c_query = CString::new(query)?;
148        let inner_result = unsafe {
149            self.connection_and_transaction_manager
150                .raw_connection
151                .exec(c_query.as_ptr())
152        };
153        update_transaction_manager_status(
154            inner_result.and_then(|raw_result| {
155                PgResult::new(
156                    raw_result,
157                    &self.connection_and_transaction_manager.raw_connection,
158                )
159            }),
160            &mut self.connection_and_transaction_manager,
161            &StrQueryHelper::new(query),
162            true,
163        )?;
164        Ok(())
165    }
166}
167
168/// A [`PgConnection`] specific loading mode to load rows one by one
169///
170/// See the documentation of [`PgConnection`] for details
171#[derive(Debug, Copy, Clone)]
172pub struct PgRowByRowLoadingMode;
173
174impl ConnectionSealed for PgConnection {}
175
176impl Connection for PgConnection {
177    type Backend = Pg;
178    type TransactionManager = AnsiTransactionManager;
179
180    fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
181        let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation();
182        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
183            url: database_url,
184        });
185        let r = RawConnection::establish(database_url).and_then(|raw_conn| {
186            let mut conn = PgConnection {
187                connection_and_transaction_manager: ConnectionAndTransactionManager {
188                    raw_connection: raw_conn,
189                    transaction_state: AnsiTransactionManager::default(),
190                    instrumentation: None,
191                },
192                statement_cache: StatementCache::new(),
193                metadata_cache: PgMetadataCache::new(),
194            };
195            conn.set_config_options()
196                .map_err(CouldntSetupConfiguration)?;
197            Ok(conn)
198        });
199        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
200            url: database_url,
201            error: r.as_ref().err(),
202        });
203        let mut conn = r?;
204        conn.connection_and_transaction_manager.instrumentation = instrumentation;
205        Ok(conn)
206    }
207
208    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
209    where
210        T: QueryFragment<Pg> + QueryId,
211    {
212        update_transaction_manager_status(
213            self.with_prepared_query(source, true, |query, params, conn, _source| {
214                let res = query
215                    .execute(&mut conn.raw_connection, &params, false)
216                    .map(|r| r.rows_affected());
217                // according to https://www.postgresql.org/docs/current/libpq-async.html
218                // `PQgetResult` needs to be called till a null pointer is returned
219                while conn.raw_connection.get_next_result()?.is_some() {}
220                res
221            }),
222            &mut self.connection_and_transaction_manager,
223            &crate::debug_query(source),
224            true,
225        )
226    }
227
228    fn transaction_state(&mut self) -> &mut AnsiTransactionManager
229    where
230        Self: Sized,
231    {
232        &mut self.connection_and_transaction_manager.transaction_state
233    }
234
235    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
236        &mut self.connection_and_transaction_manager.instrumentation
237    }
238
239    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
240        self.connection_and_transaction_manager.instrumentation = Some(Box::new(instrumentation));
241    }
242}
243
244impl<B> LoadConnection<B> for PgConnection
245where
246    Self: self::private::PgLoadingMode<B>,
247{
248    type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
249    type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
250
251    fn load<'conn, 'query, T>(
252        &'conn mut self,
253        source: T,
254    ) -> QueryResult<Self::Cursor<'conn, 'query>>
255    where
256        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
257        Self::Backend: QueryMetadata<T::SqlType>,
258    {
259        self.with_prepared_query(source, false, |stmt, params, conn, source| {
260            use self::private::PgLoadingMode;
261            let result = stmt.execute(&mut conn.raw_connection, &params, Self::USE_ROW_BY_ROW_MODE);
262            let result = update_transaction_manager_status(
263                result,
264                conn,
265                &crate::debug_query(&source),
266                false,
267            )?;
268            Self::get_cursor(conn, result, source)
269        })
270    }
271}
272
273impl GetPgMetadataCache for PgConnection {
274    fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
275        &mut self.metadata_cache
276    }
277}
278
279#[inline(always)]
280fn update_transaction_manager_status<T>(
281    query_result: QueryResult<T>,
282    conn: &mut ConnectionAndTransactionManager,
283    source: &dyn DebugQuery,
284    final_call: bool,
285) -> QueryResult<T> {
286    /// avoid monomorphizing for every result type - this part will not be inlined
287    fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
288        let raw_conn: &mut RawConnection = &mut conn.raw_connection;
289        let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
290        // libpq keeps track of the transaction status internally, and that is accessible
291        // via `transaction_status`. We can use that to update the AnsiTransactionManager
292        // status
293        match raw_conn.transaction_status() {
294            PgTransactionStatus::InError => {
295                tm.status.set_requires_rollback_maybe_up_to_top_level(true)
296            }
297            PgTransactionStatus::Unknown => tm.status.set_in_error(),
298            PgTransactionStatus::Idle => {
299                // This is useful in particular for commit attempts (even
300                // if `COMMIT` errors it still exits transaction)
301
302                // This may repair the transaction manager
303                tm.status = TransactionManagerStatus::Valid(Default::default())
304            }
305            PgTransactionStatus::InTransaction => {
306                let transaction_status = &mut tm.status;
307                // If we weren't an error, it is possible that we were a transaction start
308                // -> we should tolerate any state
309                if is_err {
310                    // An error may not have us enter a transaction, so if we weren't in one
311                    // we may not be in one now
312                    if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
313                    {
314                        // -> transaction manager is broken
315                        transaction_status.set_in_error()
316                    }
317                } else {
318                    // If transaction was InError before, but now it's not (because we attempted
319                    // a rollback), we may pretend it's fixed because
320                    // if it isn't Postgres *will* tell us again.
321
322                    // Fun fact: if we have not received an `InTransaction` status however,
323                    // postgres will *not* warn us that transaction is broken when attempting to
324                    // commit, so we may think that commit has succeeded but in fact it hasn't.
325                    tm.status.set_requires_rollback_maybe_up_to_top_level(false)
326                }
327            }
328            PgTransactionStatus::Active => {
329                // This is a transient state for libpq - nothing we can deduce here.
330            }
331        }
332    }
333    non_generic_inner(conn, query_result.is_err());
334    if let Err(ref e) = query_result {
335        conn.instrumentation
336            .on_connection_event(InstrumentationEvent::FinishQuery {
337                query: source,
338                error: Some(e),
339            });
340    } else if final_call {
341        conn.instrumentation
342            .on_connection_event(InstrumentationEvent::FinishQuery {
343                query: source,
344                error: None,
345            });
346    }
347    query_result
348}
349
350#[cfg(feature = "r2d2")]
351impl crate::r2d2::R2D2Connection for PgConnection {
352    fn ping(&mut self) -> QueryResult<()> {
353        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
354    }
355
356    fn is_broken(&mut self) -> bool {
357        AnsiTransactionManager::is_broken_transaction_manager(self)
358    }
359}
360
361impl MultiConnectionHelper for PgConnection {
362    fn to_any<'a>(
363        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
364    ) -> &mut (dyn std::any::Any + 'a) {
365        lookup.as_any()
366    }
367
368    fn from_any(
369        lookup: &mut dyn std::any::Any,
370    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
371        lookup
372            .downcast_mut::<Self>()
373            .map(|conn| conn as &mut dyn super::PgMetadataLookup)
374    }
375}
376
377impl PgConnection {
378    /// Build a transaction, specifying additional details such as isolation level
379    ///
380    /// See [`TransactionBuilder`] for more examples.
381    ///
382    /// [`TransactionBuilder`]: crate::pg::TransactionBuilder
383    ///
384    /// ```rust
385    /// # include!("../../doctest_setup.rs");
386    /// #
387    /// # fn main() {
388    /// #     run_test().unwrap();
389    /// # }
390    /// #
391    /// # fn run_test() -> QueryResult<()> {
392    /// #     use schema::users::dsl::*;
393    /// #     let conn = &mut connection_no_transaction();
394    /// conn.build_transaction()
395    ///     .read_only()
396    ///     .serializable()
397    ///     .deferrable()
398    ///     .run(|conn| Ok(()))
399    /// # }
400    /// ```
401    pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
402        TransactionBuilder::new(self)
403    }
404
405    pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
406    where
407        S: CopyFromExpression<T>,
408    {
409        let query = InternalCopyFromQuery::new(target);
410        let res = self.with_prepared_query(query, false, |stmt, binds, conn, mut source| {
411            fn inner_copy_in<S, T>(
412                stmt: MaybeCached<'_, Statement>,
413                conn: &mut ConnectionAndTransactionManager,
414                binds: Vec<Option<Vec<u8>>>,
415                source: &mut InternalCopyFromQuery<S, T>,
416            ) -> Result<usize, S::Error>
417            where
418                S: CopyFromExpression<T>,
419            {
420                let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
421                let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
422                let r = source.target.callback(&mut copy_in);
423                copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
424                let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
425                    crate::result::Error::DeserializationError(
426                        "Failed to receive result from the database".into(),
427                    )
428                })?;
429                let rows = next_res.rows_affected();
430                while let Some(_r) = conn.raw_connection.get_next_result()? {}
431                r?;
432                Ok(rows)
433            }
434
435            let rows = inner_copy_in(stmt, conn, binds, &mut source);
436            if let Err(ref e) = rows {
437                let database_error = crate::result::Error::DatabaseError(
438                    crate::result::DatabaseErrorKind::Unknown,
439                    Box::new(e.to_string()),
440                );
441                conn.instrumentation
442                    .on_connection_event(InstrumentationEvent::FinishQuery {
443                        query: &crate::debug_query(&source),
444                        error: Some(&database_error),
445                    });
446            } else {
447                conn.instrumentation
448                    .on_connection_event(InstrumentationEvent::FinishQuery {
449                        query: &crate::debug_query(&source),
450                        error: None,
451                    });
452            }
453
454            rows
455        })?;
456
457        Ok(res)
458    }
459
460    pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
461    where
462        T: CopyTarget,
463    {
464        let res = self.with_prepared_query::<_, _, Error>(
465            command,
466            false,
467            |stmt, binds, conn, source| {
468                let res = stmt.execute(&mut conn.raw_connection, &binds, false);
469                conn.instrumentation
470                    .on_connection_event(InstrumentationEvent::FinishQuery {
471                        query: &crate::debug_query(&source),
472                        error: res.as_ref().err(),
473                    });
474                Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
475            },
476        )?;
477        Ok(res)
478    }
479
480    fn with_prepared_query<'conn, T, R, E>(
481        &'conn mut self,
482        source: T,
483        execute_returning_count: bool,
484        f: impl FnOnce(
485            MaybeCached<'_, Statement>,
486            Vec<Option<Vec<u8>>>,
487            &'conn mut ConnectionAndTransactionManager,
488            T,
489        ) -> Result<R, E>,
490    ) -> Result<R, E>
491    where
492        T: QueryFragment<Pg> + QueryId,
493        E: From<crate::result::Error>,
494    {
495        self.connection_and_transaction_manager
496            .instrumentation
497            .on_connection_event(InstrumentationEvent::StartQuery {
498                query: &crate::debug_query(&source),
499            });
500        let mut bind_collector = RawBytesBindCollector::<Pg>::new();
501        source.collect_binds(&mut bind_collector, self, &Pg)?;
502        let binds = bind_collector.binds;
503        let metadata = bind_collector.metadata;
504
505        let cache_len = self.statement_cache.len();
506        let cache = &mut self.statement_cache;
507        let conn = &mut self.connection_and_transaction_manager.raw_connection;
508        let query = cache.cached_statement(
509            &source,
510            &Pg,
511            &metadata,
512            |sql, _| {
513                let query_name = if source.is_safe_to_cache_prepared(&Pg)? {
514                    Some(format!("__diesel_stmt_{cache_len}"))
515                } else {
516                    None
517                };
518                Statement::prepare(conn, sql, query_name.as_deref(), &metadata)
519            },
520            &mut self.connection_and_transaction_manager.instrumentation,
521        );
522        if !execute_returning_count {
523            if let Err(ref e) = query {
524                self.connection_and_transaction_manager
525                    .instrumentation
526                    .on_connection_event(InstrumentationEvent::FinishQuery {
527                        query: &crate::debug_query(&source),
528                        error: Some(e),
529                    });
530            }
531        }
532
533        f(
534            query?,
535            binds,
536            &mut self.connection_and_transaction_manager,
537            source,
538        )
539    }
540
541    fn set_config_options(&mut self) -> QueryResult<()> {
542        crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
543        crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
544        self.connection_and_transaction_manager
545            .raw_connection
546            .set_notice_processor(noop_notice_processor);
547        Ok(())
548    }
549}
550
551extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
552
553mod private {
554    use super::*;
555
556    #[allow(missing_debug_implementations)]
557    pub struct ConnectionAndTransactionManager {
558        pub(super) raw_connection: RawConnection,
559        pub(super) transaction_state: AnsiTransactionManager,
560        pub(super) instrumentation: Option<Box<dyn Instrumentation>>,
561    }
562
563    pub trait PgLoadingMode<B> {
564        const USE_ROW_BY_ROW_MODE: bool;
565        type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
566        type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
567
568        fn get_cursor<'conn, 'query>(
569            raw_connection: &'conn mut ConnectionAndTransactionManager,
570            result: PgResult,
571            source: impl QueryFragment<Pg> + 'query,
572        ) -> QueryResult<Self::Cursor<'conn, 'query>>;
573    }
574
575    impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
576        const USE_ROW_BY_ROW_MODE: bool = false;
577        type Cursor<'conn, 'query> = Cursor;
578        type Row<'conn, 'query> = self::row::PgRow;
579
580        fn get_cursor<'conn, 'query>(
581            conn: &'conn mut ConnectionAndTransactionManager,
582            result: PgResult,
583            source: impl QueryFragment<Pg> + 'query,
584        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
585            update_transaction_manager_status(
586                Cursor::new(result, &mut conn.raw_connection),
587                conn,
588                &crate::debug_query(&source),
589                true,
590            )
591        }
592    }
593
594    impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
595        const USE_ROW_BY_ROW_MODE: bool = true;
596        type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
597        type Row<'conn, 'query> = self::row::PgRow;
598
599        fn get_cursor<'conn, 'query>(
600            raw_connection: &'conn mut ConnectionAndTransactionManager,
601            result: PgResult,
602            source: impl QueryFragment<Pg> + 'query,
603        ) -> QueryResult<Self::Cursor<'conn, 'query>> {
604            Ok(RowByRowCursor::new(
605                result,
606                raw_connection,
607                Box::new(source),
608            ))
609        }
610    }
611}
612
613#[cfg(test)]
614// that's a false positive for `panic!`/`assert!` on rust 2018
615#[allow(clippy::uninlined_format_args)]
616mod tests {
617    extern crate dotenvy;
618
619    use super::*;
620    use crate::dsl::sql;
621    use crate::prelude::*;
622    use crate::result::Error::DatabaseError;
623    use crate::sql_types::{Integer, VarChar};
624    use std::num::NonZeroU32;
625
626    #[test]
627    fn malformed_sql_query() {
628        let connection = &mut connection();
629        let query =
630            crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
631
632        if let Err(DatabaseError(_, string)) = query {
633            assert_eq!(Some(26), string.statement_position());
634        } else {
635            unreachable!();
636        }
637    }
638
639    #[test]
640    fn prepared_statements_are_cached() {
641        let connection = &mut connection();
642
643        let query = crate::select(1.into_sql::<Integer>());
644
645        assert_eq!(Ok(1), query.get_result(connection));
646        assert_eq!(Ok(1), query.get_result(connection));
647        assert_eq!(1, connection.statement_cache.len());
648    }
649
650    #[test]
651    fn queries_with_identical_sql_but_different_types_are_cached_separately() {
652        let connection = &mut connection();
653
654        let query = crate::select(1.into_sql::<Integer>());
655        let query2 = crate::select("hi".into_sql::<VarChar>());
656
657        assert_eq!(Ok(1), query.get_result(connection));
658        assert_eq!(Ok("hi".to_string()), query2.get_result(connection));
659        assert_eq!(2, connection.statement_cache.len());
660    }
661
662    #[test]
663    fn queries_with_identical_types_and_sql_but_different_bind_types_are_cached_separately() {
664        let connection = &mut connection();
665
666        let query = crate::select(1.into_sql::<Integer>()).into_boxed::<Pg>();
667        let query2 = crate::select("hi".into_sql::<VarChar>()).into_boxed::<Pg>();
668
669        assert_eq!(0, connection.statement_cache.len());
670        assert_eq!(Ok(1), query.get_result(connection));
671        assert_eq!(Ok("hi".to_string()), query2.get_result(connection));
672        assert_eq!(2, connection.statement_cache.len());
673    }
674
675    define_sql_function!(fn lower(x: VarChar) -> VarChar);
676
677    #[test]
678    fn queries_with_identical_types_and_binds_but_different_sql_are_cached_separately() {
679        let connection = &mut connection();
680
681        let hi = "HI".into_sql::<VarChar>();
682        let query = crate::select(hi).into_boxed::<Pg>();
683        let query2 = crate::select(lower(hi)).into_boxed::<Pg>();
684
685        assert_eq!(0, connection.statement_cache.len());
686        assert_eq!(Ok("HI".to_string()), query.get_result(connection));
687        assert_eq!(Ok("hi".to_string()), query2.get_result(connection));
688        assert_eq!(2, connection.statement_cache.len());
689    }
690
691    #[test]
692    fn queries_with_sql_literal_nodes_are_not_cached() {
693        let connection = &mut connection();
694        let query = crate::select(sql::<Integer>("1"));
695
696        assert_eq!(Ok(1), query.get_result(connection));
697        assert_eq!(0, connection.statement_cache.len());
698    }
699
700    table! {
701        users {
702            id -> Integer,
703            name -> Text,
704        }
705    }
706
707    #[test]
708    fn inserts_from_select_are_cached() {
709        let connection = &mut connection();
710        connection.begin_test_transaction().unwrap();
711
712        crate::sql_query(
713            "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
714        )
715        .execute(connection)
716        .unwrap();
717
718        let query = users::table.filter(users::id.eq(42));
719        let insert = query
720            .insert_into(users::table)
721            .into_columns((users::id, users::name));
722        assert!(insert.execute(connection).is_ok());
723        assert_eq!(1, connection.statement_cache.len());
724
725        let query = users::table.filter(users::id.eq(42)).into_boxed();
726        let insert = query
727            .insert_into(users::table)
728            .into_columns((users::id, users::name));
729        assert!(insert.execute(connection).is_ok());
730        assert_eq!(2, connection.statement_cache.len());
731    }
732
733    #[test]
734    fn single_inserts_are_cached() {
735        let connection = &mut connection();
736        connection.begin_test_transaction().unwrap();
737
738        crate::sql_query(
739            "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
740        )
741        .execute(connection)
742        .unwrap();
743
744        let insert =
745            crate::insert_into(users::table).values((users::id.eq(42), users::name.eq("Foo")));
746
747        assert!(insert.execute(connection).is_ok());
748        assert_eq!(1, connection.statement_cache.len());
749    }
750
751    #[test]
752    fn dynamic_batch_inserts_are_not_cached() {
753        let connection = &mut connection();
754        connection.begin_test_transaction().unwrap();
755
756        crate::sql_query(
757            "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
758        )
759        .execute(connection)
760        .unwrap();
761
762        let insert = crate::insert_into(users::table)
763            .values(vec![(users::id.eq(42), users::name.eq("Foo"))]);
764
765        assert!(insert.execute(connection).is_ok());
766        assert_eq!(0, connection.statement_cache.len());
767    }
768
769    #[test]
770    fn static_batch_inserts_are_cached() {
771        let connection = &mut connection();
772        connection.begin_test_transaction().unwrap();
773
774        crate::sql_query(
775            "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
776        )
777        .execute(connection)
778        .unwrap();
779
780        let insert =
781            crate::insert_into(users::table).values([(users::id.eq(42), users::name.eq("Foo"))]);
782
783        assert!(insert.execute(connection).is_ok());
784        assert_eq!(1, connection.statement_cache.len());
785    }
786
787    #[test]
788    fn queries_containing_in_with_vec_are_cached() {
789        let connection = &mut connection();
790        let one_as_expr = 1.into_sql::<Integer>();
791        let query = crate::select(one_as_expr.eq_any(vec![1, 2, 3]));
792
793        assert_eq!(Ok(true), query.get_result(connection));
794        assert_eq!(1, connection.statement_cache.len());
795    }
796
797    fn connection() -> PgConnection {
798        crate::test_helpers::pg_connection_no_transaction()
799    }
800
801    #[test]
802    fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
803        use crate::connection::AnsiTransactionManager;
804        use crate::connection::TransactionManager;
805        use crate::result::Error;
806        use crate::PgConnection;
807
808        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
809        assert_eq!(
810            None,
811            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
812                conn
813            ).transaction_depth().expect("Transaction depth")
814        );
815        let result = AnsiTransactionManager::commit_transaction(conn);
816        assert!(matches!(result, Err(Error::NotInTransaction)))
817    }
818
819    #[test]
820    fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
821        use crate::connection::AnsiTransactionManager;
822        use crate::connection::TransactionManager;
823        use crate::result::Error;
824        use crate::PgConnection;
825
826        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
827        assert_eq!(
828            None,
829            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
830                conn
831            ).transaction_depth().expect("Transaction depth")
832        );
833        let result = AnsiTransactionManager::rollback_transaction(conn);
834        assert!(matches!(result, Err(Error::NotInTransaction)))
835    }
836
837    #[test]
838    fn postgres_transaction_is_rolled_back_upon_syntax_error() {
839        use std::num::NonZeroU32;
840
841        use crate::connection::AnsiTransactionManager;
842        use crate::connection::TransactionManager;
843        use crate::pg::connection::raw::PgTransactionStatus;
844        use crate::*;
845        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
846        assert_eq!(
847            None,
848            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
849                conn
850            ).transaction_depth().expect("Transaction depth")
851        );
852        let _result = conn.build_transaction().run(|conn| {
853            assert_eq!(
854                NonZeroU32::new(1),
855                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
856                    conn
857                ).transaction_depth().expect("Transaction depth")
858            );
859            // In Postgres, a syntax error breaks the transaction block
860            let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
861            assert!(query_result.is_err());
862            assert_eq!(
863                PgTransactionStatus::InError,
864                conn.connection_and_transaction_manager.raw_connection.transaction_status()
865            );
866            query_result
867        });
868        assert_eq!(
869            None,
870            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
871                conn
872            ).transaction_depth().expect("Transaction depth")
873        );
874        assert_eq!(
875            PgTransactionStatus::Idle,
876            conn.connection_and_transaction_manager
877                .raw_connection
878                .transaction_status()
879        );
880    }
881
882    #[test]
883    fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
884        use std::num::NonZeroU32;
885
886        use crate::connection::AnsiTransactionManager;
887        use crate::connection::TransactionManager;
888        use crate::pg::connection::raw::PgTransactionStatus;
889        use crate::*;
890        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
891        assert_eq!(
892            None,
893            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
894                conn
895            ).transaction_depth().expect("Transaction depth")
896        );
897        let result = conn.build_transaction().run(|conn| {
898            assert_eq!(
899                NonZeroU32::new(1),
900                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
901                    conn
902            ).transaction_depth().expect("Transaction depth")
903            );
904            let result = conn.build_transaction().run(|conn| {
905                assert_eq!(
906                    NonZeroU32::new(2),
907                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
908                        conn
909            ).transaction_depth().expect("Transaction depth")
910                );
911                sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
912            });
913            assert!(result.is_err());
914            assert_eq!(
915                NonZeroU32::new(1),
916                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
917                    conn
918            ).transaction_depth().expect("Transaction depth")
919            );
920            let query_result = sql_query("SELECT 1").execute(conn);
921            assert!(query_result.is_ok());
922            assert_eq!(
923                PgTransactionStatus::InTransaction,
924                conn.connection_and_transaction_manager.raw_connection.transaction_status()
925            );
926            query_result
927        });
928        assert!(result.is_ok());
929        assert_eq!(
930            PgTransactionStatus::Idle,
931            conn.connection_and_transaction_manager
932                .raw_connection
933                .transaction_status()
934        );
935        assert_eq!(
936            None,
937            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
938                conn
939            ).transaction_depth().expect("Transaction depth")
940        );
941    }
942
943    #[test]
944    // This function uses collect with an side effect (spawning threads)
945    // so this is a false positive from clippy
946    #[allow(clippy::needless_collect)]
947    fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
948        use crate::pg::connection::raw::PgTransactionStatus;
949        use crate::result::DatabaseErrorKind::SerializationFailure;
950        use crate::result::Error::DatabaseError;
951        use crate::*;
952        use std::sync::{Arc, Barrier};
953        use std::thread;
954
955        table! {
956            #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
957            serialization_example {
958                id -> Serial,
959                class -> Integer,
960            }
961        }
962
963        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
964
965        sql_query(
966            "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
967        )
968        .execute(conn)
969        .unwrap();
970        sql_query(
971            r#"
972            CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
973                id SERIAL PRIMARY KEY,
974                class INTEGER NOT NULL
975            )
976        "#,
977        )
978        .execute(conn)
979        .unwrap();
980
981        insert_into(serialization_example::table)
982            .values(&vec![
983                serialization_example::class.eq(1),
984                serialization_example::class.eq(2),
985            ])
986            .execute(conn)
987            .unwrap();
988
989        let before_barrier = Arc::new(Barrier::new(2));
990        let after_barrier = Arc::new(Barrier::new(2));
991        let threads = (1..3)
992            .map(|i| {
993                let before_barrier = before_barrier.clone();
994                let after_barrier = after_barrier.clone();
995                thread::spawn(move || {
996                    use crate::connection::AnsiTransactionManager;
997                    use crate::connection::TransactionManager;
998                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
999                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1000
1001                    let result = conn.build_transaction().serializable().run(|conn| {
1002                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1003
1004                        let _ = serialization_example::table
1005                            .filter(serialization_example::class.eq(i))
1006                            .count()
1007                            .execute(conn)?;
1008
1009                        let other_i = if i == 1 { 2 } else { 1 };
1010                        let q = insert_into(serialization_example::table)
1011                            .values(serialization_example::class.eq(other_i));
1012                        before_barrier.wait();
1013
1014                        let r = q.execute(conn);
1015                        after_barrier.wait();
1016                        r
1017                    });
1018                    assert_eq!(
1019                        PgTransactionStatus::Idle,
1020                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1021                    );
1022
1023                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1024                    result
1025                })
1026            })
1027            .collect::<Vec<_>>();
1028
1029        let mut results = threads
1030            .into_iter()
1031            .map(|t| t.join().unwrap())
1032            .collect::<Vec<_>>();
1033
1034        results.sort_by_key(|r| r.is_err());
1035
1036        assert!(results[0].is_ok(), "Got {:?} instead", results);
1037        assert!(
1038            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1039            "Got {:?} instead",
1040            results
1041        );
1042        assert_eq!(
1043            PgTransactionStatus::Idle,
1044            conn.connection_and_transaction_manager
1045                .raw_connection
1046                .transaction_status()
1047        );
1048    }
1049
1050    #[test]
1051    // This function uses collect with an side effect (spawning threads)
1052    // so this is a false positive from clippy
1053    #[allow(clippy::needless_collect)]
1054    fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
1055        use crate::pg::connection::raw::PgTransactionStatus;
1056        use crate::result::DatabaseErrorKind::SerializationFailure;
1057        use crate::result::Error::DatabaseError;
1058        use crate::*;
1059        use std::sync::{Arc, Barrier};
1060        use std::thread;
1061
1062        table! {
1063            #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
1064            serialization_example {
1065                id -> Serial,
1066                class -> Integer,
1067            }
1068        }
1069
1070        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1071
1072        sql_query(
1073            "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1074        )
1075        .execute(conn)
1076        .unwrap();
1077        sql_query(
1078            r#"
1079            CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1080                id SERIAL PRIMARY KEY,
1081                class INTEGER NOT NULL
1082            )
1083        "#,
1084        )
1085        .execute(conn)
1086        .unwrap();
1087
1088        insert_into(serialization_example::table)
1089            .values(&vec![
1090                serialization_example::class.eq(1),
1091                serialization_example::class.eq(2),
1092            ])
1093            .execute(conn)
1094            .unwrap();
1095
1096        let before_barrier = Arc::new(Barrier::new(2));
1097        let after_barrier = Arc::new(Barrier::new(2));
1098        let threads = (1..3)
1099            .map(|i| {
1100                let before_barrier = before_barrier.clone();
1101                let after_barrier = after_barrier.clone();
1102                thread::spawn(move || {
1103                    use crate::connection::AnsiTransactionManager;
1104                    use crate::connection::TransactionManager;
1105                    let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1106                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1107
1108                    let result = conn.build_transaction().serializable().run(|conn| {
1109                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1110                        let r = conn.transaction(|conn| {
1111                            assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1112
1113                            let _ = serialization_example::table
1114                                .filter(serialization_example::class.eq(i))
1115                                .count()
1116                                .execute(conn)?;
1117
1118                            let other_i = if i == 1 { 2 } else { 1 };
1119                            let q = insert_into(serialization_example::table)
1120                                .values(serialization_example::class.eq(other_i));
1121                            before_barrier.wait();
1122
1123                            let r = q.execute(conn);
1124                            after_barrier.wait();
1125                            r
1126                        });
1127                        assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1128                        assert_eq!(
1129                            PgTransactionStatus::InTransaction,
1130                            conn.connection_and_transaction_manager.raw_connection.transaction_status()
1131                        );
1132                        r
1133                    });
1134                    assert_eq!(
1135                        PgTransactionStatus::Idle,
1136                        conn.connection_and_transaction_manager.raw_connection.transaction_status()
1137                    );
1138
1139                    assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1140                    result
1141                })
1142            })
1143            .collect::<Vec<_>>();
1144
1145        let mut results = threads
1146            .into_iter()
1147            .map(|t| t.join().unwrap())
1148            .collect::<Vec<_>>();
1149
1150        results.sort_by_key(|r| r.is_err());
1151
1152        assert!(results[0].is_ok(), "Got {:?} instead", results);
1153        assert!(
1154            matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1155            "Got {:?} instead",
1156            results
1157        );
1158        assert_eq!(
1159            PgTransactionStatus::Idle,
1160            conn.connection_and_transaction_manager
1161                .raw_connection
1162                .transaction_status()
1163        );
1164    }
1165
1166    #[test]
1167    fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1168        use crate::connection::AnsiTransactionManager;
1169        use crate::connection::TransactionManager;
1170        use crate::pg::connection::raw::PgTransactionStatus;
1171        use crate::result::Error;
1172        use crate::*;
1173
1174        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1175        assert_eq!(
1176            None,
1177            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1178                conn
1179            ).transaction_depth().expect("Transaction depth")
1180        );
1181        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1182            assert_eq!(
1183                NonZeroU32::new(1),
1184                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1185                    conn
1186            ).transaction_depth().expect("Transaction depth")
1187            );
1188            sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1189            sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1190                .execute(conn)?;
1191            sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1192            let result =
1193                sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1194            assert!(result.is_ok());
1195            assert_eq!(
1196                PgTransactionStatus::InTransaction,
1197                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1198            );
1199            Ok(())
1200        });
1201        assert_eq!(
1202            None,
1203            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1204                conn
1205            ).transaction_depth().expect("Transaction depth")
1206        );
1207        assert_eq!(
1208            PgTransactionStatus::Idle,
1209            conn.connection_and_transaction_manager
1210                .raw_connection
1211                .transaction_status()
1212        );
1213        assert!(result.is_err());
1214    }
1215
1216    #[test]
1217    fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1218        use crate::connection::AnsiTransactionManager;
1219        use crate::connection::TransactionManager;
1220        use crate::pg::connection::raw::PgTransactionStatus;
1221        use crate::result::Error;
1222        use crate::*;
1223
1224        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1225        assert_eq!(
1226            None,
1227            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1228                conn
1229            ).transaction_depth().expect("Transaction depth")
1230        );
1231        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1232            assert_eq!(
1233                NonZeroU32::new(1),
1234                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1235                    conn
1236            ).transaction_depth().expect("Transaction depth")
1237            );
1238            sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1239            sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1240                .execute(conn)?;
1241            sql_query(
1242                r#"
1243                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1244                        RETURNS trigger
1245                        LANGUAGE plpgsql
1246                        AS $$
1247                    DECLARE
1248                    BEGIN
1249                        IF NEW.value = 42 THEN
1250                            RAISE EXCEPTION 'Transaction kaboom';
1251                        END IF;
1252                    RETURN NEW;
1253
1254                    END;$$;
1255                "#,
1256            )
1257            .execute(conn)?;
1258
1259            sql_query(
1260                r#"
1261                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1262                        AFTER INSERT ON "deferred_trigger_commit"
1263                        DEFERRABLE INITIALLY DEFERRED
1264                        FOR EACH ROW
1265                        EXECUTE PROCEDURE transaction_depth_blow_up()
1266            "#,
1267            )
1268            .execute(conn)?;
1269            let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1270            assert!(result.is_ok());
1271            assert_eq!(
1272                PgTransactionStatus::InTransaction,
1273                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1274            );
1275            Ok(())
1276        });
1277        assert_eq!(
1278            None,
1279            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1280                conn
1281            ).transaction_depth().expect("Transaction depth")
1282        );
1283        assert_eq!(
1284            PgTransactionStatus::Idle,
1285            conn.connection_and_transaction_manager
1286                .raw_connection
1287                .transaction_status()
1288        );
1289        assert!(result.is_err());
1290    }
1291
1292    #[test]
1293    fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1294        use crate::connection::AnsiTransactionManager;
1295        use crate::connection::TransactionManager;
1296        use crate::pg::connection::raw::PgTransactionStatus;
1297        use crate::result::Error;
1298        use crate::*;
1299
1300        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1301        assert_eq!(
1302            None,
1303            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1304                conn
1305            ).transaction_depth().expect("Transaction depth")
1306        );
1307        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1308            assert_eq!(
1309                NonZeroU32::new(1),
1310                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1311                    conn
1312            ).transaction_depth().expect("Transaction depth")
1313            );
1314            sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1315            sql_query(
1316                "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1317            )
1318            .execute(conn)?;
1319            sql_query(
1320                r#"
1321                    CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1322                        RETURNS trigger
1323                        LANGUAGE plpgsql
1324                        AS $$
1325                    DECLARE
1326                    BEGIN
1327                        IF NEW.value = 42 THEN
1328                            RAISE EXCEPTION 'Transaction kaboom';
1329                        END IF;
1330                    RETURN NEW;
1331
1332                    END;$$;
1333                "#,
1334            )
1335            .execute(conn)?;
1336
1337            sql_query(
1338                r#"
1339                    CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1340                        AFTER INSERT ON "deferred_trigger_nested_commit"
1341                        DEFERRABLE INITIALLY DEFERRED
1342                        FOR EACH ROW
1343                        EXECUTE PROCEDURE transaction_depth_blow_up()
1344            "#,
1345            )
1346            .execute(conn)?;
1347            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1348                let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1349                    .execute(conn);
1350                assert!(result.is_ok());
1351                Ok(())
1352            });
1353            assert!(inner_result.is_err());
1354            assert_eq!(
1355                PgTransactionStatus::InTransaction,
1356                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1357            );
1358            Ok(())
1359        });
1360        assert_eq!(
1361            None,
1362            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1363                conn
1364            ).transaction_depth().expect("Transaction depth")
1365        );
1366        assert_eq!(
1367            PgTransactionStatus::Idle,
1368            conn.connection_and_transaction_manager
1369                .raw_connection
1370                .transaction_status()
1371        );
1372        assert!(result.is_ok(), "Expected success, got {:?}", result);
1373    }
1374
1375    #[test]
1376    fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1377        use crate::connection::AnsiTransactionManager;
1378        use crate::connection::TransactionManager;
1379        use crate::pg::connection::raw::PgTransactionStatus;
1380        use crate::result::Error;
1381        use crate::*;
1382
1383        let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1384        assert_eq!(
1385            None,
1386            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1387                conn
1388            ).transaction_depth().expect("Transaction depth")
1389        );
1390        let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1391            assert_eq!(
1392                NonZeroU32::new(1),
1393                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1394                    conn
1395            ).transaction_depth().expect("Transaction depth")
1396            );
1397            sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1398            sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1399            let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1400                assert_eq!(
1401                    NonZeroU32::new(2),
1402                    <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1403                        conn
1404                    ).transaction_depth().expect("Transaction depth")
1405                );
1406                sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1407                let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1408                assert!(result.is_ok());
1409                Ok(())
1410            });
1411            assert!(inner_result.is_err());
1412            assert_eq!(
1413                PgTransactionStatus::InTransaction,
1414                conn.connection_and_transaction_manager.raw_connection.transaction_status()
1415            );
1416            assert_eq!(
1417                NonZeroU32::new(1),
1418                <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1419                    conn
1420            ).transaction_depth().expect("Transaction depth")
1421            );
1422            sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1423        });
1424        assert_eq!(
1425            None,
1426            <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1427                conn
1428            ).transaction_depth().expect("Transaction depth")
1429        );
1430        assert_eq!(
1431            PgTransactionStatus::Idle,
1432            conn.connection_and_transaction_manager
1433                .raw_connection
1434                .transaction_status()
1435        );
1436        assert!(result.is_ok());
1437    }
1438}