1#![warn(missing_docs)]
40#![doc(html_root_url = "https://docs.rs/r2d2/0.8")]
41
42use log::error;
43
44use parking_lot::{Condvar, Mutex, MutexGuard};
45use std::cmp;
46use std::error;
47use std::fmt;
48use std::mem;
49use std::ops::{Deref, DerefMut};
50use std::sync::atomic::{AtomicUsize, Ordering};
51use std::sync::{Arc, Weak};
52use std::time::{Duration, Instant};
53
54pub use crate::config::Builder;
55use crate::config::Config;
56use crate::event::{AcquireEvent, CheckinEvent, CheckoutEvent, ReleaseEvent, TimeoutEvent};
57pub use crate::event::{HandleEvent, NopEventHandler};
58pub use crate::extensions::Extensions;
59
60mod config;
61pub mod event;
62mod extensions;
63
64#[cfg(test)]
65mod test;
66
67static CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
68
69pub trait ManageConnection: Send + Sync + 'static {
71 type Connection: Send + 'static;
73
74 type Error: error::Error + 'static;
76
77 fn connect(&self) -> Result<Self::Connection, Self::Error>;
79
80 fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
85
86 fn has_broken(&self, conn: &mut Self::Connection) -> bool;
96}
97
98pub trait HandleError<E>: fmt::Debug + Send + Sync + 'static {
100 fn handle_error(&self, error: E);
102}
103
104#[derive(Copy, Clone, Debug)]
106pub struct NopErrorHandler;
107
108impl<E> HandleError<E> for NopErrorHandler {
109 fn handle_error(&self, _: E) {}
110}
111
112#[derive(Copy, Clone, Debug)]
114pub struct LoggingErrorHandler;
115
116impl<E> HandleError<E> for LoggingErrorHandler
117where
118 E: error::Error,
119{
120 fn handle_error(&self, error: E) {
121 error!("{}", error);
122 }
123}
124
125pub trait CustomizeConnection<C, E>: fmt::Debug + Send + Sync + 'static {
127 #[allow(unused_variables)]
136 fn on_acquire(&self, conn: &mut C) -> Result<(), E> {
137 Ok(())
138 }
139
140 #[allow(unused_variables)]
147 fn on_release(&self, conn: C) {}
148}
149
150#[derive(Copy, Clone, Debug)]
152pub struct NopConnectionCustomizer;
153
154impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}
155
156struct Conn<C> {
157 conn: C,
158 extensions: Extensions,
159 birth: Instant,
160 id: u64,
161}
162
163struct IdleConn<C> {
164 conn: Conn<C>,
165 idle_start: Instant,
166}
167
168struct PoolInternals<C> {
169 conns: Vec<IdleConn<C>>,
170 num_conns: u32,
171 pending_conns: u32,
172 last_error: Option<String>,
173}
174
175struct SharedPool<M>
176where
177 M: ManageConnection,
178{
179 config: Config<M::Connection, M::Error>,
180 manager: M,
181 internals: Mutex<PoolInternals<M::Connection>>,
182 cond: Condvar,
183}
184
185fn drop_conns<M>(
186 shared: &Arc<SharedPool<M>>,
187 mut internals: MutexGuard<PoolInternals<M::Connection>>,
188 conns: Vec<Conn<M::Connection>>,
189) where
190 M: ManageConnection,
191{
192 internals.num_conns -= conns.len() as u32;
193 establish_idle_connections(shared, &mut internals);
194 drop(internals); for conn in conns {
197 let event = ReleaseEvent {
198 id: conn.id,
199 age: conn.birth.elapsed(),
200 };
201 shared.config.event_handler.handle_release(event);
202 shared.config.connection_customizer.on_release(conn.conn);
203 }
204}
205
206fn establish_idle_connections<M>(
207 shared: &Arc<SharedPool<M>>,
208 internals: &mut PoolInternals<M::Connection>,
209) where
210 M: ManageConnection,
211{
212 let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
213 let idle = internals.conns.len() as u32;
214 for _ in idle..min {
215 add_connection(shared, internals);
216 }
217}
218
219fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
220where
221 M: ManageConnection,
222{
223 if internals.num_conns + internals.pending_conns >= shared.config.max_size {
224 return;
225 }
226
227 internals.pending_conns += 1;
228 inner(Duration::from_secs(0), shared);
229
230 fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
231 where
232 M: ManageConnection,
233 {
234 let new_shared = Arc::downgrade(shared);
235 shared.config.thread_pool.execute_after(delay, move || {
236 let shared = match new_shared.upgrade() {
237 Some(shared) => shared,
238 None => return,
239 };
240
241 let conn = shared.manager.connect().and_then(|mut conn| {
242 shared
243 .config
244 .connection_customizer
245 .on_acquire(&mut conn)
246 .map(|_| conn)
247 });
248 match conn {
249 Ok(conn) => {
250 let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;
251
252 let event = AcquireEvent { id };
253 shared.config.event_handler.handle_acquire(event);
254
255 let mut internals = shared.internals.lock();
256 internals.last_error = None;
257 let now = Instant::now();
258 let conn = IdleConn {
259 conn: Conn {
260 conn,
261 extensions: Extensions::new(),
262 birth: now,
263 id,
264 },
265 idle_start: now,
266 };
267 internals.conns.push(conn);
268 internals.pending_conns -= 1;
269 internals.num_conns += 1;
270 shared.cond.notify_one();
271 }
272 Err(err) => {
273 shared.internals.lock().last_error = Some(err.to_string());
274 shared.config.error_handler.handle_error(err);
275 let delay = cmp::max(Duration::from_millis(200), delay);
276 let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
277 inner(delay, &shared);
278 }
279 }
280 });
281 }
282}
283
284fn reap_connections<M>(shared: &Weak<SharedPool<M>>)
285where
286 M: ManageConnection,
287{
288 let shared = match shared.upgrade() {
289 Some(shared) => shared,
290 None => return,
291 };
292
293 let mut old = Vec::with_capacity(shared.config.max_size as usize);
294 let mut to_drop = vec![];
295
296 let mut internals = shared.internals.lock();
297 mem::swap(&mut old, &mut internals.conns);
298 let now = Instant::now();
299 for conn in old {
300 let mut reap = false;
301 if let Some(timeout) = shared.config.idle_timeout {
302 reap |= now - conn.idle_start >= timeout;
303 }
304 if let Some(lifetime) = shared.config.max_lifetime {
305 reap |= now - conn.conn.birth >= lifetime;
306 }
307 if reap {
308 to_drop.push(conn.conn);
309 } else {
310 internals.conns.push(conn);
311 }
312 }
313 drop_conns(&shared, internals, to_drop);
314}
315
316pub struct Pool<M>(Arc<SharedPool<M>>)
318where
319 M: ManageConnection;
320
321impl<M> Clone for Pool<M>
323where
324 M: ManageConnection,
325{
326 fn clone(&self) -> Pool<M> {
327 Pool(self.0.clone())
328 }
329}
330
331impl<M> fmt::Debug for Pool<M>
332where
333 M: ManageConnection + fmt::Debug,
334{
335 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
336 fmt.debug_struct("Pool")
337 .field("state", &self.state())
338 .field("config", &self.0.config)
339 .field("manager", &self.0.manager)
340 .finish()
341 }
342}
343
344impl<M> Pool<M>
345where
346 M: ManageConnection,
347{
348 pub fn new(manager: M) -> Result<Pool<M>, Error> {
350 Pool::builder().build(manager)
351 }
352
353 pub fn builder() -> Builder<M> {
355 Builder::new()
356 }
357
358 fn new_inner(
360 config: Config<M::Connection, M::Error>,
361 manager: M,
362 reaper_rate: Duration,
363 ) -> Pool<M> {
364 let internals = PoolInternals {
365 conns: Vec::with_capacity(config.max_size as usize),
366 num_conns: 0,
367 pending_conns: 0,
368 last_error: None,
369 };
370
371 let shared = Arc::new(SharedPool {
372 config,
373 manager,
374 internals: Mutex::new(internals),
375 cond: Condvar::new(),
376 });
377
378 establish_idle_connections(&shared, &mut shared.internals.lock());
379
380 if shared.config.max_lifetime.is_some() || shared.config.idle_timeout.is_some() {
381 let s = Arc::downgrade(&shared);
382 shared
383 .config
384 .thread_pool
385 .execute_at_fixed_rate(reaper_rate, reaper_rate, move || reap_connections(&s));
386 }
387
388 Pool(shared)
389 }
390
391 fn wait_for_initialization(&self) -> Result<(), Error> {
392 let end = Instant::now() + self.0.config.connection_timeout;
393 let mut internals = self.0.internals.lock();
394
395 let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);
396
397 while internals.num_conns != initial_size {
398 if self.0.cond.wait_until(&mut internals, end).timed_out() {
399 return Err(Error(internals.last_error.take()));
400 }
401 }
402
403 Ok(())
404 }
405
406 pub fn get(&self) -> Result<PooledConnection<M>, Error> {
411 self.get_timeout(self.0.config.connection_timeout)
412 }
413
414 pub fn get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error> {
419 let start = Instant::now();
420 let end = start + timeout;
421 let mut internals = self.0.internals.lock();
422
423 loop {
424 match self.try_get_inner(internals) {
425 Ok(conn) => {
426 let event = CheckoutEvent {
427 id: conn.conn.as_ref().unwrap().id,
428 duration: start.elapsed(),
429 };
430 self.0.config.event_handler.handle_checkout(event);
431 return Ok(conn);
432 }
433 Err(i) => internals = i,
434 }
435
436 add_connection(&self.0, &mut internals);
437
438 if self.0.cond.wait_until(&mut internals, end).timed_out() {
439 let event = TimeoutEvent { timeout };
440 self.0.config.event_handler.handle_timeout(event);
441
442 return Err(Error(internals.last_error.take()));
443 }
444 }
445 }
446
447 pub fn try_get(&self) -> Option<PooledConnection<M>> {
453 self.try_get_inner(self.0.internals.lock()).ok()
454 }
455
456 fn try_get_inner<'a>(
457 &'a self,
458 mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
459 ) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>> {
460 loop {
461 if let Some(mut conn) = internals.conns.pop() {
462 establish_idle_connections(&self.0, &mut internals);
463 drop(internals);
464
465 if self.0.config.test_on_check_out {
466 if let Err(e) = self.0.manager.is_valid(&mut conn.conn.conn) {
467 let msg = e.to_string();
468 self.0.config.error_handler.handle_error(e);
469 internals = self.0.internals.lock();
471 internals.last_error = Some(msg);
472 drop_conns(&self.0, internals, vec![conn.conn]);
473 internals = self.0.internals.lock();
474 continue;
475 }
476 }
477
478 return Ok(PooledConnection {
479 pool: self.clone(),
480 checkout: Instant::now(),
481 conn: Some(conn.conn),
482 });
483 } else {
484 return Err(internals);
485 }
486 }
487 }
488
489 fn put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>) {
490 let event = CheckinEvent {
491 id: conn.id,
492 duration: checkout.elapsed(),
493 };
494 self.0.config.event_handler.handle_checkin(event);
495
496 let broken = self.0.manager.has_broken(&mut conn.conn);
498
499 let mut internals = self.0.internals.lock();
500 if broken {
501 drop_conns(&self.0, internals, vec![conn]);
502 } else {
503 let conn = IdleConn {
504 conn,
505 idle_start: Instant::now(),
506 };
507 internals.conns.push(conn);
508 self.0.cond.notify_one();
509 }
510 }
511
512 pub fn state(&self) -> State {
514 let internals = self.0.internals.lock();
515 State {
516 connections: internals.num_conns,
517 idle_connections: internals.conns.len() as u32,
518 _p: (),
519 }
520 }
521
522 pub fn max_size(&self) -> u32 {
524 self.0.config.max_size
525 }
526
527 pub fn min_idle(&self) -> Option<u32> {
529 self.0.config.min_idle
530 }
531
532 pub fn test_on_check_out(&self) -> bool {
534 self.0.config.test_on_check_out
535 }
536
537 pub fn max_lifetime(&self) -> Option<Duration> {
539 self.0.config.max_lifetime
540 }
541
542 pub fn idle_timeout(&self) -> Option<Duration> {
544 self.0.config.idle_timeout
545 }
546
547 pub fn connection_timeout(&self) -> Duration {
549 self.0.config.connection_timeout
550 }
551}
552
553#[derive(Debug)]
555pub struct Error(Option<String>);
556
557impl fmt::Display for Error {
558 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
559 fmt.write_str(error::Error::description(self))?;
560 if let Some(ref err) = self.0 {
561 write!(fmt, ": {}", err)?;
562 }
563 Ok(())
564 }
565}
566
567impl error::Error for Error {
568 fn description(&self) -> &str {
569 "timed out waiting for connection"
570 }
571}
572
573pub struct State {
575 pub connections: u32,
577 pub idle_connections: u32,
579 _p: (),
580}
581
582impl fmt::Debug for State {
583 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
584 fmt.debug_struct("State")
585 .field("connections", &self.connections)
586 .field("idle_connections", &self.idle_connections)
587 .finish()
588 }
589}
590
591pub struct PooledConnection<M>
593where
594 M: ManageConnection,
595{
596 pool: Pool<M>,
597 checkout: Instant,
598 conn: Option<Conn<M::Connection>>,
599}
600
601impl<M> fmt::Debug for PooledConnection<M>
602where
603 M: ManageConnection,
604 M::Connection: fmt::Debug,
605{
606 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
607 fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
608 }
609}
610
611impl<M> Drop for PooledConnection<M>
612where
613 M: ManageConnection,
614{
615 fn drop(&mut self) {
616 self.pool.put_back(self.checkout, self.conn.take().unwrap());
617 }
618}
619
620impl<M> Deref for PooledConnection<M>
621where
622 M: ManageConnection,
623{
624 type Target = M::Connection;
625
626 fn deref(&self) -> &M::Connection {
627 &self.conn.as_ref().unwrap().conn
628 }
629}
630
631impl<M> DerefMut for PooledConnection<M>
632where
633 M: ManageConnection,
634{
635 fn deref_mut(&mut self) -> &mut M::Connection {
636 &mut self.conn.as_mut().unwrap().conn
637 }
638}
639
640impl<M> PooledConnection<M>
641where
642 M: ManageConnection,
643{
644 pub fn extensions(this: &Self) -> &Extensions {
646 &this.conn.as_ref().unwrap().extensions
647 }
648
649 pub fn extensions_mut(this: &mut Self) -> &mut Extensions {
651 &mut this.conn.as_mut().unwrap().extensions
652 }
653}