1use crate::mutex::MutexGuard;
9use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10use crate::{deadlock, util};
11use core::{
12 fmt, ptr,
13 sync::atomic::{AtomicPtr, Ordering},
14};
15use lock_api::RawMutex as RawMutex_;
16use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
17use std::ops::DerefMut;
18use std::time::{Duration, Instant};
19
20#[derive(Debug, PartialEq, Eq, Copy, Clone)]
23pub struct WaitTimeoutResult(bool);
24
25impl WaitTimeoutResult {
26 #[inline]
28 pub fn timed_out(self) -> bool {
29 self.0
30 }
31}
32
33pub struct Condvar {
91 state: AtomicPtr<RawMutex>,
92}
93
94impl Condvar {
95 #[inline]
98 pub const fn new() -> Condvar {
99 Condvar {
100 state: AtomicPtr::new(ptr::null_mut()),
101 }
102 }
103
104 #[inline]
128 pub fn notify_one(&self) -> bool {
129 let state = self.state.load(Ordering::Relaxed);
131 if state.is_null() {
132 return false;
133 }
134
135 self.notify_one_slow(state)
136 }
137
138 #[cold]
139 fn notify_one_slow(&self, mutex: *mut RawMutex) -> bool {
140 let from = self as *const _ as usize;
142 let to = mutex as usize;
143 let validate = || {
144 if self.state.load(Ordering::Relaxed) != mutex {
150 return RequeueOp::Abort;
151 }
152
153 if unsafe { (*mutex).mark_parked_if_locked() } {
160 RequeueOp::RequeueOne
161 } else {
162 RequeueOp::UnparkOne
163 }
164 };
165 let callback = |_op, result: UnparkResult| {
166 if !result.have_more_threads {
168 self.state.store(ptr::null_mut(), Ordering::Relaxed);
169 }
170 TOKEN_NORMAL
171 };
172 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
173
174 res.unparked_threads + res.requeued_threads != 0
175 }
176
177 #[inline]
187 pub fn notify_all(&self) -> usize {
188 let state = self.state.load(Ordering::Relaxed);
190 if state.is_null() {
191 return 0;
192 }
193
194 self.notify_all_slow(state)
195 }
196
197 #[cold]
198 fn notify_all_slow(&self, mutex: *mut RawMutex) -> usize {
199 let from = self as *const _ as usize;
201 let to = mutex as usize;
202 let validate = || {
203 if self.state.load(Ordering::Relaxed) != mutex {
209 return RequeueOp::Abort;
210 }
211
212 self.state.store(ptr::null_mut(), Ordering::Relaxed);
215
216 if unsafe { (*mutex).mark_parked_if_locked() } {
223 RequeueOp::RequeueAll
224 } else {
225 RequeueOp::UnparkOneRequeueRest
226 }
227 };
228 let callback = |op, result: UnparkResult| {
229 if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232 unsafe { (*mutex).mark_parked() };
233 }
234 TOKEN_NORMAL
235 };
236 let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237
238 res.unparked_threads + res.requeued_threads
239 }
240
241 #[inline]
255 pub fn wait<T: ?Sized>(&self, mutex_guard: &mut MutexGuard<'_, T>) {
256 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, None);
257 }
258
259 #[inline]
283 pub fn wait_until<T: ?Sized>(
284 &self,
285 mutex_guard: &mut MutexGuard<'_, T>,
286 timeout: Instant,
287 ) -> WaitTimeoutResult {
288 self.wait_until_internal(
289 unsafe { MutexGuard::mutex(mutex_guard).raw() },
290 Some(timeout),
291 )
292 }
293
294 fn wait_until_internal(&self, mutex: &RawMutex, timeout: Option<Instant>) -> WaitTimeoutResult {
297 let result;
298 let mut bad_mutex = false;
299 let mut requeued = false;
300 {
301 let addr = self as *const _ as usize;
302 let lock_addr = mutex as *const _ as *mut _;
303 let validate = || {
304 let state = self.state.load(Ordering::Relaxed);
308 if state.is_null() {
309 self.state.store(lock_addr, Ordering::Relaxed);
310 } else if state != lock_addr {
311 bad_mutex = true;
312 return false;
313 }
314 true
315 };
316 let before_sleep = || {
317 unsafe { mutex.unlock() };
319 };
320 let timed_out = |k, was_last_thread| {
321 requeued = k != addr;
325
326 if !requeued && was_last_thread {
330 self.state.store(ptr::null_mut(), Ordering::Relaxed);
331 }
332 };
333 result = unsafe {
334 parking_lot_core::park(
335 addr,
336 validate,
337 before_sleep,
338 timed_out,
339 DEFAULT_PARK_TOKEN,
340 timeout,
341 )
342 };
343 }
344
345 if bad_mutex {
349 panic!("attempted to use a condition variable with more than one mutex");
350 }
351
352 if result == ParkResult::Unparked(TOKEN_HANDOFF) {
354 unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
355 } else {
356 mutex.lock();
357 }
358
359 WaitTimeoutResult(!(result.is_unparked() || requeued))
360 }
361
362 #[inline]
381 pub fn wait_for<T: ?Sized>(
382 &self,
383 mutex_guard: &mut MutexGuard<'_, T>,
384 timeout: Duration,
385 ) -> WaitTimeoutResult {
386 let deadline = util::to_deadline(timeout);
387 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
388 }
389
390 #[inline]
391 fn wait_while_until_internal<T, F>(
392 &self,
393 mutex_guard: &mut MutexGuard<'_, T>,
394 mut condition: F,
395 timeout: Option<Instant>,
396 ) -> WaitTimeoutResult
397 where
398 T: ?Sized,
399 F: FnMut(&mut T) -> bool,
400 {
401 let mut result = WaitTimeoutResult(false);
402
403 while !result.timed_out() && condition(mutex_guard.deref_mut()) {
404 result =
405 self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
406 }
407
408 result
409 }
410 #[inline]
427 pub fn wait_while<T, F>(&self, mutex_guard: &mut MutexGuard<'_, T>, condition: F)
428 where
429 T: ?Sized,
430 F: FnMut(&mut T) -> bool,
431 {
432 self.wait_while_until_internal(mutex_guard, condition, None);
433 }
434
435 #[inline]
463 pub fn wait_while_until<T, F>(
464 &self,
465 mutex_guard: &mut MutexGuard<'_, T>,
466 condition: F,
467 timeout: Instant,
468 ) -> WaitTimeoutResult
469 where
470 T: ?Sized,
471 F: FnMut(&mut T) -> bool,
472 {
473 self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
474 }
475
476 #[inline]
498 pub fn wait_while_for<T: ?Sized, F>(
499 &self,
500 mutex_guard: &mut MutexGuard<'_, T>,
501 condition: F,
502 timeout: Duration,
503 ) -> WaitTimeoutResult
504 where
505 F: FnMut(&mut T) -> bool,
506 {
507 let deadline = util::to_deadline(timeout);
508 self.wait_while_until_internal(mutex_guard, condition, deadline)
509 }
510}
511
512impl Default for Condvar {
513 #[inline]
514 fn default() -> Condvar {
515 Condvar::new()
516 }
517}
518
519impl fmt::Debug for Condvar {
520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521 f.pad("Condvar { .. }")
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use crate::{Condvar, Mutex, MutexGuard};
528 use std::sync::mpsc::channel;
529 use std::sync::Arc;
530 use std::thread;
531 use std::thread::sleep;
532 use std::thread::JoinHandle;
533 use std::time::Duration;
534 use std::time::Instant;
535
536 #[test]
537 fn smoke() {
538 let c = Condvar::new();
539 c.notify_one();
540 c.notify_all();
541 }
542
543 #[test]
544 fn notify_one() {
545 let m = Arc::new(Mutex::new(()));
546 let m2 = m.clone();
547 let c = Arc::new(Condvar::new());
548 let c2 = c.clone();
549
550 let mut g = m.lock();
551 let _t = thread::spawn(move || {
552 let _g = m2.lock();
553 c2.notify_one();
554 });
555 c.wait(&mut g);
556 }
557
558 #[test]
559 fn notify_all() {
560 const N: usize = 10;
561
562 let data = Arc::new((Mutex::new(0), Condvar::new()));
563 let (tx, rx) = channel();
564 for _ in 0..N {
565 let data = data.clone();
566 let tx = tx.clone();
567 thread::spawn(move || {
568 let (lock, cond) = &*data;
569 let mut cnt = lock.lock();
570 *cnt += 1;
571 if *cnt == N {
572 tx.send(()).unwrap();
573 }
574 while *cnt != 0 {
575 cond.wait(&mut cnt);
576 }
577 tx.send(()).unwrap();
578 });
579 }
580 drop(tx);
581
582 let (lock, cond) = &*data;
583 rx.recv().unwrap();
584 let mut cnt = lock.lock();
585 *cnt = 0;
586 cond.notify_all();
587 drop(cnt);
588
589 for _ in 0..N {
590 rx.recv().unwrap();
591 }
592 }
593
594 #[test]
595 fn notify_one_return_true() {
596 let m = Arc::new(Mutex::new(()));
597 let m2 = m.clone();
598 let c = Arc::new(Condvar::new());
599 let c2 = c.clone();
600
601 let mut g = m.lock();
602 let _t = thread::spawn(move || {
603 let _g = m2.lock();
604 assert!(c2.notify_one());
605 });
606 c.wait(&mut g);
607 }
608
609 #[test]
610 fn notify_one_return_false() {
611 let m = Arc::new(Mutex::new(()));
612 let c = Arc::new(Condvar::new());
613
614 let _t = thread::spawn(move || {
615 let _g = m.lock();
616 assert!(!c.notify_one());
617 });
618 }
619
620 #[test]
621 fn notify_all_return() {
622 const N: usize = 10;
623
624 let data = Arc::new((Mutex::new(0), Condvar::new()));
625 let (tx, rx) = channel();
626 for _ in 0..N {
627 let data = data.clone();
628 let tx = tx.clone();
629 thread::spawn(move || {
630 let (lock, cond) = &*data;
631 let mut cnt = lock.lock();
632 *cnt += 1;
633 if *cnt == N {
634 tx.send(()).unwrap();
635 }
636 while *cnt != 0 {
637 cond.wait(&mut cnt);
638 }
639 tx.send(()).unwrap();
640 });
641 }
642 drop(tx);
643
644 let (lock, cond) = &*data;
645 rx.recv().unwrap();
646 let mut cnt = lock.lock();
647 *cnt = 0;
648 assert_eq!(cond.notify_all(), N);
649 drop(cnt);
650
651 for _ in 0..N {
652 rx.recv().unwrap();
653 }
654
655 assert_eq!(cond.notify_all(), 0);
656 }
657
658 #[test]
659 fn wait_for() {
660 let m = Arc::new(Mutex::new(()));
661 let m2 = m.clone();
662 let c = Arc::new(Condvar::new());
663 let c2 = c.clone();
664
665 let mut g = m.lock();
666 let no_timeout = c.wait_for(&mut g, Duration::from_millis(1));
667 assert!(no_timeout.timed_out());
668
669 let _t = thread::spawn(move || {
670 let _g = m2.lock();
671 c2.notify_one();
672 });
673 let timeout_res = c.wait_for(&mut g, Duration::from_secs(u64::max_value()));
674 assert!(!timeout_res.timed_out());
675
676 drop(g);
677 }
678
679 #[test]
680 fn wait_until() {
681 let m = Arc::new(Mutex::new(()));
682 let m2 = m.clone();
683 let c = Arc::new(Condvar::new());
684 let c2 = c.clone();
685
686 let mut g = m.lock();
687 let no_timeout = c.wait_until(&mut g, Instant::now() + Duration::from_millis(1));
688 assert!(no_timeout.timed_out());
689 let _t = thread::spawn(move || {
690 let _g = m2.lock();
691 c2.notify_one();
692 });
693 let timeout_res = c.wait_until(
694 &mut g,
695 Instant::now() + Duration::from_millis(u32::max_value() as u64),
696 );
697 assert!(!timeout_res.timed_out());
698 drop(g);
699 }
700
701 fn spawn_wait_while_notifier(
702 mutex: Arc<Mutex<u32>>,
703 cv: Arc<Condvar>,
704 num_iters: u32,
705 timeout: Option<Instant>,
706 ) -> JoinHandle<()> {
707 thread::spawn(move || {
708 for epoch in 1..=num_iters {
709 let mut sleep_backoff = Duration::from_millis(1);
713 let _mutex_guard = loop {
714 let mutex_guard = mutex.lock();
715
716 if let Some(timeout) = timeout {
717 if Instant::now() >= timeout {
718 return;
719 }
720 }
721
722 if *mutex_guard == epoch {
723 break mutex_guard;
724 }
725
726 drop(mutex_guard);
727
728 sleep(sleep_backoff);
731 sleep_backoff *= 2;
732 };
733
734 cv.notify_one();
735 }
736 })
737 }
738
739 #[test]
740 fn wait_while_until_internal_does_not_wait_if_initially_false() {
741 let mutex = Arc::new(Mutex::new(0));
742 let cv = Arc::new(Condvar::new());
743
744 let condition = |counter: &mut u32| {
745 *counter += 1;
746 false
747 };
748
749 let mut mutex_guard = mutex.lock();
750 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
751
752 assert!(!timeout_result.timed_out());
753 assert!(*mutex_guard == 1);
754 }
755
756 #[test]
757 fn wait_while_until_internal_times_out_before_false() {
758 let mutex = Arc::new(Mutex::new(0));
759 let cv = Arc::new(Condvar::new());
760
761 let num_iters = 3;
762 let condition = |counter: &mut u32| {
763 *counter += 1;
764 true
765 };
766
767 let mut mutex_guard = mutex.lock();
768 let timeout = Some(Instant::now() + Duration::from_millis(500));
769 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, timeout);
770
771 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);
772
773 assert!(timeout_result.timed_out());
774 assert!(*mutex_guard == num_iters + 1);
775
776 drop(mutex_guard);
778 handle.join().unwrap();
779 }
780
781 #[test]
782 fn wait_while_until_internal() {
783 let mutex = Arc::new(Mutex::new(0));
784 let cv = Arc::new(Condvar::new());
785
786 let num_iters = 4;
787
788 let condition = |counter: &mut u32| {
789 *counter += 1;
790 *counter <= num_iters
791 };
792
793 let mut mutex_guard = mutex.lock();
794 let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);
795
796 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
797
798 assert!(!timeout_result.timed_out());
799 assert!(*mutex_guard == num_iters + 1);
800
801 let timeout_result = cv.wait_while_until_internal(&mut mutex_guard, condition, None);
802 handle.join().unwrap();
803
804 assert!(!timeout_result.timed_out());
805 assert!(*mutex_guard == num_iters + 2);
806 }
807
808 #[test]
809 #[should_panic]
810 fn two_mutexes() {
811 let m = Arc::new(Mutex::new(()));
812 let m2 = m.clone();
813 let m3 = Arc::new(Mutex::new(()));
814 let c = Arc::new(Condvar::new());
815 let c2 = c.clone();
816
817 struct PanicGuard<'a>(&'a Condvar);
819 impl<'a> Drop for PanicGuard<'a> {
820 fn drop(&mut self) {
821 self.0.notify_one();
822 }
823 }
824
825 let (tx, rx) = channel();
826 let g = m.lock();
827 let _t = thread::spawn(move || {
828 let mut g = m2.lock();
829 tx.send(()).unwrap();
830 c2.wait(&mut g);
831 });
832 drop(g);
833 rx.recv().unwrap();
834 let _g = m.lock();
835 let _guard = PanicGuard(&c);
836 c.wait(&mut m3.lock());
837 }
838
839 #[test]
840 fn two_mutexes_disjoint() {
841 let m = Arc::new(Mutex::new(()));
842 let m2 = m.clone();
843 let m3 = Arc::new(Mutex::new(()));
844 let c = Arc::new(Condvar::new());
845 let c2 = c.clone();
846
847 let mut g = m.lock();
848 let _t = thread::spawn(move || {
849 let _g = m2.lock();
850 c2.notify_one();
851 });
852 c.wait(&mut g);
853 drop(g);
854
855 let _ = c.wait_for(&mut m3.lock(), Duration::from_millis(1));
856 }
857
858 #[test]
859 fn test_debug_condvar() {
860 let c = Condvar::new();
861 assert_eq!(format!("{:?}", c), "Condvar { .. }");
862 }
863
864 #[test]
865 fn test_condvar_requeue() {
866 let m = Arc::new(Mutex::new(()));
867 let m2 = m.clone();
868 let c = Arc::new(Condvar::new());
869 let c2 = c.clone();
870 let t = thread::spawn(move || {
871 let mut g = m2.lock();
872 c2.wait(&mut g);
873 });
874
875 let mut g = m.lock();
876 while !c.notify_one() {
877 MutexGuard::bump(&mut g);
879 thread::yield_now();
882 }
883 drop(g);
885 t.join().unwrap();
886 }
887
888 #[test]
889 fn test_issue_129() {
890 let locks = Arc::new((Mutex::new(()), Condvar::new()));
891
892 let (tx, rx) = channel();
893 for _ in 0..4 {
894 let locks = locks.clone();
895 let tx = tx.clone();
896 thread::spawn(move || {
897 let mut guard = locks.0.lock();
898 locks.1.wait(&mut guard);
899 locks.1.wait_for(&mut guard, Duration::from_millis(1));
900 locks.1.notify_one();
901 tx.send(()).unwrap();
902 });
903 }
904
905 thread::sleep(Duration::from_millis(100));
906 locks.1.notify_one();
907
908 for _ in 0..4 {
909 assert_eq!(rx.recv_timeout(Duration::from_millis(500)), Ok(()));
910 }
911 }
912}
913
914#[cfg(test)]
917mod webkit_queue_test {
918 use crate::{Condvar, Mutex, MutexGuard};
919 use std::{collections::VecDeque, sync::Arc, thread, time::Duration};
920
921 #[derive(Clone, Copy)]
922 enum Timeout {
923 Bounded(Duration),
924 Forever,
925 }
926
927 #[derive(Clone, Copy)]
928 enum NotifyStyle {
929 One,
930 All,
931 }
932
933 struct Queue {
934 items: VecDeque<usize>,
935 should_continue: bool,
936 }
937
938 impl Queue {
939 fn new() -> Self {
940 Self {
941 items: VecDeque::new(),
942 should_continue: true,
943 }
944 }
945 }
946
947 fn wait<T: ?Sized>(
948 condition: &Condvar,
949 lock: &mut MutexGuard<'_, T>,
950 predicate: impl Fn(&mut MutexGuard<'_, T>) -> bool,
951 timeout: &Timeout,
952 ) {
953 while !predicate(lock) {
954 match timeout {
955 Timeout::Forever => condition.wait(lock),
956 Timeout::Bounded(bound) => {
957 condition.wait_for(lock, *bound);
958 }
959 }
960 }
961 }
962
963 fn notify(style: NotifyStyle, condition: &Condvar, should_notify: bool) {
964 match style {
965 NotifyStyle::One => {
966 condition.notify_one();
967 }
968 NotifyStyle::All => {
969 if should_notify {
970 condition.notify_all();
971 }
972 }
973 }
974 }
975
976 fn run_queue_test(
977 num_producers: usize,
978 num_consumers: usize,
979 max_queue_size: usize,
980 messages_per_producer: usize,
981 notify_style: NotifyStyle,
982 timeout: Timeout,
983 delay: Duration,
984 ) {
985 let input_queue = Arc::new(Mutex::new(Queue::new()));
986 let empty_condition = Arc::new(Condvar::new());
987 let full_condition = Arc::new(Condvar::new());
988
989 let output_vec = Arc::new(Mutex::new(vec![]));
990
991 let consumers = (0..num_consumers)
992 .map(|_| {
993 consumer_thread(
994 input_queue.clone(),
995 empty_condition.clone(),
996 full_condition.clone(),
997 timeout,
998 notify_style,
999 output_vec.clone(),
1000 max_queue_size,
1001 )
1002 })
1003 .collect::<Vec<_>>();
1004 let producers = (0..num_producers)
1005 .map(|_| {
1006 producer_thread(
1007 messages_per_producer,
1008 input_queue.clone(),
1009 empty_condition.clone(),
1010 full_condition.clone(),
1011 timeout,
1012 notify_style,
1013 max_queue_size,
1014 )
1015 })
1016 .collect::<Vec<_>>();
1017
1018 thread::sleep(delay);
1019
1020 for producer in producers.into_iter() {
1021 producer.join().expect("Producer thread panicked");
1022 }
1023
1024 {
1025 let mut input_queue = input_queue.lock();
1026 input_queue.should_continue = false;
1027 }
1028 empty_condition.notify_all();
1029
1030 for consumer in consumers.into_iter() {
1031 consumer.join().expect("Consumer thread panicked");
1032 }
1033
1034 let mut output_vec = output_vec.lock();
1035 assert_eq!(output_vec.len(), num_producers * messages_per_producer);
1036 output_vec.sort();
1037 for msg_idx in 0..messages_per_producer {
1038 for producer_idx in 0..num_producers {
1039 assert_eq!(msg_idx, output_vec[msg_idx * num_producers + producer_idx]);
1040 }
1041 }
1042 }
1043
1044 fn consumer_thread(
1045 input_queue: Arc<Mutex<Queue>>,
1046 empty_condition: Arc<Condvar>,
1047 full_condition: Arc<Condvar>,
1048 timeout: Timeout,
1049 notify_style: NotifyStyle,
1050 output_queue: Arc<Mutex<Vec<usize>>>,
1051 max_queue_size: usize,
1052 ) -> thread::JoinHandle<()> {
1053 thread::spawn(move || loop {
1054 let (should_notify, result) = {
1055 let mut queue = input_queue.lock();
1056 wait(
1057 &empty_condition,
1058 &mut queue,
1059 |state| -> bool { !state.items.is_empty() || !state.should_continue },
1060 &timeout,
1061 );
1062 if queue.items.is_empty() && !queue.should_continue {
1063 return;
1064 }
1065 let should_notify = queue.items.len() == max_queue_size;
1066 let result = queue.items.pop_front();
1067 std::mem::drop(queue);
1068 (should_notify, result)
1069 };
1070 notify(notify_style, &full_condition, should_notify);
1071
1072 if let Some(result) = result {
1073 output_queue.lock().push(result);
1074 }
1075 })
1076 }
1077
1078 fn producer_thread(
1079 num_messages: usize,
1080 queue: Arc<Mutex<Queue>>,
1081 empty_condition: Arc<Condvar>,
1082 full_condition: Arc<Condvar>,
1083 timeout: Timeout,
1084 notify_style: NotifyStyle,
1085 max_queue_size: usize,
1086 ) -> thread::JoinHandle<()> {
1087 thread::spawn(move || {
1088 for message in 0..num_messages {
1089 let should_notify = {
1090 let mut queue = queue.lock();
1091 wait(
1092 &full_condition,
1093 &mut queue,
1094 |state| state.items.len() < max_queue_size,
1095 &timeout,
1096 );
1097 let should_notify = queue.items.is_empty();
1098 queue.items.push_back(message);
1099 std::mem::drop(queue);
1100 should_notify
1101 };
1102 notify(notify_style, &empty_condition, should_notify);
1103 }
1104 })
1105 }
1106
1107 macro_rules! run_queue_tests {
1108 ( $( $name:ident(
1109 num_producers: $num_producers:expr,
1110 num_consumers: $num_consumers:expr,
1111 max_queue_size: $max_queue_size:expr,
1112 messages_per_producer: $messages_per_producer:expr,
1113 notification_style: $notification_style:expr,
1114 timeout: $timeout:expr,
1115 delay_seconds: $delay_seconds:expr);
1116 )* ) => {
1117 $(#[test]
1118 fn $name() {
1119 let delay = Duration::from_secs($delay_seconds);
1120 run_queue_test(
1121 $num_producers,
1122 $num_consumers,
1123 $max_queue_size,
1124 $messages_per_producer,
1125 $notification_style,
1126 $timeout,
1127 delay,
1128 );
1129 })*
1130 };
1131 }
1132
1133 run_queue_tests! {
1134 sanity_check_queue(
1135 num_producers: 1,
1136 num_consumers: 1,
1137 max_queue_size: 1,
1138 messages_per_producer: 100_000,
1139 notification_style: NotifyStyle::All,
1140 timeout: Timeout::Bounded(Duration::from_secs(1)),
1141 delay_seconds: 0
1142 );
1143 sanity_check_queue_timeout(
1144 num_producers: 1,
1145 num_consumers: 1,
1146 max_queue_size: 1,
1147 messages_per_producer: 100_000,
1148 notification_style: NotifyStyle::All,
1149 timeout: Timeout::Forever,
1150 delay_seconds: 0
1151 );
1152 new_test_without_timeout_5(
1153 num_producers: 1,
1154 num_consumers: 5,
1155 max_queue_size: 1,
1156 messages_per_producer: 100_000,
1157 notification_style: NotifyStyle::All,
1158 timeout: Timeout::Forever,
1159 delay_seconds: 0
1160 );
1161 one_producer_one_consumer_one_slot(
1162 num_producers: 1,
1163 num_consumers: 1,
1164 max_queue_size: 1,
1165 messages_per_producer: 100_000,
1166 notification_style: NotifyStyle::All,
1167 timeout: Timeout::Forever,
1168 delay_seconds: 0
1169 );
1170 one_producer_one_consumer_one_slot_timeout(
1171 num_producers: 1,
1172 num_consumers: 1,
1173 max_queue_size: 1,
1174 messages_per_producer: 100_000,
1175 notification_style: NotifyStyle::All,
1176 timeout: Timeout::Forever,
1177 delay_seconds: 1
1178 );
1179 one_producer_one_consumer_hundred_slots(
1180 num_producers: 1,
1181 num_consumers: 1,
1182 max_queue_size: 100,
1183 messages_per_producer: 1_000_000,
1184 notification_style: NotifyStyle::All,
1185 timeout: Timeout::Forever,
1186 delay_seconds: 0
1187 );
1188 ten_producers_one_consumer_one_slot(
1189 num_producers: 10,
1190 num_consumers: 1,
1191 max_queue_size: 1,
1192 messages_per_producer: 10000,
1193 notification_style: NotifyStyle::All,
1194 timeout: Timeout::Forever,
1195 delay_seconds: 0
1196 );
1197 ten_producers_one_consumer_hundred_slots_notify_all(
1198 num_producers: 10,
1199 num_consumers: 1,
1200 max_queue_size: 100,
1201 messages_per_producer: 10000,
1202 notification_style: NotifyStyle::All,
1203 timeout: Timeout::Forever,
1204 delay_seconds: 0
1205 );
1206 ten_producers_one_consumer_hundred_slots_notify_one(
1207 num_producers: 10,
1208 num_consumers: 1,
1209 max_queue_size: 100,
1210 messages_per_producer: 10000,
1211 notification_style: NotifyStyle::One,
1212 timeout: Timeout::Forever,
1213 delay_seconds: 0
1214 );
1215 one_producer_ten_consumers_one_slot(
1216 num_producers: 1,
1217 num_consumers: 10,
1218 max_queue_size: 1,
1219 messages_per_producer: 10000,
1220 notification_style: NotifyStyle::All,
1221 timeout: Timeout::Forever,
1222 delay_seconds: 0
1223 );
1224 one_producer_ten_consumers_hundred_slots_notify_all(
1225 num_producers: 1,
1226 num_consumers: 10,
1227 max_queue_size: 100,
1228 messages_per_producer: 100_000,
1229 notification_style: NotifyStyle::All,
1230 timeout: Timeout::Forever,
1231 delay_seconds: 0
1232 );
1233 one_producer_ten_consumers_hundred_slots_notify_one(
1234 num_producers: 1,
1235 num_consumers: 10,
1236 max_queue_size: 100,
1237 messages_per_producer: 100_000,
1238 notification_style: NotifyStyle::One,
1239 timeout: Timeout::Forever,
1240 delay_seconds: 0
1241 );
1242 ten_producers_ten_consumers_one_slot(
1243 num_producers: 10,
1244 num_consumers: 10,
1245 max_queue_size: 1,
1246 messages_per_producer: 50000,
1247 notification_style: NotifyStyle::All,
1248 timeout: Timeout::Forever,
1249 delay_seconds: 0
1250 );
1251 ten_producers_ten_consumers_hundred_slots_notify_all(
1252 num_producers: 10,
1253 num_consumers: 10,
1254 max_queue_size: 100,
1255 messages_per_producer: 50000,
1256 notification_style: NotifyStyle::All,
1257 timeout: Timeout::Forever,
1258 delay_seconds: 0
1259 );
1260 ten_producers_ten_consumers_hundred_slots_notify_one(
1261 num_producers: 10,
1262 num_consumers: 10,
1263 max_queue_size: 100,
1264 messages_per_producer: 50000,
1265 notification_style: NotifyStyle::One,
1266 timeout: Timeout::Forever,
1267 delay_seconds: 0
1268 );
1269 }
1270}