r2d2/
lib.rs

1//! A generic connection pool.
2//!
3//! Opening a new database connection every time one is needed is both
4//! inefficient and can lead to resource exhaustion under high traffic
5//! conditions. A connection pool maintains a set of open connections to a
6//! database, handing them out for repeated use.
7//!
8//! r2d2 is agnostic to the connection type it is managing. Implementors of the
9//! `ManageConnection` trait provide the database-specific logic to create and
10//! check the health of connections.
11//!
12//! # Example
13//!
14//! Using an imaginary "foodb" database.
15//!
16//! ```rust,ignore
17//! use std::thread;
18//!
19//! extern crate r2d2;
20//! extern crate r2d2_foodb;
21//!
22//! fn main() {
23//!     let manager = r2d2_foodb::FooConnectionManager::new("localhost:1234");
24//!     let pool = r2d2::Pool::builder()
25//!         .max_size(15)
26//!         .build(manager)
27//!         .unwrap();
28//!
29//!     for _ in 0..20 {
30//!         let pool = pool.clone();
31//!         thread::spawn(move || {
32//!             let conn = pool.get().unwrap();
33//!             // use the connection
34//!             // it will be returned to the pool when it falls out of scope.
35//!         })
36//!     }
37//! }
38//! ```
39#![warn(missing_docs)]
40#![doc(html_root_url = "https://docs.rs/r2d2/0.8")]
41
42use log::error;
43
44use parking_lot::{Condvar, Mutex, MutexGuard};
45use std::cmp;
46use std::error;
47use std::fmt;
48use std::mem;
49use std::ops::{Deref, DerefMut};
50use std::sync::atomic::{AtomicUsize, Ordering};
51use std::sync::{Arc, Weak};
52use std::time::{Duration, Instant};
53
54pub use crate::config::Builder;
55use crate::config::Config;
56use crate::event::{AcquireEvent, CheckinEvent, CheckoutEvent, ReleaseEvent, TimeoutEvent};
57pub use crate::event::{HandleEvent, NopEventHandler};
58pub use crate::extensions::Extensions;
59
60mod config;
61pub mod event;
62mod extensions;
63
64#[cfg(test)]
65mod test;
66
67static CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
68
69/// A trait which provides connection-specific functionality.
70pub trait ManageConnection: Send + Sync + 'static {
71    /// The connection type this manager deals with.
72    type Connection: Send + 'static;
73
74    /// The error type returned by `Connection`s.
75    type Error: error::Error + 'static;
76
77    /// Attempts to create a new connection.
78    fn connect(&self) -> Result<Self::Connection, Self::Error>;
79
80    /// Determines if the connection is still connected to the database.
81    ///
82    /// A standard implementation would check if a simple query like `SELECT 1`
83    /// succeeds.
84    fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
85
86    /// *Quickly* determines if the connection is no longer usable.
87    ///
88    /// This will be called synchronously every time a connection is returned
89    /// to the pool, so it should *not* block. If it returns `true`, the
90    /// connection will be discarded.
91    ///
92    /// For example, an implementation might check if the underlying TCP socket
93    /// has disconnected. Implementations that do not support this kind of
94    /// fast health check may simply return `false`.
95    fn has_broken(&self, conn: &mut Self::Connection) -> bool;
96}
97
98/// A trait which handles errors reported by the `ManageConnection`.
99pub trait HandleError<E>: fmt::Debug + Send + Sync + 'static {
100    /// Handles an error.
101    fn handle_error(&self, error: E);
102}
103
104/// A `HandleError` implementation which does nothing.
105#[derive(Copy, Clone, Debug)]
106pub struct NopErrorHandler;
107
108impl<E> HandleError<E> for NopErrorHandler {
109    fn handle_error(&self, _: E) {}
110}
111
112/// A `HandleError` implementation which logs at the error level.
113#[derive(Copy, Clone, Debug)]
114pub struct LoggingErrorHandler;
115
116impl<E> HandleError<E> for LoggingErrorHandler
117where
118    E: error::Error,
119{
120    fn handle_error(&self, error: E) {
121        error!("{}", error);
122    }
123}
124
125/// A trait which allows for customization of connections.
126pub trait CustomizeConnection<C, E>: fmt::Debug + Send + Sync + 'static {
127    /// Called with connections immediately after they are returned from
128    /// `ManageConnection::connect`.
129    ///
130    /// The default implementation simply returns `Ok(())`.
131    ///
132    /// # Errors
133    ///
134    /// If this method returns an error, the connection will be discarded.
135    #[allow(unused_variables)]
136    fn on_acquire(&self, conn: &mut C) -> Result<(), E> {
137        Ok(())
138    }
139
140    /// Called with connections when they are removed from the pool.
141    ///
142    /// The connections may be broken (as reported by `is_valid` or
143    /// `has_broken`), or have simply timed out.
144    ///
145    /// The default implementation does nothing.
146    #[allow(unused_variables)]
147    fn on_release(&self, conn: C) {}
148}
149
150/// A `CustomizeConnection` which does nothing.
151#[derive(Copy, Clone, Debug)]
152pub struct NopConnectionCustomizer;
153
154impl<C, E> CustomizeConnection<C, E> for NopConnectionCustomizer {}
155
156struct Conn<C> {
157    conn: C,
158    extensions: Extensions,
159    birth: Instant,
160    id: u64,
161}
162
163struct IdleConn<C> {
164    conn: Conn<C>,
165    idle_start: Instant,
166}
167
168struct PoolInternals<C> {
169    conns: Vec<IdleConn<C>>,
170    num_conns: u32,
171    pending_conns: u32,
172    last_error: Option<String>,
173}
174
175struct SharedPool<M>
176where
177    M: ManageConnection,
178{
179    config: Config<M::Connection, M::Error>,
180    manager: M,
181    internals: Mutex<PoolInternals<M::Connection>>,
182    cond: Condvar,
183}
184
185fn drop_conns<M>(
186    shared: &Arc<SharedPool<M>>,
187    mut internals: MutexGuard<PoolInternals<M::Connection>>,
188    conns: Vec<Conn<M::Connection>>,
189) where
190    M: ManageConnection,
191{
192    internals.num_conns -= conns.len() as u32;
193    establish_idle_connections(shared, &mut internals);
194    drop(internals); // make sure we run connection destructors without this locked
195
196    for conn in conns {
197        let event = ReleaseEvent {
198            id: conn.id,
199            age: conn.birth.elapsed(),
200        };
201        shared.config.event_handler.handle_release(event);
202        shared.config.connection_customizer.on_release(conn.conn);
203    }
204}
205
206fn establish_idle_connections<M>(
207    shared: &Arc<SharedPool<M>>,
208    internals: &mut PoolInternals<M::Connection>,
209) where
210    M: ManageConnection,
211{
212    let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
213    let idle = internals.conns.len() as u32;
214    for _ in idle..min {
215        add_connection(shared, internals);
216    }
217}
218
219fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
220where
221    M: ManageConnection,
222{
223    if internals.num_conns + internals.pending_conns >= shared.config.max_size {
224        return;
225    }
226
227    internals.pending_conns += 1;
228    inner(Duration::from_secs(0), shared);
229
230    fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
231    where
232        M: ManageConnection,
233    {
234        let new_shared = Arc::downgrade(shared);
235        shared.config.thread_pool.execute_after(delay, move || {
236            let shared = match new_shared.upgrade() {
237                Some(shared) => shared,
238                None => return,
239            };
240
241            let conn = shared.manager.connect().and_then(|mut conn| {
242                shared
243                    .config
244                    .connection_customizer
245                    .on_acquire(&mut conn)
246                    .map(|_| conn)
247            });
248            match conn {
249                Ok(conn) => {
250                    let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;
251
252                    let event = AcquireEvent { id };
253                    shared.config.event_handler.handle_acquire(event);
254
255                    let mut internals = shared.internals.lock();
256                    internals.last_error = None;
257                    let now = Instant::now();
258                    let conn = IdleConn {
259                        conn: Conn {
260                            conn,
261                            extensions: Extensions::new(),
262                            birth: now,
263                            id,
264                        },
265                        idle_start: now,
266                    };
267                    internals.conns.push(conn);
268                    internals.pending_conns -= 1;
269                    internals.num_conns += 1;
270                    shared.cond.notify_one();
271                }
272                Err(err) => {
273                    shared.internals.lock().last_error = Some(err.to_string());
274                    shared.config.error_handler.handle_error(err);
275                    let delay = cmp::max(Duration::from_millis(200), delay);
276                    let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
277                    inner(delay, &shared);
278                }
279            }
280        });
281    }
282}
283
284fn reap_connections<M>(shared: &Weak<SharedPool<M>>)
285where
286    M: ManageConnection,
287{
288    let shared = match shared.upgrade() {
289        Some(shared) => shared,
290        None => return,
291    };
292
293    let mut old = Vec::with_capacity(shared.config.max_size as usize);
294    let mut to_drop = vec![];
295
296    let mut internals = shared.internals.lock();
297    mem::swap(&mut old, &mut internals.conns);
298    let now = Instant::now();
299    for conn in old {
300        let mut reap = false;
301        if let Some(timeout) = shared.config.idle_timeout {
302            reap |= now - conn.idle_start >= timeout;
303        }
304        if let Some(lifetime) = shared.config.max_lifetime {
305            reap |= now - conn.conn.birth >= lifetime;
306        }
307        if reap {
308            to_drop.push(conn.conn);
309        } else {
310            internals.conns.push(conn);
311        }
312    }
313    drop_conns(&shared, internals, to_drop);
314}
315
316/// A generic connection pool.
317pub struct Pool<M>(Arc<SharedPool<M>>)
318where
319    M: ManageConnection;
320
321/// Returns a new `Pool` referencing the same state as `self`.
322impl<M> Clone for Pool<M>
323where
324    M: ManageConnection,
325{
326    fn clone(&self) -> Pool<M> {
327        Pool(self.0.clone())
328    }
329}
330
331impl<M> fmt::Debug for Pool<M>
332where
333    M: ManageConnection + fmt::Debug,
334{
335    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
336        fmt.debug_struct("Pool")
337            .field("state", &self.state())
338            .field("config", &self.0.config)
339            .field("manager", &self.0.manager)
340            .finish()
341    }
342}
343
344impl<M> Pool<M>
345where
346    M: ManageConnection,
347{
348    /// Creates a new connection pool with a default configuration.
349    pub fn new(manager: M) -> Result<Pool<M>, Error> {
350        Pool::builder().build(manager)
351    }
352
353    /// Returns a builder type to configure a new pool.
354    pub fn builder() -> Builder<M> {
355        Builder::new()
356    }
357
358    // for testing
359    fn new_inner(
360        config: Config<M::Connection, M::Error>,
361        manager: M,
362        reaper_rate: Duration,
363    ) -> Pool<M> {
364        let internals = PoolInternals {
365            conns: Vec::with_capacity(config.max_size as usize),
366            num_conns: 0,
367            pending_conns: 0,
368            last_error: None,
369        };
370
371        let shared = Arc::new(SharedPool {
372            config,
373            manager,
374            internals: Mutex::new(internals),
375            cond: Condvar::new(),
376        });
377
378        establish_idle_connections(&shared, &mut shared.internals.lock());
379
380        if shared.config.max_lifetime.is_some() || shared.config.idle_timeout.is_some() {
381            let s = Arc::downgrade(&shared);
382            shared
383                .config
384                .thread_pool
385                .execute_at_fixed_rate(reaper_rate, reaper_rate, move || reap_connections(&s));
386        }
387
388        Pool(shared)
389    }
390
391    fn wait_for_initialization(&self) -> Result<(), Error> {
392        let end = Instant::now() + self.0.config.connection_timeout;
393        let mut internals = self.0.internals.lock();
394
395        let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);
396
397        while internals.num_conns != initial_size {
398            if self.0.cond.wait_until(&mut internals, end).timed_out() {
399                return Err(Error(internals.last_error.take()));
400            }
401        }
402
403        Ok(())
404    }
405
406    /// Retrieves a connection from the pool.
407    ///
408    /// Waits for at most the configured connection timeout before returning an
409    /// error.
410    pub fn get(&self) -> Result<PooledConnection<M>, Error> {
411        self.get_timeout(self.0.config.connection_timeout)
412    }
413
414    /// Retrieves a connection from the pool, waiting for at most `timeout`
415    ///
416    /// The given timeout will be used instead of the configured connection
417    /// timeout.
418    pub fn get_timeout(&self, timeout: Duration) -> Result<PooledConnection<M>, Error> {
419        let start = Instant::now();
420        let end = start + timeout;
421        let mut internals = self.0.internals.lock();
422
423        loop {
424            match self.try_get_inner(internals) {
425                Ok(conn) => {
426                    let event = CheckoutEvent {
427                        id: conn.conn.as_ref().unwrap().id,
428                        duration: start.elapsed(),
429                    };
430                    self.0.config.event_handler.handle_checkout(event);
431                    return Ok(conn);
432                }
433                Err(i) => internals = i,
434            }
435
436            add_connection(&self.0, &mut internals);
437
438            if self.0.cond.wait_until(&mut internals, end).timed_out() {
439                let event = TimeoutEvent { timeout };
440                self.0.config.event_handler.handle_timeout(event);
441
442                return Err(Error(internals.last_error.take()));
443            }
444        }
445    }
446
447    /// Attempts to retrieve a connection from the pool if there is one
448    /// available.
449    ///
450    /// Returns `None` if there are no idle connections available in the pool.
451    /// This method will not block waiting to establish a new connection.
452    pub fn try_get(&self) -> Option<PooledConnection<M>> {
453        self.try_get_inner(self.0.internals.lock()).ok()
454    }
455
456    fn try_get_inner<'a>(
457        &'a self,
458        mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
459    ) -> Result<PooledConnection<M>, MutexGuard<'a, PoolInternals<M::Connection>>> {
460        loop {
461            if let Some(mut conn) = internals.conns.pop() {
462                establish_idle_connections(&self.0, &mut internals);
463                drop(internals);
464
465                if self.0.config.test_on_check_out {
466                    if let Err(e) = self.0.manager.is_valid(&mut conn.conn.conn) {
467                        let msg = e.to_string();
468                        self.0.config.error_handler.handle_error(e);
469                        // FIXME we shouldn't have to lock, unlock, and relock here
470                        internals = self.0.internals.lock();
471                        internals.last_error = Some(msg);
472                        drop_conns(&self.0, internals, vec![conn.conn]);
473                        internals = self.0.internals.lock();
474                        continue;
475                    }
476                }
477
478                return Ok(PooledConnection {
479                    pool: self.clone(),
480                    checkout: Instant::now(),
481                    conn: Some(conn.conn),
482                });
483            } else {
484                return Err(internals);
485            }
486        }
487    }
488
489    fn put_back(&self, checkout: Instant, mut conn: Conn<M::Connection>) {
490        let event = CheckinEvent {
491            id: conn.id,
492            duration: checkout.elapsed(),
493        };
494        self.0.config.event_handler.handle_checkin(event);
495
496        // This is specified to be fast, but call it before locking anyways
497        let broken = self.0.manager.has_broken(&mut conn.conn);
498
499        let mut internals = self.0.internals.lock();
500        if broken {
501            drop_conns(&self.0, internals, vec![conn]);
502        } else {
503            let conn = IdleConn {
504                conn,
505                idle_start: Instant::now(),
506            };
507            internals.conns.push(conn);
508            self.0.cond.notify_one();
509        }
510    }
511
512    /// Returns information about the current state of the pool.
513    pub fn state(&self) -> State {
514        let internals = self.0.internals.lock();
515        State {
516            connections: internals.num_conns,
517            idle_connections: internals.conns.len() as u32,
518            _p: (),
519        }
520    }
521
522    /// Returns the configured maximum pool size.
523    pub fn max_size(&self) -> u32 {
524        self.0.config.max_size
525    }
526
527    /// Returns the configured mimimum idle connection count.
528    pub fn min_idle(&self) -> Option<u32> {
529        self.0.config.min_idle
530    }
531
532    /// Returns if the pool is configured to test connections on check out.
533    pub fn test_on_check_out(&self) -> bool {
534        self.0.config.test_on_check_out
535    }
536
537    /// Returns the configured maximum connection lifetime.
538    pub fn max_lifetime(&self) -> Option<Duration> {
539        self.0.config.max_lifetime
540    }
541
542    /// Returns the configured idle connection timeout.
543    pub fn idle_timeout(&self) -> Option<Duration> {
544        self.0.config.idle_timeout
545    }
546
547    /// Returns the configured connection timeout.
548    pub fn connection_timeout(&self) -> Duration {
549        self.0.config.connection_timeout
550    }
551}
552
553/// The error type returned by methods in this crate.
554#[derive(Debug)]
555pub struct Error(Option<String>);
556
557impl fmt::Display for Error {
558    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
559        fmt.write_str(error::Error::description(self))?;
560        if let Some(ref err) = self.0 {
561            write!(fmt, ": {}", err)?;
562        }
563        Ok(())
564    }
565}
566
567impl error::Error for Error {
568    fn description(&self) -> &str {
569        "timed out waiting for connection"
570    }
571}
572
573/// Information about the state of a `Pool`.
574pub struct State {
575    /// The number of connections currently being managed by the pool.
576    pub connections: u32,
577    /// The number of idle connections.
578    pub idle_connections: u32,
579    _p: (),
580}
581
582impl fmt::Debug for State {
583    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
584        fmt.debug_struct("State")
585            .field("connections", &self.connections)
586            .field("idle_connections", &self.idle_connections)
587            .finish()
588    }
589}
590
591/// A smart pointer wrapping a connection.
592pub struct PooledConnection<M>
593where
594    M: ManageConnection,
595{
596    pool: Pool<M>,
597    checkout: Instant,
598    conn: Option<Conn<M::Connection>>,
599}
600
601impl<M> fmt::Debug for PooledConnection<M>
602where
603    M: ManageConnection,
604    M::Connection: fmt::Debug,
605{
606    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
607        fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
608    }
609}
610
611impl<M> Drop for PooledConnection<M>
612where
613    M: ManageConnection,
614{
615    fn drop(&mut self) {
616        self.pool.put_back(self.checkout, self.conn.take().unwrap());
617    }
618}
619
620impl<M> Deref for PooledConnection<M>
621where
622    M: ManageConnection,
623{
624    type Target = M::Connection;
625
626    fn deref(&self) -> &M::Connection {
627        &self.conn.as_ref().unwrap().conn
628    }
629}
630
631impl<M> DerefMut for PooledConnection<M>
632where
633    M: ManageConnection,
634{
635    fn deref_mut(&mut self) -> &mut M::Connection {
636        &mut self.conn.as_mut().unwrap().conn
637    }
638}
639
640impl<M> PooledConnection<M>
641where
642    M: ManageConnection,
643{
644    /// Returns a shared reference to the extensions associated with this connection.
645    pub fn extensions(this: &Self) -> &Extensions {
646        &this.conn.as_ref().unwrap().extensions
647    }
648
649    /// Returns a mutable reference to the extensions associated with this connection.
650    pub fn extensions_mut(this: &mut Self) -> &mut Extensions {
651        &mut this.conn.as_mut().unwrap().extensions
652    }
653}