diesel/pg/
transaction.rs

1#![allow(dead_code)]
2use crate::backend::Backend;
3use crate::connection::{AnsiTransactionManager, TransactionManager};
4use crate::pg::Pg;
5use crate::prelude::*;
6use crate::query_builder::{AstPass, QueryBuilder, QueryFragment};
7use crate::result::Error;
8
9/// Used to build a transaction, specifying additional details.
10///
11/// This struct is returned by [`.build_transaction`].
12/// See the documentation for methods on this struct for usage examples.
13/// See [the PostgreSQL documentation for `SET TRANSACTION`][pg-docs]
14/// for details on the behavior of each option.
15///
16/// [`.build_transaction`]: PgConnection::build_transaction()
17/// [pg-docs]: https://www.postgresql.org/docs/current/static/sql-set-transaction.html
18#[allow(missing_debug_implementations)] // False positive. Connection isn't Debug.
19#[must_use = "Transaction builder does nothing unless you call `run` on it"]
20#[cfg(feature = "postgres_backend")]
21pub struct TransactionBuilder<'a, C> {
22    connection: &'a mut C,
23    isolation_level: Option<IsolationLevel>,
24    read_mode: Option<ReadMode>,
25    deferrable: Option<Deferrable>,
26}
27
28impl<'a, C> TransactionBuilder<'a, C>
29where
30    C: Connection<Backend = Pg, TransactionManager = AnsiTransactionManager>,
31{
32    /// Creates a new TransactionBuilder.
33    #[diesel_derives::__diesel_public_if(
34        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
35    )]
36    pub(crate) fn new(connection: &'a mut C) -> Self {
37        Self {
38            connection,
39            isolation_level: None,
40            read_mode: None,
41            deferrable: None,
42        }
43    }
44
45    /// Makes the transaction `READ ONLY`
46    ///
47    /// # Example
48    ///
49    /// ```rust
50    /// # include!("../doctest_setup.rs");
51    /// # use diesel::sql_query;
52    /// #
53    /// # fn main() {
54    /// #     run_test().unwrap();
55    /// # }
56    /// #
57    /// # table! {
58    /// #     users_for_read_only {
59    /// #         id -> Integer,
60    /// #         name -> Text,
61    /// #     }
62    /// # }
63    /// #
64    /// # fn run_test() -> QueryResult<()> {
65    /// #     use users_for_read_only::table as users;
66    /// #     use users_for_read_only::columns::*;
67    /// #     let conn = &mut connection_no_transaction();
68    /// #     sql_query("CREATE TABLE IF NOT EXISTS users_for_read_only (
69    /// #       id SERIAL PRIMARY KEY,
70    /// #       name TEXT NOT NULL
71    /// #     )").execute(conn)?;
72    /// conn.build_transaction()
73    ///     .read_only()
74    ///     .run::<_, diesel::result::Error, _>(|conn| {
75    ///         let read_attempt = users.select(name).load::<String>(conn);
76    ///         assert!(read_attempt.is_ok());
77    ///
78    ///         let write_attempt = diesel::insert_into(users)
79    ///             .values(name.eq("Ruby"))
80    ///             .execute(conn);
81    ///         assert!(write_attempt.is_err());
82    ///
83    ///         Ok(())
84    ///     })?;
85    /// #     sql_query("DROP TABLE users_for_read_only").execute(conn)?;
86    /// #     Ok(())
87    /// # }
88    /// ```
89    pub fn read_only(mut self) -> Self {
90        self.read_mode = Some(ReadMode::ReadOnly);
91        self
92    }
93
94    /// Makes the transaction `READ WRITE`
95    ///
96    /// This is the default, unless you've changed the
97    /// `default_transaction_read_only` configuration parameter.
98    ///
99    /// # Example
100    ///
101    /// ```rust
102    /// # include!("../doctest_setup.rs");
103    /// # use diesel::result::Error::RollbackTransaction;
104    /// # use diesel::sql_query;
105    /// #
106    /// # fn main() {
107    /// #     assert_eq!(run_test(), Err(RollbackTransaction));
108    /// # }
109    /// #
110    /// # fn run_test() -> QueryResult<()> {
111    /// #     use schema::users::dsl::*;
112    /// #     let conn = &mut connection_no_transaction();
113    /// conn.build_transaction().read_write().run(|conn| {
114    /// #         sql_query("CREATE TABLE IF NOT EXISTS users (
115    /// #             id SERIAL PRIMARY KEY,
116    /// #             name TEXT NOT NULL
117    /// #         )").execute(conn)?;
118    ///     let read_attempt = users.select(name).load::<String>(conn);
119    ///     assert!(read_attempt.is_ok());
120    ///
121    ///     let write_attempt = diesel::insert_into(users)
122    ///         .values(name.eq("Ruby"))
123    ///         .execute(conn);
124    ///     assert!(write_attempt.is_ok());
125    ///
126    /// #       Err(RollbackTransaction)
127    /// #       /*
128    ///     Ok(())
129    /// #       */
130    /// })
131    /// # }
132    /// ```
133    pub fn read_write(mut self) -> Self {
134        self.read_mode = Some(ReadMode::ReadWrite);
135        self
136    }
137
138    /// Makes the transaction `DEFERRABLE`
139    ///
140    /// # Example
141    ///
142    /// ```rust
143    /// # include!("../doctest_setup.rs");
144    /// #
145    /// # fn main() {
146    /// #     run_test().unwrap();
147    /// # }
148    /// #
149    /// # fn run_test() -> QueryResult<()> {
150    /// #     use schema::users::dsl::*;
151    /// #     let conn = &mut connection_no_transaction();
152    /// conn.build_transaction().deferrable().run(|conn| Ok(()))
153    /// # }
154    /// ```
155    pub fn deferrable(mut self) -> Self {
156        self.deferrable = Some(Deferrable::Deferrable);
157        self
158    }
159
160    /// Makes the transaction `NOT DEFERRABLE`
161    ///
162    /// This is the default, unless you've changed the
163    /// `default_transaction_deferrable` configuration parameter.
164    ///
165    /// # Example
166    ///
167    /// ```rust
168    /// # include!("../doctest_setup.rs");
169    /// #
170    /// # fn main() {
171    /// #     run_test().unwrap();
172    /// # }
173    /// #
174    /// # fn run_test() -> QueryResult<()> {
175    /// #     use schema::users::dsl::*;
176    /// #     let conn = &mut connection_no_transaction();
177    /// conn.build_transaction().not_deferrable().run(|conn| Ok(()))
178    /// # }
179    /// ```
180    pub fn not_deferrable(mut self) -> Self {
181        self.deferrable = Some(Deferrable::NotDeferrable);
182        self
183    }
184
185    /// Makes the transaction `ISOLATION LEVEL READ COMMITTED`
186    ///
187    /// This is the default, unless you've changed the
188    /// `default_transaction_isolation_level` configuration parameter.
189    ///
190    /// # Example
191    ///
192    /// ```rust
193    /// # include!("../doctest_setup.rs");
194    /// #
195    /// # fn main() {
196    /// #     run_test().unwrap();
197    /// # }
198    /// #
199    /// # fn run_test() -> QueryResult<()> {
200    /// #     use schema::users::dsl::*;
201    /// #     let conn = &mut connection_no_transaction();
202    /// conn.build_transaction().read_committed().run(|conn| Ok(()))
203    /// # }
204    /// ```
205    pub fn read_committed(mut self) -> Self {
206        self.isolation_level = Some(IsolationLevel::ReadCommitted);
207        self
208    }
209
210    /// Makes the transaction `ISOLATION LEVEL REPEATABLE READ`
211    ///
212    /// # Example
213    ///
214    /// ```rust
215    /// # include!("../doctest_setup.rs");
216    /// #
217    /// # fn main() {
218    /// #     run_test().unwrap();
219    /// # }
220    /// #
221    /// # fn run_test() -> QueryResult<()> {
222    /// #     use schema::users::dsl::*;
223    /// #     let conn = &mut connection_no_transaction();
224    /// conn.build_transaction()
225    ///     .repeatable_read()
226    ///     .run(|conn| Ok(()))
227    /// # }
228    /// ```
229    pub fn repeatable_read(mut self) -> Self {
230        self.isolation_level = Some(IsolationLevel::RepeatableRead);
231        self
232    }
233
234    /// Makes the transaction `ISOLATION LEVEL SERIALIZABLE`
235    ///
236    /// # Example
237    ///
238    /// ```rust
239    /// # include!("../doctest_setup.rs");
240    /// #
241    /// # fn main() {
242    /// #     run_test().unwrap();
243    /// # }
244    /// #
245    /// # fn run_test() -> QueryResult<()> {
246    /// #     use schema::users::dsl::*;
247    /// #     let conn = &mut connection_no_transaction();
248    /// conn.build_transaction().serializable().run(|conn| Ok(()))
249    /// # }
250    /// ```
251    pub fn serializable(mut self) -> Self {
252        self.isolation_level = Some(IsolationLevel::Serializable);
253        self
254    }
255
256    /// Runs the given function inside of the transaction
257    /// with the parameters given to this builder.
258    ///
259    /// This function executes the provided closure `f` inside a database
260    /// transaction. If there is already an open transaction for the current
261    /// connection it will return an error. The connection is committed if
262    /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
263    /// For both cases the original result value will be returned from this function.
264    ///
265    /// If the transaction fails to commit and requires a rollback according to Postgres,
266    /// (e.g. serialization failure) a rollback will be attempted.
267    /// If the rollback fails, the error will be returned in a
268    /// [`Error::RollbackErrorOnCommit`](crate::result::Error::RollbackErrorOnCommit),
269    /// from which you will be able to extract both the original commit error and
270    /// the rollback error.
271    /// In addition, the connection will be considered broken
272    /// as it contains a uncommitted unabortable open transaction. Any further
273    /// interaction with the transaction system will result in an returned error
274    /// in this case.
275    pub fn run<T, E, F>(&mut self, f: F) -> Result<T, E>
276    where
277        F: FnOnce(&mut C) -> Result<T, E>,
278        E: From<Error>,
279    {
280        let mut query_builder = <Pg as Backend>::QueryBuilder::default();
281        self.to_sql(&mut query_builder, &Pg)?;
282        let sql = query_builder.finish();
283
284        AnsiTransactionManager::begin_transaction_sql(&mut *self.connection, &sql)?;
285        match f(&mut *self.connection) {
286            Ok(value) => {
287                AnsiTransactionManager::commit_transaction(&mut *self.connection)?;
288                Ok(value)
289            }
290            Err(user_error) => {
291                match AnsiTransactionManager::rollback_transaction(&mut *self.connection) {
292                    Ok(()) => Err(user_error),
293                    Err(Error::BrokenTransactionManager) => {
294                        // In this case we are probably more interested by the
295                        // original error, which likely caused this
296                        Err(user_error)
297                    }
298                    Err(rollback_error) => Err(rollback_error.into()),
299                }
300            }
301        }
302    }
303}
304
305impl<C> QueryFragment<Pg> for TransactionBuilder<'_, C> {
306    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
307        out.push_sql("BEGIN TRANSACTION");
308        if let Some(ref isolation_level) = self.isolation_level {
309            isolation_level.walk_ast(out.reborrow())?;
310        }
311        if let Some(ref read_mode) = self.read_mode {
312            read_mode.walk_ast(out.reborrow())?;
313        }
314        if let Some(ref deferrable) = self.deferrable {
315            deferrable.walk_ast(out.reborrow())?;
316        }
317        Ok(())
318    }
319}
320
321#[derive(Debug, Clone, Copy)]
322enum IsolationLevel {
323    ReadCommitted,
324    RepeatableRead,
325    Serializable,
326}
327
328impl QueryFragment<Pg> for IsolationLevel {
329    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
330        out.push_sql(" ISOLATION LEVEL ");
331        match *self {
332            IsolationLevel::ReadCommitted => out.push_sql("READ COMMITTED"),
333            IsolationLevel::RepeatableRead => out.push_sql("REPEATABLE READ"),
334            IsolationLevel::Serializable => out.push_sql("SERIALIZABLE"),
335        }
336        Ok(())
337    }
338}
339
340#[derive(Debug, Clone, Copy)]
341enum ReadMode {
342    ReadOnly,
343    ReadWrite,
344}
345
346impl QueryFragment<Pg> for ReadMode {
347    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
348        match *self {
349            ReadMode::ReadOnly => out.push_sql(" READ ONLY"),
350            ReadMode::ReadWrite => out.push_sql(" READ WRITE"),
351        }
352        Ok(())
353    }
354}
355
356#[derive(Debug, Clone, Copy)]
357enum Deferrable {
358    Deferrable,
359    NotDeferrable,
360}
361
362impl QueryFragment<Pg> for Deferrable {
363    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
364        match *self {
365            Deferrable::Deferrable => out.push_sql(" DEFERRABLE"),
366            Deferrable::NotDeferrable => out.push_sql(" NOT DEFERRABLE"),
367        }
368        Ok(())
369    }
370}
371
372#[cfg(test)]
373#[diesel_test_helper::test]
374fn test_transaction_builder_generates_correct_sql() {
375    extern crate dotenvy;
376
377    macro_rules! assert_sql {
378        ($query:expr, $sql:expr) => {
379            let mut query_builder = <Pg as Backend>::QueryBuilder::default();
380            $query.to_sql(&mut query_builder, &Pg).unwrap();
381            let sql = query_builder.finish();
382            assert_eq!(sql, $sql);
383        };
384    }
385
386    let database_url = dotenvy::var("PG_DATABASE_URL")
387        .or_else(|_| dotenvy::var("DATABASE_URL"))
388        .expect("DATABASE_URL must be set in order to run tests");
389    let mut conn = PgConnection::establish(&database_url).unwrap();
390
391    assert_sql!(conn.build_transaction(), "BEGIN TRANSACTION");
392    assert_sql!(
393        conn.build_transaction().read_only(),
394        "BEGIN TRANSACTION READ ONLY"
395    );
396    assert_sql!(
397        conn.build_transaction().read_write(),
398        "BEGIN TRANSACTION READ WRITE"
399    );
400    assert_sql!(
401        conn.build_transaction().deferrable(),
402        "BEGIN TRANSACTION DEFERRABLE"
403    );
404    assert_sql!(
405        conn.build_transaction().not_deferrable(),
406        "BEGIN TRANSACTION NOT DEFERRABLE"
407    );
408    assert_sql!(
409        conn.build_transaction().read_committed(),
410        "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"
411    );
412    assert_sql!(
413        conn.build_transaction().repeatable_read(),
414        "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"
415    );
416    assert_sql!(
417        conn.build_transaction().serializable(),
418        "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"
419    );
420    assert_sql!(
421        conn.build_transaction()
422            .serializable()
423            .deferrable()
424            .read_only(),
425        "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE"
426    );
427}