1pub(super) mod copy;
2pub(crate) mod cursor;
3mod raw;
4mod result;
5mod row;
6mod stmt;
7
8use self::copy::CopyFromSink;
9use self::copy::CopyToBuffer;
10use self::cursor::*;
11use self::private::ConnectionAndTransactionManager;
12use self::raw::{PgTransactionStatus, RawConnection};
13use self::stmt::Statement;
14use crate::connection::instrumentation::DebugQuery;
15use crate::connection::instrumentation::Instrumentation;
16use crate::connection::instrumentation::StrQueryHelper;
17use crate::connection::statement_cache::{MaybeCached, StatementCache};
18use crate::connection::*;
19use crate::expression::QueryMetadata;
20use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
21use crate::pg::query_builder::copy::InternalCopyFromQuery;
22use crate::pg::{Pg, TransactionBuilder};
23use crate::query_builder::bind_collector::RawBytesBindCollector;
24use crate::query_builder::*;
25use crate::result::ConnectionError::CouldntSetupConfiguration;
26use crate::result::*;
27use crate::RunQueryDsl;
28use std::ffi::CString;
29use std::fmt::Debug;
30use std::os::raw as libc;
31
32use super::query_builder::copy::CopyFromExpression;
33use super::query_builder::copy::CopyTarget;
34use super::query_builder::copy::CopyToCommand;
35
36pub(super) use self::result::PgResult;
37
38#[allow(missing_debug_implementations)]
128#[cfg(feature = "postgres")]
129pub struct PgConnection {
130 statement_cache: StatementCache<Pg, Statement>,
131 metadata_cache: PgMetadataCache,
132 connection_and_transaction_manager: ConnectionAndTransactionManager,
133}
134
135#[allow(unsafe_code)]
137unsafe impl Send for PgConnection {}
138
139impl SimpleConnection for PgConnection {
140 #[allow(unsafe_code)] fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
142 self.connection_and_transaction_manager
143 .instrumentation
144 .on_connection_event(InstrumentationEvent::StartQuery {
145 query: &StrQueryHelper::new(query),
146 });
147 let c_query = CString::new(query)?;
148 let inner_result = unsafe {
149 self.connection_and_transaction_manager
150 .raw_connection
151 .exec(c_query.as_ptr())
152 };
153 update_transaction_manager_status(
154 inner_result.and_then(|raw_result| {
155 PgResult::new(
156 raw_result,
157 &self.connection_and_transaction_manager.raw_connection,
158 )
159 }),
160 &mut self.connection_and_transaction_manager,
161 &StrQueryHelper::new(query),
162 true,
163 )?;
164 Ok(())
165 }
166}
167
168#[derive(Debug, Copy, Clone)]
172pub struct PgRowByRowLoadingMode;
173
174impl ConnectionSealed for PgConnection {}
175
176impl Connection for PgConnection {
177 type Backend = Pg;
178 type TransactionManager = AnsiTransactionManager;
179
180 fn establish(database_url: &str) -> ConnectionResult<PgConnection> {
181 let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation();
182 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
183 url: database_url,
184 });
185 let r = RawConnection::establish(database_url).and_then(|raw_conn| {
186 let mut conn = PgConnection {
187 connection_and_transaction_manager: ConnectionAndTransactionManager {
188 raw_connection: raw_conn,
189 transaction_state: AnsiTransactionManager::default(),
190 instrumentation: None,
191 },
192 statement_cache: StatementCache::new(),
193 metadata_cache: PgMetadataCache::new(),
194 };
195 conn.set_config_options()
196 .map_err(CouldntSetupConfiguration)?;
197 Ok(conn)
198 });
199 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
200 url: database_url,
201 error: r.as_ref().err(),
202 });
203 let mut conn = r?;
204 conn.connection_and_transaction_manager.instrumentation = instrumentation;
205 Ok(conn)
206 }
207
208 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
209 where
210 T: QueryFragment<Pg> + QueryId,
211 {
212 update_transaction_manager_status(
213 self.with_prepared_query(source, true, |query, params, conn, _source| {
214 let res = query
215 .execute(&mut conn.raw_connection, ¶ms, false)
216 .map(|r| r.rows_affected());
217 while conn.raw_connection.get_next_result()?.is_some() {}
220 res
221 }),
222 &mut self.connection_and_transaction_manager,
223 &crate::debug_query(source),
224 true,
225 )
226 }
227
228 fn transaction_state(&mut self) -> &mut AnsiTransactionManager
229 where
230 Self: Sized,
231 {
232 &mut self.connection_and_transaction_manager.transaction_state
233 }
234
235 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
236 &mut self.connection_and_transaction_manager.instrumentation
237 }
238
239 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
240 self.connection_and_transaction_manager.instrumentation = Some(Box::new(instrumentation));
241 }
242}
243
244impl<B> LoadConnection<B> for PgConnection
245where
246 Self: self::private::PgLoadingMode<B>,
247{
248 type Cursor<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Cursor<'conn, 'query>;
249 type Row<'conn, 'query> = <Self as self::private::PgLoadingMode<B>>::Row<'conn, 'query>;
250
251 fn load<'conn, 'query, T>(
252 &'conn mut self,
253 source: T,
254 ) -> QueryResult<Self::Cursor<'conn, 'query>>
255 where
256 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
257 Self::Backend: QueryMetadata<T::SqlType>,
258 {
259 self.with_prepared_query(source, false, |stmt, params, conn, source| {
260 use self::private::PgLoadingMode;
261 let result = stmt.execute(&mut conn.raw_connection, ¶ms, Self::USE_ROW_BY_ROW_MODE);
262 let result = update_transaction_manager_status(
263 result,
264 conn,
265 &crate::debug_query(&source),
266 false,
267 )?;
268 Self::get_cursor(conn, result, source)
269 })
270 }
271}
272
273impl GetPgMetadataCache for PgConnection {
274 fn get_metadata_cache(&mut self) -> &mut PgMetadataCache {
275 &mut self.metadata_cache
276 }
277}
278
279#[inline(always)]
280fn update_transaction_manager_status<T>(
281 query_result: QueryResult<T>,
282 conn: &mut ConnectionAndTransactionManager,
283 source: &dyn DebugQuery,
284 final_call: bool,
285) -> QueryResult<T> {
286 fn non_generic_inner(conn: &mut ConnectionAndTransactionManager, is_err: bool) {
288 let raw_conn: &mut RawConnection = &mut conn.raw_connection;
289 let tm: &mut AnsiTransactionManager = &mut conn.transaction_state;
290 match raw_conn.transaction_status() {
294 PgTransactionStatus::InError => {
295 tm.status.set_requires_rollback_maybe_up_to_top_level(true)
296 }
297 PgTransactionStatus::Unknown => tm.status.set_in_error(),
298 PgTransactionStatus::Idle => {
299 tm.status = TransactionManagerStatus::Valid(Default::default())
304 }
305 PgTransactionStatus::InTransaction => {
306 let transaction_status = &mut tm.status;
307 if is_err {
310 if !matches!(transaction_status, TransactionManagerStatus::Valid(valid_tm) if valid_tm.transaction_depth().is_some())
313 {
314 transaction_status.set_in_error()
316 }
317 } else {
318 tm.status.set_requires_rollback_maybe_up_to_top_level(false)
326 }
327 }
328 PgTransactionStatus::Active => {
329 }
331 }
332 }
333 non_generic_inner(conn, query_result.is_err());
334 if let Err(ref e) = query_result {
335 conn.instrumentation
336 .on_connection_event(InstrumentationEvent::FinishQuery {
337 query: source,
338 error: Some(e),
339 });
340 } else if final_call {
341 conn.instrumentation
342 .on_connection_event(InstrumentationEvent::FinishQuery {
343 query: source,
344 error: None,
345 });
346 }
347 query_result
348}
349
350#[cfg(feature = "r2d2")]
351impl crate::r2d2::R2D2Connection for PgConnection {
352 fn ping(&mut self) -> QueryResult<()> {
353 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
354 }
355
356 fn is_broken(&mut self) -> bool {
357 AnsiTransactionManager::is_broken_transaction_manager(self)
358 }
359}
360
361impl MultiConnectionHelper for PgConnection {
362 fn to_any<'a>(
363 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
364 ) -> &mut (dyn std::any::Any + 'a) {
365 lookup.as_any()
366 }
367
368 fn from_any(
369 lookup: &mut dyn std::any::Any,
370 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
371 lookup
372 .downcast_mut::<Self>()
373 .map(|conn| conn as &mut dyn super::PgMetadataLookup)
374 }
375}
376
377impl PgConnection {
378 pub fn build_transaction(&mut self) -> TransactionBuilder<'_, Self> {
402 TransactionBuilder::new(self)
403 }
404
405 pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
406 where
407 S: CopyFromExpression<T>,
408 {
409 let query = InternalCopyFromQuery::new(target);
410 let res = self.with_prepared_query(query, false, |stmt, binds, conn, mut source| {
411 fn inner_copy_in<S, T>(
412 stmt: MaybeCached<'_, Statement>,
413 conn: &mut ConnectionAndTransactionManager,
414 binds: Vec<Option<Vec<u8>>>,
415 source: &mut InternalCopyFromQuery<S, T>,
416 ) -> Result<usize, S::Error>
417 where
418 S: CopyFromExpression<T>,
419 {
420 let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
421 let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
422 let r = source.target.callback(&mut copy_in);
423 copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
424 let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
425 crate::result::Error::DeserializationError(
426 "Failed to receive result from the database".into(),
427 )
428 })?;
429 let rows = next_res.rows_affected();
430 while let Some(_r) = conn.raw_connection.get_next_result()? {}
431 r?;
432 Ok(rows)
433 }
434
435 let rows = inner_copy_in(stmt, conn, binds, &mut source);
436 if let Err(ref e) = rows {
437 let database_error = crate::result::Error::DatabaseError(
438 crate::result::DatabaseErrorKind::Unknown,
439 Box::new(e.to_string()),
440 );
441 conn.instrumentation
442 .on_connection_event(InstrumentationEvent::FinishQuery {
443 query: &crate::debug_query(&source),
444 error: Some(&database_error),
445 });
446 } else {
447 conn.instrumentation
448 .on_connection_event(InstrumentationEvent::FinishQuery {
449 query: &crate::debug_query(&source),
450 error: None,
451 });
452 }
453
454 rows
455 })?;
456
457 Ok(res)
458 }
459
460 pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
461 where
462 T: CopyTarget,
463 {
464 let res = self.with_prepared_query::<_, _, Error>(
465 command,
466 false,
467 |stmt, binds, conn, source| {
468 let res = stmt.execute(&mut conn.raw_connection, &binds, false);
469 conn.instrumentation
470 .on_connection_event(InstrumentationEvent::FinishQuery {
471 query: &crate::debug_query(&source),
472 error: res.as_ref().err(),
473 });
474 Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
475 },
476 )?;
477 Ok(res)
478 }
479
480 fn with_prepared_query<'conn, T, R, E>(
481 &'conn mut self,
482 source: T,
483 execute_returning_count: bool,
484 f: impl FnOnce(
485 MaybeCached<'_, Statement>,
486 Vec<Option<Vec<u8>>>,
487 &'conn mut ConnectionAndTransactionManager,
488 T,
489 ) -> Result<R, E>,
490 ) -> Result<R, E>
491 where
492 T: QueryFragment<Pg> + QueryId,
493 E: From<crate::result::Error>,
494 {
495 self.connection_and_transaction_manager
496 .instrumentation
497 .on_connection_event(InstrumentationEvent::StartQuery {
498 query: &crate::debug_query(&source),
499 });
500 let mut bind_collector = RawBytesBindCollector::<Pg>::new();
501 source.collect_binds(&mut bind_collector, self, &Pg)?;
502 let binds = bind_collector.binds;
503 let metadata = bind_collector.metadata;
504
505 let cache_len = self.statement_cache.len();
506 let cache = &mut self.statement_cache;
507 let conn = &mut self.connection_and_transaction_manager.raw_connection;
508 let query = cache.cached_statement(
509 &source,
510 &Pg,
511 &metadata,
512 |sql, _| {
513 let query_name = if source.is_safe_to_cache_prepared(&Pg)? {
514 Some(format!("__diesel_stmt_{cache_len}"))
515 } else {
516 None
517 };
518 Statement::prepare(conn, sql, query_name.as_deref(), &metadata)
519 },
520 &mut self.connection_and_transaction_manager.instrumentation,
521 );
522 if !execute_returning_count {
523 if let Err(ref e) = query {
524 self.connection_and_transaction_manager
525 .instrumentation
526 .on_connection_event(InstrumentationEvent::FinishQuery {
527 query: &crate::debug_query(&source),
528 error: Some(e),
529 });
530 }
531 }
532
533 f(
534 query?,
535 binds,
536 &mut self.connection_and_transaction_manager,
537 source,
538 )
539 }
540
541 fn set_config_options(&mut self) -> QueryResult<()> {
542 crate::sql_query("SET TIME ZONE 'UTC'").execute(self)?;
543 crate::sql_query("SET CLIENT_ENCODING TO 'UTF8'").execute(self)?;
544 self.connection_and_transaction_manager
545 .raw_connection
546 .set_notice_processor(noop_notice_processor);
547 Ok(())
548 }
549}
550
551extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
552
553mod private {
554 use super::*;
555
556 #[allow(missing_debug_implementations)]
557 pub struct ConnectionAndTransactionManager {
558 pub(super) raw_connection: RawConnection,
559 pub(super) transaction_state: AnsiTransactionManager,
560 pub(super) instrumentation: Option<Box<dyn Instrumentation>>,
561 }
562
563 pub trait PgLoadingMode<B> {
564 const USE_ROW_BY_ROW_MODE: bool;
565 type Cursor<'conn, 'query>: Iterator<Item = QueryResult<Self::Row<'conn, 'query>>>;
566 type Row<'conn, 'query>: crate::row::Row<'conn, Pg>;
567
568 fn get_cursor<'conn, 'query>(
569 raw_connection: &'conn mut ConnectionAndTransactionManager,
570 result: PgResult,
571 source: impl QueryFragment<Pg> + 'query,
572 ) -> QueryResult<Self::Cursor<'conn, 'query>>;
573 }
574
575 impl PgLoadingMode<DefaultLoadingMode> for PgConnection {
576 const USE_ROW_BY_ROW_MODE: bool = false;
577 type Cursor<'conn, 'query> = Cursor;
578 type Row<'conn, 'query> = self::row::PgRow;
579
580 fn get_cursor<'conn, 'query>(
581 conn: &'conn mut ConnectionAndTransactionManager,
582 result: PgResult,
583 source: impl QueryFragment<Pg> + 'query,
584 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
585 update_transaction_manager_status(
586 Cursor::new(result, &mut conn.raw_connection),
587 conn,
588 &crate::debug_query(&source),
589 true,
590 )
591 }
592 }
593
594 impl PgLoadingMode<PgRowByRowLoadingMode> for PgConnection {
595 const USE_ROW_BY_ROW_MODE: bool = true;
596 type Cursor<'conn, 'query> = RowByRowCursor<'conn, 'query>;
597 type Row<'conn, 'query> = self::row::PgRow;
598
599 fn get_cursor<'conn, 'query>(
600 raw_connection: &'conn mut ConnectionAndTransactionManager,
601 result: PgResult,
602 source: impl QueryFragment<Pg> + 'query,
603 ) -> QueryResult<Self::Cursor<'conn, 'query>> {
604 Ok(RowByRowCursor::new(
605 result,
606 raw_connection,
607 Box::new(source),
608 ))
609 }
610 }
611}
612
613#[cfg(test)]
614#[allow(clippy::uninlined_format_args)]
616mod tests {
617 extern crate dotenvy;
618
619 use super::*;
620 use crate::dsl::sql;
621 use crate::prelude::*;
622 use crate::result::Error::DatabaseError;
623 use crate::sql_types::{Integer, VarChar};
624 use std::num::NonZeroU32;
625
626 #[test]
627 fn malformed_sql_query() {
628 let connection = &mut connection();
629 let query =
630 crate::sql_query("SELECT not_existent FROM also_not_there;").execute(connection);
631
632 if let Err(DatabaseError(_, string)) = query {
633 assert_eq!(Some(26), string.statement_position());
634 } else {
635 unreachable!();
636 }
637 }
638
639 #[test]
640 fn prepared_statements_are_cached() {
641 let connection = &mut connection();
642
643 let query = crate::select(1.into_sql::<Integer>());
644
645 assert_eq!(Ok(1), query.get_result(connection));
646 assert_eq!(Ok(1), query.get_result(connection));
647 assert_eq!(1, connection.statement_cache.len());
648 }
649
650 #[test]
651 fn queries_with_identical_sql_but_different_types_are_cached_separately() {
652 let connection = &mut connection();
653
654 let query = crate::select(1.into_sql::<Integer>());
655 let query2 = crate::select("hi".into_sql::<VarChar>());
656
657 assert_eq!(Ok(1), query.get_result(connection));
658 assert_eq!(Ok("hi".to_string()), query2.get_result(connection));
659 assert_eq!(2, connection.statement_cache.len());
660 }
661
662 #[test]
663 fn queries_with_identical_types_and_sql_but_different_bind_types_are_cached_separately() {
664 let connection = &mut connection();
665
666 let query = crate::select(1.into_sql::<Integer>()).into_boxed::<Pg>();
667 let query2 = crate::select("hi".into_sql::<VarChar>()).into_boxed::<Pg>();
668
669 assert_eq!(0, connection.statement_cache.len());
670 assert_eq!(Ok(1), query.get_result(connection));
671 assert_eq!(Ok("hi".to_string()), query2.get_result(connection));
672 assert_eq!(2, connection.statement_cache.len());
673 }
674
675 define_sql_function!(fn lower(x: VarChar) -> VarChar);
676
677 #[test]
678 fn queries_with_identical_types_and_binds_but_different_sql_are_cached_separately() {
679 let connection = &mut connection();
680
681 let hi = "HI".into_sql::<VarChar>();
682 let query = crate::select(hi).into_boxed::<Pg>();
683 let query2 = crate::select(lower(hi)).into_boxed::<Pg>();
684
685 assert_eq!(0, connection.statement_cache.len());
686 assert_eq!(Ok("HI".to_string()), query.get_result(connection));
687 assert_eq!(Ok("hi".to_string()), query2.get_result(connection));
688 assert_eq!(2, connection.statement_cache.len());
689 }
690
691 #[test]
692 fn queries_with_sql_literal_nodes_are_not_cached() {
693 let connection = &mut connection();
694 let query = crate::select(sql::<Integer>("1"));
695
696 assert_eq!(Ok(1), query.get_result(connection));
697 assert_eq!(0, connection.statement_cache.len());
698 }
699
700 table! {
701 users {
702 id -> Integer,
703 name -> Text,
704 }
705 }
706
707 #[test]
708 fn inserts_from_select_are_cached() {
709 let connection = &mut connection();
710 connection.begin_test_transaction().unwrap();
711
712 crate::sql_query(
713 "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
714 )
715 .execute(connection)
716 .unwrap();
717
718 let query = users::table.filter(users::id.eq(42));
719 let insert = query
720 .insert_into(users::table)
721 .into_columns((users::id, users::name));
722 assert!(insert.execute(connection).is_ok());
723 assert_eq!(1, connection.statement_cache.len());
724
725 let query = users::table.filter(users::id.eq(42)).into_boxed();
726 let insert = query
727 .insert_into(users::table)
728 .into_columns((users::id, users::name));
729 assert!(insert.execute(connection).is_ok());
730 assert_eq!(2, connection.statement_cache.len());
731 }
732
733 #[test]
734 fn single_inserts_are_cached() {
735 let connection = &mut connection();
736 connection.begin_test_transaction().unwrap();
737
738 crate::sql_query(
739 "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
740 )
741 .execute(connection)
742 .unwrap();
743
744 let insert =
745 crate::insert_into(users::table).values((users::id.eq(42), users::name.eq("Foo")));
746
747 assert!(insert.execute(connection).is_ok());
748 assert_eq!(1, connection.statement_cache.len());
749 }
750
751 #[test]
752 fn dynamic_batch_inserts_are_not_cached() {
753 let connection = &mut connection();
754 connection.begin_test_transaction().unwrap();
755
756 crate::sql_query(
757 "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
758 )
759 .execute(connection)
760 .unwrap();
761
762 let insert = crate::insert_into(users::table)
763 .values(vec![(users::id.eq(42), users::name.eq("Foo"))]);
764
765 assert!(insert.execute(connection).is_ok());
766 assert_eq!(0, connection.statement_cache.len());
767 }
768
769 #[test]
770 fn static_batch_inserts_are_cached() {
771 let connection = &mut connection();
772 connection.begin_test_transaction().unwrap();
773
774 crate::sql_query(
775 "CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
776 )
777 .execute(connection)
778 .unwrap();
779
780 let insert =
781 crate::insert_into(users::table).values([(users::id.eq(42), users::name.eq("Foo"))]);
782
783 assert!(insert.execute(connection).is_ok());
784 assert_eq!(1, connection.statement_cache.len());
785 }
786
787 #[test]
788 fn queries_containing_in_with_vec_are_cached() {
789 let connection = &mut connection();
790 let one_as_expr = 1.into_sql::<Integer>();
791 let query = crate::select(one_as_expr.eq_any(vec![1, 2, 3]));
792
793 assert_eq!(Ok(true), query.get_result(connection));
794 assert_eq!(1, connection.statement_cache.len());
795 }
796
797 fn connection() -> PgConnection {
798 crate::test_helpers::pg_connection_no_transaction()
799 }
800
801 #[test]
802 fn transaction_manager_returns_an_error_when_attempting_to_commit_outside_of_a_transaction() {
803 use crate::connection::AnsiTransactionManager;
804 use crate::connection::TransactionManager;
805 use crate::result::Error;
806 use crate::PgConnection;
807
808 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
809 assert_eq!(
810 None,
811 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
812 conn
813 ).transaction_depth().expect("Transaction depth")
814 );
815 let result = AnsiTransactionManager::commit_transaction(conn);
816 assert!(matches!(result, Err(Error::NotInTransaction)))
817 }
818
819 #[test]
820 fn transaction_manager_returns_an_error_when_attempting_to_rollback_outside_of_a_transaction() {
821 use crate::connection::AnsiTransactionManager;
822 use crate::connection::TransactionManager;
823 use crate::result::Error;
824 use crate::PgConnection;
825
826 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
827 assert_eq!(
828 None,
829 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
830 conn
831 ).transaction_depth().expect("Transaction depth")
832 );
833 let result = AnsiTransactionManager::rollback_transaction(conn);
834 assert!(matches!(result, Err(Error::NotInTransaction)))
835 }
836
837 #[test]
838 fn postgres_transaction_is_rolled_back_upon_syntax_error() {
839 use std::num::NonZeroU32;
840
841 use crate::connection::AnsiTransactionManager;
842 use crate::connection::TransactionManager;
843 use crate::pg::connection::raw::PgTransactionStatus;
844 use crate::*;
845 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
846 assert_eq!(
847 None,
848 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
849 conn
850 ).transaction_depth().expect("Transaction depth")
851 );
852 let _result = conn.build_transaction().run(|conn| {
853 assert_eq!(
854 NonZeroU32::new(1),
855 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
856 conn
857 ).transaction_depth().expect("Transaction depth")
858 );
859 let query_result = sql_query("SELECT_SYNTAX_ERROR 1").execute(conn);
861 assert!(query_result.is_err());
862 assert_eq!(
863 PgTransactionStatus::InError,
864 conn.connection_and_transaction_manager.raw_connection.transaction_status()
865 );
866 query_result
867 });
868 assert_eq!(
869 None,
870 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
871 conn
872 ).transaction_depth().expect("Transaction depth")
873 );
874 assert_eq!(
875 PgTransactionStatus::Idle,
876 conn.connection_and_transaction_manager
877 .raw_connection
878 .transaction_status()
879 );
880 }
881
882 #[test]
883 fn nested_postgres_transaction_is_rolled_back_upon_syntax_error() {
884 use std::num::NonZeroU32;
885
886 use crate::connection::AnsiTransactionManager;
887 use crate::connection::TransactionManager;
888 use crate::pg::connection::raw::PgTransactionStatus;
889 use crate::*;
890 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
891 assert_eq!(
892 None,
893 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
894 conn
895 ).transaction_depth().expect("Transaction depth")
896 );
897 let result = conn.build_transaction().run(|conn| {
898 assert_eq!(
899 NonZeroU32::new(1),
900 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
901 conn
902 ).transaction_depth().expect("Transaction depth")
903 );
904 let result = conn.build_transaction().run(|conn| {
905 assert_eq!(
906 NonZeroU32::new(2),
907 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
908 conn
909 ).transaction_depth().expect("Transaction depth")
910 );
911 sql_query("SELECT_SYNTAX_ERROR 1").execute(conn)
912 });
913 assert!(result.is_err());
914 assert_eq!(
915 NonZeroU32::new(1),
916 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
917 conn
918 ).transaction_depth().expect("Transaction depth")
919 );
920 let query_result = sql_query("SELECT 1").execute(conn);
921 assert!(query_result.is_ok());
922 assert_eq!(
923 PgTransactionStatus::InTransaction,
924 conn.connection_and_transaction_manager.raw_connection.transaction_status()
925 );
926 query_result
927 });
928 assert!(result.is_ok());
929 assert_eq!(
930 PgTransactionStatus::Idle,
931 conn.connection_and_transaction_manager
932 .raw_connection
933 .transaction_status()
934 );
935 assert_eq!(
936 None,
937 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
938 conn
939 ).transaction_depth().expect("Transaction depth")
940 );
941 }
942
943 #[test]
944 #[allow(clippy::needless_collect)]
947 fn postgres_transaction_depth_is_tracked_properly_on_serialization_failure() {
948 use crate::pg::connection::raw::PgTransactionStatus;
949 use crate::result::DatabaseErrorKind::SerializationFailure;
950 use crate::result::Error::DatabaseError;
951 use crate::*;
952 use std::sync::{Arc, Barrier};
953 use std::thread;
954
955 table! {
956 #[sql_name = "pg_transaction_depth_is_tracked_properly_on_commit_failure"]
957 serialization_example {
958 id -> Serial,
959 class -> Integer,
960 }
961 }
962
963 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
964
965 sql_query(
966 "DROP TABLE IF EXISTS pg_transaction_depth_is_tracked_properly_on_commit_failure;",
967 )
968 .execute(conn)
969 .unwrap();
970 sql_query(
971 r#"
972 CREATE TABLE pg_transaction_depth_is_tracked_properly_on_commit_failure (
973 id SERIAL PRIMARY KEY,
974 class INTEGER NOT NULL
975 )
976 "#,
977 )
978 .execute(conn)
979 .unwrap();
980
981 insert_into(serialization_example::table)
982 .values(&vec![
983 serialization_example::class.eq(1),
984 serialization_example::class.eq(2),
985 ])
986 .execute(conn)
987 .unwrap();
988
989 let before_barrier = Arc::new(Barrier::new(2));
990 let after_barrier = Arc::new(Barrier::new(2));
991 let threads = (1..3)
992 .map(|i| {
993 let before_barrier = before_barrier.clone();
994 let after_barrier = after_barrier.clone();
995 thread::spawn(move || {
996 use crate::connection::AnsiTransactionManager;
997 use crate::connection::TransactionManager;
998 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
999 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1000
1001 let result = conn.build_transaction().serializable().run(|conn| {
1002 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1003
1004 let _ = serialization_example::table
1005 .filter(serialization_example::class.eq(i))
1006 .count()
1007 .execute(conn)?;
1008
1009 let other_i = if i == 1 { 2 } else { 1 };
1010 let q = insert_into(serialization_example::table)
1011 .values(serialization_example::class.eq(other_i));
1012 before_barrier.wait();
1013
1014 let r = q.execute(conn);
1015 after_barrier.wait();
1016 r
1017 });
1018 assert_eq!(
1019 PgTransactionStatus::Idle,
1020 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1021 );
1022
1023 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1024 result
1025 })
1026 })
1027 .collect::<Vec<_>>();
1028
1029 let mut results = threads
1030 .into_iter()
1031 .map(|t| t.join().unwrap())
1032 .collect::<Vec<_>>();
1033
1034 results.sort_by_key(|r| r.is_err());
1035
1036 assert!(results[0].is_ok(), "Got {:?} instead", results);
1037 assert!(
1038 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1039 "Got {:?} instead",
1040 results
1041 );
1042 assert_eq!(
1043 PgTransactionStatus::Idle,
1044 conn.connection_and_transaction_manager
1045 .raw_connection
1046 .transaction_status()
1047 );
1048 }
1049
1050 #[test]
1051 #[allow(clippy::needless_collect)]
1054 fn postgres_transaction_depth_is_tracked_properly_on_nested_serialization_failure() {
1055 use crate::pg::connection::raw::PgTransactionStatus;
1056 use crate::result::DatabaseErrorKind::SerializationFailure;
1057 use crate::result::Error::DatabaseError;
1058 use crate::*;
1059 use std::sync::{Arc, Barrier};
1060 use std::thread;
1061
1062 table! {
1063 #[sql_name = "pg_nested_transaction_depth_is_tracked_properly_on_commit_failure"]
1064 serialization_example {
1065 id -> Serial,
1066 class -> Integer,
1067 }
1068 }
1069
1070 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1071
1072 sql_query(
1073 "DROP TABLE IF EXISTS pg_nested_transaction_depth_is_tracked_properly_on_commit_failure;",
1074 )
1075 .execute(conn)
1076 .unwrap();
1077 sql_query(
1078 r#"
1079 CREATE TABLE pg_nested_transaction_depth_is_tracked_properly_on_commit_failure (
1080 id SERIAL PRIMARY KEY,
1081 class INTEGER NOT NULL
1082 )
1083 "#,
1084 )
1085 .execute(conn)
1086 .unwrap();
1087
1088 insert_into(serialization_example::table)
1089 .values(&vec![
1090 serialization_example::class.eq(1),
1091 serialization_example::class.eq(2),
1092 ])
1093 .execute(conn)
1094 .unwrap();
1095
1096 let before_barrier = Arc::new(Barrier::new(2));
1097 let after_barrier = Arc::new(Barrier::new(2));
1098 let threads = (1..3)
1099 .map(|i| {
1100 let before_barrier = before_barrier.clone();
1101 let after_barrier = after_barrier.clone();
1102 thread::spawn(move || {
1103 use crate::connection::AnsiTransactionManager;
1104 use crate::connection::TransactionManager;
1105 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1106 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1107
1108 let result = conn.build_transaction().serializable().run(|conn| {
1109 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1110 let r = conn.transaction(|conn| {
1111 assert_eq!(NonZeroU32::new(2), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1112
1113 let _ = serialization_example::table
1114 .filter(serialization_example::class.eq(i))
1115 .count()
1116 .execute(conn)?;
1117
1118 let other_i = if i == 1 { 2 } else { 1 };
1119 let q = insert_into(serialization_example::table)
1120 .values(serialization_example::class.eq(other_i));
1121 before_barrier.wait();
1122
1123 let r = q.execute(conn);
1124 after_barrier.wait();
1125 r
1126 });
1127 assert_eq!(NonZeroU32::new(1), <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1128 assert_eq!(
1129 PgTransactionStatus::InTransaction,
1130 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1131 );
1132 r
1133 });
1134 assert_eq!(
1135 PgTransactionStatus::Idle,
1136 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1137 );
1138
1139 assert_eq!(None, <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(conn).transaction_depth().expect("Transaction depth"));
1140 result
1141 })
1142 })
1143 .collect::<Vec<_>>();
1144
1145 let mut results = threads
1146 .into_iter()
1147 .map(|t| t.join().unwrap())
1148 .collect::<Vec<_>>();
1149
1150 results.sort_by_key(|r| r.is_err());
1151
1152 assert!(results[0].is_ok(), "Got {:?} instead", results);
1153 assert!(
1154 matches!(&results[1], Err(DatabaseError(SerializationFailure, _))),
1155 "Got {:?} instead",
1156 results
1157 );
1158 assert_eq!(
1159 PgTransactionStatus::Idle,
1160 conn.connection_and_transaction_manager
1161 .raw_connection
1162 .transaction_status()
1163 );
1164 }
1165
1166 #[test]
1167 fn postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1168 use crate::connection::AnsiTransactionManager;
1169 use crate::connection::TransactionManager;
1170 use crate::pg::connection::raw::PgTransactionStatus;
1171 use crate::result::Error;
1172 use crate::*;
1173
1174 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1175 assert_eq!(
1176 None,
1177 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1178 conn
1179 ).transaction_depth().expect("Transaction depth")
1180 );
1181 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1182 assert_eq!(
1183 NonZeroU32::new(1),
1184 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1185 conn
1186 ).transaction_depth().expect("Transaction depth")
1187 );
1188 sql_query("DROP TABLE IF EXISTS deferred_constraint_commit").execute(conn)?;
1189 sql_query("CREATE TABLE deferred_constraint_commit(id INT UNIQUE INITIALLY DEFERRED)")
1190 .execute(conn)?;
1191 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn)?;
1192 let result =
1193 sql_query("INSERT INTO deferred_constraint_commit VALUES(1)").execute(conn);
1194 assert!(result.is_ok());
1195 assert_eq!(
1196 PgTransactionStatus::InTransaction,
1197 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1198 );
1199 Ok(())
1200 });
1201 assert_eq!(
1202 None,
1203 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1204 conn
1205 ).transaction_depth().expect("Transaction depth")
1206 );
1207 assert_eq!(
1208 PgTransactionStatus::Idle,
1209 conn.connection_and_transaction_manager
1210 .raw_connection
1211 .transaction_status()
1212 );
1213 assert!(result.is_err());
1214 }
1215
1216 #[test]
1217 fn postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1218 use crate::connection::AnsiTransactionManager;
1219 use crate::connection::TransactionManager;
1220 use crate::pg::connection::raw::PgTransactionStatus;
1221 use crate::result::Error;
1222 use crate::*;
1223
1224 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1225 assert_eq!(
1226 None,
1227 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1228 conn
1229 ).transaction_depth().expect("Transaction depth")
1230 );
1231 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1232 assert_eq!(
1233 NonZeroU32::new(1),
1234 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1235 conn
1236 ).transaction_depth().expect("Transaction depth")
1237 );
1238 sql_query("DROP TABLE IF EXISTS deferred_trigger_commit").execute(conn)?;
1239 sql_query("CREATE TABLE deferred_trigger_commit(id INT UNIQUE INITIALLY DEFERRED)")
1240 .execute(conn)?;
1241 sql_query(
1242 r#"
1243 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1244 RETURNS trigger
1245 LANGUAGE plpgsql
1246 AS $$
1247 DECLARE
1248 BEGIN
1249 IF NEW.value = 42 THEN
1250 RAISE EXCEPTION 'Transaction kaboom';
1251 END IF;
1252 RETURN NEW;
1253
1254 END;$$;
1255 "#,
1256 )
1257 .execute(conn)?;
1258
1259 sql_query(
1260 r#"
1261 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1262 AFTER INSERT ON "deferred_trigger_commit"
1263 DEFERRABLE INITIALLY DEFERRED
1264 FOR EACH ROW
1265 EXECUTE PROCEDURE transaction_depth_blow_up()
1266 "#,
1267 )
1268 .execute(conn)?;
1269 let result = sql_query("INSERT INTO deferred_trigger_commit VALUES(42)").execute(conn);
1270 assert!(result.is_ok());
1271 assert_eq!(
1272 PgTransactionStatus::InTransaction,
1273 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1274 );
1275 Ok(())
1276 });
1277 assert_eq!(
1278 None,
1279 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1280 conn
1281 ).transaction_depth().expect("Transaction depth")
1282 );
1283 assert_eq!(
1284 PgTransactionStatus::Idle,
1285 conn.connection_and_transaction_manager
1286 .raw_connection
1287 .transaction_status()
1288 );
1289 assert!(result.is_err());
1290 }
1291
1292 #[test]
1293 fn nested_postgres_transaction_is_rolled_back_upon_deferred_trigger_failure() {
1294 use crate::connection::AnsiTransactionManager;
1295 use crate::connection::TransactionManager;
1296 use crate::pg::connection::raw::PgTransactionStatus;
1297 use crate::result::Error;
1298 use crate::*;
1299
1300 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1301 assert_eq!(
1302 None,
1303 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1304 conn
1305 ).transaction_depth().expect("Transaction depth")
1306 );
1307 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1308 assert_eq!(
1309 NonZeroU32::new(1),
1310 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1311 conn
1312 ).transaction_depth().expect("Transaction depth")
1313 );
1314 sql_query("DROP TABLE IF EXISTS deferred_trigger_nested_commit").execute(conn)?;
1315 sql_query(
1316 "CREATE TABLE deferred_trigger_nested_commit(id INT UNIQUE INITIALLY DEFERRED)",
1317 )
1318 .execute(conn)?;
1319 sql_query(
1320 r#"
1321 CREATE OR REPLACE FUNCTION transaction_depth_blow_up()
1322 RETURNS trigger
1323 LANGUAGE plpgsql
1324 AS $$
1325 DECLARE
1326 BEGIN
1327 IF NEW.value = 42 THEN
1328 RAISE EXCEPTION 'Transaction kaboom';
1329 END IF;
1330 RETURN NEW;
1331
1332 END;$$;
1333 "#,
1334 )
1335 .execute(conn)?;
1336
1337 sql_query(
1338 r#"
1339 CREATE CONSTRAINT TRIGGER transaction_depth_trigger
1340 AFTER INSERT ON "deferred_trigger_nested_commit"
1341 DEFERRABLE INITIALLY DEFERRED
1342 FOR EACH ROW
1343 EXECUTE PROCEDURE transaction_depth_blow_up()
1344 "#,
1345 )
1346 .execute(conn)?;
1347 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1348 let result = sql_query("INSERT INTO deferred_trigger_nested_commit VALUES(42)")
1349 .execute(conn);
1350 assert!(result.is_ok());
1351 Ok(())
1352 });
1353 assert!(inner_result.is_err());
1354 assert_eq!(
1355 PgTransactionStatus::InTransaction,
1356 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1357 );
1358 Ok(())
1359 });
1360 assert_eq!(
1361 None,
1362 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1363 conn
1364 ).transaction_depth().expect("Transaction depth")
1365 );
1366 assert_eq!(
1367 PgTransactionStatus::Idle,
1368 conn.connection_and_transaction_manager
1369 .raw_connection
1370 .transaction_status()
1371 );
1372 assert!(result.is_ok(), "Expected success, got {:?}", result);
1373 }
1374
1375 #[test]
1376 fn nested_postgres_transaction_is_rolled_back_upon_deferred_constraint_failure() {
1377 use crate::connection::AnsiTransactionManager;
1378 use crate::connection::TransactionManager;
1379 use crate::pg::connection::raw::PgTransactionStatus;
1380 use crate::result::Error;
1381 use crate::*;
1382
1383 let conn = &mut crate::test_helpers::pg_connection_no_transaction();
1384 assert_eq!(
1385 None,
1386 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1387 conn
1388 ).transaction_depth().expect("Transaction depth")
1389 );
1390 let result: Result<_, Error> = conn.build_transaction().run(|conn| {
1391 assert_eq!(
1392 NonZeroU32::new(1),
1393 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1394 conn
1395 ).transaction_depth().expect("Transaction depth")
1396 );
1397 sql_query("DROP TABLE IF EXISTS deferred_constraint_nested_commit").execute(conn)?;
1398 sql_query("CREATE TABLE deferred_constraint_nested_commit(id INT UNIQUE INITIALLY DEFERRED)").execute(conn)?;
1399 let inner_result: Result<_, Error> = conn.build_transaction().run(|conn| {
1400 assert_eq!(
1401 NonZeroU32::new(2),
1402 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1403 conn
1404 ).transaction_depth().expect("Transaction depth")
1405 );
1406 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)?;
1407 let result = sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn);
1408 assert!(result.is_ok());
1409 Ok(())
1410 });
1411 assert!(inner_result.is_err());
1412 assert_eq!(
1413 PgTransactionStatus::InTransaction,
1414 conn.connection_and_transaction_manager.raw_connection.transaction_status()
1415 );
1416 assert_eq!(
1417 NonZeroU32::new(1),
1418 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1419 conn
1420 ).transaction_depth().expect("Transaction depth")
1421 );
1422 sql_query("INSERT INTO deferred_constraint_nested_commit VALUES(1)").execute(conn)
1423 });
1424 assert_eq!(
1425 None,
1426 <AnsiTransactionManager as TransactionManager<PgConnection>>::transaction_manager_status_mut(
1427 conn
1428 ).transaction_depth().expect("Transaction depth")
1429 );
1430 assert_eq!(
1431 PgTransactionStatus::Idle,
1432 conn.connection_and_transaction_manager
1433 .raw_connection
1434 .transaction_status()
1435 );
1436 assert!(result.is_ok());
1437 }
1438}