diesel/mysql/connection/
mod.rs
1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::iterator::StatementIterator;
8use self::stmt::Statement;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::connection::instrumentation::DebugQuery;
12use crate::connection::instrumentation::StrQueryHelper;
13use crate::connection::statement_cache::{MaybeCached, StatementCache};
14use crate::connection::*;
15use crate::expression::QueryMetadata;
16use crate::query_builder::bind_collector::RawBytesBindCollector;
17use crate::query_builder::*;
18use crate::result::*;
19use crate::RunQueryDsl;
20
21#[cfg(feature = "mysql")]
22#[allow(missing_debug_implementations, missing_copy_implementations)]
23pub struct MysqlConnection {
111 raw_connection: RawConnection,
112 transaction_state: AnsiTransactionManager,
113 statement_cache: StatementCache<Mysql, Statement>,
114 instrumentation: Option<Box<dyn Instrumentation>>,
115}
116
117#[allow(unsafe_code)]
119unsafe impl Send for MysqlConnection {}
120
121impl SimpleConnection for MysqlConnection {
122 fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
123 self.instrumentation
124 .on_connection_event(InstrumentationEvent::StartQuery {
125 query: &StrQueryHelper::new(query),
126 });
127 let r = self
128 .raw_connection
129 .enable_multi_statements(|| self.raw_connection.execute(query));
130 self.instrumentation
131 .on_connection_event(InstrumentationEvent::FinishQuery {
132 query: &StrQueryHelper::new(query),
133 error: r.as_ref().err(),
134 });
135 r
136 }
137}
138
139impl ConnectionSealed for MysqlConnection {}
140
141impl Connection for MysqlConnection {
142 type Backend = Mysql;
143 type TransactionManager = AnsiTransactionManager;
144
145 fn establish(database_url: &str) -> ConnectionResult<Self> {
159 let mut instrumentation = crate::connection::instrumentation::get_default_instrumentation();
160 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
161 url: database_url,
162 });
163
164 let establish_result = Self::establish_inner(database_url);
165 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
166 url: database_url,
167 error: establish_result.as_ref().err(),
168 });
169 let mut conn = establish_result?;
170 conn.instrumentation = instrumentation;
171 Ok(conn)
172 }
173
174 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
175 where
176 T: QueryFragment<Self::Backend> + QueryId,
177 {
178 #[allow(unsafe_code)] update_transaction_manager_status(
180 prepared_query(
181 &source,
182 &mut self.statement_cache,
183 &mut self.raw_connection,
184 &mut self.instrumentation,
185 )
186 .and_then(|stmt| {
187 let stmt_use = unsafe { stmt.execute() }?;
190 stmt_use.affected_rows()
191 }),
192 &mut self.transaction_state,
193 &mut self.instrumentation,
194 &crate::debug_query(source),
195 )
196 }
197
198 fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
199 &mut self.transaction_state
200 }
201
202 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
203 &mut self.instrumentation
204 }
205
206 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
207 self.instrumentation = Some(Box::new(instrumentation));
208 }
209}
210
211#[inline(always)]
212fn update_transaction_manager_status<T>(
213 query_result: QueryResult<T>,
214 transaction_manager: &mut AnsiTransactionManager,
215 instrumentation: &mut Option<Box<dyn Instrumentation>>,
216 query: &dyn DebugQuery,
217) -> QueryResult<T> {
218 if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result {
219 transaction_manager
220 .status
221 .set_requires_rollback_maybe_up_to_top_level(true)
222 }
223 instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
224 query,
225 error: query_result.as_ref().err(),
226 });
227 query_result
228}
229
230impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
231 type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
232 type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
233
234 fn load<'conn, 'query, T>(
235 &'conn mut self,
236 source: T,
237 ) -> QueryResult<Self::Cursor<'conn, 'query>>
238 where
239 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
240 Self::Backend: QueryMetadata<T::SqlType>,
241 {
242 update_transaction_manager_status(
243 prepared_query(
244 &source,
245 &mut self.statement_cache,
246 &mut self.raw_connection,
247 &mut self.instrumentation,
248 )
249 .and_then(|stmt| {
250 let mut metadata = Vec::new();
251 Mysql::row_metadata(&mut (), &mut metadata);
252 StatementIterator::from_stmt(stmt, &metadata)
253 }),
254 &mut self.transaction_state,
255 &mut self.instrumentation,
256 &crate::debug_query(&source),
257 )
258 }
259}
260
261#[cfg(feature = "r2d2")]
262impl crate::r2d2::R2D2Connection for MysqlConnection {
263 fn ping(&mut self) -> QueryResult<()> {
264 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
265 }
266
267 fn is_broken(&mut self) -> bool {
268 AnsiTransactionManager::is_broken_transaction_manager(self)
269 }
270}
271
272impl MultiConnectionHelper for MysqlConnection {
273 fn to_any<'a>(
274 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
275 ) -> &mut (dyn std::any::Any + 'a) {
276 lookup
277 }
278
279 fn from_any(
280 lookup: &mut dyn std::any::Any,
281 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
282 lookup.downcast_mut()
283 }
284}
285
286fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
287 source: &'_ T,
288 statement_cache: &'a mut StatementCache<Mysql, Statement>,
289 raw_connection: &'a mut RawConnection,
290 instrumentation: &mut dyn Instrumentation,
291) -> QueryResult<MaybeCached<'a, Statement>> {
292 instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
293 query: &crate::debug_query(source),
294 });
295 let mut stmt = statement_cache.cached_statement(
296 source,
297 &Mysql,
298 &[],
299 |sql, _| raw_connection.prepare(sql),
300 instrumentation,
301 )?;
302
303 let mut bind_collector = RawBytesBindCollector::new();
304 source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
305 let binds = bind_collector
306 .metadata
307 .into_iter()
308 .zip(bind_collector.binds);
309 stmt.bind(binds)?;
310 Ok(stmt)
311}
312
313impl MysqlConnection {
314 fn set_config_options(&mut self) -> QueryResult<()> {
315 crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
316 crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
317 crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
318 crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
319 Ok(())
320 }
321
322 fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
323 use crate::ConnectionError::CouldntSetupConfiguration;
324
325 let raw_connection = RawConnection::new();
326 let connection_options = ConnectionOptions::parse(database_url)?;
327 raw_connection.connect(&connection_options)?;
328 let mut conn = MysqlConnection {
329 raw_connection,
330 transaction_state: AnsiTransactionManager::default(),
331 statement_cache: StatementCache::new(),
332 instrumentation: None,
333 };
334 conn.set_config_options()
335 .map_err(CouldntSetupConfiguration)?;
336 Ok(conn)
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 extern crate dotenvy;
343
344 use super::*;
345 use std::env;
346
347 fn connection() -> MysqlConnection {
348 dotenvy::dotenv().ok();
349 let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
350 .or_else(|_| env::var("MYSQL_DATABASE_URL"))
351 .or_else(|_| env::var("DATABASE_URL"))
352 .expect("DATABASE_URL must be set in order to run unit tests");
353 MysqlConnection::establish(&database_url).unwrap()
354 }
355
356 #[test]
357 fn batch_execute_handles_single_queries_with_results() {
358 let connection = &mut connection();
359 assert!(connection.batch_execute("SELECT 1").is_ok());
360 assert!(connection.batch_execute("SELECT 1").is_ok());
361 }
362
363 #[test]
364 fn batch_execute_handles_multi_queries_with_results() {
365 let connection = &mut connection();
366 let query = "SELECT 1; SELECT 2; SELECT 3;";
367 assert!(connection.batch_execute(query).is_ok());
368 assert!(connection.batch_execute(query).is_ok());
369 }
370
371 #[test]
372 fn execute_handles_queries_which_return_results() {
373 let connection = &mut connection();
374 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
375 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
376 }
377
378 #[test]
379 fn check_client_found_rows_flag() {
380 let conn = &mut crate::test_helpers::connection();
381 crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
382 .execute(conn)
383 .unwrap();
384
385 crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
386 .execute(conn)
387 .unwrap();
388
389 crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
390 .execute(conn)
391 .unwrap();
392
393 let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
394 .execute(conn)
395 .unwrap();
396
397 assert_eq!(output, 1);
398 }
399}