diesel/pg/
transaction.rs

1#![allow(dead_code)]
2use crate::backend::Backend;
3use crate::connection::{AnsiTransactionManager, TransactionManager};
4use crate::pg::Pg;
5use crate::prelude::*;
6use crate::query_builder::{AstPass, QueryBuilder, QueryFragment};
7use crate::result::Error;
8
9/// Used to build a transaction, specifying additional details.
10///
11/// This struct is returned by [`.build_transaction`].
12/// See the documentation for methods on this struct for usage examples.
13/// See [the PostgreSQL documentation for `SET TRANSACTION`][pg-docs]
14/// for details on the behavior of each option.
15///
16/// [`.build_transaction`]: PgConnection::build_transaction()
17/// [pg-docs]: https://www.postgresql.org/docs/current/static/sql-set-transaction.html
18#[allow(missing_debug_implementations)] // False positive. Connection isn't Debug.
19#[must_use = "Transaction builder does nothing unless you call `run` on it"]
20#[cfg(feature = "postgres_backend")]
21pub struct TransactionBuilder<'a, C> {
22    connection: &'a mut C,
23    isolation_level: Option<IsolationLevel>,
24    read_mode: Option<ReadMode>,
25    deferrable: Option<Deferrable>,
26}
27
28impl<'a, C> TransactionBuilder<'a, C>
29where
30    C: Connection<Backend = Pg, TransactionManager = AnsiTransactionManager>,
31{
32    /// Creates a new TransactionBuilder.
33    #[diesel_derives::__diesel_public_if(
34        feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
35    )]
36    pub(crate) fn new(connection: &'a mut C) -> Self {
37        Self {
38            connection,
39            isolation_level: None,
40            read_mode: None,
41            deferrable: None,
42        }
43    }
44
45    /// Makes the transaction `READ ONLY`
46    ///
47    /// # Example
48    ///
49    /// ```rust
50    /// # include!("../doctest_setup.rs");
51    /// # use diesel::sql_query;
52    /// #
53    /// # fn main() {
54    /// #     run_test().unwrap();
55    /// # }
56    /// #
57    /// # table! {
58    /// #     users_for_read_only {
59    /// #         id -> Integer,
60    /// #         name -> Text,
61    /// #     }
62    /// # }
63    /// #
64    /// # fn run_test() -> QueryResult<()> {
65    /// #     use users_for_read_only::table as users;
66    /// #     use users_for_read_only::columns::*;
67    /// #     let conn = &mut connection_no_transaction();
68    /// #     sql_query("CREATE TABLE IF NOT EXISTS users_for_read_only (
69    /// #       id SERIAL PRIMARY KEY,
70    /// #       name TEXT NOT NULL
71    /// #     )").execute(conn)?;
72    /// conn.build_transaction()
73    ///     .read_only()
74    ///     .run::<_, diesel::result::Error, _>(|conn| {
75    ///         let read_attempt = users.select(name).load::<String>(conn);
76    ///         assert!(read_attempt.is_ok());
77    ///
78    ///         let write_attempt = diesel::insert_into(users)
79    ///             .values(name.eq("Ruby"))
80    ///             .execute(conn);
81    ///         assert!(write_attempt.is_err());
82    ///
83    ///         Ok(())
84    ///     })?;
85    /// #     sql_query("DROP TABLE users_for_read_only").execute(conn)?;
86    /// #     Ok(())
87    /// # }
88    /// ```
89    pub fn read_only(mut self) -> Self {
90        self.read_mode = Some(ReadMode::ReadOnly);
91        self
92    }
93
94    /// Makes the transaction `READ WRITE`
95    ///
96    /// This is the default, unless you've changed the
97    /// `default_transaction_read_only` configuration parameter.
98    ///
99    /// # Example
100    ///
101    /// ```rust
102    /// # include!("../doctest_setup.rs");
103    /// # use diesel::result::Error::RollbackTransaction;
104    /// # use diesel::sql_query;
105    /// #
106    /// # fn main() {
107    /// #     assert_eq!(run_test(), Err(RollbackTransaction));
108    /// # }
109    /// #
110    /// # fn run_test() -> QueryResult<()> {
111    /// #     use schema::users::dsl::*;
112    /// #     let conn = &mut connection_no_transaction();
113    /// conn.build_transaction()
114    ///     .read_write()
115    ///     .run(|conn| {
116    /// #         sql_query("CREATE TABLE IF NOT EXISTS users (
117    /// #             id SERIAL PRIMARY KEY,
118    /// #             name TEXT NOT NULL
119    /// #         )").execute(conn)?;
120    ///         let read_attempt = users.select(name).load::<String>(conn);
121    ///         assert!(read_attempt.is_ok());
122    ///
123    ///         let write_attempt = diesel::insert_into(users)
124    ///             .values(name.eq("Ruby"))
125    ///             .execute(conn);
126    ///         assert!(write_attempt.is_ok());
127    ///
128    /// #       Err(RollbackTransaction)
129    /// #       /*
130    ///         Ok(())
131    /// #       */
132    ///     })
133    /// # }
134    /// ```
135    pub fn read_write(mut self) -> Self {
136        self.read_mode = Some(ReadMode::ReadWrite);
137        self
138    }
139
140    /// Makes the transaction `DEFERRABLE`
141    ///
142    /// # Example
143    ///
144    /// ```rust
145    /// # include!("../doctest_setup.rs");
146    /// #
147    /// # fn main() {
148    /// #     run_test().unwrap();
149    /// # }
150    /// #
151    /// # fn run_test() -> QueryResult<()> {
152    /// #     use schema::users::dsl::*;
153    /// #     let conn = &mut connection_no_transaction();
154    /// conn.build_transaction()
155    ///     .deferrable()
156    ///     .run(|conn| Ok(()))
157    /// # }
158    /// ```
159    pub fn deferrable(mut self) -> Self {
160        self.deferrable = Some(Deferrable::Deferrable);
161        self
162    }
163
164    /// Makes the transaction `NOT DEFERRABLE`
165    ///
166    /// This is the default, unless you've changed the
167    /// `default_transaction_deferrable` configuration parameter.
168    ///
169    /// # Example
170    ///
171    /// ```rust
172    /// # include!("../doctest_setup.rs");
173    /// #
174    /// # fn main() {
175    /// #     run_test().unwrap();
176    /// # }
177    /// #
178    /// # fn run_test() -> QueryResult<()> {
179    /// #     use schema::users::dsl::*;
180    /// #     let conn = &mut connection_no_transaction();
181    /// conn.build_transaction()
182    ///     .not_deferrable()
183    ///     .run(|conn| Ok(()))
184    /// # }
185    /// ```
186    pub fn not_deferrable(mut self) -> Self {
187        self.deferrable = Some(Deferrable::NotDeferrable);
188        self
189    }
190
191    /// Makes the transaction `ISOLATION LEVEL READ COMMITTED`
192    ///
193    /// This is the default, unless you've changed the
194    /// `default_transaction_isolation_level` configuration parameter.
195    ///
196    /// # Example
197    ///
198    /// ```rust
199    /// # include!("../doctest_setup.rs");
200    /// #
201    /// # fn main() {
202    /// #     run_test().unwrap();
203    /// # }
204    /// #
205    /// # fn run_test() -> QueryResult<()> {
206    /// #     use schema::users::dsl::*;
207    /// #     let conn = &mut connection_no_transaction();
208    /// conn.build_transaction()
209    ///     .read_committed()
210    ///     .run(|conn| Ok(()))
211    /// # }
212    /// ```
213    pub fn read_committed(mut self) -> Self {
214        self.isolation_level = Some(IsolationLevel::ReadCommitted);
215        self
216    }
217
218    /// Makes the transaction `ISOLATION LEVEL REPEATABLE READ`
219    ///
220    /// # Example
221    ///
222    /// ```rust
223    /// # include!("../doctest_setup.rs");
224    /// #
225    /// # fn main() {
226    /// #     run_test().unwrap();
227    /// # }
228    /// #
229    /// # fn run_test() -> QueryResult<()> {
230    /// #     use schema::users::dsl::*;
231    /// #     let conn = &mut connection_no_transaction();
232    /// conn.build_transaction()
233    ///     .repeatable_read()
234    ///     .run(|conn| Ok(()))
235    /// # }
236    /// ```
237    pub fn repeatable_read(mut self) -> Self {
238        self.isolation_level = Some(IsolationLevel::RepeatableRead);
239        self
240    }
241
242    /// Makes the transaction `ISOLATION LEVEL SERIALIZABLE`
243    ///
244    /// # Example
245    ///
246    /// ```rust
247    /// # include!("../doctest_setup.rs");
248    /// #
249    /// # fn main() {
250    /// #     run_test().unwrap();
251    /// # }
252    /// #
253    /// # fn run_test() -> QueryResult<()> {
254    /// #     use schema::users::dsl::*;
255    /// #     let conn = &mut connection_no_transaction();
256    /// conn.build_transaction()
257    ///     .serializable()
258    ///     .run(|conn| Ok(()))
259    /// # }
260    /// ```
261    pub fn serializable(mut self) -> Self {
262        self.isolation_level = Some(IsolationLevel::Serializable);
263        self
264    }
265
266    /// Runs the given function inside of the transaction
267    /// with the parameters given to this builder.
268    ///
269    /// This function executes the provided closure `f` inside a database
270    /// transaction. If there is already an open transaction for the current
271    /// connection it will return an error. The connection is committed if
272    /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
273    /// For both cases the original result value will be returned from this function.
274    ///
275    /// If the transaction fails to commit and requires a rollback according to Postgres,
276    /// (e.g. serialization failure) a rollback will be attempted.
277    /// If the rollback fails, the error will be returned in a
278    /// [`Error::RollbackErrorOnCommit`](crate::result::Error::RollbackErrorOnCommit),
279    /// from which you will be able to extract both the original commit error and
280    /// the rollback error.
281    /// In addition, the connection will be considered broken
282    /// as it contains a uncommitted unabortable open transaction. Any further
283    /// interaction with the transaction system will result in an returned error
284    /// in this case.
285    pub fn run<T, E, F>(&mut self, f: F) -> Result<T, E>
286    where
287        F: FnOnce(&mut C) -> Result<T, E>,
288        E: From<Error>,
289    {
290        let mut query_builder = <Pg as Backend>::QueryBuilder::default();
291        self.to_sql(&mut query_builder, &Pg)?;
292        let sql = query_builder.finish();
293
294        AnsiTransactionManager::begin_transaction_sql(&mut *self.connection, &sql)?;
295        match f(&mut *self.connection) {
296            Ok(value) => {
297                AnsiTransactionManager::commit_transaction(&mut *self.connection)?;
298                Ok(value)
299            }
300            Err(user_error) => {
301                match AnsiTransactionManager::rollback_transaction(&mut *self.connection) {
302                    Ok(()) => Err(user_error),
303                    Err(Error::BrokenTransactionManager) => {
304                        // In this case we are probably more interested by the
305                        // original error, which likely caused this
306                        Err(user_error)
307                    }
308                    Err(rollback_error) => Err(rollback_error.into()),
309                }
310            }
311        }
312    }
313}
314
315impl<C> QueryFragment<Pg> for TransactionBuilder<'_, C> {
316    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
317        out.push_sql("BEGIN TRANSACTION");
318        if let Some(ref isolation_level) = self.isolation_level {
319            isolation_level.walk_ast(out.reborrow())?;
320        }
321        if let Some(ref read_mode) = self.read_mode {
322            read_mode.walk_ast(out.reborrow())?;
323        }
324        if let Some(ref deferrable) = self.deferrable {
325            deferrable.walk_ast(out.reborrow())?;
326        }
327        Ok(())
328    }
329}
330
331#[derive(Debug, Clone, Copy)]
332enum IsolationLevel {
333    ReadCommitted,
334    RepeatableRead,
335    Serializable,
336}
337
338impl QueryFragment<Pg> for IsolationLevel {
339    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
340        out.push_sql(" ISOLATION LEVEL ");
341        match *self {
342            IsolationLevel::ReadCommitted => out.push_sql("READ COMMITTED"),
343            IsolationLevel::RepeatableRead => out.push_sql("REPEATABLE READ"),
344            IsolationLevel::Serializable => out.push_sql("SERIALIZABLE"),
345        }
346        Ok(())
347    }
348}
349
350#[derive(Debug, Clone, Copy)]
351enum ReadMode {
352    ReadOnly,
353    ReadWrite,
354}
355
356impl QueryFragment<Pg> for ReadMode {
357    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
358        match *self {
359            ReadMode::ReadOnly => out.push_sql(" READ ONLY"),
360            ReadMode::ReadWrite => out.push_sql(" READ WRITE"),
361        }
362        Ok(())
363    }
364}
365
366#[derive(Debug, Clone, Copy)]
367enum Deferrable {
368    Deferrable,
369    NotDeferrable,
370}
371
372impl QueryFragment<Pg> for Deferrable {
373    fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
374        match *self {
375            Deferrable::Deferrable => out.push_sql(" DEFERRABLE"),
376            Deferrable::NotDeferrable => out.push_sql(" NOT DEFERRABLE"),
377        }
378        Ok(())
379    }
380}
381
382#[cfg(test)]
383#[diesel_test_helper::test]
384fn test_transaction_builder_generates_correct_sql() {
385    extern crate dotenvy;
386
387    macro_rules! assert_sql {
388        ($query:expr, $sql:expr) => {
389            let mut query_builder = <Pg as Backend>::QueryBuilder::default();
390            $query.to_sql(&mut query_builder, &Pg).unwrap();
391            let sql = query_builder.finish();
392            assert_eq!(sql, $sql);
393        };
394    }
395
396    let database_url = dotenvy::var("PG_DATABASE_URL")
397        .or_else(|_| dotenvy::var("DATABASE_URL"))
398        .expect("DATABASE_URL must be set in order to run tests");
399    let mut conn = PgConnection::establish(&database_url).unwrap();
400
401    assert_sql!(conn.build_transaction(), "BEGIN TRANSACTION");
402    assert_sql!(
403        conn.build_transaction().read_only(),
404        "BEGIN TRANSACTION READ ONLY"
405    );
406    assert_sql!(
407        conn.build_transaction().read_write(),
408        "BEGIN TRANSACTION READ WRITE"
409    );
410    assert_sql!(
411        conn.build_transaction().deferrable(),
412        "BEGIN TRANSACTION DEFERRABLE"
413    );
414    assert_sql!(
415        conn.build_transaction().not_deferrable(),
416        "BEGIN TRANSACTION NOT DEFERRABLE"
417    );
418    assert_sql!(
419        conn.build_transaction().read_committed(),
420        "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"
421    );
422    assert_sql!(
423        conn.build_transaction().repeatable_read(),
424        "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"
425    );
426    assert_sql!(
427        conn.build_transaction().serializable(),
428        "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"
429    );
430    assert_sql!(
431        conn.build_transaction()
432            .serializable()
433            .deferrable()
434            .read_only(),
435        "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE"
436    );
437}