1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
78use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12fmt, ptr,
13 sync::atomic::{AtomicPtr, Ordering},
14};
15use lock_api::RawMutexas RawMutex_;
16use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17use std::ops::DerefMut;
18use std::time::{Duration, Instant};
1920/// A type indicating whether a timed wait on a condition variable returned
21/// due to a time out or not.
22#[derive(#[automatically_derived]
impl ::core::fmt::Debug for WaitTimeoutResult {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_tuple_field1_finish(f,
"WaitTimeoutResult", &&self.0)
}
}Debug, #[automatically_derived]
impl ::core::cmp::PartialEq for WaitTimeoutResult {
#[inline]
fn eq(&self, other: &WaitTimeoutResult) -> bool { self.0 == other.0 }
}PartialEq, #[automatically_derived]
impl ::core::cmp::Eq for WaitTimeoutResult {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) -> () {
let _: ::core::cmp::AssertParamIsEq<bool>;
}
}Eq, #[automatically_derived]
impl ::core::marker::Copy for WaitTimeoutResult { }Copy, #[automatically_derived]
impl ::core::clone::Clone for WaitTimeoutResult {
#[inline]
fn clone(&self) -> WaitTimeoutResult {
let _: ::core::clone::AssertParamIsClone<bool>;
*self
}
}Clone)]
23pub struct WaitTimeoutResult(bool);
2425impl WaitTimeoutResult {
26/// Returns whether the wait was known to have timed out.
27#[inline]
28pub fn timed_out(self) -> bool {
29self.0
30}
31}
3233/// A Condition Variable
34///
35/// Condition variables represent the ability to block a thread such that it
36/// consumes no CPU time while waiting for an event to occur. Condition
37/// variables are typically associated with a boolean predicate (a condition)
38/// and a mutex. The predicate is always verified inside of the mutex before
39/// determining that thread must block.
40///
41/// Note that this module places one additional restriction over the system
42/// condition variables: each condvar can be used with only one mutex at a
43/// time. Any attempt to use multiple mutexes on the same condition variable
44/// simultaneously will result in a runtime panic. However it is possible to
45/// switch to a different mutex if there are no threads currently waiting on
46/// the condition variable.
47///
48/// # Differences from the standard library `Condvar`
49///
50/// - No spurious wakeups: A wait will only return a non-timeout result if it
51/// was woken up by `notify_one` or `notify_all`.
52/// - `Condvar::notify_all` will only wake up a single thread, the rest are
53/// requeued to wait for the `Mutex` to be unlocked by the thread that was
54/// woken up.
55/// - Only requires 1 word of space, whereas the standard library boxes the
56/// `Condvar` due to platform limitations.
57/// - Can be statically constructed.
58/// - Does not require any drop glue when dropped.
59/// - Inline fast path for the uncontended case.
60///
61/// # Examples
62///
63/// ```
64/// use parking_lot::{Mutex, Condvar};
65/// use std::sync::Arc;
66/// use std::thread;
67///
68/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
69/// let pair2 = pair.clone();
70///
71/// // Inside of our lock, spawn a new thread, and then wait for it to start
72/// thread::spawn(move|| {
73/// let &(ref lock, ref cvar) = &*pair2;
74/// let mut started = lock.lock();
75/// *started = true;
76/// cvar.notify_one();
77/// });
78///
79/// // wait for the thread to start up
80/// let &(ref lock, ref cvar) = &*pair;
81/// let mut started = lock.lock();
82/// if !*started {
83/// cvar.wait(&mut started);
84/// }
85/// // Note that we used an if instead of a while loop above. This is only
86/// // possible because parking_lot's Condvar will never spuriously wake up.
87/// // This means that wait() will only return after notify_one or notify_all is
88/// // called.
89/// ```
90pub struct Condvar {
91 state: AtomicPtr<RawMutex>,
92}
9394impl Condvar {
95/// Creates a new condition variable which is ready to be waited on and
96 /// notified.
97#[inline]
98pub const fn new() -> Condvar {
99Condvar {
100 state: AtomicPtr::new(ptr::null_mut()),
101 }
102 }
103104/// Wakes up one blocked thread on this condvar.
105 ///
106 /// Returns whether a thread was woken up.
107 ///
108 /// If there is a blocked thread on this condition variable, then it will
109 /// be woken up from its call to `wait` or `wait_timeout`. Calls to
110 /// `notify_one` are not buffered in any way.
111 ///
112 /// To wake up all threads, see `notify_all()`.
113 ///
114 /// # Examples
115 ///
116 /// ```
117 /// use parking_lot::Condvar;
118 ///
119 /// let condvar = Condvar::new();
120 ///
121 /// // do something with condvar, share it with other threads
122 ///
123 /// if !condvar.notify_one() {
124 /// println!("Nobody was listening for this.");
125 /// }
126 /// ```
127#[inline]
128pub fn notify_one(&self) -> bool {
129// Nothing to do if there are no waiting threads
130let state = self.state.load(Ordering::Relaxed);
131if state.is_null() {
132return false;
133 }
134135self.notify_one_slow(state)
136 }
137138#[cold]
139fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140// Unpark one thread and requeue the rest onto the mutex
141let from = selfas *const _ as usize;
142let to = mutexas usize;
143let validate = || {
144// Make sure that our atomic state still points to the same
145 // mutex. If not then it means that all threads on the current
146 // mutex were woken up and a new waiting thread switched to a
147 // different mutex. In that case we can get away with doing
148 // nothing.
149if self.state.load(Ordering::Relaxed) != mutex {
150return RequeueOp::Abort;
151 }
152153// Unpark one thread if the mutex is unlocked, otherwise just
154 // requeue everything to the mutex. This is safe to do here
155 // since unlocking the mutex when the parked bit is set requires
156 // locking the queue. There is the possibility of a race if the
157 // mutex gets locked after we check, but that doesn't matter in
158 // this case.
159if unsafe { (*mutex).mark_parked_if_locked() } {
160 RequeueOp::RequeueOne161 } else {
162 RequeueOp::UnparkOne163 }
164 };
165let callback = |_op, result: UnparkResult| {
166// Clear our state if there are no more waiting threads
167if !result.have_more_threads {
168self.state.store(ptr::null_mut(), Ordering::Relaxed);
169 }
170TOKEN_NORMAL171 };
172let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
173174res.unparked_threads + res.requeued_threads != 0
175}
176177/// Wakes up all blocked threads on this condvar.
178 ///
179 /// Returns the number of threads woken up.
180 ///
181 /// This method will ensure that any current waiters on the condition
182 /// variable are awoken. Calls to `notify_all()` are not buffered in any
183 /// way.
184 ///
185 /// To wake up only one thread, see `notify_one()`.
186#[inline]
187pub fn notify_all(&self) -> usize {
188// Nothing to do if there are no waiting threads
189let state = self.state.load(Ordering::Relaxed);
190if state.is_null() {
191return 0;
192 }
193194self.notify_all_slow(state)
195 }
196197#[cold]
198fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
199// Unpark one thread and requeue the rest onto the mutex
200let from = selfas *const _ as usize;
201let to = mutexas usize;
202let validate = || {
203// Make sure that our atomic state still points to the same
204 // mutex. If not then it means that all threads on the current
205 // mutex were woken up and a new waiting thread switched to a
206 // different mutex. In that case we can get away with doing
207 // nothing.
208if self.state.load(Ordering::Relaxed) != mutex {
209return RequeueOp::Abort;
210 }
211212// Clear our state since we are going to unpark or requeue all
213 // threads.
214self.state.store(ptr::null_mut(), Ordering::Relaxed);
215216// Unpark one thread if the mutex is unlocked, otherwise just
217 // requeue everything to the mutex. This is safe to do here
218 // since unlocking the mutex when the parked bit is set requires
219 // locking the queue. There is the possibility of a race if the
220 // mutex gets locked after we check, but that doesn't matter in
221 // this case.
222if unsafe { (*mutex).mark_parked_if_locked() } {
223 RequeueOp::RequeueAll224 } else {
225 RequeueOp::UnparkOneRequeueRest226 }
227 };
228let callback = |op, result: UnparkResult| {
229// If we requeued threads to the mutex, mark it as having
230 // parked threads. The RequeueAll case is already handled above.
231if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232unsafe { (*mutex).mark_parked() };
233 }
234TOKEN_NORMAL235 };
236let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237238res.unparked_threads + res.requeued_threads
239 }
240241/// Blocks the current thread until this condition variable receives a
242 /// notification.
243 ///
244 /// This function will atomically unlock the mutex specified (represented by
245 /// `mutex_guard`) and block the current thread. This means that any calls
246 /// to `notify_*()` which happen logically after the mutex is unlocked are
247 /// candidates to wake this thread up. When this function call returns, the
248 /// lock specified will have been re-acquired.
249 ///
250 /// # Panics
251 ///
252 /// This function will panic if another thread is waiting on the `Condvar`
253 /// with a different `Mutex` object.
254#[inline]
255pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
256self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
257 }
258259/// Waits on this condition variable for a notification, timing out after
260 /// the specified time instant.
261 ///
262 /// The semantics of this function are equivalent to `wait()` except that
263 /// the thread will be blocked roughly until `timeout` is reached. This
264 /// method should not be used for precise timing due to anomalies such as
265 /// preemption or platform differences that may not cause the maximum
266 /// amount of time waited to be precisely `timeout`.
267 ///
268 /// Note that the best effort is made to ensure that the time waited is
269 /// measured with a monotonic clock, and not affected by the changes made to
270 /// the system time.
271 ///
272 /// The returned `WaitTimeoutResult` value indicates if the timeout is
273 /// known to have elapsed.
274 ///
275 /// Like `wait`, the lock specified will be re-acquired when this function
276 /// returns, regardless of whether the timeout elapsed or not.
277 ///
278 /// # Panics
279 ///
280 /// This function will panic if another thread is waiting on the `Condvar`
281 /// with a different `Mutex` object.
282#[inline]
283pub fn wait_until<T: ?Sized>(
284&self,
285 mutex_guard: &mut MutexGuard<'_, T>,
286 timeout: Instant,
287 ) -> WaitTimeoutResult {
288self.wait_until_internal(
289unsafe { MutexGuard::mutex(mutex_guard).raw() },
290Some(timeout),
291 )
292 }
293294// This is a non-generic function to reduce the monomorphization cost of
295 // using `wait_until`.
296fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
297let result;
298let mut bad_mutex = false;
299let mut requeued = false;
300 {
301let addr = selfas *const _ as usize;
302let lock_addr = mutexas *const _ as *mut _;
303let validate = || {
304// Ensure we don't use two different mutexes with the same
305 // Condvar at the same time. This is done while locked to
306 // avoid races with notify_one
307let state = self.state.load(Ordering::Relaxed);
308if state.is_null() {
309self.state.store(lock_addr, Ordering::Relaxed);
310 } else if state != lock_addr {
311bad_mutex = true;
312return false;
313 }
314true
315};
316let before_sleep = || {
317// Unlock the mutex before sleeping...
318unsafe { mutex.unlock() };
319 };
320let timed_out = |k, was_last_thread| {
321// If we were requeued to a mutex, then we did not time out.
322 // We'll just park ourselves on the mutex again when we try
323 // to lock it later.
324requeued = k != addr;
325326// If we were the last thread on the queue then we need to
327 // clear our state. This is normally done by the
328 // notify_{one,all} functions when not timing out.
329if !requeued && was_last_thread {
330self.state.store(ptr::null_mut(), Ordering::Relaxed);
331 }
332 };
333result = unsafe {
334 parking_lot_core::park(
335addr,
336validate,
337before_sleep,
338timed_out,
339DEFAULT_PARK_TOKEN,
340timeout,
341 )
342 };
343 }
344345// Panic if we tried to use multiple mutexes with a Condvar. Note
346 // that at this point the MutexGuard is still locked. It will be
347 // unlocked by the unwinding logic.
348if bad_mutex {
349{
::core::panicking::panic_fmt(format_args!("attempted to use a condition variable with more than one mutex"));
};panic!("attempted to use a condition variable with more than one mutex");
350 }
351352// ... and re-lock it once we are done sleeping
353if result == ParkResult::Unparked(TOKEN_HANDOFF) {
354unsafe { deadlock::acquire_resource(mutexas *const _ as usize) };
355 } else {
356mutex.lock();
357 }
358359WaitTimeoutResult(!(result.is_unparked() || requeued))
360 }
361362/// Waits on this condition variable for a notification, timing out after a
363 /// specified duration.
364 ///
365 /// The semantics of this function are equivalent to `wait()` except that
366 /// the thread will be blocked for roughly no longer than `timeout`. This
367 /// method should not be used for precise timing due to anomalies such as
368 /// preemption or platform differences that may not cause the maximum
369 /// amount of time waited to be precisely `timeout`.
370 ///
371 /// Note that the best effort is made to ensure that the time waited is
372 /// measured with a monotonic clock, and not affected by the changes made to
373 /// the system time.
374 ///
375 /// The returned `WaitTimeoutResult` value indicates if the timeout is
376 /// known to have elapsed.
377 ///
378 /// Like `wait`, the lock specified will be re-acquired when this function
379 /// returns, regardless of whether the timeout elapsed or not.
380#[inline]
381pub fn wait_for<T: ?Sized>(
382&self,
383 mutex_guard: &mut MutexGuard<'_, T>,
384 timeout: Duration,
385 ) -> WaitTimeoutResult {
386let deadline = util::to_deadline(timeout);
387self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
388 }
389390#[inline]
391fn wait_while_until_internal<T, F>(
392&self,
393 mutex_guard: &mut MutexGuard<'_, T>,
394mut condition: F,
395 timeout: Option<Instant>,
396 ) -> WaitTimeoutResult397where
398T: ?Sized,
399 F: FnMut(&mut T) -> bool,
400 {
401let mut result = WaitTimeoutResult(false);
402403while !result.timed_out() && condition(mutex_guard.deref_mut()) {
404 result =
405self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
406 }
407408result409 }
410/// Blocks the current thread until this condition variable receives a
411 /// notification. If the provided condition evaluates to `false`, then the
412 /// thread is no longer blocked and the operation is completed. If the
413 /// condition evaluates to `true`, then the thread is blocked again and
414 /// waits for another notification before repeating this process.
415 ///
416 /// This function will atomically unlock the mutex specified (represented by
417 /// `mutex_guard`) and block the current thread. This means that any calls
418 /// to `notify_*()` which happen logically after the mutex is unlocked are
419 /// candidates to wake this thread up. When this function call returns, the
420 /// lock specified will have been re-acquired.
421 ///
422 /// # Panics
423 ///
424 /// This function will panic if another thread is waiting on the `Condvar`
425 /// with a different `Mutex` object.
426#[inline]
427pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F)
428where
429T: ?Sized,
430 F: FnMut(&mut T) -> bool,
431 {
432self.wait_while_until_internal(mutex_guard, condition, None);
433 }
434435/// Waits on this condition variable for a notification, timing out after
436 /// the specified time instant. If the provided condition evaluates to
437 /// `false`, then the thread is no longer blocked and the operation is
438 /// completed. If the condition evaluates to `true`, then the thread is
439 /// blocked again and waits for another notification before repeating
440 /// this process.
441 ///
442 /// The semantics of this function are equivalent to `wait()` except that
443 /// the thread will be blocked roughly until `timeout` is reached. This
444 /// method should not be used for precise timing due to anomalies such as
445 /// preemption or platform differences that may not cause the maximum
446 /// amount of time waited to be precisely `timeout`.
447 ///
448 /// Note that the best effort is made to ensure that the time waited is
449 /// measured with a monotonic clock, and not affected by the changes made to
450 /// the system time.
451 ///
452 /// The returned `WaitTimeoutResult` value indicates if the timeout is
453 /// known to have elapsed.
454 ///
455 /// Like `wait`, the lock specified will be re-acquired when this function
456 /// returns, regardless of whether the timeout elapsed or not.
457 ///
458 /// # Panics
459 ///
460 /// This function will panic if another thread is waiting on the `Condvar`
461 /// with a different `Mutex` object.
462#[inline]
463pub fn wait_while_until<T, F>(
464&self,
465 mutex_guard: &mut MutexGuard<'_, T>,
466 condition: F,
467 timeout: Instant,
468 ) -> WaitTimeoutResult469where
470T: ?Sized,
471 F: FnMut(&mut T) -> bool,
472 {
473self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
474 }
475476/// Waits on this condition variable for a notification, timing out after a
477 /// specified duration. If the provided condition evaluates to `false`,
478 /// then the thread is no longer blocked and the operation is completed.
479 /// If the condition evaluates to `true`, then the thread is blocked again
480 /// and waits for another notification before repeating this process.
481 ///
482 /// The semantics of this function are equivalent to `wait()` except that
483 /// the thread will be blocked for roughly no longer than `timeout`. This
484 /// method should not be used for precise timing due to anomalies such as
485 /// preemption or platform differences that may not cause the maximum
486 /// amount of time waited to be precisely `timeout`.
487 ///
488 /// Note that the best effort is made to ensure that the time waited is
489 /// measured with a monotonic clock, and not affected by the changes made to
490 /// the system time.
491 ///
492 /// The returned `WaitTimeoutResult` value indicates if the timeout is
493 /// known to have elapsed.
494 ///
495 /// Like `wait`, the lock specified will be re-acquired when this function
496 /// returns, regardless of whether the timeout elapsed or not.
497#[inline]
498pub fn wait_while_for<T: ?Sized, F>(
499&self,
500 mutex_guard: &mut MutexGuard<'_, T>,
501 condition: F,
502 timeout: Duration,
503 ) -> WaitTimeoutResult504where
505F: FnMut(&mut T) -> bool,
506 {
507let deadline = util::to_deadline(timeout);
508self.wait_while_until_internal(mutex_guard, condition, deadline)
509 }
510}
511512impl Defaultfor Condvar {
513#[inline]
514fn default() -> Condvar {
515Condvar::new()
516 }
517}
518519impl fmt::Debugfor Condvar {
520fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521f.pad("Condvar { .. }")
522 }
523}
524525#[cfg(test)]
526mod tests {
527use crate::{Condvar, Mutex, MutexGuard};
528use std::sync::mpsc::channel;
529use std::sync::Arc;
530use std::thread;
531use std::thread::sleep;
532use std::thread::JoinHandle;
533use std::time::Duration;
534use std::time::Instant;
535536#[test]
537fn smoke() {
538let c = Condvar::new();
539 c.notify_one();
540 c.notify_all();
541 }
542543#[test]
544fn notify_one() {
545let m = Arc::new(Mutex::new(()));
546let m2 = m.clone();
547let c = Arc::new(Condvar::new());
548let c2 = c.clone();
549550let mut g = m.lock();
551let _t = thread::spawn(move || {
552let _g = m2.lock();
553 c2.notify_one();
554 });
555 c.wait(&mut g);
556 }
557558#[test]
559fn notify_all() {
560const N: usize = 10;
561562let data = Arc::new((Mutex::new(0), Condvar::new()));
563let (tx, rx) = channel();
564for _ in 0..N {
565let data = data.clone();
566let tx = tx.clone();
567 thread::spawn(move || {
568let (lock, cond) = &*data;
569let mut cnt = lock.lock();
570*cnt += 1;
571if *cnt == N {
572 tx.send(()).unwrap();
573 }
574while *cnt != 0 {
575 cond.wait(&mut cnt);
576 }
577 tx.send(()).unwrap();
578 });
579 }
580 drop(tx);
581582let (lock, cond) = &*data;
583 rx.recv().unwrap();
584let mut cnt = lock.lock();
585*cnt = 0;
586 cond.notify_all();
587 drop(cnt);
588589for _ in 0..N {
590 rx.recv().unwrap();
591 }
592 }
593594#[test]
595fn notify_one_return_true() {
596let m = Arc::new(Mutex::new(()));
597let m2 = m.clone();
598let c = Arc::new(Condvar::new());
599let c2 = c.clone();
600601let mut g = m.lock();
602let _t = thread::spawn(move || {
603let _g = m2.lock();
604assert!(c2.notify_one());
605 });
606 c.wait(&mut g);
607 }
608609#[test]
610fn notify_one_return_false() {
611let m = Arc::new(Mutex::new(()));
612let c = Arc::new(Condvar::new());
613614let _t = thread::spawn(move || {
615let _g = m.lock();
616assert!(!c.notify_one());
617 });
618 }
619620#[test]
621fn notify_all_return() {
622const N: usize = 10;
623624let data = Arc::new((Mutex::new(0), Condvar::new()));
625let (tx, rx) = channel();
626for _ in 0..N {
627let data = data.clone();
628let tx = tx.clone();
629 thread::spawn(move || {
630let (lock, cond) = &*data;
631let mut cnt = lock.lock();
632*cnt += 1;
633if *cnt == N {
634 tx.send(()).unwrap();
635 }
636while *cnt != 0 {
637 cond.wait(&mut cnt);
638 }
639 tx.send(()).unwrap();
640 });
641 }
642 drop(tx);
643644let (lock, cond) = &*data;
645 rx.recv().unwrap();
646let mut cnt = lock.lock();
647*cnt = 0;
648assert_eq!(cond.notify_all(), N);
649 drop(cnt);
650651for _ in 0..N {
652 rx.recv().unwrap();
653 }
654655assert_eq!(cond.notify_all(), 0);
656 }
657658#[test]
659fn wait_for() {
660let m = Arc::new(Mutex::new(()));
661let m2 = m.clone();
662let c = Arc::new(Condvar::new());
663let c2 = c.clone();
664665let mut g = m.lock();
666let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
667assert!(no_timeout.timed_out());
668669let _t = thread::spawn(move || {
670let _g = m2.lock();
671 c2.notify_one();
672 });
673let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
674assert!(!timeout_res.timed_out());
675676 drop(g);
677 }
678679#[test]
680fn wait_until() {
681let m = Arc::new(Mutex::new(()));
682let m2 = m.clone();
683let c = Arc::new(Condvar::new());
684let c2 = c.clone();
685686let mut g = m.lock();
687let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
688assert!(no_timeout.timed_out());
689let _t = thread::spawn(move || {
690let _g = m2.lock();
691 c2.notify_one();
692 });
693let timeout_res = c.wait_until(
694&mut g,
695 Instant::now() + Duration::from_millis(u32::max_value() as u64),
696 );
697assert!(!timeout_res.timed_out());
698 drop(g);
699 }
700701fn spawn_wait_while_notifier(
702 mutex: Arc<Mutex<u32>>,
703 cv: Arc<Condvar>,
704 num_iters: u32,
705 timeout: Option<Instant>,
706 ) -> JoinHandle<()> {
707 thread::spawn(move || {
708for epoch in 1..=num_iters {
709// spin to wait for main test thread to block
710 // before notifying it to wake back up and check
711 // its condition.
712let mut sleep_backoff = Duration::from_millis(1);
713let _mutex_guard = loop {
714let mutex_guard = mutex.lock();
715716if let Some(timeout) = timeout {
717if Instant::now() >= timeout {
718return;
719 }
720 }
721722if *mutex_guard == epoch {
723break mutex_guard;
724 }
725726 drop(mutex_guard);
727728// give main test thread a good chance to
729 // acquire the lock before this thread does.
730sleep(sleep_backoff);
731 sleep_backoff *= 2;
732 };
733734 cv.notify_one();
735 }
736 })
737 }
738739#[test]
740fn wait_while_until_internal_does_not_wait_if_initially_false() {
741let mutex = Arc::new(Mutex::new(0));
742let cv = Arc::new(Condvar::new());
743744let condition = |counter: &mut u32| {
745*counter += 1;
746false
747};
748749let mut mutex_guard = mutex.lock();
750let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
751752assert!(!timeout_result.timed_out());
753assert!(*mutex_guard == 1);
754 }
755756#[test]
757fn wait_while_until_internal_times_out_before_false() {
758let mutex = Arc::new(Mutex::new(0));
759let cv = Arc::new(Condvar::new());
760761let num_iters = 3;
762let condition = |counter: &mut u32| {
763*counter += 1;
764true
765};
766767let mut mutex_guard = mutex.lock();
768let timeout = Some(Instant::now() + Duration::from_millis(500));
769let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout);
770771let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);
772773assert!(timeout_result.timed_out());
774assert!(*mutex_guard == num_iters + 1);
775776// prevent deadlock with notifier
777drop(mutex_guard);
778 handle.join().unwrap();
779 }
780781#[test]
782fn wait_while_until_internal() {
783let mutex = Arc::new(Mutex::new(0));
784let cv = Arc::new(Condvar::new());
785786let num_iters = 4;
787788let condition = |counter: &mut u32| {
789*counter += 1;
790*counter <= num_iters
791 };
792793let mut mutex_guard = mutex.lock();
794let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);
795796let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
797798assert!(!timeout_result.timed_out());
799assert!(*mutex_guard == num_iters + 1);
800801let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
802 handle.join().unwrap();
803804assert!(!timeout_result.timed_out());
805assert!(*mutex_guard == num_iters + 2);
806 }
807808#[test]
809 #[should_panic]
810fn two_mutexes() {
811let m = Arc::new(Mutex::new(()));
812let m2 = m.clone();
813let m3 = Arc::new(Mutex::new(()));
814let c = Arc::new(Condvar::new());
815let c2 = c.clone();
816817// Make sure we don't leave the child thread dangling
818struct PanicGuard<'a>(&'a Condvar);
819impl<'a> Drop for PanicGuard<'a> {
820fn drop(&mut self) {
821self.0.notify_one();
822 }
823 }
824825let (tx, rx) = channel();
826let g = m.lock();
827let _t = thread::spawn(move || {
828let mut g = m2.lock();
829 tx.send(()).unwrap();
830 c2.wait(&mut g);
831 });
832 drop(g);
833 rx.recv().unwrap();
834let _g = m.lock();
835let _guard = PanicGuard(&c);
836 c.wait(&mut m3.lock());
837 }
838839#[test]
840fn two_mutexes_disjoint() {
841let m = Arc::new(Mutex::new(()));
842let m2 = m.clone();
843let m3 = Arc::new(Mutex::new(()));
844let c = Arc::new(Condvar::new());
845let c2 = c.clone();
846847let mut g = m.lock();
848let _t = thread::spawn(move || {
849let _g = m2.lock();
850 c2.notify_one();
851 });
852 c.wait(&mut g);
853 drop(g);
854855let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
856 }
857858#[test]
859fn test_debug_condvar() {
860let c = Condvar::new();
861assert_eq!(format!("{:?}", c), "Condvar { .. }");
862 }
863864#[test]
865fn test_condvar_requeue() {
866let m = Arc::new(Mutex::new(()));
867let m2 = m.clone();
868let c = Arc::new(Condvar::new());
869let c2 = c.clone();
870let t = thread::spawn(move || {
871let mut g = m2.lock();
872 c2.wait(&mut g);
873 });
874875let mut g = m.lock();
876while !c.notify_one() {
877// Wait for the thread to get into wait()
878MutexGuard::bump(&mut g);
879// Yield, so the other thread gets a chance to do something.
880 // (At least Miri needs this, because it doesn't preempt threads.)
881thread::yield_now();
882 }
883// The thread should have been requeued to the mutex, which we wake up now.
884drop(g);
885 t.join().unwrap();
886 }
887888#[test]
889fn test_issue_129() {
890let locks = Arc::new((Mutex::new(()), Condvar::new()));
891892let (tx, rx) = channel();
893for _ in 0..4 {
894let locks = locks.clone();
895let tx = tx.clone();
896 thread::spawn(move || {
897let mut guard = locks.0.lock();
898 locks.1.wait(&mut guard);
899 locks.1.wait_for(&mut guard, Duration::from_millis(1));
900 locks.1.notify_one();
901 tx.send(()).unwrap();
902 });
903 }
904905 thread::sleep(Duration::from_millis(100));
906 locks.1.notify_one();
907908for _ in 0..4 {
909assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
910 }
911 }
912}
913914/// This module contains an integration test that is heavily inspired from WebKit's own integration
915/// tests for it's own Condvar.
916#[cfg(test)]
917mod webkit_queue_test {
918use crate::{Condvar, Mutex, MutexGuard};
919use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
920921#[derive(Clone, Copy)]
922enum Timeout {
923 Bounded(Duration),
924 Forever,
925 }
926927#[derive(Clone, Copy)]
928enum NotifyStyle {
929 One,
930 All,
931 }
932933struct Queue {
934 items: VecDeque<usize>,
935 should_continue: bool,
936 }
937938impl Queue {
939fn new() -> Self {
940Self {
941 items: VecDeque::new(),
942 should_continue: true,
943 }
944 }
945 }
946947fn wait<T: ?Sized>(
948 condition: &Condvar,
949 lock: &mut MutexGuard<'_, T>,
950 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
951 timeout: &Timeout,
952 ) {
953while !predicate(lock) {
954match timeout {
955 Timeout::Forever => condition.wait(lock),
956 Timeout::Bounded(bound) => {
957 condition.wait_for(lock, *bound);
958 }
959 }
960 }
961 }
962963fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
964match style {
965 NotifyStyle::One => {
966 condition.notify_one();
967 }
968 NotifyStyle::All => {
969if should_notify {
970 condition.notify_all();
971 }
972 }
973 }
974 }
975976fn run_queue_test(
977 num_producers: usize,
978 num_consumers: usize,
979 max_queue_size: usize,
980 messages_per_producer: usize,
981 notify_style: NotifyStyle,
982 timeout: Timeout,
983 delay: Duration,
984 ) {
985let input_queue = Arc::new(Mutex::new(Queue::new()));
986let empty_condition = Arc::new(Condvar::new());
987let full_condition = Arc::new(Condvar::new());
988989let output_vec = Arc::new(Mutex::new(vec![]));
990991let consumers = (0..num_consumers)
992 .map(|_| {
993 consumer_thread(
994 input_queue.clone(),
995 empty_condition.clone(),
996 full_condition.clone(),
997 timeout,
998 notify_style,
999 output_vec.clone(),
1000 max_queue_size,
1001 )
1002 })
1003 .collect::<Vec<_>>();
1004let producers = (0..num_producers)
1005 .map(|_| {
1006 producer_thread(
1007 messages_per_producer,
1008 input_queue.clone(),
1009 empty_condition.clone(),
1010 full_condition.clone(),
1011 timeout,
1012 notify_style,
1013 max_queue_size,
1014 )
1015 })
1016 .collect::<Vec<_>>();
10171018 thread::sleep(delay);
10191020for producer in producers.into_iter() {
1021 producer.join().expect("Producer thread panicked");
1022 }
10231024 {
1025let mut input_queue = input_queue.lock();
1026 input_queue.should_continue = false;
1027 }
1028 empty_condition.notify_all();
10291030for consumer in consumers.into_iter() {
1031 consumer.join().expect("Consumer thread panicked");
1032 }
10331034let mut output_vec = output_vec.lock();
1035assert_eq!(output_vec.len(), num_producers * messages_per_producer);
1036 output_vec.sort();
1037for msg_idx in 0..messages_per_producer {
1038for producer_idx in 0..num_producers {
1039assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
1040 }
1041 }
1042 }
10431044fn consumer_thread(
1045 input_queue: Arc<Mutex<Queue>>,
1046 empty_condition: Arc<Condvar>,
1047 full_condition: Arc<Condvar>,
1048 timeout: Timeout,
1049 notify_style: NotifyStyle,
1050 output_queue: Arc<Mutex<Vec<usize>>>,
1051 max_queue_size: usize,
1052 ) -> thread::JoinHandle<()> {
1053 thread::spawn(move || loop {
1054let (should_notify, result) = {
1055let mut queue = input_queue.lock();
1056 wait(
1057&empty_condition,
1058&mut queue,
1059 |state| -> bool { !state.items.is_empty() || !state.should_continue },
1060&timeout,
1061 );
1062if queue.items.is_empty() && !queue.should_continue {
1063return;
1064 }
1065let should_notify = queue.items.len() == max_queue_size;
1066let result = queue.items.pop_front();
1067 std::mem::drop(queue);
1068 (should_notify, result)
1069 };
1070 notify(notify_style, &full_condition, should_notify);
10711072if let Some(result) = result {
1073 output_queue.lock().push(result);
1074 }
1075 })
1076 }
10771078fn producer_thread(
1079 num_messages: usize,
1080 queue: Arc<Mutex<Queue>>,
1081 empty_condition: Arc<Condvar>,
1082 full_condition: Arc<Condvar>,
1083 timeout: Timeout,
1084 notify_style: NotifyStyle,
1085 max_queue_size: usize,
1086 ) -> thread::JoinHandle<()> {
1087 thread::spawn(move || {
1088for message in 0..num_messages {
1089let should_notify = {
1090let mut queue = queue.lock();
1091 wait(
1092&full_condition,
1093&mut queue,
1094 |state| state.items.len() < max_queue_size,
1095&timeout,
1096 );
1097let should_notify = queue.items.is_empty();
1098 queue.items.push_back(message);
1099 std::mem::drop(queue);
1100 should_notify
1101 };
1102 notify(notify_style, &empty_condition, should_notify);
1103 }
1104 })
1105 }
11061107macro_rules! run_queue_tests {
1108 ( $( $name:ident(
1109 num_producers: $num_producers:expr,
1110 num_consumers: $num_consumers:expr,
1111 max_queue_size: $max_queue_size:expr,
1112 messages_per_producer: $messages_per_producer:expr,
1113 notification_style: $notification_style:expr,
1114 timeout: $timeout:expr,
1115 delay_seconds: $delay_seconds:expr);
1116 )* ) => {
1117 $(#[test]
1118fn $name() {
1119let delay = Duration::from_secs($delay_seconds);
1120 run_queue_test(
1121$num_producers,
1122$num_consumers,
1123$max_queue_size,
1124$messages_per_producer,
1125$notification_style,
1126$timeout,
1127 delay,
1128 );
1129 })*
1130 };
1131 }
11321133run_queue_tests! {
1134 sanity_check_queue(
1135 num_producers: 1,
1136 num_consumers: 1,
1137 max_queue_size: 1,
1138 messages_per_producer: 100_000,
1139 notification_style: NotifyStyle::All,
1140 timeout: Timeout::Bounded(Duration::from_secs(1)),
1141 delay_seconds: 0
1142);
1143 sanity_check_queue_timeout(
1144 num_producers: 1,
1145 num_consumers: 1,
1146 max_queue_size: 1,
1147 messages_per_producer: 100_000,
1148 notification_style: NotifyStyle::All,
1149 timeout: Timeout::Forever,
1150 delay_seconds: 0
1151);
1152 new_test_without_timeout_5(
1153 num_producers: 1,
1154 num_consumers: 5,
1155 max_queue_size: 1,
1156 messages_per_producer: 100_000,
1157 notification_style: NotifyStyle::All,
1158 timeout: Timeout::Forever,
1159 delay_seconds: 0
1160);
1161 one_producer_one_consumer_one_slot(
1162 num_producers: 1,
1163 num_consumers: 1,
1164 max_queue_size: 1,
1165 messages_per_producer: 100_000,
1166 notification_style: NotifyStyle::All,
1167 timeout: Timeout::Forever,
1168 delay_seconds: 0
1169);
1170 one_producer_one_consumer_one_slot_timeout(
1171 num_producers: 1,
1172 num_consumers: 1,
1173 max_queue_size: 1,
1174 messages_per_producer: 100_000,
1175 notification_style: NotifyStyle::All,
1176 timeout: Timeout::Forever,
1177 delay_seconds: 1
1178);
1179 one_producer_one_consumer_hundred_slots(
1180 num_producers: 1,
1181 num_consumers: 1,
1182 max_queue_size: 100,
1183 messages_per_producer: 1_000_000,
1184 notification_style: NotifyStyle::All,
1185 timeout: Timeout::Forever,
1186 delay_seconds: 0
1187);
1188 ten_producers_one_consumer_one_slot(
1189 num_producers: 10,
1190 num_consumers: 1,
1191 max_queue_size: 1,
1192 messages_per_producer: 10000,
1193 notification_style: NotifyStyle::All,
1194 timeout: Timeout::Forever,
1195 delay_seconds: 0
1196);
1197 ten_producers_one_consumer_hundred_slots_notify_all(
1198 num_producers: 10,
1199 num_consumers: 1,
1200 max_queue_size: 100,
1201 messages_per_producer: 10000,
1202 notification_style: NotifyStyle::All,
1203 timeout: Timeout::Forever,
1204 delay_seconds: 0
1205);
1206 ten_producers_one_consumer_hundred_slots_notify_one(
1207 num_producers: 10,
1208 num_consumers: 1,
1209 max_queue_size: 100,
1210 messages_per_producer: 10000,
1211 notification_style: NotifyStyle::One,
1212 timeout: Timeout::Forever,
1213 delay_seconds: 0
1214);
1215 one_producer_ten_consumers_one_slot(
1216 num_producers: 1,
1217 num_consumers: 10,
1218 max_queue_size: 1,
1219 messages_per_producer: 10000,
1220 notification_style: NotifyStyle::All,
1221 timeout: Timeout::Forever,
1222 delay_seconds: 0
1223);
1224 one_producer_ten_consumers_hundred_slots_notify_all(
1225 num_producers: 1,
1226 num_consumers: 10,
1227 max_queue_size: 100,
1228 messages_per_producer: 100_000,
1229 notification_style: NotifyStyle::All,
1230 timeout: Timeout::Forever,
1231 delay_seconds: 0
1232);
1233 one_producer_ten_consumers_hundred_slots_notify_one(
1234 num_producers: 1,
1235 num_consumers: 10,
1236 max_queue_size: 100,
1237 messages_per_producer: 100_000,
1238 notification_style: NotifyStyle::One,
1239 timeout: Timeout::Forever,
1240 delay_seconds: 0
1241);
1242 ten_producers_ten_consumers_one_slot(
1243 num_producers: 10,
1244 num_consumers: 10,
1245 max_queue_size: 1,
1246 messages_per_producer: 50000,
1247 notification_style: NotifyStyle::All,
1248 timeout: Timeout::Forever,
1249 delay_seconds: 0
1250);
1251 ten_producers_ten_consumers_hundred_slots_notify_all(
1252 num_producers: 10,
1253 num_consumers: 10,
1254 max_queue_size: 100,
1255 messages_per_producer: 50000,
1256 notification_style: NotifyStyle::All,
1257 timeout: Timeout::Forever,
1258 delay_seconds: 0
1259);
1260 ten_producers_ten_consumers_hundred_slots_notify_one(
1261 num_producers: 10,
1262 num_consumers: 10,
1263 max_queue_size: 100,
1264 messages_per_producer: 50000,
1265 notification_style: NotifyStyle::One,
1266 timeout: Timeout::Forever,
1267 delay_seconds: 0
1268);
1269 }
1270}