diesel/pg/transaction.rs
1#![allow(dead_code)]
2use crate::backend::Backend;
3use crate::connection::{AnsiTransactionManager, TransactionManager};
4use crate::pg::Pg;
5use crate::prelude::*;
6use crate::query_builder::{AstPass, QueryBuilder, QueryFragment};
7use crate::result::Error;
8
9/// Used to build a transaction, specifying additional details.
10///
11/// This struct is returned by [`.build_transaction`].
12/// See the documentation for methods on this struct for usage examples.
13/// See [the PostgreSQL documentation for `SET TRANSACTION`][pg-docs]
14/// for details on the behavior of each option.
15///
16/// [`.build_transaction`]: PgConnection::build_transaction()
17/// [pg-docs]: https://www.postgresql.org/docs/current/static/sql-set-transaction.html
18#[allow(missing_debug_implementations)] // False positive. Connection isn't Debug.
19#[must_use = "Transaction builder does nothing unless you call `run` on it"]
20#[cfg(feature = "postgres_backend")]
21pub struct TransactionBuilder<'a, C> {
22 connection: &'a mut C,
23 isolation_level: Option<IsolationLevel>,
24 read_mode: Option<ReadMode>,
25 deferrable: Option<Deferrable>,
26}
27
28impl<'a, C> TransactionBuilder<'a, C>
29where
30 C: Connection<Backend = Pg, TransactionManager = AnsiTransactionManager>,
31{
32 /// Creates a new TransactionBuilder.
33 #[diesel_derives::__diesel_public_if(
34 feature = "i-implement-a-third-party-backend-and-opt-into-breaking-changes"
35 )]
36 pub(crate) fn new(connection: &'a mut C) -> Self {
37 Self {
38 connection,
39 isolation_level: None,
40 read_mode: None,
41 deferrable: None,
42 }
43 }
44
45 /// Makes the transaction `READ ONLY`
46 ///
47 /// # Example
48 ///
49 /// ```rust
50 /// # include!("../doctest_setup.rs");
51 /// # use diesel::sql_query;
52 /// #
53 /// # fn main() {
54 /// # run_test().unwrap();
55 /// # }
56 /// #
57 /// # table! {
58 /// # users_for_read_only {
59 /// # id -> Integer,
60 /// # name -> Text,
61 /// # }
62 /// # }
63 /// #
64 /// # fn run_test() -> QueryResult<()> {
65 /// # use users_for_read_only::table as users;
66 /// # use users_for_read_only::columns::*;
67 /// # let conn = &mut connection_no_transaction();
68 /// # sql_query("CREATE TABLE IF NOT EXISTS users_for_read_only (
69 /// # id SERIAL PRIMARY KEY,
70 /// # name TEXT NOT NULL
71 /// # )").execute(conn)?;
72 /// conn.build_transaction()
73 /// .read_only()
74 /// .run::<_, diesel::result::Error, _>(|conn| {
75 /// let read_attempt = users.select(name).load::<String>(conn);
76 /// assert!(read_attempt.is_ok());
77 ///
78 /// let write_attempt = diesel::insert_into(users)
79 /// .values(name.eq("Ruby"))
80 /// .execute(conn);
81 /// assert!(write_attempt.is_err());
82 ///
83 /// Ok(())
84 /// })?;
85 /// # sql_query("DROP TABLE users_for_read_only").execute(conn)?;
86 /// # Ok(())
87 /// # }
88 /// ```
89 pub fn read_only(mut self) -> Self {
90 self.read_mode = Some(ReadMode::ReadOnly);
91 self
92 }
93
94 /// Makes the transaction `READ WRITE`
95 ///
96 /// This is the default, unless you've changed the
97 /// `default_transaction_read_only` configuration parameter.
98 ///
99 /// # Example
100 ///
101 /// ```rust
102 /// # include!("../doctest_setup.rs");
103 /// # use diesel::result::Error::RollbackTransaction;
104 /// # use diesel::sql_query;
105 /// #
106 /// # fn main() {
107 /// # assert_eq!(run_test(), Err(RollbackTransaction));
108 /// # }
109 /// #
110 /// # fn run_test() -> QueryResult<()> {
111 /// # use schema::users::dsl::*;
112 /// # let conn = &mut connection_no_transaction();
113 /// conn.build_transaction().read_write().run(|conn| {
114 /// # sql_query("CREATE TABLE IF NOT EXISTS users (
115 /// # id SERIAL PRIMARY KEY,
116 /// # name TEXT NOT NULL
117 /// # )").execute(conn)?;
118 /// let read_attempt = users.select(name).load::<String>(conn);
119 /// assert!(read_attempt.is_ok());
120 ///
121 /// let write_attempt = diesel::insert_into(users)
122 /// .values(name.eq("Ruby"))
123 /// .execute(conn);
124 /// assert!(write_attempt.is_ok());
125 ///
126 /// # Err(RollbackTransaction)
127 /// # /*
128 /// Ok(())
129 /// # */
130 /// })
131 /// # }
132 /// ```
133 pub fn read_write(mut self) -> Self {
134 self.read_mode = Some(ReadMode::ReadWrite);
135 self
136 }
137
138 /// Makes the transaction `DEFERRABLE`
139 ///
140 /// # Example
141 ///
142 /// ```rust
143 /// # include!("../doctest_setup.rs");
144 /// #
145 /// # fn main() {
146 /// # run_test().unwrap();
147 /// # }
148 /// #
149 /// # fn run_test() -> QueryResult<()> {
150 /// # use schema::users::dsl::*;
151 /// # let conn = &mut connection_no_transaction();
152 /// conn.build_transaction().deferrable().run(|conn| Ok(()))
153 /// # }
154 /// ```
155 pub fn deferrable(mut self) -> Self {
156 self.deferrable = Some(Deferrable::Deferrable);
157 self
158 }
159
160 /// Makes the transaction `NOT DEFERRABLE`
161 ///
162 /// This is the default, unless you've changed the
163 /// `default_transaction_deferrable` configuration parameter.
164 ///
165 /// # Example
166 ///
167 /// ```rust
168 /// # include!("../doctest_setup.rs");
169 /// #
170 /// # fn main() {
171 /// # run_test().unwrap();
172 /// # }
173 /// #
174 /// # fn run_test() -> QueryResult<()> {
175 /// # use schema::users::dsl::*;
176 /// # let conn = &mut connection_no_transaction();
177 /// conn.build_transaction().not_deferrable().run(|conn| Ok(()))
178 /// # }
179 /// ```
180 pub fn not_deferrable(mut self) -> Self {
181 self.deferrable = Some(Deferrable::NotDeferrable);
182 self
183 }
184
185 /// Makes the transaction `ISOLATION LEVEL READ COMMITTED`
186 ///
187 /// This is the default, unless you've changed the
188 /// `default_transaction_isolation_level` configuration parameter.
189 ///
190 /// # Example
191 ///
192 /// ```rust
193 /// # include!("../doctest_setup.rs");
194 /// #
195 /// # fn main() {
196 /// # run_test().unwrap();
197 /// # }
198 /// #
199 /// # fn run_test() -> QueryResult<()> {
200 /// # use schema::users::dsl::*;
201 /// # let conn = &mut connection_no_transaction();
202 /// conn.build_transaction().read_committed().run(|conn| Ok(()))
203 /// # }
204 /// ```
205 pub fn read_committed(mut self) -> Self {
206 self.isolation_level = Some(IsolationLevel::ReadCommitted);
207 self
208 }
209
210 /// Makes the transaction `ISOLATION LEVEL REPEATABLE READ`
211 ///
212 /// # Example
213 ///
214 /// ```rust
215 /// # include!("../doctest_setup.rs");
216 /// #
217 /// # fn main() {
218 /// # run_test().unwrap();
219 /// # }
220 /// #
221 /// # fn run_test() -> QueryResult<()> {
222 /// # use schema::users::dsl::*;
223 /// # let conn = &mut connection_no_transaction();
224 /// conn.build_transaction()
225 /// .repeatable_read()
226 /// .run(|conn| Ok(()))
227 /// # }
228 /// ```
229 pub fn repeatable_read(mut self) -> Self {
230 self.isolation_level = Some(IsolationLevel::RepeatableRead);
231 self
232 }
233
234 /// Makes the transaction `ISOLATION LEVEL SERIALIZABLE`
235 ///
236 /// # Example
237 ///
238 /// ```rust
239 /// # include!("../doctest_setup.rs");
240 /// #
241 /// # fn main() {
242 /// # run_test().unwrap();
243 /// # }
244 /// #
245 /// # fn run_test() -> QueryResult<()> {
246 /// # use schema::users::dsl::*;
247 /// # let conn = &mut connection_no_transaction();
248 /// conn.build_transaction().serializable().run(|conn| Ok(()))
249 /// # }
250 /// ```
251 pub fn serializable(mut self) -> Self {
252 self.isolation_level = Some(IsolationLevel::Serializable);
253 self
254 }
255
256 /// Runs the given function inside of the transaction
257 /// with the parameters given to this builder.
258 ///
259 /// This function executes the provided closure `f` inside a database
260 /// transaction. If there is already an open transaction for the current
261 /// connection it will return an error. The connection is committed if
262 /// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
263 /// For both cases the original result value will be returned from this function.
264 ///
265 /// If the transaction fails to commit and requires a rollback according to Postgres,
266 /// (e.g. serialization failure) a rollback will be attempted.
267 /// If the rollback fails, the error will be returned in a
268 /// [`Error::RollbackErrorOnCommit`](crate::result::Error::RollbackErrorOnCommit),
269 /// from which you will be able to extract both the original commit error and
270 /// the rollback error.
271 /// In addition, the connection will be considered broken
272 /// as it contains a uncommitted unabortable open transaction. Any further
273 /// interaction with the transaction system will result in an returned error
274 /// in this case.
275 pub fn run<T, E, F>(&mut self, f: F) -> Result<T, E>
276 where
277 F: FnOnce(&mut C) -> Result<T, E>,
278 E: From<Error>,
279 {
280 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
281 self.to_sql(&mut query_builder, &Pg)?;
282 let sql = query_builder.finish();
283
284 AnsiTransactionManager::begin_transaction_sql(&mut *self.connection, &sql)?;
285 match f(&mut *self.connection) {
286 Ok(value) => {
287 AnsiTransactionManager::commit_transaction(&mut *self.connection)?;
288 Ok(value)
289 }
290 Err(user_error) => {
291 match AnsiTransactionManager::rollback_transaction(&mut *self.connection) {
292 Ok(()) => Err(user_error),
293 Err(Error::BrokenTransactionManager) => {
294 // In this case we are probably more interested by the
295 // original error, which likely caused this
296 Err(user_error)
297 }
298 Err(rollback_error) => Err(rollback_error.into()),
299 }
300 }
301 }
302 }
303}
304
305impl<C> QueryFragment<Pg> for TransactionBuilder<'_, C> {
306 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
307 out.push_sql("BEGIN TRANSACTION");
308 if let Some(ref isolation_level) = self.isolation_level {
309 isolation_level.walk_ast(out.reborrow())?;
310 }
311 if let Some(ref read_mode) = self.read_mode {
312 read_mode.walk_ast(out.reborrow())?;
313 }
314 if let Some(ref deferrable) = self.deferrable {
315 deferrable.walk_ast(out.reborrow())?;
316 }
317 Ok(())
318 }
319}
320
321#[derive(Debug, Clone, Copy)]
322enum IsolationLevel {
323 ReadCommitted,
324 RepeatableRead,
325 Serializable,
326}
327
328impl QueryFragment<Pg> for IsolationLevel {
329 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
330 out.push_sql(" ISOLATION LEVEL ");
331 match *self {
332 IsolationLevel::ReadCommitted => out.push_sql("READ COMMITTED"),
333 IsolationLevel::RepeatableRead => out.push_sql("REPEATABLE READ"),
334 IsolationLevel::Serializable => out.push_sql("SERIALIZABLE"),
335 }
336 Ok(())
337 }
338}
339
340#[derive(Debug, Clone, Copy)]
341enum ReadMode {
342 ReadOnly,
343 ReadWrite,
344}
345
346impl QueryFragment<Pg> for ReadMode {
347 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
348 match *self {
349 ReadMode::ReadOnly => out.push_sql(" READ ONLY"),
350 ReadMode::ReadWrite => out.push_sql(" READ WRITE"),
351 }
352 Ok(())
353 }
354}
355
356#[derive(Debug, Clone, Copy)]
357enum Deferrable {
358 Deferrable,
359 NotDeferrable,
360}
361
362impl QueryFragment<Pg> for Deferrable {
363 fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
364 match *self {
365 Deferrable::Deferrable => out.push_sql(" DEFERRABLE"),
366 Deferrable::NotDeferrable => out.push_sql(" NOT DEFERRABLE"),
367 }
368 Ok(())
369 }
370}
371
372#[cfg(test)]
373#[diesel_test_helper::test]
374fn test_transaction_builder_generates_correct_sql() {
375 extern crate dotenvy;
376
377 macro_rules! assert_sql {
378 ($query:expr, $sql:expr) => {
379 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
380 $query.to_sql(&mut query_builder, &Pg).unwrap();
381 let sql = query_builder.finish();
382 assert_eq!(sql, $sql);
383 };
384 }
385
386 let database_url = dotenvy::var("PG_DATABASE_URL")
387 .or_else(|_| dotenvy::var("DATABASE_URL"))
388 .expect("DATABASE_URL must be set in order to run tests");
389 let mut conn = PgConnection::establish(&database_url).unwrap();
390
391 assert_sql!(conn.build_transaction(), "BEGIN TRANSACTION");
392 assert_sql!(
393 conn.build_transaction().read_only(),
394 "BEGIN TRANSACTION READ ONLY"
395 );
396 assert_sql!(
397 conn.build_transaction().read_write(),
398 "BEGIN TRANSACTION READ WRITE"
399 );
400 assert_sql!(
401 conn.build_transaction().deferrable(),
402 "BEGIN TRANSACTION DEFERRABLE"
403 );
404 assert_sql!(
405 conn.build_transaction().not_deferrable(),
406 "BEGIN TRANSACTION NOT DEFERRABLE"
407 );
408 assert_sql!(
409 conn.build_transaction().read_committed(),
410 "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"
411 );
412 assert_sql!(
413 conn.build_transaction().repeatable_read(),
414 "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"
415 );
416 assert_sql!(
417 conn.build_transaction().serializable(),
418 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"
419 );
420 assert_sql!(
421 conn.build_transaction()
422 .serializable()
423 .deferrable()
424 .read_only(),
425 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE"
426 );
427}