diesel/mysql/connection/
mod.rs

1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::iterator::StatementIterator;
8use self::stmt::Statement;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::connection::instrumentation::{DebugQuery, DynInstrumentation, StrQueryHelper};
12use crate::connection::statement_cache::{MaybeCached, StatementCache};
13use crate::connection::*;
14use crate::expression::QueryMetadata;
15use crate::query_builder::bind_collector::RawBytesBindCollector;
16use crate::query_builder::*;
17use crate::result::*;
18use crate::RunQueryDsl;
19
20#[cfg(feature = "mysql")]
21#[allow(missing_debug_implementations, missing_copy_implementations)]
22/// A connection to a MySQL database. Connection URLs should be in the form
23/// `mysql://[user[:password]@]host/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt]`
24///
25///* `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
26///  will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
27///  a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
28/// * `unix_socket` expects the path to the unix socket
29/// * `ssl_ca` accepts a path to the system's certificate roots
30/// * `ssl_cert` accepts a path to the client's certificate file
31/// * `ssl_key` accepts a path to the client's private key file
32/// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
33///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
34///
35/// # Supported loading model implementations
36///
37/// * [`DefaultLoadingMode`]
38///
39/// As `MysqlConnection` only supports a single loading mode implementation
40/// it is **not required** to explicitly specify a loading mode
41/// when calling [`RunQueryDsl::load_iter()`] or [`LoadConnection::load`]
42///
43/// ## DefaultLoadingMode
44///
45/// `MysqlConnection` only supports a single loading mode, which loads
46/// values row by row from the result set.
47///
48/// ```rust
49/// # include!("../../doctest_setup.rs");
50/// #
51/// # fn main() {
52/// #     run_test().unwrap();
53/// # }
54/// #
55/// # fn run_test() -> QueryResult<()> {
56/// #     use schema::users;
57/// #     let connection = &mut establish_connection();
58/// use diesel::connection::DefaultLoadingMode;
59/// { // scope to restrict the lifetime of the iterator
60///     let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
61///
62///     for r in iter1 {
63///         let (id, name) = r?;
64///         println!("Id: {} Name: {}", id, name);
65///     }
66/// }
67///
68/// // works without specifying the loading mode
69/// let iter2 = users::table.load_iter::<(i32, String), _>(connection)?;
70///
71/// for r in iter2 {
72///     let (id, name) = r?;
73///     println!("Id: {} Name: {}", id, name);
74/// }
75/// #   Ok(())
76/// # }
77/// ```
78///
79/// This mode does **not support** creating
80/// multiple iterators using the same connection.
81///
82/// ```compile_fail
83/// # include!("../../doctest_setup.rs");
84/// #
85/// # fn main() {
86/// #     run_test().unwrap();
87/// # }
88/// #
89/// # fn run_test() -> QueryResult<()> {
90/// #     use schema::users;
91/// #     let connection = &mut establish_connection();
92/// use diesel::connection::DefaultLoadingMode;
93///
94/// let iter1 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
95/// let iter2 = users::table.load_iter::<(i32, String), DefaultLoadingMode>(connection)?;
96///
97/// for r in iter1 {
98///     let (id, name) = r?;
99///     println!("Id: {} Name: {}", id, name);
100/// }
101///
102/// for r in iter2 {
103///     let (id, name) = r?;
104///     println!("Id: {} Name: {}", id, name);
105/// }
106/// #   Ok(())
107/// # }
108/// ```
109pub struct MysqlConnection {
110    raw_connection: RawConnection,
111    transaction_state: AnsiTransactionManager,
112    statement_cache: StatementCache<Mysql, Statement>,
113    instrumentation: DynInstrumentation,
114}
115
116// mysql connection can be shared between threads according to libmysqlclients documentation
117#[allow(unsafe_code)]
118unsafe impl Send for MysqlConnection {}
119
120impl SimpleConnection for MysqlConnection {
121    fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
122        self.instrumentation
123            .on_connection_event(InstrumentationEvent::StartQuery {
124                query: &StrQueryHelper::new(query),
125            });
126        let r = self
127            .raw_connection
128            .enable_multi_statements(|| self.raw_connection.execute(query));
129        self.instrumentation
130            .on_connection_event(InstrumentationEvent::FinishQuery {
131                query: &StrQueryHelper::new(query),
132                error: r.as_ref().err(),
133            });
134        r
135    }
136}
137
138impl ConnectionSealed for MysqlConnection {}
139
140impl Connection for MysqlConnection {
141    type Backend = Mysql;
142    type TransactionManager = AnsiTransactionManager;
143
144    /// Establishes a new connection to the MySQL database
145    /// `database_url` may be enhanced by GET parameters
146    /// `mysql://[user[:password]@]host[:port]/database_name[?unix_socket=socket-path&ssl_mode=SSL_MODE*&ssl_ca=/etc/ssl/certs/ca-certificates.crt&ssl_cert=/etc/ssl/certs/client-cert.crt&ssl_key=/etc/ssl/certs/client-key.crt]`
147    ///
148    /// * `host` can be an IP address or a hostname. If it is set to `localhost`, a connection
149    ///   will be attempted through the socket at `/tmp/mysql.sock`. If you want to connect to
150    ///   a local server via TCP (e.g. docker containers), use `0.0.0.0` or `127.0.0.1` instead.
151    /// * `unix_socket` expects the path to the unix socket
152    /// * `ssl_ca` accepts a path to the system's certificate roots
153    /// * `ssl_cert` accepts a path to the client's certificate file
154    /// * `ssl_key` accepts a path to the client's private key file
155    /// * `ssl_mode` expects a value defined for MySQL client command option `--ssl-mode`
156    ///   See <https://dev.mysql.com/doc/refman/5.7/en/connection-options.html#option_general_ssl-mode>
157    fn establish(database_url: &str) -> ConnectionResult<Self> {
158        let mut instrumentation = DynInstrumentation::default_instrumentation();
159        instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
160            url: database_url,
161        });
162
163        let establish_result = Self::establish_inner(database_url);
164        instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
165            url: database_url,
166            error: establish_result.as_ref().err(),
167        });
168        let mut conn = establish_result?;
169        conn.instrumentation = instrumentation;
170        Ok(conn)
171    }
172
173    fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
174    where
175        T: QueryFragment<Self::Backend> + QueryId,
176    {
177        #[allow(unsafe_code)] // call to unsafe function
178        update_transaction_manager_status(
179            prepared_query(
180                &source,
181                &mut self.statement_cache,
182                &mut self.raw_connection,
183                &mut *self.instrumentation,
184            )
185            .and_then(|stmt| {
186                // we have not called result yet, so calling `execute` is
187                // fine
188                let stmt_use = unsafe { stmt.execute() }?;
189                stmt_use.affected_rows()
190            }),
191            &mut self.transaction_state,
192            &mut self.instrumentation,
193            &crate::debug_query(source),
194        )
195    }
196
197    fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
198        &mut self.transaction_state
199    }
200
201    fn instrumentation(&mut self) -> &mut dyn Instrumentation {
202        &mut *self.instrumentation
203    }
204
205    fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
206        self.instrumentation = instrumentation.into();
207    }
208
209    fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
210        self.statement_cache.set_cache_size(size);
211    }
212}
213
214#[inline(always)]
215fn update_transaction_manager_status<T>(
216    query_result: QueryResult<T>,
217    transaction_manager: &mut AnsiTransactionManager,
218    instrumentation: &mut DynInstrumentation,
219    query: &dyn DebugQuery,
220) -> QueryResult<T> {
221    if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result {
222        transaction_manager
223            .status
224            .set_requires_rollback_maybe_up_to_top_level(true)
225    }
226    instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
227        query,
228        error: query_result.as_ref().err(),
229    });
230    query_result
231}
232
233impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
234    type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
235    type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
236
237    fn load<'conn, 'query, T>(
238        &'conn mut self,
239        source: T,
240    ) -> QueryResult<Self::Cursor<'conn, 'query>>
241    where
242        T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
243        Self::Backend: QueryMetadata<T::SqlType>,
244    {
245        update_transaction_manager_status(
246            prepared_query(
247                &source,
248                &mut self.statement_cache,
249                &mut self.raw_connection,
250                &mut *self.instrumentation,
251            )
252            .and_then(|stmt| {
253                let mut metadata = Vec::new();
254                Mysql::row_metadata(&mut (), &mut metadata);
255                StatementIterator::from_stmt(stmt, &metadata)
256            }),
257            &mut self.transaction_state,
258            &mut self.instrumentation,
259            &crate::debug_query(&source),
260        )
261    }
262}
263
264#[cfg(feature = "r2d2")]
265impl crate::r2d2::R2D2Connection for MysqlConnection {
266    fn ping(&mut self) -> QueryResult<()> {
267        crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
268    }
269
270    fn is_broken(&mut self) -> bool {
271        AnsiTransactionManager::is_broken_transaction_manager(self)
272    }
273}
274
275impl MultiConnectionHelper for MysqlConnection {
276    fn to_any<'a>(
277        lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
278    ) -> &mut (dyn std::any::Any + 'a) {
279        lookup
280    }
281
282    fn from_any(
283        lookup: &mut dyn std::any::Any,
284    ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
285        lookup.downcast_mut()
286    }
287}
288
289fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
290    source: &'_ T,
291    statement_cache: &'a mut StatementCache<Mysql, Statement>,
292    raw_connection: &'a mut RawConnection,
293    instrumentation: &mut dyn Instrumentation,
294) -> QueryResult<MaybeCached<'a, Statement>> {
295    instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
296        query: &crate::debug_query(source),
297    });
298    let mut stmt = statement_cache.cached_statement(
299        source,
300        &Mysql,
301        &[],
302        &*raw_connection,
303        RawConnection::prepare,
304        instrumentation,
305    )?;
306
307    let mut bind_collector = RawBytesBindCollector::new();
308    source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
309    let binds = bind_collector
310        .metadata
311        .into_iter()
312        .zip(bind_collector.binds);
313    stmt.bind(binds)?;
314    Ok(stmt)
315}
316
317impl MysqlConnection {
318    fn set_config_options(&mut self) -> QueryResult<()> {
319        crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
320        crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
321        crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
322        crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
323        Ok(())
324    }
325
326    fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
327        use crate::ConnectionError::CouldntSetupConfiguration;
328
329        let raw_connection = RawConnection::new();
330        let connection_options = ConnectionOptions::parse(database_url)?;
331        raw_connection.connect(&connection_options)?;
332        let mut conn = MysqlConnection {
333            raw_connection,
334            transaction_state: AnsiTransactionManager::default(),
335            statement_cache: StatementCache::new(),
336            instrumentation: DynInstrumentation::none(),
337        };
338        conn.set_config_options()
339            .map_err(CouldntSetupConfiguration)?;
340        Ok(conn)
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    extern crate dotenvy;
347
348    use super::*;
349    use std::env;
350
351    fn connection() -> MysqlConnection {
352        dotenvy::dotenv().ok();
353        let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
354            .or_else(|_| env::var("MYSQL_DATABASE_URL"))
355            .or_else(|_| env::var("DATABASE_URL"))
356            .expect("DATABASE_URL must be set in order to run unit tests");
357        MysqlConnection::establish(&database_url).unwrap()
358    }
359
360    #[diesel_test_helper::test]
361    fn batch_execute_handles_single_queries_with_results() {
362        let connection = &mut connection();
363        assert!(connection.batch_execute("SELECT 1").is_ok());
364        assert!(connection.batch_execute("SELECT 1").is_ok());
365    }
366
367    #[diesel_test_helper::test]
368    fn batch_execute_handles_multi_queries_with_results() {
369        let connection = &mut connection();
370        let query = "SELECT 1; SELECT 2; SELECT 3;";
371        assert!(connection.batch_execute(query).is_ok());
372        assert!(connection.batch_execute(query).is_ok());
373    }
374
375    #[diesel_test_helper::test]
376    fn execute_handles_queries_which_return_results() {
377        let connection = &mut connection();
378        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
379        assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
380    }
381
382    #[diesel_test_helper::test]
383    fn check_client_found_rows_flag() {
384        let conn = &mut crate::test_helpers::connection();
385        crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
386            .execute(conn)
387            .unwrap();
388
389        crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
390            .execute(conn)
391            .unwrap();
392
393        crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
394            .execute(conn)
395            .unwrap();
396
397        let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
398            .execute(conn)
399            .unwrap();
400
401        assert_eq!(output, 1);
402    }
403}