1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::Statement;
8use self::stmt::iterator::StatementIterator;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::RunQueryDsl;
12use crate::connection::instrumentation::{DebugQuery, DynInstrumentation, StrQueryHelper};
13use crate::connection::statement_cache::{MaybeCached, StatementCache};
14use crate::connection::*;
15use crate::expression::QueryMetadata;
16use crate::query_builder::bind_collector::RawBytesBindCollector;
17use crate::query_builder::*;
18use crate::result::*;
19
20#[cfg(feature = "mysql")]
21#[allow(missing_debug_implementations, missing_copy_implementations)]
22pub struct MysqlConnection {
111 raw_connection: RawConnection,
112 transaction_state: AnsiTransactionManager,
113 statement_cache: StatementCache<Mysql, Statement>,
114 instrumentation: DynInstrumentation,
115}
116
117#[allow(unsafe_code)]
119unsafe impl Send for MysqlConnection {}
120
121impl SimpleConnection for MysqlConnection {
122 fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
123 self.instrumentation
124 .on_connection_event(InstrumentationEvent::StartQuery {
125 query: &StrQueryHelper::new(query),
126 });
127 let r = self
128 .raw_connection
129 .enable_multi_statements(|| self.raw_connection.execute(query));
130 self.instrumentation
131 .on_connection_event(InstrumentationEvent::FinishQuery {
132 query: &StrQueryHelper::new(query),
133 error: r.as_ref().err(),
134 });
135 r
136 }
137}
138
139impl ConnectionSealed for MysqlConnection {}
140
141impl Connection for MysqlConnection {
142 type Backend = Mysql;
143 type TransactionManager = AnsiTransactionManager;
144
145 fn establish(database_url: &str) -> ConnectionResult<Self> {
160 let mut instrumentation = DynInstrumentation::default_instrumentation();
161 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
162 url: database_url,
163 });
164
165 let establish_result = Self::establish_inner(database_url);
166 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
167 url: database_url,
168 error: establish_result.as_ref().err(),
169 });
170 let mut conn = establish_result?;
171 conn.instrumentation = instrumentation;
172 Ok(conn)
173 }
174
175 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
176 where
177 T: QueryFragment<Self::Backend> + QueryId,
178 {
179 #[allow(unsafe_code)] update_transaction_manager_status(
181 prepared_query(
182 &source,
183 &mut self.statement_cache,
184 &mut self.raw_connection,
185 &mut *self.instrumentation,
186 )
187 .and_then(|stmt| {
188 let stmt_use = unsafe { stmt.execute() }?;
191 stmt_use.affected_rows()
192 }),
193 &mut self.transaction_state,
194 &mut self.instrumentation,
195 &crate::debug_query(source),
196 )
197 }
198
199 fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
200 &mut self.transaction_state
201 }
202
203 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
204 &mut *self.instrumentation
205 }
206
207 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
208 self.instrumentation = instrumentation.into();
209 }
210
211 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
212 self.statement_cache.set_cache_size(size);
213 }
214}
215
216#[inline(always)]
217fn update_transaction_manager_status<T>(
218 query_result: QueryResult<T>,
219 transaction_manager: &mut AnsiTransactionManager,
220 instrumentation: &mut DynInstrumentation,
221 query: &dyn DebugQuery,
222) -> QueryResult<T> {
223 fn non_generic_inner(
224 query_result: Result<(), &Error>,
225 transaction_manager: &mut AnsiTransactionManager,
226 instrumentation: &mut DynInstrumentation,
227 query: &dyn DebugQuery,
228 ) {
229 if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result
230 {
231 transaction_manager
232 .status
233 .set_requires_rollback_maybe_up_to_top_level(true)
234 }
235 instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
236 query,
237 error: query_result.err(),
238 });
239 }
240
241 non_generic_inner(
242 query_result.as_ref().map(|_| ()),
243 transaction_manager,
244 instrumentation,
245 query,
246 );
247 query_result
248}
249
250impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
251 type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
252 type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
253
254 fn load<'conn, 'query, T>(
255 &'conn mut self,
256 source: T,
257 ) -> QueryResult<Self::Cursor<'conn, 'query>>
258 where
259 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
260 Self::Backend: QueryMetadata<T::SqlType>,
261 {
262 update_transaction_manager_status(
263 prepared_query(
264 &source,
265 &mut self.statement_cache,
266 &mut self.raw_connection,
267 &mut *self.instrumentation,
268 )
269 .and_then(|stmt| {
270 let mut metadata = Vec::new();
271 Mysql::row_metadata(&mut (), &mut metadata);
272 StatementIterator::from_stmt(stmt, &metadata)
273 }),
274 &mut self.transaction_state,
275 &mut self.instrumentation,
276 &crate::debug_query(&source),
277 )
278 }
279}
280
281#[cfg(feature = "r2d2")]
282impl crate::r2d2::R2D2Connection for MysqlConnection {
283 fn ping(&mut self) -> QueryResult<()> {
284 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
285 }
286
287 fn is_broken(&mut self) -> bool {
288 AnsiTransactionManager::is_broken_transaction_manager(self)
289 }
290}
291
292impl MultiConnectionHelper for MysqlConnection {
293 fn to_any<'a>(
294 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
295 ) -> &mut (dyn core::any::Any + 'a) {
296 lookup
297 }
298
299 fn from_any(
300 lookup: &mut dyn core::any::Any,
301 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
302 lookup.downcast_mut()
303 }
304}
305
306fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
307 source: &'_ T,
308 statement_cache: &'a mut StatementCache<Mysql, Statement>,
309 raw_connection: &'a mut RawConnection,
310 instrumentation: &mut dyn Instrumentation,
311) -> QueryResult<MaybeCached<'a, Statement>> {
312 instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
313 query: &crate::debug_query(source),
314 });
315 let mut stmt = statement_cache.cached_statement(
316 source,
317 &Mysql,
318 &[],
319 &*raw_connection,
320 RawConnection::prepare,
321 instrumentation,
322 )?;
323
324 let mut bind_collector = RawBytesBindCollector::new();
325 source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
326 let binds = bind_collector
327 .metadata
328 .into_iter()
329 .zip(bind_collector.binds);
330 stmt.bind(binds)?;
331 Ok(stmt)
332}
333
334impl MysqlConnection {
335 fn set_config_options(&mut self) -> QueryResult<()> {
336 crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
337 crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
338 crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
339 crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
340 Ok(())
341 }
342
343 fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
344 use crate::ConnectionError::CouldntSetupConfiguration;
345
346 let raw_connection = RawConnection::new();
347 let connection_options = ConnectionOptions::parse(database_url)?;
348 raw_connection.connect(&connection_options)?;
349 let mut conn = MysqlConnection {
350 raw_connection,
351 transaction_state: AnsiTransactionManager::default(),
352 statement_cache: StatementCache::new(),
353 instrumentation: DynInstrumentation::none(),
354 };
355 conn.set_config_options()
356 .map_err(CouldntSetupConfiguration)?;
357 Ok(conn)
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 extern crate dotenvy;
364
365 use super::*;
366 use std::env;
367
368 fn connection() -> MysqlConnection {
369 dotenvy::dotenv().ok();
370 let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
371 .or_else(|_| env::var("MYSQL_DATABASE_URL"))
372 .or_else(|_| env::var("DATABASE_URL"))
373 .expect("DATABASE_URL must be set in order to run unit tests");
374 MysqlConnection::establish(&database_url).unwrap()
375 }
376
377 #[diesel_test_helper::test]
378 fn batch_execute_handles_single_queries_with_results() {
379 let connection = &mut connection();
380 assert!(connection.batch_execute("SELECT 1").is_ok());
381 assert!(connection.batch_execute("SELECT 1").is_ok());
382 }
383
384 #[diesel_test_helper::test]
385 fn batch_execute_handles_multi_queries_with_results() {
386 let connection = &mut connection();
387 let query = "SELECT 1; SELECT 2; SELECT 3;";
388 assert!(connection.batch_execute(query).is_ok());
389 assert!(connection.batch_execute(query).is_ok());
390 }
391
392 #[diesel_test_helper::test]
393 fn execute_handles_queries_which_return_results() {
394 let connection = &mut connection();
395 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
396 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
397 }
398
399 #[diesel_test_helper::test]
400 fn check_client_found_rows_flag() {
401 let conn = &mut crate::test_helpers::connection();
402 crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
403 .execute(conn)
404 .unwrap();
405
406 crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
407 .execute(conn)
408 .unwrap();
409
410 crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
411 .execute(conn)
412 .unwrap();
413
414 let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
415 .execute(conn)
416 .unwrap();
417
418 assert_eq!(output, 1);
419 }
420}