diesel/mysql/connection/
mod.rs

1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::iterator::StatementIterator;
8use self::stmt::Statement;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::connection::instrumentation::{DebugQuery, DynInstrumentation, StrQueryHelper};
12use crate::connection::statement_cache::{MaybeCached, StatementCache};
13use crate::connection::*;
14use crate::expression::QueryMetadata;
15use crate::query_builder::bind_collector::RawBytesBindCollector;
16use crate::query_builder::*;
17use crate::result::*;
18use crate::RunQueryDsl;
19
20#[cfg(feature = "mysql")]
21#[allow(missing_debug_implementations, missing_copy_implementations)]
22/// A connection to a MySQL database. Connection URLs should be in the form
23/// `mysql://[user[:password]@]host/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt]`
24///
25///* `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
26///  will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
27///  a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
28/// * `unix_socket` expects the path to the unix socket
29/// * `ssl_ca` accepts a path to the system's certificate roots
30/// * `ssl_cert` accepts a path to the client's certificate file
31/// * `ssl_key` accepts a path to the client's private key file
32/// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
33///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
34///
35/// # Supported loading model implementations
36///
37/// * [`DefaultLoadingMode`]
38///
39/// As `MysqlConnection` only supports a single loading mode implementation
40/// it is **not required** to explicitly specify a loading mode
41/// when calling [`RunQueryDsl::load_iter()`] or [`LoadConnection::load`]
42///
43/// ## DefaultLoadingMode
44///
45/// `MysqlConnection` only supports a single loading mode, which loads
46/// values row by row from the result set.
47///
48/// ```rust
49/// # include!("../../doctest_setup.rs");
50/// #
51/// # fn main() {
52/// #     run_test().unwrap();
53/// # }
54/// #
55/// # fn run_test() -> QueryResult<()> {
56/// #     use schema::users;
57/// #     let connection = &mut establish_connection();
58/// use diesel::connection::DefaultLoadingMode;
59/// {
60///     // scope to restrict the lifetime of the iterator
61///     let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
62///
63///     for r in iter1 {
64///         let (id, name) = r?;
65///         println!("Id: {} Name: {}", id, name);
66///     }
67/// }
68///
69/// // works without specifying the loading mode
70/// let iter2 = users::table.load_iter::<(i32, String), _>(connection)?;
71///
72/// for r in iter2 {
73///     let (id, name) = r?;
74///     println!("Id: {} Name: {}", id, name);
75/// }
76/// #   Ok(())
77/// # }
78/// ```
79///
80/// This mode does **not support** creating
81/// multiple iterators using the same connection.
82///
83/// ```compile_fail
84/// # include!("../../doctest_setup.rs");
85/// #
86/// # fn main() {
87/// #     run_test().unwrap();
88/// # }
89/// #
90/// # fn run_test() -> QueryResult<()> {
91/// #     use schema::users;
92/// #     let connection = &mut establish_connection();
93/// use diesel::connection::DefaultLoadingMode;
94///
95/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
96/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
97///
98/// for r in iter1 {
99///     let (id, name) = r?;
100///     println!("Id: {} Name: {}", id, name);
101/// }
102///
103/// for r in iter2 {
104///     let (id, name) = r?;
105///     println!("Id: {} Name: {}", id, name);
106/// }
107/// #   Ok(())
108/// # }
109/// ```
110pub struct MysqlConnection {
111    raw_connection: RawConnection,
112    transaction_state: AnsiTransactionManager,
113    statement_cache: StatementCache<Mysql, Statement>,
114    instrumentation: DynInstrumentation,
115}
116
117// mysql connection can be shared between threads according to libmysqlclients documentation
118#[allow(unsafe_code)]
119unsafe impl Send for MysqlConnection {}
120
121impl SimpleConnection for MysqlConnection {
122    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
123        self.instrumentation
124            .on_connection_event(InstrumentationEvent::StartQuery {
125                query: &StrQueryHelper::new(query),
126            });
127        let r = self
128            .raw_connection
129            .enable_multi_statements(|| self.raw_connection.execute(query));
130        self.instrumentation
131            .on_connection_event(InstrumentationEvent::FinishQuery {
132                query: &StrQueryHelper::new(query),
133                error: r.as_ref().err(),
134            });
135        r
136    }
137}
138
139impl ConnectionSealed for MysqlConnection {}
140
141impl Connection for MysqlConnection {
142    type Backend = Mysql;
143    type TransactionManager = AnsiTransactionManager;
144
145    /// Establishes a new connection to the MySQL database
146    /// `database_url` may be enhanced by GET parameters
147    /// `mysql://[user[:password]@]host[:port]/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt]`
148    ///
149    /// * `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
150    ///   will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
151    ///   a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
152    /// * `unix_socket` expects the path to the unix socket
153    /// * `ssl_ca` accepts a path to the system's certificate roots
154    /// * `ssl_cert` accepts a path to the client's certificate file
155    /// * `ssl_key` accepts a path to the client's private key file
156    /// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
157    ///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
158    fn establish(database_url: &str) -> ConnectionResult<Self> {
159        let mut instrumentation = DynInstrumentation::default_instrumentation();
160        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
161            url: database_url,
162        });
163
164        let establish_result = Self::establish_inner(database_url);
165        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
166            url: database_url,
167            error: establish_result.as_ref().err(),
168        });
169        let mut conn = establish_result?;
170        conn.instrumentation = instrumentation;
171        Ok(conn)
172    }
173
174    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
175    where
176        T: QueryFragment<Self::Backend> + QueryId,
177    {
178        #[allow(unsafe_code)] // call to unsafe function
179        update_transaction_manager_status(
180            prepared_query(
181                &source,
182                &mut self.statement_cache,
183                &mut self.raw_connection,
184                &mut *self.instrumentation,
185            )
186            .and_then(|stmt| {
187                // we have not called result yet, so calling `execute` is
188                // fine
189                let stmt_use = unsafe { stmt.execute() }?;
190                stmt_use.affected_rows()
191            }),
192            &mut self.transaction_state,
193            &mut self.instrumentation,
194            &crate::debug_query(source),
195        )
196    }
197
198    fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
199        &mut self.transaction_state
200    }
201
202    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
203        &mut *self.instrumentation
204    }
205
206    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
207        self.instrumentation = instrumentation.into();
208    }
209
210    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
211        self.statement_cache.set_cache_size(size);
212    }
213}
214
215#[inline(always)]
216fn update_transaction_manager_status<T>(
217    query_result: QueryResult<T>,
218    transaction_manager: &mut AnsiTransactionManager,
219    instrumentation: &mut DynInstrumentation,
220    query: &dyn DebugQuery,
221) -> QueryResult<T> {
222    fn non_generic_inner(
223        query_result: Result<(), &Error>,
224        transaction_manager: &mut AnsiTransactionManager,
225        instrumentation: &mut DynInstrumentation,
226        query: &dyn DebugQuery,
227    ) {
228        if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result
229        {
230            transaction_manager
231                .status
232                .set_requires_rollback_maybe_up_to_top_level(true)
233        }
234        instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
235            query,
236            error: query_result.err(),
237        });
238    }
239
240    non_generic_inner(
241        query_result.as_ref().map(|_| ()),
242        transaction_manager,
243        instrumentation,
244        query,
245    );
246    query_result
247}
248
249impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
250    type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
251    type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
252
253    fn load<'conn, 'query, T>(
254        &'conn mut self,
255        source: T,
256    ) -> QueryResult<Self::Cursor<'conn, 'query>>
257    where
258        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
259        Self::Backend: QueryMetadata<T::SqlType>,
260    {
261        update_transaction_manager_status(
262            prepared_query(
263                &source,
264                &mut self.statement_cache,
265                &mut self.raw_connection,
266                &mut *self.instrumentation,
267            )
268            .and_then(|stmt| {
269                let mut metadata = Vec::new();
270                Mysql::row_metadata(&mut (), &mut metadata);
271                StatementIterator::from_stmt(stmt, &metadata)
272            }),
273            &mut self.transaction_state,
274            &mut self.instrumentation,
275            &crate::debug_query(&source),
276        )
277    }
278}
279
280#[cfg(feature = "r2d2")]
281impl crate::r2d2::R2D2Connection for MysqlConnection {
282    fn ping(&mut self) -> QueryResult<()> {
283        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
284    }
285
286    fn is_broken(&mut self) -> bool {
287        AnsiTransactionManager::is_broken_transaction_manager(self)
288    }
289}
290
291impl MultiConnectionHelper for MysqlConnection {
292    fn to_any<'a>(
293        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
294    ) -> &mut (dyn std::any::Any + 'a) {
295        lookup
296    }
297
298    fn from_any(
299        lookup: &mut dyn std::any::Any,
300    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
301        lookup.downcast_mut()
302    }
303}
304
305fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
306    source: &'_ T,
307    statement_cache: &'a mut StatementCache<Mysql, Statement>,
308    raw_connection: &'a mut RawConnection,
309    instrumentation: &mut dyn Instrumentation,
310) -> QueryResult<MaybeCached<'a, Statement>> {
311    instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
312        query: &crate::debug_query(source),
313    });
314    let mut stmt = statement_cache.cached_statement(
315        source,
316        &Mysql,
317        &[],
318        &*raw_connection,
319        RawConnection::prepare,
320        instrumentation,
321    )?;
322
323    let mut bind_collector = RawBytesBindCollector::new();
324    source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
325    let binds = bind_collector
326        .metadata
327        .into_iter()
328        .zip(bind_collector.binds);
329    stmt.bind(binds)?;
330    Ok(stmt)
331}
332
333impl MysqlConnection {
334    fn set_config_options(&mut self) -> QueryResult<()> {
335        crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
336        crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
337        crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
338        crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
339        Ok(())
340    }
341
342    fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
343        use crate::ConnectionError::CouldntSetupConfiguration;
344
345        let raw_connection = RawConnection::new();
346        let connection_options = ConnectionOptions::parse(database_url)?;
347        raw_connection.connect(&connection_options)?;
348        let mut conn = MysqlConnection {
349            raw_connection,
350            transaction_state: AnsiTransactionManager::default(),
351            statement_cache: StatementCache::new(),
352            instrumentation: DynInstrumentation::none(),
353        };
354        conn.set_config_options()
355            .map_err(CouldntSetupConfiguration)?;
356        Ok(conn)
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    extern crate dotenvy;
363
364    use super::*;
365    use std::env;
366
367    fn connection() -> MysqlConnection {
368        dotenvy::dotenv().ok();
369        let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
370            .or_else(|_| env::var("MYSQL_DATABASE_URL"))
371            .or_else(|_| env::var("DATABASE_URL"))
372            .expect("DATABASE_URL must be set in order to run unit tests");
373        MysqlConnection::establish(&database_url).unwrap()
374    }
375
376    #[diesel_test_helper::test]
377    fn batch_execute_handles_single_queries_with_results() {
378        let connection = &mut connection();
379        assert!(connection.batch_execute("SELECT 1").is_ok());
380        assert!(connection.batch_execute("SELECT 1").is_ok());
381    }
382
383    #[diesel_test_helper::test]
384    fn batch_execute_handles_multi_queries_with_results() {
385        let connection = &mut connection();
386        let query = "SELECT 1; SELECT 2; SELECT 3;";
387        assert!(connection.batch_execute(query).is_ok());
388        assert!(connection.batch_execute(query).is_ok());
389    }
390
391    #[diesel_test_helper::test]
392    fn execute_handles_queries_which_return_results() {
393        let connection = &mut connection();
394        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
395        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
396    }
397
398    #[diesel_test_helper::test]
399    fn check_client_found_rows_flag() {
400        let conn = &mut crate::test_helpers::connection();
401        crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
402            .execute(conn)
403            .unwrap();
404
405        crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
406            .execute(conn)
407            .unwrap();
408
409        crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
410            .execute(conn)
411            .unwrap();
412
413        let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
414            .execute(conn)
415            .unwrap();
416
417        assert_eq!(output, 1);
418    }
419}