diesel/mysql/connection/
mod.rs

1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::iterator::StatementIterator;
8use self::stmt::Statement;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::connection::instrumentation::DebugQuery;
12use crate::connection::instrumentation::StrQueryHelper;
13use crate::connection::statement_cache::{MaybeCached, StatementCache};
14use crate::connection::*;
15use crate::expression::QueryMetadata;
16use crate::query_builder::bind_collector::RawBytesBindCollector;
17use crate::query_builder::*;
18use crate::result::*;
19use crate::RunQueryDsl;
20
21#[cfg(feature = "mysql")]
22#[allow(missing_debug_implementations, missing_copy_implementations)]
23/// A connection to a MySQL database. Connection URLs should be in the form
24/// `mysql://[user[:password]@]host/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt]`
25///
26///* `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
27///   will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
28///   a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
29/// * `unix_socket` expects the path to the unix socket
30/// * `ssl_ca` accepts a path to the system's certificate roots
31/// * `ssl_cert` accepts a path to the client's certificate file
32/// * `ssl_key` accepts a path to the client's private key file
33/// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
34///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
35///
36/// # Supported loading model implementations
37///
38/// * [`DefaultLoadingMode`]
39///
40/// As `MysqlConnection` only supports a single loading mode implementation
41/// it is **not required** to explicitly specify a loading mode
42/// when calling [`RunQueryDsl::load_iter()`] or [`LoadConnection::load`]
43///
44/// ## DefaultLoadingMode
45///
46/// `MysqlConnection` only supports a single loading mode, which loads
47/// values row by row from the result set.
48///
49/// ```rust
50/// # include!("../../doctest_setup.rs");
51/// #
52/// # fn main() {
53/// #     run_test().unwrap();
54/// # }
55/// #
56/// # fn run_test() -> QueryResult<()> {
57/// #     use schema::users;
58/// #     let connection = &mut establish_connection();
59/// use diesel::connection::DefaultLoadingMode;
60/// { // scope to restrict the lifetime of the iterator
61///     let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
62///
63///     for r in iter1 {
64///         let (id, name) = r?;
65///         println!("Id: {} Name: {}", id, name);
66///     }
67/// }
68///
69/// // works without specifying the loading mode
70/// let iter2 = users::table.load_iter::<(i32, String), _>(connection)?;
71///
72/// for r in iter2 {
73///     let (id, name) = r?;
74///     println!("Id: {} Name: {}", id, name);
75/// }
76/// #   Ok(())
77/// # }
78/// ```
79///
80/// This mode does **not support** creating
81/// multiple iterators using the same connection.
82///
83/// ```compile_fail
84/// # include!("../../doctest_setup.rs");
85/// #
86/// # fn main() {
87/// #     run_test().unwrap();
88/// # }
89/// #
90/// # fn run_test() -> QueryResult<()> {
91/// #     use schema::users;
92/// #     let connection = &mut establish_connection();
93/// use diesel::connection::DefaultLoadingMode;
94///
95/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
96/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
97///
98/// for r in iter1 {
99///     let (id, name) = r?;
100///     println!("Id: {} Name: {}", id, name);
101/// }
102///
103/// for r in iter2 {
104///     let (id, name) = r?;
105///     println!("Id: {} Name: {}", id, name);
106/// }
107/// #   Ok(())
108/// # }
109/// ```
110pub struct MysqlConnection {
111    raw_connection: RawConnection,
112    transaction_state: AnsiTransactionManager,
113    statement_cache: StatementCache<Mysql, Statement>,
114    instrumentation: Option<Box<dyn Instrumentation>>,
115}
116
117// mysql connection can be shared between threads according to libmysqlclients documentation
118#[allow(unsafe_code)]
119unsafe impl Send for MysqlConnection {}
120
121impl SimpleConnection for MysqlConnection {
122    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
123        self.instrumentation
124            .on_connection_event(InstrumentationEvent::StartQuery {
125                query: &StrQueryHelper::new(query),
126            });
127        let r = self
128            .raw_connection
129            .enable_multi_statements(|| self.raw_connection.execute(query));
130        self.instrumentation
131            .on_connection_event(InstrumentationEvent::FinishQuery {
132                query: &StrQueryHelper::new(query),
133                error: r.as_ref().err(),
134            });
135        r
136    }
137}
138
139impl ConnectionSealed for MysqlConnection {}
140
141impl Connection for MysqlConnection {
142    type Backend = Mysql;
143    type TransactionManager = AnsiTransactionManager;
144
145    /// Establishes a new connection to the MySQL database
146    /// `database_url` may be enhanced by GET parameters
147    /// `mysql://[user[:password]@]host[:port]/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt]`
148    ///
149    /// * `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
150    ///   will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
151    ///   a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
152    /// * `unix_socket` expects the path to the unix socket
153    /// * `ssl_ca` accepts a path to the system's certificate roots
154    /// * `ssl_cert` accepts a path to the client's certificate file
155    /// * `ssl_key` accepts a path to the client's private key file
156    /// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
157    ///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
158    fn establish(database_url: &str) -> ConnectionResult<Self> {
159        let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation();
160        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
161            url: database_url,
162        });
163
164        let establish_result = Self::establish_inner(database_url);
165        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
166            url: database_url,
167            error: establish_result.as_ref().err(),
168        });
169        let mut conn = establish_result?;
170        conn.instrumentation = instrumentation;
171        Ok(conn)
172    }
173
174    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
175    where
176        T: QueryFragment<Self::Backend> + QueryId,
177    {
178        #[allow(unsafe_code)] // call to unsafe function
179        update_transaction_manager_status(
180            prepared_query(
181                &source,
182                &mut self.statement_cache,
183                &mut self.raw_connection,
184                &mut self.instrumentation,
185            )
186            .and_then(|stmt| {
187                // we have not called result yet, so calling `execute` is
188                // fine
189                let stmt_use = unsafe { stmt.execute() }?;
190                stmt_use.affected_rows()
191            }),
192            &mut self.transaction_state,
193            &mut self.instrumentation,
194            &crate::debug_query(source),
195        )
196    }
197
198    fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
199        &mut self.transaction_state
200    }
201
202    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
203        &mut self.instrumentation
204    }
205
206    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
207        self.instrumentation = Some(Box::new(instrumentation));
208    }
209}
210
211#[inline(always)]
212fn update_transaction_manager_status<T>(
213    query_result: QueryResult<T>,
214    transaction_manager: &mut AnsiTransactionManager,
215    instrumentation: &mut Option<Box<dyn Instrumentation>>,
216    query: &dyn DebugQuery,
217) -> QueryResult<T> {
218    if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result {
219        transaction_manager
220            .status
221            .set_requires_rollback_maybe_up_to_top_level(true)
222    }
223    instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
224        query,
225        error: query_result.as_ref().err(),
226    });
227    query_result
228}
229
230impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
231    type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
232    type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
233
234    fn load<'conn, 'query, T>(
235        &'conn mut self,
236        source: T,
237    ) -> QueryResult<Self::Cursor<'conn, 'query>>
238    where
239        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
240        Self::Backend: QueryMetadata<T::SqlType>,
241    {
242        update_transaction_manager_status(
243            prepared_query(
244                &source,
245                &mut self.statement_cache,
246                &mut self.raw_connection,
247                &mut self.instrumentation,
248            )
249            .and_then(|stmt| {
250                let mut metadata = Vec::new();
251                Mysql::row_metadata(&mut (), &mut metadata);
252                StatementIterator::from_stmt(stmt, &metadata)
253            }),
254            &mut self.transaction_state,
255            &mut self.instrumentation,
256            &crate::debug_query(&source),
257        )
258    }
259}
260
261#[cfg(feature = "r2d2")]
262impl crate::r2d2::R2D2Connection for MysqlConnection {
263    fn ping(&mut self) -> QueryResult<()> {
264        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
265    }
266
267    fn is_broken(&mut self) -> bool {
268        AnsiTransactionManager::is_broken_transaction_manager(self)
269    }
270}
271
272impl MultiConnectionHelper for MysqlConnection {
273    fn to_any<'a>(
274        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
275    ) -> &mut (dyn std::any::Any + 'a) {
276        lookup
277    }
278
279    fn from_any(
280        lookup: &mut dyn std::any::Any,
281    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
282        lookup.downcast_mut()
283    }
284}
285
286fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
287    source: &'_ T,
288    statement_cache: &'a mut StatementCache<Mysql, Statement>,
289    raw_connection: &'a mut RawConnection,
290    instrumentation: &mut dyn Instrumentation,
291) -> QueryResult<MaybeCached<'a, Statement>> {
292    instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
293        query: &crate::debug_query(source),
294    });
295    let mut stmt = statement_cache.cached_statement(
296        source,
297        &Mysql,
298        &[],
299        |sql, _| raw_connection.prepare(sql),
300        instrumentation,
301    )?;
302
303    let mut bind_collector = RawBytesBindCollector::new();
304    source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
305    let binds = bind_collector
306        .metadata
307        .into_iter()
308        .zip(bind_collector.binds);
309    stmt.bind(binds)?;
310    Ok(stmt)
311}
312
313impl MysqlConnection {
314    fn set_config_options(&mut self) -> QueryResult<()> {
315        crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
316        crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
317        crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
318        crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
319        Ok(())
320    }
321
322    fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
323        use crate::ConnectionError::CouldntSetupConfiguration;
324
325        let raw_connection = RawConnection::new();
326        let connection_options = ConnectionOptions::parse(database_url)?;
327        raw_connection.connect(&connection_options)?;
328        let mut conn = MysqlConnection {
329            raw_connection,
330            transaction_state: AnsiTransactionManager::default(),
331            statement_cache: StatementCache::new(),
332            instrumentation: None,
333        };
334        conn.set_config_options()
335            .map_err(CouldntSetupConfiguration)?;
336        Ok(conn)
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    extern crate dotenvy;
343
344    use super::*;
345    use std::env;
346
347    fn connection() -> MysqlConnection {
348        dotenvy::dotenv().ok();
349        let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
350            .or_else(|_| env::var("MYSQL_DATABASE_URL"))
351            .or_else(|_| env::var("DATABASE_URL"))
352            .expect("DATABASE_URL must be set in order to run unit tests");
353        MysqlConnection::establish(&database_url).unwrap()
354    }
355
356    #[test]
357    fn batch_execute_handles_single_queries_with_results() {
358        let connection = &mut connection();
359        assert!(connection.batch_execute("SELECT 1").is_ok());
360        assert!(connection.batch_execute("SELECT 1").is_ok());
361    }
362
363    #[test]
364    fn batch_execute_handles_multi_queries_with_results() {
365        let connection = &mut connection();
366        let query = "SELECT 1; SELECT 2; SELECT 3;";
367        assert!(connection.batch_execute(query).is_ok());
368        assert!(connection.batch_execute(query).is_ok());
369    }
370
371    #[test]
372    fn execute_handles_queries_which_return_results() {
373        let connection = &mut connection();
374        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
375        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
376    }
377
378    #[test]
379    fn check_client_found_rows_flag() {
380        let conn = &mut crate::test_helpers::connection();
381        crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
382            .execute(conn)
383            .unwrap();
384
385        crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
386            .execute(conn)
387            .unwrap();
388
389        crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
390            .execute(conn)
391            .unwrap();
392
393        let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
394            .execute(conn)
395            .unwrap();
396
397        assert_eq!(output, 1);
398    }
399}