parking_lot/
condvar.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12    fmt, ptr,
13    sync::atomic::{AtomicPtr, Ordering},
14};
15use lock_api::RawMutex as RawMutex_;
16use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17use std::ops::DerefMut;
18use std::time::{Duration, Instant};
19
20/// A type indicating whether a timed wait on a condition variable returned
21/// due to a time out or not.
22#[derive(Debug, PartialEq, Eq, Copy, Clone)]
23pub struct WaitTimeoutResult(bool);
24
25impl WaitTimeoutResult {
26    /// Returns whether the wait was known to have timed out.
27    #[inline]
28    pub fn timed_out(self) -> bool {
29        self.0
30    }
31}
32
33/// A Condition Variable
34///
35/// Condition variables represent the ability to block a thread such that it
36/// consumes no CPU time while waiting for an event to occur. Condition
37/// variables are typically associated with a boolean predicate (a condition)
38/// and a mutex. The predicate is always verified inside of the mutex before
39/// determining that thread must block.
40///
41/// Note that this module places one additional restriction over the system
42/// condition variables: each condvar can be used with only one mutex at a
43/// time. Any attempt to use multiple mutexes on the same condition variable
44/// simultaneously will result in a runtime panic. However it is possible to
45/// switch to a different mutex if there are no threads currently waiting on
46/// the condition variable.
47///
48/// # Differences from the standard library `Condvar`
49///
50/// - No spurious wakeups: A wait will only return a non-timeout result if it
51///   was woken up by `notify_one` or `notify_all`.
52/// - `Condvar::notify_all` will only wake up a single thread, the rest are
53///   requeued to wait for the `Mutex` to be unlocked by the thread that was
54///   woken up.
55/// - Only requires 1 word of space, whereas the standard library boxes the
56///   `Condvar` due to platform limitations.
57/// - Can be statically constructed.
58/// - Does not require any drop glue when dropped.
59/// - Inline fast path for the uncontended case.
60///
61/// # Examples
62///
63/// ```
64/// use parking_lot::{Mutex, Condvar};
65/// use std::sync::Arc;
66/// use std::thread;
67///
68/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69/// let pair2 = pair.clone();
70///
71/// // Inside of our lock, spawn a new thread, and then wait for it to start
72/// thread::spawn(move|| {
73///     let &(ref lock, ref cvar) = &*pair2;
74///     let mut started = lock.lock();
75///     *started = true;
76///     cvar.notify_one();
77/// });
78///
79/// // wait for the thread to start up
80/// let &(ref lock, ref cvar) = &*pair;
81/// let mut started = lock.lock();
82/// if !*started {
83///     cvar.wait(&mut started);
84/// }
85/// // Note that we used an if instead of a while loop above. This is only
86/// // possible because parking_lot's Condvar will never spuriously wake up.
87/// // This means that wait() will only return after notify_one or notify_all is
88/// // called.
89/// ```
90pub struct Condvar {
91    state: AtomicPtr<RawMutex>,
92}
93
94impl Condvar {
95    /// Creates a new condition variable which is ready to be waited on and
96    /// notified.
97    #[inline]
98    pub const fn new() -> Condvar {
99        Condvar {
100            state: AtomicPtr::new(ptr::null_mut()),
101        }
102    }
103
104    /// Wakes up one blocked thread on this condvar.
105    ///
106    /// Returns whether a thread was woken up.
107    ///
108    /// If there is a blocked thread on this condition variable, then it will
109    /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110    /// `notify_one` are not buffered in any way.
111    ///
112    /// To wake up all threads, see `notify_all()`.
113    ///
114    /// # Examples
115    ///
116    /// ```
117    /// use parking_lot::Condvar;
118    ///
119    /// let condvar = Condvar::new();
120    ///
121    /// // do something with condvar, share it with other threads
122    ///
123    /// if !condvar.notify_one() {
124    ///     println!("Nobody was listening for this.");
125    /// }
126    /// ```
127    #[inline]
128    pub fn notify_one(&self) -> bool {
129        // Nothing to do if there are no waiting threads
130        let state = self.state.load(Ordering::Relaxed);
131        if state.is_null() {
132            return false;
133        }
134
135        self.notify_one_slow(state)
136    }
137
138    #[cold]
139    fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140        // Unpark one thread and requeue the rest onto the mutex
141        let from = self as *const _ as usize;
142        let to = mutex as usize;
143        let validate = || {
144            // Make sure that our atomic state still points to the same
145            // mutex. If not then it means that all threads on the current
146            // mutex were woken up and a new waiting thread switched to a
147            // different mutex. In that case we can get away with doing
148            // nothing.
149            if self.state.load(Ordering::Relaxed) != mutex {
150                return RequeueOp::Abort;
151            }
152
153            // Unpark one thread if the mutex is unlocked, otherwise just
154            // requeue everything to the mutex. This is safe to do here
155            // since unlocking the mutex when the parked bit is set requires
156            // locking the queue. There is the possibility of a race if the
157            // mutex gets locked after we check, but that doesn't matter in
158            // this case.
159            if unsafe { (*mutex).mark_parked_if_locked() } {
160                RequeueOp::RequeueOne
161            } else {
162                RequeueOp::UnparkOne
163            }
164        };
165        let callback = |_op, result: UnparkResult| {
166            // Clear our state if there are no more waiting threads
167            if !result.have_more_threads {
168                self.state.store(ptr::null_mut(), Ordering::Relaxed);
169            }
170            TOKEN_NORMAL
171        };
172        let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
173
174        res.unparked_threads + res.requeued_threads != 0
175    }
176
177    /// Wakes up all blocked threads on this condvar.
178    ///
179    /// Returns the number of threads woken up.
180    ///
181    /// This method will ensure that any current waiters on the condition
182    /// variable are awoken. Calls to `notify_all()` are not buffered in any
183    /// way.
184    ///
185    /// To wake up only one thread, see `notify_one()`.
186    #[inline]
187    pub fn notify_all(&self) -> usize {
188        // Nothing to do if there are no waiting threads
189        let state = self.state.load(Ordering::Relaxed);
190        if state.is_null() {
191            return 0;
192        }
193
194        self.notify_all_slow(state)
195    }
196
197    #[cold]
198    fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
199        // Unpark one thread and requeue the rest onto the mutex
200        let from = self as *const _ as usize;
201        let to = mutex as usize;
202        let validate = || {
203            // Make sure that our atomic state still points to the same
204            // mutex. If not then it means that all threads on the current
205            // mutex were woken up and a new waiting thread switched to a
206            // different mutex. In that case we can get away with doing
207            // nothing.
208            if self.state.load(Ordering::Relaxed) != mutex {
209                return RequeueOp::Abort;
210            }
211
212            // Clear our state since we are going to unpark or requeue all
213            // threads.
214            self.state.store(ptr::null_mut(), Ordering::Relaxed);
215
216            // Unpark one thread if the mutex is unlocked, otherwise just
217            // requeue everything to the mutex. This is safe to do here
218            // since unlocking the mutex when the parked bit is set requires
219            // locking the queue. There is the possibility of a race if the
220            // mutex gets locked after we check, but that doesn't matter in
221            // this case.
222            if unsafe { (*mutex).mark_parked_if_locked() } {
223                RequeueOp::RequeueAll
224            } else {
225                RequeueOp::UnparkOneRequeueRest
226            }
227        };
228        let callback = |op, result: UnparkResult| {
229            // If we requeued threads to the mutex, mark it as having
230            // parked threads. The RequeueAll case is already handled above.
231            if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232                unsafe { (*mutex).mark_parked() };
233            }
234            TOKEN_NORMAL
235        };
236        let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237
238        res.unparked_threads + res.requeued_threads
239    }
240
241    /// Blocks the current thread until this condition variable receives a
242    /// notification.
243    ///
244    /// This function will atomically unlock the mutex specified (represented by
245    /// `mutex_guard`) and block the current thread. This means that any calls
246    /// to `notify_*()` which happen logically after the mutex is unlocked are
247    /// candidates to wake this thread up. When this function call returns, the
248    /// lock specified will have been re-acquired.
249    ///
250    /// # Panics
251    ///
252    /// This function will panic if another thread is waiting on the `Condvar`
253    /// with a different `Mutex` object.
254    #[inline]
255    pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
256        self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
257    }
258
259    /// Waits on this condition variable for a notification, timing out after
260    /// the specified time instant.
261    ///
262    /// The semantics of this function are equivalent to `wait()` except that
263    /// the thread will be blocked roughly until `timeout` is reached. This
264    /// method should not be used for precise timing due to anomalies such as
265    /// preemption or platform differences that may not cause the maximum
266    /// amount of time waited to be precisely `timeout`.
267    ///
268    /// Note that the best effort is made to ensure that the time waited is
269    /// measured with a monotonic clock, and not affected by the changes made to
270    /// the system time.
271    ///
272    /// The returned `WaitTimeoutResult` value indicates if the timeout is
273    /// known to have elapsed.
274    ///
275    /// Like `wait`, the lock specified will be re-acquired when this function
276    /// returns, regardless of whether the timeout elapsed or not.
277    ///
278    /// # Panics
279    ///
280    /// This function will panic if another thread is waiting on the `Condvar`
281    /// with a different `Mutex` object.
282    #[inline]
283    pub fn wait_until<T: ?Sized>(
284        &self,
285        mutex_guard: &mut MutexGuard<'_, T>,
286        timeout: Instant,
287    ) -> WaitTimeoutResult {
288        self.wait_until_internal(
289            unsafe { MutexGuard::mutex(mutex_guard).raw() },
290            Some(timeout),
291        )
292    }
293
294    // This is a non-generic function to reduce the monomorphization cost of
295    // using `wait_until`.
296    fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
297        let result;
298        let mut bad_mutex = false;
299        let mut requeued = false;
300        {
301            let addr = self as *const _ as usize;
302            let lock_addr = mutex as *const _ as *mut _;
303            let validate = || {
304                // Ensure we don't use two different mutexes with the same
305                // Condvar at the same time. This is done while locked to
306                // avoid races with notify_one
307                let state = self.state.load(Ordering::Relaxed);
308                if state.is_null() {
309                    self.state.store(lock_addr, Ordering::Relaxed);
310                } else if state != lock_addr {
311                    bad_mutex = true;
312                    return false;
313                }
314                true
315            };
316            let before_sleep = || {
317                // Unlock the mutex before sleeping...
318                unsafe { mutex.unlock() };
319            };
320            let timed_out = |k, was_last_thread| {
321                // If we were requeued to a mutex, then we did not time out.
322                // We'll just park ourselves on the mutex again when we try
323                // to lock it later.
324                requeued = k != addr;
325
326                // If we were the last thread on the queue then we need to
327                // clear our state. This is normally done by the
328                // notify_{one,all} functions when not timing out.
329                if !requeued && was_last_thread {
330                    self.state.store(ptr::null_mut(), Ordering::Relaxed);
331                }
332            };
333            result = unsafe {
334                parking_lot_core::park(
335                    addr,
336                    validate,
337                    before_sleep,
338                    timed_out,
339                    DEFAULT_PARK_TOKEN,
340                    timeout,
341                )
342            };
343        }
344
345        // Panic if we tried to use multiple mutexes with a Condvar. Note
346        // that at this point the MutexGuard is still locked. It will be
347        // unlocked by the unwinding logic.
348        if bad_mutex {
349            panic!("attempted to use a condition variable with more than one mutex");
350        }
351
352        // ... and re-lock it once we are done sleeping
353        if result == ParkResult::Unparked(TOKEN_HANDOFF) {
354            unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
355        } else {
356            mutex.lock();
357        }
358
359        WaitTimeoutResult(!(result.is_unparked() || requeued))
360    }
361
362    /// Waits on this condition variable for a notification, timing out after a
363    /// specified duration.
364    ///
365    /// The semantics of this function are equivalent to `wait()` except that
366    /// the thread will be blocked for roughly no longer than `timeout`. This
367    /// method should not be used for precise timing due to anomalies such as
368    /// preemption or platform differences that may not cause the maximum
369    /// amount of time waited to be precisely `timeout`.
370    ///
371    /// Note that the best effort is made to ensure that the time waited is
372    /// measured with a monotonic clock, and not affected by the changes made to
373    /// the system time.
374    ///
375    /// The returned `WaitTimeoutResult` value indicates if the timeout is
376    /// known to have elapsed.
377    ///
378    /// Like `wait`, the lock specified will be re-acquired when this function
379    /// returns, regardless of whether the timeout elapsed or not.
380    #[inline]
381    pub fn wait_for<T: ?Sized>(
382        &self,
383        mutex_guard: &mut MutexGuard<'_, T>,
384        timeout: Duration,
385    ) -> WaitTimeoutResult {
386        let deadline = util::to_deadline(timeout);
387        self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
388    }
389
390    #[inline]
391    fn wait_while_until_internal<T, F>(
392        &self,
393        mutex_guard: &mut MutexGuard<'_, T>,
394        mut condition: F,
395        timeout: Option<Instant>,
396    ) -> WaitTimeoutResult
397    where
398        T: ?Sized,
399        F: FnMut(&mut T) -> bool,
400    {
401        let mut result = WaitTimeoutResult(false);
402
403        while !result.timed_out() && condition(mutex_guard.deref_mut()) {
404            result =
405                self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
406        }
407
408        result
409    }
410    /// Blocks the current thread until this condition variable receives a
411    /// notification. If the provided condition evaluates to `false`, then the
412    /// thread is no longer blocked and the operation is completed. If the
413    /// condition evaluates to `true`, then the thread is blocked again and
414    /// waits for another notification before repeating this process.
415    ///
416    /// This function will atomically unlock the mutex specified (represented by
417    /// `mutex_guard`) and block the current thread. This means that any calls
418    /// to `notify_*()` which happen logically after the mutex is unlocked are
419    /// candidates to wake this thread up. When this function call returns, the
420    /// lock specified will have been re-acquired.
421    ///
422    /// # Panics
423    ///
424    /// This function will panic if another thread is waiting on the `Condvar`
425    /// with a different `Mutex` object.
426    #[inline]
427    pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F)
428    where
429        T: ?Sized,
430        F: FnMut(&mut T) -> bool,
431    {
432        self.wait_while_until_internal(mutex_guard, condition, None);
433    }
434
435    /// Waits on this condition variable for a notification, timing out after
436    /// the specified time instant. If the provided condition evaluates to
437    /// `false`, then the thread is no longer blocked and the operation is
438    /// completed. If the condition evaluates to `true`, then the thread is
439    /// blocked again and waits for another notification before repeating
440    /// this process.
441    ///
442    /// The semantics of this function are equivalent to `wait()` except that
443    /// the thread will be blocked roughly until `timeout` is reached. This
444    /// method should not be used for precise timing due to anomalies such as
445    /// preemption or platform differences that may not cause the maximum
446    /// amount of time waited to be precisely `timeout`.
447    ///
448    /// Note that the best effort is made to ensure that the time waited is
449    /// measured with a monotonic clock, and not affected by the changes made to
450    /// the system time.
451    ///
452    /// The returned `WaitTimeoutResult` value indicates if the timeout is
453    /// known to have elapsed.
454    ///
455    /// Like `wait`, the lock specified will be re-acquired when this function
456    /// returns, regardless of whether the timeout elapsed or not.
457    ///
458    /// # Panics
459    ///
460    /// This function will panic if another thread is waiting on the `Condvar`
461    /// with a different `Mutex` object.
462    #[inline]
463    pub fn wait_while_until<T, F>(
464        &self,
465        mutex_guard: &mut MutexGuard<'_, T>,
466        condition: F,
467        timeout: Instant,
468    ) -> WaitTimeoutResult
469    where
470        T: ?Sized,
471        F: FnMut(&mut T) -> bool,
472    {
473        self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
474    }
475
476    /// Waits on this condition variable for a notification, timing out after a
477    /// specified duration. If the provided condition evaluates to `false`,
478    /// then the thread is no longer blocked and the operation is completed.
479    /// If the condition evaluates to `true`, then the thread is blocked again
480    /// and waits for another notification before repeating this process.
481    ///
482    /// The semantics of this function are equivalent to `wait()` except that
483    /// the thread will be blocked for roughly no longer than `timeout`. This
484    /// method should not be used for precise timing due to anomalies such as
485    /// preemption or platform differences that may not cause the maximum
486    /// amount of time waited to be precisely `timeout`.
487    ///
488    /// Note that the best effort is made to ensure that the time waited is
489    /// measured with a monotonic clock, and not affected by the changes made to
490    /// the system time.
491    ///
492    /// The returned `WaitTimeoutResult` value indicates if the timeout is
493    /// known to have elapsed.
494    ///
495    /// Like `wait`, the lock specified will be re-acquired when this function
496    /// returns, regardless of whether the timeout elapsed or not.
497    #[inline]
498    pub fn wait_while_for<T: ?Sized, F>(
499        &self,
500        mutex_guard: &mut MutexGuard<'_, T>,
501        condition: F,
502        timeout: Duration,
503    ) -> WaitTimeoutResult
504    where
505        F: FnMut(&mut T) -> bool,
506    {
507        let deadline = util::to_deadline(timeout);
508        self.wait_while_until_internal(mutex_guard, condition, deadline)
509    }
510}
511
512impl Default for Condvar {
513    #[inline]
514    fn default() -> Condvar {
515        Condvar::new()
516    }
517}
518
519impl fmt::Debug for Condvar {
520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521        f.pad("Condvar { .. }")
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use crate::{Condvar, Mutex, MutexGuard};
528    use std::sync::mpsc::channel;
529    use std::sync::Arc;
530    use std::thread;
531    use std::thread::sleep;
532    use std::thread::JoinHandle;
533    use std::time::Duration;
534    use std::time::Instant;
535
536    #[test]
537    fn smoke() {
538        let c = Condvar::new();
539        c.notify_one();
540        c.notify_all();
541    }
542
543    #[test]
544    fn notify_one() {
545        let m = Arc::new(Mutex::new(()));
546        let m2 = m.clone();
547        let c = Arc::new(Condvar::new());
548        let c2 = c.clone();
549
550        let mut g = m.lock();
551        let _t = thread::spawn(move || {
552            let _g = m2.lock();
553            c2.notify_one();
554        });
555        c.wait(&mut g);
556    }
557
558    #[test]
559    fn notify_all() {
560        const N: usize = 10;
561
562        let data = Arc::new((Mutex::new(0), Condvar::new()));
563        let (tx, rx) = channel();
564        for _ in 0..N {
565            let data = data.clone();
566            let tx = tx.clone();
567            thread::spawn(move || {
568                let (lock, cond) = &*data;
569                let mut cnt = lock.lock();
570                *cnt += 1;
571                if *cnt == N {
572                    tx.send(()).unwrap();
573                }
574                while *cnt != 0 {
575                    cond.wait(&mut cnt);
576                }
577                tx.send(()).unwrap();
578            });
579        }
580        drop(tx);
581
582        let (lock, cond) = &*data;
583        rx.recv().unwrap();
584        let mut cnt = lock.lock();
585        *cnt = 0;
586        cond.notify_all();
587        drop(cnt);
588
589        for _ in 0..N {
590            rx.recv().unwrap();
591        }
592    }
593
594    #[test]
595    fn notify_one_return_true() {
596        let m = Arc::new(Mutex::new(()));
597        let m2 = m.clone();
598        let c = Arc::new(Condvar::new());
599        let c2 = c.clone();
600
601        let mut g = m.lock();
602        let _t = thread::spawn(move || {
603            let _g = m2.lock();
604            assert!(c2.notify_one());
605        });
606        c.wait(&mut g);
607    }
608
609    #[test]
610    fn notify_one_return_false() {
611        let m = Arc::new(Mutex::new(()));
612        let c = Arc::new(Condvar::new());
613
614        let _t = thread::spawn(move || {
615            let _g = m.lock();
616            assert!(!c.notify_one());
617        });
618    }
619
620    #[test]
621    fn notify_all_return() {
622        const N: usize = 10;
623
624        let data = Arc::new((Mutex::new(0), Condvar::new()));
625        let (tx, rx) = channel();
626        for _ in 0..N {
627            let data = data.clone();
628            let tx = tx.clone();
629            thread::spawn(move || {
630                let (lock, cond) = &*data;
631                let mut cnt = lock.lock();
632                *cnt += 1;
633                if *cnt == N {
634                    tx.send(()).unwrap();
635                }
636                while *cnt != 0 {
637                    cond.wait(&mut cnt);
638                }
639                tx.send(()).unwrap();
640            });
641        }
642        drop(tx);
643
644        let (lock, cond) = &*data;
645        rx.recv().unwrap();
646        let mut cnt = lock.lock();
647        *cnt = 0;
648        assert_eq!(cond.notify_all(), N);
649        drop(cnt);
650
651        for _ in 0..N {
652            rx.recv().unwrap();
653        }
654
655        assert_eq!(cond.notify_all(), 0);
656    }
657
658    #[test]
659    fn wait_for() {
660        let m = Arc::new(Mutex::new(()));
661        let m2 = m.clone();
662        let c = Arc::new(Condvar::new());
663        let c2 = c.clone();
664
665        let mut g = m.lock();
666        let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
667        assert!(no_timeout.timed_out());
668
669        let _t = thread::spawn(move || {
670            let _g = m2.lock();
671            c2.notify_one();
672        });
673        let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
674        assert!(!timeout_res.timed_out());
675
676        drop(g);
677    }
678
679    #[test]
680    fn wait_until() {
681        let m = Arc::new(Mutex::new(()));
682        let m2 = m.clone();
683        let c = Arc::new(Condvar::new());
684        let c2 = c.clone();
685
686        let mut g = m.lock();
687        let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
688        assert!(no_timeout.timed_out());
689        let _t = thread::spawn(move || {
690            let _g = m2.lock();
691            c2.notify_one();
692        });
693        let timeout_res = c.wait_until(
694            &mut g,
695            Instant::now() + Duration::from_millis(u32::max_value() as u64),
696        );
697        assert!(!timeout_res.timed_out());
698        drop(g);
699    }
700
701    fn spawn_wait_while_notifier(
702        mutex: Arc<Mutex<u32>>,
703        cv: Arc<Condvar>,
704        num_iters: u32,
705        timeout: Option<Instant>,
706    ) -> JoinHandle<()> {
707        thread::spawn(move || {
708            for epoch in 1..=num_iters {
709                // spin to wait for main test thread to block
710                // before notifying it to wake back up and check
711                // its condition.
712                let mut sleep_backoff = Duration::from_millis(1);
713                let _mutex_guard = loop {
714                    let mutex_guard = mutex.lock();
715
716                    if let Some(timeout) = timeout {
717                        if Instant::now() >= timeout {
718                            return;
719                        }
720                    }
721
722                    if *mutex_guard == epoch {
723                        break mutex_guard;
724                    }
725
726                    drop(mutex_guard);
727
728                    // give main test thread a good chance to
729                    // acquire the lock before this thread does.
730                    sleep(sleep_backoff);
731                    sleep_backoff *= 2;
732                };
733
734                cv.notify_one();
735            }
736        })
737    }
738
739    #[test]
740    fn wait_while_until_internal_does_not_wait_if_initially_false() {
741        let mutex = Arc::new(Mutex::new(0));
742        let cv = Arc::new(Condvar::new());
743
744        let condition = |counter: &mut u32| {
745            *counter += 1;
746            false
747        };
748
749        let mut mutex_guard = mutex.lock();
750        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
751
752        assert!(!timeout_result.timed_out());
753        assert!(*mutex_guard == 1);
754    }
755
756    #[test]
757    fn wait_while_until_internal_times_out_before_false() {
758        let mutex = Arc::new(Mutex::new(0));
759        let cv = Arc::new(Condvar::new());
760
761        let num_iters = 3;
762        let condition = |counter: &mut u32| {
763            *counter += 1;
764            true
765        };
766
767        let mut mutex_guard = mutex.lock();
768        let timeout = Some(Instant::now() + Duration::from_millis(500));
769        let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout);
770
771        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);
772
773        assert!(timeout_result.timed_out());
774        assert!(*mutex_guard == num_iters + 1);
775
776        // prevent deadlock with notifier
777        drop(mutex_guard);
778        handle.join().unwrap();
779    }
780
781    #[test]
782    fn wait_while_until_internal() {
783        let mutex = Arc::new(Mutex::new(0));
784        let cv = Arc::new(Condvar::new());
785
786        let num_iters = 4;
787
788        let condition = |counter: &mut u32| {
789            *counter += 1;
790            *counter <= num_iters
791        };
792
793        let mut mutex_guard = mutex.lock();
794        let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);
795
796        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
797
798        assert!(!timeout_result.timed_out());
799        assert!(*mutex_guard == num_iters + 1);
800
801        let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
802        handle.join().unwrap();
803
804        assert!(!timeout_result.timed_out());
805        assert!(*mutex_guard == num_iters + 2);
806    }
807
808    #[test]
809    #[should_panic]
810    fn two_mutexes() {
811        let m = Arc::new(Mutex::new(()));
812        let m2 = m.clone();
813        let m3 = Arc::new(Mutex::new(()));
814        let c = Arc::new(Condvar::new());
815        let c2 = c.clone();
816
817        // Make sure we don't leave the child thread dangling
818        struct PanicGuard<'a>(&'a Condvar);
819        impl<'a> Drop for PanicGuard<'a> {
820            fn drop(&mut self) {
821                self.0.notify_one();
822            }
823        }
824
825        let (tx, rx) = channel();
826        let g = m.lock();
827        let _t = thread::spawn(move || {
828            let mut g = m2.lock();
829            tx.send(()).unwrap();
830            c2.wait(&mut g);
831        });
832        drop(g);
833        rx.recv().unwrap();
834        let _g = m.lock();
835        let _guard = PanicGuard(&c);
836        c.wait(&mut m3.lock());
837    }
838
839    #[test]
840    fn two_mutexes_disjoint() {
841        let m = Arc::new(Mutex::new(()));
842        let m2 = m.clone();
843        let m3 = Arc::new(Mutex::new(()));
844        let c = Arc::new(Condvar::new());
845        let c2 = c.clone();
846
847        let mut g = m.lock();
848        let _t = thread::spawn(move || {
849            let _g = m2.lock();
850            c2.notify_one();
851        });
852        c.wait(&mut g);
853        drop(g);
854
855        let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
856    }
857
858    #[test]
859    fn test_debug_condvar() {
860        let c = Condvar::new();
861        assert_eq!(format!("{:?}", c), "Condvar { .. }");
862    }
863
864    #[test]
865    fn test_condvar_requeue() {
866        let m = Arc::new(Mutex::new(()));
867        let m2 = m.clone();
868        let c = Arc::new(Condvar::new());
869        let c2 = c.clone();
870        let t = thread::spawn(move || {
871            let mut g = m2.lock();
872            c2.wait(&mut g);
873        });
874
875        let mut g = m.lock();
876        while !c.notify_one() {
877            // Wait for the thread to get into wait()
878            MutexGuard::bump(&mut g);
879            // Yield, so the other thread gets a chance to do something.
880            // (At least Miri needs this, because it doesn't preempt threads.)
881            thread::yield_now();
882        }
883        // The thread should have been requeued to the mutex, which we wake up now.
884        drop(g);
885        t.join().unwrap();
886    }
887
888    #[test]
889    fn test_issue_129() {
890        let locks = Arc::new((Mutex::new(()), Condvar::new()));
891
892        let (tx, rx) = channel();
893        for _ in 0..4 {
894            let locks = locks.clone();
895            let tx = tx.clone();
896            thread::spawn(move || {
897                let mut guard = locks.0.lock();
898                locks.1.wait(&mut guard);
899                locks.1.wait_for(&mut guard, Duration::from_millis(1));
900                locks.1.notify_one();
901                tx.send(()).unwrap();
902            });
903        }
904
905        thread::sleep(Duration::from_millis(100));
906        locks.1.notify_one();
907
908        for _ in 0..4 {
909            assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
910        }
911    }
912}
913
914/// This module contains an integration test that is heavily inspired from WebKit's own integration
915/// tests for it's own Condvar.
916#[cfg(test)]
917mod webkit_queue_test {
918    use crate::{Condvar, Mutex, MutexGuard};
919    use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
920
921    #[derive(Clone, Copy)]
922    enum Timeout {
923        Bounded(Duration),
924        Forever,
925    }
926
927    #[derive(Clone, Copy)]
928    enum NotifyStyle {
929        One,
930        All,
931    }
932
933    struct Queue {
934        items: VecDeque<usize>,
935        should_continue: bool,
936    }
937
938    impl Queue {
939        fn new() -> Self {
940            Self {
941                items: VecDeque::new(),
942                should_continue: true,
943            }
944        }
945    }
946
947    fn wait<T: ?Sized>(
948        condition: &Condvar,
949        lock: &mut MutexGuard<'_, T>,
950        predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
951        timeout: &Timeout,
952    ) {
953        while !predicate(lock) {
954            match timeout {
955                Timeout::Forever => condition.wait(lock),
956                Timeout::Bounded(bound) => {
957                    condition.wait_for(lock, *bound);
958                }
959            }
960        }
961    }
962
963    fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
964        match style {
965            NotifyStyle::One => {
966                condition.notify_one();
967            }
968            NotifyStyle::All => {
969                if should_notify {
970                    condition.notify_all();
971                }
972            }
973        }
974    }
975
976    fn run_queue_test(
977        num_producers: usize,
978        num_consumers: usize,
979        max_queue_size: usize,
980        messages_per_producer: usize,
981        notify_style: NotifyStyle,
982        timeout: Timeout,
983        delay: Duration,
984    ) {
985        let input_queue = Arc::new(Mutex::new(Queue::new()));
986        let empty_condition = Arc::new(Condvar::new());
987        let full_condition = Arc::new(Condvar::new());
988
989        let output_vec = Arc::new(Mutex::new(vec![]));
990
991        let consumers = (0..num_consumers)
992            .map(|_| {
993                consumer_thread(
994                    input_queue.clone(),
995                    empty_condition.clone(),
996                    full_condition.clone(),
997                    timeout,
998                    notify_style,
999                    output_vec.clone(),
1000                    max_queue_size,
1001                )
1002            })
1003            .collect::<Vec<_>>();
1004        let producers = (0..num_producers)
1005            .map(|_| {
1006                producer_thread(
1007                    messages_per_producer,
1008                    input_queue.clone(),
1009                    empty_condition.clone(),
1010                    full_condition.clone(),
1011                    timeout,
1012                    notify_style,
1013                    max_queue_size,
1014                )
1015            })
1016            .collect::<Vec<_>>();
1017
1018        thread::sleep(delay);
1019
1020        for producer in producers.into_iter() {
1021            producer.join().expect("Producer thread panicked");
1022        }
1023
1024        {
1025            let mut input_queue = input_queue.lock();
1026            input_queue.should_continue = false;
1027        }
1028        empty_condition.notify_all();
1029
1030        for consumer in consumers.into_iter() {
1031            consumer.join().expect("Consumer thread panicked");
1032        }
1033
1034        let mut output_vec = output_vec.lock();
1035        assert_eq!(output_vec.len(), num_producers * messages_per_producer);
1036        output_vec.sort();
1037        for msg_idx in 0..messages_per_producer {
1038            for producer_idx in 0..num_producers {
1039                assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
1040            }
1041        }
1042    }
1043
1044    fn consumer_thread(
1045        input_queue: Arc<Mutex<Queue>>,
1046        empty_condition: Arc<Condvar>,
1047        full_condition: Arc<Condvar>,
1048        timeout: Timeout,
1049        notify_style: NotifyStyle,
1050        output_queue: Arc<Mutex<Vec<usize>>>,
1051        max_queue_size: usize,
1052    ) -> thread::JoinHandle<()> {
1053        thread::spawn(move || loop {
1054            let (should_notify, result) = {
1055                let mut queue = input_queue.lock();
1056                wait(
1057                    &empty_condition,
1058                    &mut queue,
1059                    |state| -> bool { !state.items.is_empty() || !state.should_continue },
1060                    &timeout,
1061                );
1062                if queue.items.is_empty() && !queue.should_continue {
1063                    return;
1064                }
1065                let should_notify = queue.items.len() == max_queue_size;
1066                let result = queue.items.pop_front();
1067                std::mem::drop(queue);
1068                (should_notify, result)
1069            };
1070            notify(notify_style, &full_condition, should_notify);
1071
1072            if let Some(result) = result {
1073                output_queue.lock().push(result);
1074            }
1075        })
1076    }
1077
1078    fn producer_thread(
1079        num_messages: usize,
1080        queue: Arc<Mutex<Queue>>,
1081        empty_condition: Arc<Condvar>,
1082        full_condition: Arc<Condvar>,
1083        timeout: Timeout,
1084        notify_style: NotifyStyle,
1085        max_queue_size: usize,
1086    ) -> thread::JoinHandle<()> {
1087        thread::spawn(move || {
1088            for message in 0..num_messages {
1089                let should_notify = {
1090                    let mut queue = queue.lock();
1091                    wait(
1092                        &full_condition,
1093                        &mut queue,
1094                        |state| state.items.len() < max_queue_size,
1095                        &timeout,
1096                    );
1097                    let should_notify = queue.items.is_empty();
1098                    queue.items.push_back(message);
1099                    std::mem::drop(queue);
1100                    should_notify
1101                };
1102                notify(notify_style, &empty_condition, should_notify);
1103            }
1104        })
1105    }
1106
1107    macro_rules! run_queue_tests {
1108        ( $( $name:ident(
1109            num_producers: $num_producers:expr,
1110            num_consumers: $num_consumers:expr,
1111            max_queue_size: $max_queue_size:expr,
1112            messages_per_producer: $messages_per_producer:expr,
1113            notification_style: $notification_style:expr,
1114            timeout: $timeout:expr,
1115            delay_seconds: $delay_seconds:expr);
1116        )* ) => {
1117            $(#[test]
1118            fn $name() {
1119                let delay = Duration::from_secs($delay_seconds);
1120                run_queue_test(
1121                    $num_producers,
1122                    $num_consumers,
1123                    $max_queue_size,
1124                    $messages_per_producer,
1125                    $notification_style,
1126                    $timeout,
1127                    delay,
1128                    );
1129            })*
1130        };
1131    }
1132
1133    run_queue_tests! {
1134        sanity_check_queue(
1135            num_producers: 1,
1136            num_consumers: 1,
1137            max_queue_size: 1,
1138            messages_per_producer: 100_000,
1139            notification_style: NotifyStyle::All,
1140            timeout: Timeout::Bounded(Duration::from_secs(1)),
1141            delay_seconds: 0
1142        );
1143        sanity_check_queue_timeout(
1144            num_producers: 1,
1145            num_consumers: 1,
1146            max_queue_size: 1,
1147            messages_per_producer: 100_000,
1148            notification_style: NotifyStyle::All,
1149            timeout: Timeout::Forever,
1150            delay_seconds: 0
1151        );
1152        new_test_without_timeout_5(
1153            num_producers: 1,
1154            num_consumers: 5,
1155            max_queue_size: 1,
1156            messages_per_producer: 100_000,
1157            notification_style: NotifyStyle::All,
1158            timeout: Timeout::Forever,
1159            delay_seconds: 0
1160        );
1161        one_producer_one_consumer_one_slot(
1162            num_producers: 1,
1163            num_consumers: 1,
1164            max_queue_size: 1,
1165            messages_per_producer: 100_000,
1166            notification_style: NotifyStyle::All,
1167            timeout: Timeout::Forever,
1168            delay_seconds: 0
1169        );
1170        one_producer_one_consumer_one_slot_timeout(
1171            num_producers: 1,
1172            num_consumers: 1,
1173            max_queue_size: 1,
1174            messages_per_producer: 100_000,
1175            notification_style: NotifyStyle::All,
1176            timeout: Timeout::Forever,
1177            delay_seconds: 1
1178        );
1179        one_producer_one_consumer_hundred_slots(
1180            num_producers: 1,
1181            num_consumers: 1,
1182            max_queue_size: 100,
1183            messages_per_producer: 1_000_000,
1184            notification_style: NotifyStyle::All,
1185            timeout: Timeout::Forever,
1186            delay_seconds: 0
1187        );
1188        ten_producers_one_consumer_one_slot(
1189            num_producers: 10,
1190            num_consumers: 1,
1191            max_queue_size: 1,
1192            messages_per_producer: 10000,
1193            notification_style: NotifyStyle::All,
1194            timeout: Timeout::Forever,
1195            delay_seconds: 0
1196        );
1197        ten_producers_one_consumer_hundred_slots_notify_all(
1198            num_producers: 10,
1199            num_consumers: 1,
1200            max_queue_size: 100,
1201            messages_per_producer: 10000,
1202            notification_style: NotifyStyle::All,
1203            timeout: Timeout::Forever,
1204            delay_seconds: 0
1205        );
1206        ten_producers_one_consumer_hundred_slots_notify_one(
1207            num_producers: 10,
1208            num_consumers: 1,
1209            max_queue_size: 100,
1210            messages_per_producer: 10000,
1211            notification_style: NotifyStyle::One,
1212            timeout: Timeout::Forever,
1213            delay_seconds: 0
1214        );
1215        one_producer_ten_consumers_one_slot(
1216            num_producers: 1,
1217            num_consumers: 10,
1218            max_queue_size: 1,
1219            messages_per_producer: 10000,
1220            notification_style: NotifyStyle::All,
1221            timeout: Timeout::Forever,
1222            delay_seconds: 0
1223        );
1224        one_producer_ten_consumers_hundred_slots_notify_all(
1225            num_producers: 1,
1226            num_consumers: 10,
1227            max_queue_size: 100,
1228            messages_per_producer: 100_000,
1229            notification_style: NotifyStyle::All,
1230            timeout: Timeout::Forever,
1231            delay_seconds: 0
1232        );
1233        one_producer_ten_consumers_hundred_slots_notify_one(
1234            num_producers: 1,
1235            num_consumers: 10,
1236            max_queue_size: 100,
1237            messages_per_producer: 100_000,
1238            notification_style: NotifyStyle::One,
1239            timeout: Timeout::Forever,
1240            delay_seconds: 0
1241        );
1242        ten_producers_ten_consumers_one_slot(
1243            num_producers: 10,
1244            num_consumers: 10,
1245            max_queue_size: 1,
1246            messages_per_producer: 50000,
1247            notification_style: NotifyStyle::All,
1248            timeout: Timeout::Forever,
1249            delay_seconds: 0
1250        );
1251        ten_producers_ten_consumers_hundred_slots_notify_all(
1252            num_producers: 10,
1253            num_consumers: 10,
1254            max_queue_size: 100,
1255            messages_per_producer: 50000,
1256            notification_style: NotifyStyle::All,
1257            timeout: Timeout::Forever,
1258            delay_seconds: 0
1259        );
1260        ten_producers_ten_consumers_hundred_slots_notify_one(
1261            num_producers: 10,
1262            num_consumers: 10,
1263            max_queue_size: 100,
1264            messages_per_producer: 50000,
1265            notification_style: NotifyStyle::One,
1266            timeout: Timeout::Forever,
1267            delay_seconds: 0
1268        );
1269    }
1270}