Skip to main content

diesel/mysql/connection/
mod.rs

1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::Statement;
8use self::stmt::iterator::StatementIterator;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::RunQueryDsl;
12use crate::connection::instrumentation::{DebugQuery, DynInstrumentation, StrQueryHelper};
13use crate::connection::statement_cache::{MaybeCached, StatementCache};
14use crate::connection::*;
15use crate::expression::QueryMetadata;
16use crate::query_builder::bind_collector::RawBytesBindCollector;
17use crate::query_builder::*;
18use crate::result::*;
19
20#[cfg(feature = "mysql")]
21#[allow(missing_debug_implementations, missing_copy_implementations)]
22/// A connection to a MySQL database. Connection URLs should be in the form
23/// `mysql://[user[:password]@]host/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt]`
24///
25///* `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
26///  will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
27///  a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
28/// * `unix_socket` expects the path to the unix socket
29/// * `ssl_ca` accepts a path to the system's certificate roots
30/// * `ssl_cert` accepts a path to the client's certificate file
31/// * `ssl_key` accepts a path to the client's private key file
32/// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
33///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
34///
35/// # Supported loading model implementations
36///
37/// * [`DefaultLoadingMode`]
38///
39/// As `MysqlConnection` only supports a single loading mode implementation
40/// it is **not required** to explicitly specify a loading mode
41/// when calling [`RunQueryDsl::load_iter()`] or [`LoadConnection::load`]
42///
43/// ## DefaultLoadingMode
44///
45/// `MysqlConnection` only supports a single loading mode, which loads
46/// values row by row from the result set.
47///
48/// ```rust
49/// # include!("../../doctest_setup.rs");
50/// #
51/// # fn main() {
52/// #     run_test().unwrap();
53/// # }
54/// #
55/// # fn run_test() -> QueryResult<()> {
56/// #     use schema::users;
57/// #     let connection = &mut establish_connection();
58/// use diesel::connection::DefaultLoadingMode;
59/// {
60///     // scope to restrict the lifetime of the iterator
61///     let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
62///
63///     for r in iter1 {
64///         let (id, name) = r?;
65///         println!("Id: {} Name: {}", id, name);
66///     }
67/// }
68///
69/// // works without specifying the loading mode
70/// let iter2 = users::table.load_iter::<(i32, String), _>(connection)?;
71///
72/// for r in iter2 {
73///     let (id, name) = r?;
74///     println!("Id: {} Name: {}", id, name);
75/// }
76/// #   Ok(())
77/// # }
78/// ```
79///
80/// This mode does **not support** creating
81/// multiple iterators using the same connection.
82///
83/// ```compile_fail
84/// # include!("../../doctest_setup.rs");
85/// #
86/// # fn main() {
87/// #     run_test().unwrap();
88/// # }
89/// #
90/// # fn run_test() -> QueryResult<()> {
91/// #     use schema::users;
92/// #     let connection = &mut establish_connection();
93/// use diesel::connection::DefaultLoadingMode;
94///
95/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
96/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
97///
98/// for r in iter1 {
99///     let (id, name) = r?;
100///     println!("Id: {} Name: {}", id, name);
101/// }
102///
103/// for r in iter2 {
104///     let (id, name) = r?;
105///     println!("Id: {} Name: {}", id, name);
106/// }
107/// #   Ok(())
108/// # }
109/// ```
110pub struct MysqlConnection {
111    raw_connection: RawConnection,
112    transaction_state: AnsiTransactionManager,
113    statement_cache: StatementCache<Mysql, Statement>,
114    instrumentation: DynInstrumentation,
115}
116
117// mysql connection can be shared between threads according to libmysqlclients documentation
118#[allow(unsafe_code)]
119unsafe impl Send for MysqlConnection {}
120
121impl SimpleConnection for MysqlConnection {
122    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
123        self.instrumentation
124            .on_connection_event(InstrumentationEvent::StartQuery {
125                query: &StrQueryHelper::new(query),
126            });
127        let r = self
128            .raw_connection
129            .enable_multi_statements(|| self.raw_connection.execute(query));
130        self.instrumentation
131            .on_connection_event(InstrumentationEvent::FinishQuery {
132                query: &StrQueryHelper::new(query),
133                error: r.as_ref().err(),
134            });
135        r
136    }
137}
138
139impl ConnectionSealed for MysqlConnection {}
140
141impl Connection for MysqlConnection {
142    type Backend = Mysql;
143    type TransactionManager = AnsiTransactionManager;
144
145    /// Establishes a new connection to the MySQL database
146    /// `database_url` may be enhanced by GET parameters
147    /// `mysql://[user[:password]@]host[:port]/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt&local_infile=true]`
148    ///
149    /// * `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
150    ///   will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
151    ///   a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
152    /// * `unix_socket` expects the path to the unix socket
153    /// * `ssl_ca` accepts a path to the system's certificate roots
154    /// * `ssl_cert` accepts a path to the client's certificate file
155    /// * `ssl_key` accepts a path to the client's private key file
156    /// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
157    ///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
158    /// * `local_infile` expects a boolean to enable or disable LOAD DATA LOCAL
159    fn establish(database_url: &str) -> ConnectionResult<Self> {
160        let mut instrumentation = DynInstrumentation::default_instrumentation();
161        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
162            url: database_url,
163        });
164
165        let establish_result = Self::establish_inner(database_url);
166        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
167            url: database_url,
168            error: establish_result.as_ref().err(),
169        });
170        let mut conn = establish_result?;
171        conn.instrumentation = instrumentation;
172        Ok(conn)
173    }
174
175    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
176    where
177        T: QueryFragment<Self::Backend> + QueryId,
178    {
179        #[allow(unsafe_code)] // call to unsafe function
180        update_transaction_manager_status(
181            prepared_query(
182                &source,
183                &mut self.statement_cache,
184                &mut self.raw_connection,
185                &mut *self.instrumentation,
186            )
187            .and_then(|stmt| {
188                // we have not called result yet, so calling `execute` is
189                // fine
190                let stmt_use = unsafe { stmt.execute() }?;
191                stmt_use.affected_rows()
192            }),
193            &mut self.transaction_state,
194            &mut self.instrumentation,
195            &crate::debug_query(source),
196        )
197    }
198
199    fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
200        &mut self.transaction_state
201    }
202
203    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
204        &mut *self.instrumentation
205    }
206
207    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
208        self.instrumentation = instrumentation.into();
209    }
210
211    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
212        self.statement_cache.set_cache_size(size);
213    }
214}
215
216#[inline(always)]
217fn update_transaction_manager_status<T>(
218    query_result: QueryResult<T>,
219    transaction_manager: &mut AnsiTransactionManager,
220    instrumentation: &mut DynInstrumentation,
221    query: &dyn DebugQuery,
222) -> QueryResult<T> {
223    fn non_generic_inner(
224        query_result: Result<(), &Error>,
225        transaction_manager: &mut AnsiTransactionManager,
226        instrumentation: &mut DynInstrumentation,
227        query: &dyn DebugQuery,
228    ) {
229        if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result
230        {
231            transaction_manager
232                .status
233                .set_requires_rollback_maybe_up_to_top_level(true)
234        }
235        instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
236            query,
237            error: query_result.err(),
238        });
239    }
240
241    non_generic_inner(
242        query_result.as_ref().map(|_| ()),
243        transaction_manager,
244        instrumentation,
245        query,
246    );
247    query_result
248}
249
250impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
251    type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
252    type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
253
254    fn load<'conn, 'query, T>(
255        &'conn mut self,
256        source: T,
257    ) -> QueryResult<Self::Cursor<'conn, 'query>>
258    where
259        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
260        Self::Backend: QueryMetadata<T::SqlType>,
261    {
262        update_transaction_manager_status(
263            prepared_query(
264                &source,
265                &mut self.statement_cache,
266                &mut self.raw_connection,
267                &mut *self.instrumentation,
268            )
269            .and_then(|stmt| {
270                let mut metadata = Vec::new();
271                Mysql::row_metadata(&mut (), &mut metadata);
272                StatementIterator::from_stmt(stmt, &metadata)
273            }),
274            &mut self.transaction_state,
275            &mut self.instrumentation,
276            &crate::debug_query(&source),
277        )
278    }
279}
280
281#[cfg(feature = "r2d2")]
282impl crate::r2d2::R2D2Connection for MysqlConnection {
283    fn ping(&mut self) -> QueryResult<()> {
284        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
285    }
286
287    fn is_broken(&mut self) -> bool {
288        AnsiTransactionManager::is_broken_transaction_manager(self)
289    }
290}
291
292impl MultiConnectionHelper for MysqlConnection {
293    fn to_any<'a>(
294        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
295    ) -> &mut (dyn core::any::Any + 'a) {
296        lookup
297    }
298
299    fn from_any(
300        lookup: &mut dyn core::any::Any,
301    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
302        lookup.downcast_mut()
303    }
304}
305
306fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
307    source: &'_ T,
308    statement_cache: &'a mut StatementCache<Mysql, Statement>,
309    raw_connection: &'a mut RawConnection,
310    instrumentation: &mut dyn Instrumentation,
311) -> QueryResult<MaybeCached<'a, Statement>> {
312    instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
313        query: &crate::debug_query(source),
314    });
315    let mut stmt = statement_cache.cached_statement(
316        source,
317        &Mysql,
318        &[],
319        &*raw_connection,
320        RawConnection::prepare,
321        instrumentation,
322    )?;
323
324    let mut bind_collector = RawBytesBindCollector::new();
325    source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
326    let binds = bind_collector
327        .metadata
328        .into_iter()
329        .zip(bind_collector.binds);
330    stmt.bind(binds)?;
331    Ok(stmt)
332}
333
334impl MysqlConnection {
335    fn set_config_options(&mut self) -> QueryResult<()> {
336        crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
337        crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
338        crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
339        crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
340        Ok(())
341    }
342
343    fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
344        use crate::ConnectionError::CouldntSetupConfiguration;
345
346        let raw_connection = RawConnection::new();
347        let connection_options = ConnectionOptions::parse(database_url)?;
348        raw_connection.connect(&connection_options)?;
349        let mut conn = MysqlConnection {
350            raw_connection,
351            transaction_state: AnsiTransactionManager::default(),
352            statement_cache: StatementCache::new(),
353            instrumentation: DynInstrumentation::none(),
354        };
355        conn.set_config_options()
356            .map_err(CouldntSetupConfiguration)?;
357        Ok(conn)
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    extern crate dotenvy;
364
365    use super::*;
366    use std::env;
367
368    fn connection() -> MysqlConnection {
369        dotenvy::dotenv().ok();
370        let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
371            .or_else(|_| env::var("MYSQL_DATABASE_URL"))
372            .or_else(|_| env::var("DATABASE_URL"))
373            .expect("DATABASE_URL must be set in order to run unit tests");
374        MysqlConnection::establish(&database_url).unwrap()
375    }
376
377    #[diesel_test_helper::test]
378    fn batch_execute_handles_single_queries_with_results() {
379        let connection = &mut connection();
380        assert!(connection.batch_execute("SELECT 1").is_ok());
381        assert!(connection.batch_execute("SELECT 1").is_ok());
382    }
383
384    #[diesel_test_helper::test]
385    fn batch_execute_handles_multi_queries_with_results() {
386        let connection = &mut connection();
387        let query = "SELECT 1; SELECT 2; SELECT 3;";
388        assert!(connection.batch_execute(query).is_ok());
389        assert!(connection.batch_execute(query).is_ok());
390    }
391
392    #[diesel_test_helper::test]
393    fn execute_handles_queries_which_return_results() {
394        let connection = &mut connection();
395        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
396        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
397    }
398
399    #[diesel_test_helper::test]
400    fn check_client_found_rows_flag() {
401        let conn = &mut crate::test_helpers::connection();
402        crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
403            .execute(conn)
404            .unwrap();
405
406        crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
407            .execute(conn)
408            .unwrap();
409
410        crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
411            .execute(conn)
412            .unwrap();
413
414        let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
415            .execute(conn)
416            .unwrap();
417
418        assert_eq!(output, 1);
419    }
420}