diesel/mysql/connection/
mod.rs
1mod bind;
2mod raw;
3mod stmt;
4mod url;
5
6use self::raw::RawConnection;
7use self::stmt::iterator::StatementIterator;
8use self::stmt::Statement;
9use self::url::ConnectionOptions;
10use super::backend::Mysql;
11use crate::connection::instrumentation::{DebugQuery, DynInstrumentation, StrQueryHelper};
12use crate::connection::statement_cache::{MaybeCached, StatementCache};
13use crate::connection::*;
14use crate::expression::QueryMetadata;
15use crate::query_builder::bind_collector::RawBytesBindCollector;
16use crate::query_builder::*;
17use crate::result::*;
18use crate::RunQueryDsl;
19
20#[cfg(feature = "mysql")]
21#[allow(missing_debug_implementations, missing_copy_implementations)]
22pub struct MysqlConnection {
110 raw_connection: RawConnection,
111 transaction_state: AnsiTransactionManager,
112 statement_cache: StatementCache<Mysql, Statement>,
113 instrumentation: DynInstrumentation,
114}
115
116#[allow(unsafe_code)]
118unsafe impl Send for MysqlConnection {}
119
120impl SimpleConnection for MysqlConnection {
121 fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
122 self.instrumentation
123 .on_connection_event(InstrumentationEvent::StartQuery {
124 query: &StrQueryHelper::new(query),
125 });
126 let r = self
127 .raw_connection
128 .enable_multi_statements(|| self.raw_connection.execute(query));
129 self.instrumentation
130 .on_connection_event(InstrumentationEvent::FinishQuery {
131 query: &StrQueryHelper::new(query),
132 error: r.as_ref().err(),
133 });
134 r
135 }
136}
137
138impl ConnectionSealed for MysqlConnection {}
139
140impl Connection for MysqlConnection {
141 type Backend = Mysql;
142 type TransactionManager = AnsiTransactionManager;
143
144 fn establish(database_url: &str) -> ConnectionResult<Self> {
158 let mut instrumentation = DynInstrumentation::default_instrumentation();
159 instrumentation.on_connection_event(InstrumentationEvent::StartEstablishConnection {
160 url: database_url,
161 });
162
163 let establish_result = Self::establish_inner(database_url);
164 instrumentation.on_connection_event(InstrumentationEvent::FinishEstablishConnection {
165 url: database_url,
166 error: establish_result.as_ref().err(),
167 });
168 let mut conn = establish_result?;
169 conn.instrumentation = instrumentation;
170 Ok(conn)
171 }
172
173 fn execute_returning_count<T>(&mut self, source: &T) -> QueryResult<usize>
174 where
175 T: QueryFragment<Self::Backend> + QueryId,
176 {
177 #[allow(unsafe_code)] update_transaction_manager_status(
179 prepared_query(
180 &source,
181 &mut self.statement_cache,
182 &mut self.raw_connection,
183 &mut *self.instrumentation,
184 )
185 .and_then(|stmt| {
186 let stmt_use = unsafe { stmt.execute() }?;
189 stmt_use.affected_rows()
190 }),
191 &mut self.transaction_state,
192 &mut self.instrumentation,
193 &crate::debug_query(source),
194 )
195 }
196
197 fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
198 &mut self.transaction_state
199 }
200
201 fn instrumentation(&mut self) -> &mut dyn Instrumentation {
202 &mut *self.instrumentation
203 }
204
205 fn set_instrumentation(&mut self, instrumentation: impl Instrumentation) {
206 self.instrumentation = instrumentation.into();
207 }
208
209 fn set_prepared_statement_cache_size(&mut self, size: CacheSize) {
210 self.statement_cache.set_cache_size(size);
211 }
212}
213
214#[inline(always)]
215fn update_transaction_manager_status<T>(
216 query_result: QueryResult<T>,
217 transaction_manager: &mut AnsiTransactionManager,
218 instrumentation: &mut DynInstrumentation,
219 query: &dyn DebugQuery,
220) -> QueryResult<T> {
221 if let Err(Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _)) = query_result {
222 transaction_manager
223 .status
224 .set_requires_rollback_maybe_up_to_top_level(true)
225 }
226 instrumentation.on_connection_event(InstrumentationEvent::FinishQuery {
227 query,
228 error: query_result.as_ref().err(),
229 });
230 query_result
231}
232
233impl LoadConnection<DefaultLoadingMode> for MysqlConnection {
234 type Cursor<'conn, 'query> = self::stmt::iterator::StatementIterator<'conn>;
235 type Row<'conn, 'query> = self::stmt::iterator::MysqlRow;
236
237 fn load<'conn, 'query, T>(
238 &'conn mut self,
239 source: T,
240 ) -> QueryResult<Self::Cursor<'conn, 'query>>
241 where
242 T: Query + QueryFragment<Self::Backend> + QueryId + 'query,
243 Self::Backend: QueryMetadata<T::SqlType>,
244 {
245 update_transaction_manager_status(
246 prepared_query(
247 &source,
248 &mut self.statement_cache,
249 &mut self.raw_connection,
250 &mut *self.instrumentation,
251 )
252 .and_then(|stmt| {
253 let mut metadata = Vec::new();
254 Mysql::row_metadata(&mut (), &mut metadata);
255 StatementIterator::from_stmt(stmt, &metadata)
256 }),
257 &mut self.transaction_state,
258 &mut self.instrumentation,
259 &crate::debug_query(&source),
260 )
261 }
262}
263
264#[cfg(feature = "r2d2")]
265impl crate::r2d2::R2D2Connection for MysqlConnection {
266 fn ping(&mut self) -> QueryResult<()> {
267 crate::r2d2::CheckConnectionQuery.execute(self).map(|_| ())
268 }
269
270 fn is_broken(&mut self) -> bool {
271 AnsiTransactionManager::is_broken_transaction_manager(self)
272 }
273}
274
275impl MultiConnectionHelper for MysqlConnection {
276 fn to_any<'a>(
277 lookup: &mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup,
278 ) -> &mut (dyn std::any::Any + 'a) {
279 lookup
280 }
281
282 fn from_any(
283 lookup: &mut dyn std::any::Any,
284 ) -> Option<&mut <Self::Backend as crate::sql_types::TypeMetadata>::MetadataLookup> {
285 lookup.downcast_mut()
286 }
287}
288
289fn prepared_query<'a, T: QueryFragment<Mysql> + QueryId>(
290 source: &'_ T,
291 statement_cache: &'a mut StatementCache<Mysql, Statement>,
292 raw_connection: &'a mut RawConnection,
293 instrumentation: &mut dyn Instrumentation,
294) -> QueryResult<MaybeCached<'a, Statement>> {
295 instrumentation.on_connection_event(InstrumentationEvent::StartQuery {
296 query: &crate::debug_query(source),
297 });
298 let mut stmt = statement_cache.cached_statement(
299 source,
300 &Mysql,
301 &[],
302 &*raw_connection,
303 RawConnection::prepare,
304 instrumentation,
305 )?;
306
307 let mut bind_collector = RawBytesBindCollector::new();
308 source.collect_binds(&mut bind_collector, &mut (), &Mysql)?;
309 let binds = bind_collector
310 .metadata
311 .into_iter()
312 .zip(bind_collector.binds);
313 stmt.bind(binds)?;
314 Ok(stmt)
315}
316
317impl MysqlConnection {
318 fn set_config_options(&mut self) -> QueryResult<()> {
319 crate::sql_query("SET time_zone = '+00:00';").execute(self)?;
320 crate::sql_query("SET character_set_client = 'utf8mb4'").execute(self)?;
321 crate::sql_query("SET character_set_connection = 'utf8mb4'").execute(self)?;
322 crate::sql_query("SET character_set_results = 'utf8mb4'").execute(self)?;
323 Ok(())
324 }
325
326 fn establish_inner(database_url: &str) -> Result<MysqlConnection, ConnectionError> {
327 use crate::ConnectionError::CouldntSetupConfiguration;
328
329 let raw_connection = RawConnection::new();
330 let connection_options = ConnectionOptions::parse(database_url)?;
331 raw_connection.connect(&connection_options)?;
332 let mut conn = MysqlConnection {
333 raw_connection,
334 transaction_state: AnsiTransactionManager::default(),
335 statement_cache: StatementCache::new(),
336 instrumentation: DynInstrumentation::none(),
337 };
338 conn.set_config_options()
339 .map_err(CouldntSetupConfiguration)?;
340 Ok(conn)
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 extern crate dotenvy;
347
348 use super::*;
349 use std::env;
350
351 fn connection() -> MysqlConnection {
352 dotenvy::dotenv().ok();
353 let database_url = env::var("MYSQL_UNIT_TEST_DATABASE_URL")
354 .or_else(|_| env::var("MYSQL_DATABASE_URL"))
355 .or_else(|_| env::var("DATABASE_URL"))
356 .expect("DATABASE_URL must be set in order to run unit tests");
357 MysqlConnection::establish(&database_url).unwrap()
358 }
359
360 #[diesel_test_helper::test]
361 fn batch_execute_handles_single_queries_with_results() {
362 let connection = &mut connection();
363 assert!(connection.batch_execute("SELECT 1").is_ok());
364 assert!(connection.batch_execute("SELECT 1").is_ok());
365 }
366
367 #[diesel_test_helper::test]
368 fn batch_execute_handles_multi_queries_with_results() {
369 let connection = &mut connection();
370 let query = "SELECT 1; SELECT 2; SELECT 3;";
371 assert!(connection.batch_execute(query).is_ok());
372 assert!(connection.batch_execute(query).is_ok());
373 }
374
375 #[diesel_test_helper::test]
376 fn execute_handles_queries_which_return_results() {
377 let connection = &mut connection();
378 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
379 assert!(crate::sql_query("SELECT 1").execute(connection).is_ok());
380 }
381
382 #[diesel_test_helper::test]
383 fn check_client_found_rows_flag() {
384 let conn = &mut crate::test_helpers::connection();
385 crate::sql_query("DROP TABLE IF EXISTS update_test CASCADE")
386 .execute(conn)
387 .unwrap();
388
389 crate::sql_query("CREATE TABLE update_test(id INTEGER PRIMARY KEY, num INTEGER NOT NULL)")
390 .execute(conn)
391 .unwrap();
392
393 crate::sql_query("INSERT INTO update_test(id, num) VALUES (1, 5)")
394 .execute(conn)
395 .unwrap();
396
397 let output = crate::sql_query("UPDATE update_test SET num = 5 WHERE id = 1")
398 .execute(conn)
399 .unwrap();
400
401 assert_eq!(output, 1);
402 }
403}