1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::iterator::StatementIterator;
8use self::stmt::Statement;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::connection::instrumentation::{DebugQuery, DynInstrumentation, StrQueryHelper};
12use crate::connection::statement_cache::{MaybeCached, StatementCache};
13use crate::connection::*;
14use crate::expression::QueryMetadata;
15use crate::query_builder::bind_collector::RawBytesBindCollector;
16use crate::query_builder::*;
17use crate::result::*;
18use crate::RunQueryDsl;
19
20#[cfg(feature = "mysql")]
21#[allow(missing_debug_implementations, missing_copy_implementations)]
22pub struct MysqlConnection {
111 raw_connection: RawConnection,
112 transaction_state: AnsiTransactionManager,
113 statement_cache: StatementCache<Mysql, Statement>,
114 instrumentation: DynInstrumentation,
115}
116
117#[allow(unsafe_code)]
119unsafe impl Send for MysqlConnection {}
120
121impl SimpleConnection for MysqlConnection {
122 fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
123 self.instrumentation
124 .on_connection_event(InstrumentationEvent::StartQuery {
125 query: &StrQueryHelper::new(query),
126 });
127 let r = self
128 .raw_connection
129 .enable_multi_statements(|| self.raw_connection.execute(query));
130 self.instrumentation
131 .on_connection_event(InstrumentationEvent::FinishQuery {
132 query: &StrQueryHelper::new(query),
133 error: r.as_ref().err(),
134 });
135 r
136 }
137}
138
139impl ConnectionSealed for MysqlConnection {}
140
141impl Connection for MysqlConnection {
142 type Backend = Mysql;
143 type TransactionManager = AnsiTransactionManager;
144
145 fn establish(database_url: &str) -> ConnectionResult<Self> {
159 let mut instrumentation = DynInstrumentation::default_instrumentation();
160 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
161 url: database_url,
162 });
163
164 let establish_result = Self::establish_inner(database_url);
165 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
166 url: database_url,
167 error: establish_result.as_ref().err(),
168 });
169 let mut conn = establish_result?;
170 conn.instrumentation = instrumentation;
171 Ok(conn)
172 }
173
174 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
175 where
176 T: QueryFragment<Self::Backend> + QueryId,
177 {
178 #[allow(unsafe_code)] update_transaction_manager_status(
180 prepared_query(
181 &source,
182 &mut self.statement_cache,
183 &mut self.raw_connection,
184 &mut *self.instrumentation,
185 )
186 .and_then(|stmt| {
187 let stmt_use = unsafe { stmt.execute() }?;
190 stmt_use.affected_rows()
191 }),
192 &mut self.transaction_state,
193 &mut self.instrumentation,
194 &crate::debug_query(source),
195 )
196 }
197
198 fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
199 &mut self.transaction_state
200 }
201
202 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
203 &mut *self.instrumentation
204 }
205
206 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
207 self.instrumentation = instrumentation.into();
208 }
209
210 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
211 self.statement_cache.set_cache_size(size);
212 }
213}
214
215#[inline(always)]
216fn update_transaction_manager_status<T>(
217 query_result: QueryResult<T>,
218 transaction_manager: &mut AnsiTransactionManager,
219 instrumentation: &mut DynInstrumentation,
220 query: &dyn DebugQuery,
221) -> QueryResult<T> {
222 fn non_generic_inner(
223 query_result: Result<(), &Error>,
224 transaction_manager: &mut AnsiTransactionManager,
225 instrumentation: &mut DynInstrumentation,
226 query: &dyn DebugQuery,
227 ) {
228 if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result
229 {
230 transaction_manager
231 .status
232 .set_requires_rollback_maybe_up_to_top_level(true)
233 }
234 instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
235 query,
236 error: query_result.err(),
237 });
238 }
239
240 non_generic_inner(
241 query_result.as_ref().map(|_| ()),
242 transaction_manager,
243 instrumentation,
244 query,
245 );
246 query_result
247}
248
249impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
250 type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
251 type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
252
253 fn load<'conn, 'query, T>(
254 &'conn mut self,
255 source: T,
256 ) -> QueryResult<Self::Cursor<'conn, 'query>>
257 where
258 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
259 Self::Backend: QueryMetadata<T::SqlType>,
260 {
261 update_transaction_manager_status(
262 prepared_query(
263 &source,
264 &mut self.statement_cache,
265 &mut self.raw_connection,
266 &mut *self.instrumentation,
267 )
268 .and_then(|stmt| {
269 let mut metadata = Vec::new();
270 Mysql::row_metadata(&mut (), &mut metadata);
271 StatementIterator::from_stmt(stmt, &metadata)
272 }),
273 &mut self.transaction_state,
274 &mut self.instrumentation,
275 &crate::debug_query(&source),
276 )
277 }
278}
279
280#[cfg(feature = "r2d2")]
281impl crate::r2d2::R2D2Connection for MysqlConnection {
282 fn ping(&mut self) -> QueryResult<()> {
283 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
284 }
285
286 fn is_broken(&mut self) -> bool {
287 AnsiTransactionManager::is_broken_transaction_manager(self)
288 }
289}
290
291impl MultiConnectionHelper for MysqlConnection {
292 fn to_any<'a>(
293 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
294 ) -> &mut (dyn std::any::Any + 'a) {
295 lookup
296 }
297
298 fn from_any(
299 lookup: &mut dyn std::any::Any,
300 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
301 lookup.downcast_mut()
302 }
303}
304
305fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
306 source: &'_ T,
307 statement_cache: &'a mut StatementCache<Mysql, Statement>,
308 raw_connection: &'a mut RawConnection,
309 instrumentation: &mut dyn Instrumentation,
310) -> QueryResult<MaybeCached<'a, Statement>> {
311 instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
312 query: &crate::debug_query(source),
313 });
314 let mut stmt = statement_cache.cached_statement(
315 source,
316 &Mysql,
317 &[],
318 &*raw_connection,
319 RawConnection::prepare,
320 instrumentation,
321 )?;
322
323 let mut bind_collector = RawBytesBindCollector::new();
324 source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
325 let binds = bind_collector
326 .metadata
327 .into_iter()
328 .zip(bind_collector.binds);
329 stmt.bind(binds)?;
330 Ok(stmt)
331}
332
333impl MysqlConnection {
334 fn set_config_options(&mut self) -> QueryResult<()> {
335 crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
336 crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
337 crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
338 crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
339 Ok(())
340 }
341
342 fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
343 use crate::ConnectionError::CouldntSetupConfiguration;
344
345 let raw_connection = RawConnection::new();
346 let connection_options = ConnectionOptions::parse(database_url)?;
347 raw_connection.connect(&connection_options)?;
348 let mut conn = MysqlConnection {
349 raw_connection,
350 transaction_state: AnsiTransactionManager::default(),
351 statement_cache: StatementCache::new(),
352 instrumentation: DynInstrumentation::none(),
353 };
354 conn.set_config_options()
355 .map_err(CouldntSetupConfiguration)?;
356 Ok(conn)
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 extern crate dotenvy;
363
364 use super::*;
365 use std::env;
366
367 fn connection() -> MysqlConnection {
368 dotenvy::dotenv().ok();
369 let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
370 .or_else(|_| env::var("MYSQL_DATABASE_URL"))
371 .or_else(|_| env::var("DATABASE_URL"))
372 .expect("DATABASE_URL must be set in order to run unit tests");
373 MysqlConnection::establish(&database_url).unwrap()
374 }
375
376 #[diesel_test_helper::test]
377 fn batch_execute_handles_single_queries_with_results() {
378 let connection = &mut connection();
379 assert!(connection.batch_execute("SELECT 1").is_ok());
380 assert!(connection.batch_execute("SELECT 1").is_ok());
381 }
382
383 #[diesel_test_helper::test]
384 fn batch_execute_handles_multi_queries_with_results() {
385 let connection = &mut connection();
386 let query = "SELECT 1; SELECT 2; SELECT 3;";
387 assert!(connection.batch_execute(query).is_ok());
388 assert!(connection.batch_execute(query).is_ok());
389 }
390
391 #[diesel_test_helper::test]
392 fn execute_handles_queries_which_return_results() {
393 let connection = &mut connection();
394 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
395 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
396 }
397
398 #[diesel_test_helper::test]
399 fn check_client_found_rows_flag() {
400 let conn = &mut crate::test_helpers::connection();
401 crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
402 .execute(conn)
403 .unwrap();
404
405 crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
406 .execute(conn)
407 .unwrap();
408
409 crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
410 .execute(conn)
411 .unwrap();
412
413 let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
414 .execute(conn)
415 .unwrap();
416
417 assert_eq!(output, 1);
418 }
419}