diesel/pg/transaction.rs
1#![allow(dead_code)]
2use crate::backend::Backend;
3use crate::connection::{AnsiTransactionManager, TransactionManager};
4use crate::pg::Pg;
5use crate::prelude::*;
6use crate::query_builder::{AstPass, QueryBuilder, QueryFragment};
7use crate::result::Error;
8
9/// Used to build a transaction, specifying additional details.
10///
11/// This struct is returned by [`.build_transaction`].
12/// See the documentation for methods on this struct for usage examples.
13/// See [the PostgreSQL documentation for `SET TRANSACTION`][pg-docs]
14/// for details on the behavior of each option.
15///
16/// [`.build_transaction`]: PgConnection::build_transaction()
17/// [pg-docs]: https://www.postgresql.org/docs/current/static/sql-set-transaction.html
18#[allow(missing_debug_implementations)] // False positive. Connection isn't Debug.
19#[must_use = "Transaction builder does nothing unless you call `run` on it"]
20#[cfg(feature = "postgres_backend")]
21pub struct TransactionBuilder<'a, C> {
22 connection: &'a mut C,
23 isolation_level: Option<IsolationLevel>,
24 read_mode: Option<ReadMode>,
25 deferrable: Option<Deferrable>,
26}
27
28impl<'a, C> TransactionBuilder<'a, C>
29where
30 C: Connection<Backend = Pg, TransactionManager = AnsiTransactionManager>,
31{
32 /// Creates a new TransactionBuilder.
33 #[diesel_derives::__diesel_public_if(
34 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
35 )]
36 pub(crate) fn new(connection: &'a mut C) -> Self {
37 Self {
38 connection,
39 isolation_level: None,
40 read_mode: None,
41 deferrable: None,
42 }
43 }
44
45 /// Makes the transaction `READ ONLY`
46 ///
47 /// # Example
48 ///
49 /// ```rust
50 /// # include!("../doctest_setup.rs");
51 /// # use diesel::sql_query;
52 /// #
53 /// # fn main() {
54 /// # run_test().unwrap();
55 /// # }
56 /// #
57 /// # table! {
58 /// # users_for_read_only {
59 /// # id -> Integer,
60 /// # name -> Text,
61 /// # }
62 /// # }
63 /// #
64 /// # fn run_test() -> QueryResult<()> {
65 /// # use users_for_read_only::table as users;
66 /// # use users_for_read_only::columns::*;
67 /// # let conn = &mut connection_no_transaction();
68 /// # sql_query("CREATE TABLE IF NOT EXISTS users_for_read_only (
69 /// # id SERIAL PRIMARY KEY,
70 /// # name TEXT NOT NULL
71 /// # )").execute(conn)?;
72 /// conn.build_transaction()
73 /// .read_only()
74 /// .run::<_, diesel::result::Error, _>(|conn| {
75 /// let read_attempt = users.select(name).load::<String>(conn);
76 /// assert!(read_attempt.is_ok());
77 ///
78 /// let write_attempt = diesel::insert_into(users)
79 /// .values(name.eq("Ruby"))
80 /// .execute(conn);
81 /// assert!(write_attempt.is_err());
82 ///
83 /// Ok(())
84 /// })?;
85 /// # sql_query("DROP TABLE users_for_read_only").execute(conn)?;
86 /// # Ok(())
87 /// # }
88 /// ```
89 pub fn read_only(mut self) -> Self {
90 self.read_mode = Some(ReadMode::ReadOnly);
91 self
92 }
93
94 /// Makes the transaction `READ WRITE`
95 ///
96 /// This is the default, unless you've changed the
97 /// `default_transaction_read_only` configuration parameter.
98 ///
99 /// # Example
100 ///
101 /// ```rust
102 /// # include!("../doctest_setup.rs");
103 /// # use diesel::result::Error::RollbackTransaction;
104 /// # use diesel::sql_query;
105 /// #
106 /// # fn main() {
107 /// # assert_eq!(run_test(), Err(RollbackTransaction));
108 /// # }
109 /// #
110 /// # fn run_test() -> QueryResult<()> {
111 /// # use schema::users::dsl::*;
112 /// # let conn = &mut connection_no_transaction();
113 /// conn.build_transaction()
114 /// .read_write()
115 /// .run(|conn| {
116 /// # sql_query("CREATE TABLE IF NOT EXISTS users (
117 /// # id SERIAL PRIMARY KEY,
118 /// # name TEXT NOT NULL
119 /// # )").execute(conn)?;
120 /// let read_attempt = users.select(name).load::<String>(conn);
121 /// assert!(read_attempt.is_ok());
122 ///
123 /// let write_attempt = diesel::insert_into(users)
124 /// .values(name.eq("Ruby"))
125 /// .execute(conn);
126 /// assert!(write_attempt.is_ok());
127 ///
128 /// # Err(RollbackTransaction)
129 /// # /*
130 /// Ok(())
131 /// # */
132 /// })
133 /// # }
134 /// ```
135 pub fn read_write(mut self) -> Self {
136 self.read_mode = Some(ReadMode::ReadWrite);
137 self
138 }
139
140 /// Makes the transaction `DEFERRABLE`
141 ///
142 /// # Example
143 ///
144 /// ```rust
145 /// # include!("../doctest_setup.rs");
146 /// #
147 /// # fn main() {
148 /// # run_test().unwrap();
149 /// # }
150 /// #
151 /// # fn run_test() -> QueryResult<()> {
152 /// # use schema::users::dsl::*;
153 /// # let conn = &mut connection_no_transaction();
154 /// conn.build_transaction()
155 /// .deferrable()
156 /// .run(|conn| Ok(()))
157 /// # }
158 /// ```
159 pub fn deferrable(mut self) -> Self {
160 self.deferrable = Some(Deferrable::Deferrable);
161 self
162 }
163
164 /// Makes the transaction `NOT DEFERRABLE`
165 ///
166 /// This is the default, unless you've changed the
167 /// `default_transaction_deferrable` configuration parameter.
168 ///
169 /// # Example
170 ///
171 /// ```rust
172 /// # include!("../doctest_setup.rs");
173 /// #
174 /// # fn main() {
175 /// # run_test().unwrap();
176 /// # }
177 /// #
178 /// # fn run_test() -> QueryResult<()> {
179 /// # use schema::users::dsl::*;
180 /// # let conn = &mut connection_no_transaction();
181 /// conn.build_transaction()
182 /// .not_deferrable()
183 /// .run(|conn| Ok(()))
184 /// # }
185 /// ```
186 pub fn not_deferrable(mut self) -> Self {
187 self.deferrable = Some(Deferrable::NotDeferrable);
188 self
189 }
190
191 /// Makes the transaction `ISOLATION LEVEL READ COMMITTED`
192 ///
193 /// This is the default, unless you've changed the
194 /// `default_transaction_isolation_level` configuration parameter.
195 ///
196 /// # Example
197 ///
198 /// ```rust
199 /// # include!("../doctest_setup.rs");
200 /// #
201 /// # fn main() {
202 /// # run_test().unwrap();
203 /// # }
204 /// #
205 /// # fn run_test() -> QueryResult<()> {
206 /// # use schema::users::dsl::*;
207 /// # let conn = &mut connection_no_transaction();
208 /// conn.build_transaction()
209 /// .read_committed()
210 /// .run(|conn| Ok(()))
211 /// # }
212 /// ```
213 pub fn read_committed(mut self) -> Self {
214 self.isolation_level = Some(IsolationLevel::ReadCommitted);
215 self
216 }
217
218 /// Makes the transaction `ISOLATION LEVEL REPEATABLE READ`
219 ///
220 /// # Example
221 ///
222 /// ```rust
223 /// # include!("../doctest_setup.rs");
224 /// #
225 /// # fn main() {
226 /// # run_test().unwrap();
227 /// # }
228 /// #
229 /// # fn run_test() -> QueryResult<()> {
230 /// # use schema::users::dsl::*;
231 /// # let conn = &mut connection_no_transaction();
232 /// conn.build_transaction()
233 /// .repeatable_read()
234 /// .run(|conn| Ok(()))
235 /// # }
236 /// ```
237 pub fn repeatable_read(mut self) -> Self {
238 self.isolation_level = Some(IsolationLevel::RepeatableRead);
239 self
240 }
241
242 /// Makes the transaction `ISOLATION LEVEL SERIALIZABLE`
243 ///
244 /// # Example
245 ///
246 /// ```rust
247 /// # include!("../doctest_setup.rs");
248 /// #
249 /// # fn main() {
250 /// # run_test().unwrap();
251 /// # }
252 /// #
253 /// # fn run_test() -> QueryResult<()> {
254 /// # use schema::users::dsl::*;
255 /// # let conn = &mut connection_no_transaction();
256 /// conn.build_transaction()
257 /// .serializable()
258 /// .run(|conn| Ok(()))
259 /// # }
260 /// ```
261 pub fn serializable(mut self) -> Self {
262 self.isolation_level = Some(IsolationLevel::Serializable);
263 self
264 }
265
266 /// Runs the given function inside of the transaction
267 /// with the parameters given to this builder.
268 ///
269 /// This function executes the provided closure `f` inside a database
270 /// transaction. If there is already an open transaction for the current
271 /// connection it will return an error. The connection is committed if
272 /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
273 /// For both cases the original result value will be returned from this function.
274 ///
275 /// If the transaction fails to commit and requires a rollback according to Postgres,
276 /// (e.g. serialization failure) a rollback will be attempted.
277 /// If the rollback fails, the error will be returned in a
278 /// [`Error::RollbackErrorOnCommit`](crate::result::Error::RollbackErrorOnCommit),
279 /// from which you will be able to extract both the original commit error and
280 /// the rollback error.
281 /// In addition, the connection will be considered broken
282 /// as it contains a uncommitted unabortable open transaction. Any further
283 /// interaction with the transaction system will result in an returned error
284 /// in this case.
285 pub fn run<T, E, F>(&mut self, f: F) -> Result<T, E>
286 where
287 F: FnOnce(&mut C) -> Result<T, E>,
288 E: From<Error>,
289 {
290 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
291 self.to_sql(&mut query_builder, &Pg)?;
292 let sql = query_builder.finish();
293
294 AnsiTransactionManager::begin_transaction_sql(&mut *self.connection, &sql)?;
295 match f(&mut *self.connection) {
296 Ok(value) => {
297 AnsiTransactionManager::commit_transaction(&mut *self.connection)?;
298 Ok(value)
299 }
300 Err(user_error) => {
301 match AnsiTransactionManager::rollback_transaction(&mut *self.connection) {
302 Ok(()) => Err(user_error),
303 Err(Error::BrokenTransactionManager) => {
304 // In this case we are probably more interested by the
305 // original error, which likely caused this
306 Err(user_error)
307 }
308 Err(rollback_error) => Err(rollback_error.into()),
309 }
310 }
311 }
312 }
313}
314
315impl<C> QueryFragment<Pg> for TransactionBuilder<'_, C> {
316 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
317 out.push_sql("BEGIN TRANSACTION");
318 if let Some(ref isolation_level) = self.isolation_level {
319 isolation_level.walk_ast(out.reborrow())?;
320 }
321 if let Some(ref read_mode) = self.read_mode {
322 read_mode.walk_ast(out.reborrow())?;
323 }
324 if let Some(ref deferrable) = self.deferrable {
325 deferrable.walk_ast(out.reborrow())?;
326 }
327 Ok(())
328 }
329}
330
331#[derive(Debug, Clone, Copy)]
332enum IsolationLevel {
333 ReadCommitted,
334 RepeatableRead,
335 Serializable,
336}
337
338impl QueryFragment<Pg> for IsolationLevel {
339 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
340 out.push_sql(" ISOLATION LEVEL ");
341 match *self {
342 IsolationLevel::ReadCommitted => out.push_sql("READ COMMITTED"),
343 IsolationLevel::RepeatableRead => out.push_sql("REPEATABLE READ"),
344 IsolationLevel::Serializable => out.push_sql("SERIALIZABLE"),
345 }
346 Ok(())
347 }
348}
349
350#[derive(Debug, Clone, Copy)]
351enum ReadMode {
352 ReadOnly,
353 ReadWrite,
354}
355
356impl QueryFragment<Pg> for ReadMode {
357 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
358 match *self {
359 ReadMode::ReadOnly => out.push_sql(" READ ONLY"),
360 ReadMode::ReadWrite => out.push_sql(" READ WRITE"),
361 }
362 Ok(())
363 }
364}
365
366#[derive(Debug, Clone, Copy)]
367enum Deferrable {
368 Deferrable,
369 NotDeferrable,
370}
371
372impl QueryFragment<Pg> for Deferrable {
373 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
374 match *self {
375 Deferrable::Deferrable => out.push_sql(" DEFERRABLE"),
376 Deferrable::NotDeferrable => out.push_sql(" NOT DEFERRABLE"),
377 }
378 Ok(())
379 }
380}
381
382#[cfg(test)]
383#[diesel_test_helper::test]
384fn test_transaction_builder_generates_correct_sql() {
385 extern crate dotenvy;
386
387 macro_rules! assert_sql {
388 ($query:expr, $sql:expr) => {
389 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
390 $query.to_sql(&mut query_builder, &Pg).unwrap();
391 let sql = query_builder.finish();
392 assert_eq!(sql, $sql);
393 };
394 }
395
396 let database_url = dotenvy::var("PG_DATABASE_URL")
397 .or_else(|_| dotenvy::var("DATABASE_URL"))
398 .expect("DATABASE_URL must be set in order to run tests");
399 let mut conn = PgConnection::establish(&database_url).unwrap();
400
401 assert_sql!(conn.build_transaction(), "BEGIN TRANSACTION");
402 assert_sql!(
403 conn.build_transaction().read_only(),
404 "BEGIN TRANSACTION READ ONLY"
405 );
406 assert_sql!(
407 conn.build_transaction().read_write(),
408 "BEGIN TRANSACTION READ WRITE"
409 );
410 assert_sql!(
411 conn.build_transaction().deferrable(),
412 "BEGIN TRANSACTION DEFERRABLE"
413 );
414 assert_sql!(
415 conn.build_transaction().not_deferrable(),
416 "BEGIN TRANSACTION NOT DEFERRABLE"
417 );
418 assert_sql!(
419 conn.build_transaction().read_committed(),
420 "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"
421 );
422 assert_sql!(
423 conn.build_transaction().repeatable_read(),
424 "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"
425 );
426 assert_sql!(
427 conn.build_transaction().serializable(),
428 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"
429 );
430 assert_sql!(
431 conn.build_transaction()
432 .serializable()
433 .deferrable()
434 .read_only(),
435 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE"
436 );
437}