1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
8use crate::util::UncheckedOptionExt;
9use crate::word_lock::WordLock;
10use core::{
11 cell::{Cell, UnsafeCell},
12ptr,
13 sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
14};
15use smallvec::SmallVec;
16use std::time::{Duration, Instant};
1718// Don't use Instant on wasm32-unknown-unknown, it just panics.
19cfg_if::cfg_if! {
20if #[cfg(all(
21 target_family = "wasm",
22 target_os = "unknown",
23 target_vendor = "unknown"
24))] {
25#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
26struct TimeoutInstant;
27impl TimeoutInstant {
28fn now() -> TimeoutInstant {
29 TimeoutInstant
30 }
31 }
32impl core::ops::Add<Duration> for TimeoutInstant {
33type Output = Self;
34fn add(self, _rhs: Duration) -> Self::Output {
35 TimeoutInstant
36 }
37 }
38 } else {
39use std::time::Instantas TimeoutInstant;
40 }
41}
4243static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
4445/// Holds the pointer to the currently active `HashTable`.
46///
47/// # Safety
48///
49/// Except for the initial value of null, it must always point to a valid `HashTable` instance.
50/// Any `HashTable` this global static has ever pointed to must never be freed.
51static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
5253// Even with 3x more buckets than threads, the memory overhead per thread is
54// still only a few hundred bytes per thread.
55const LOAD_FACTOR: usize = 3;
5657struct HashTable {
58// Hash buckets for the table
59entries: Box<[Bucket]>,
6061// Number of bits used for the hash function
62hash_bits: u32,
6364// Previous table. This is only kept to keep leak detectors happy.
65_prev: *const HashTable,
66}
6768impl HashTable {
69#[inline]
70fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
71let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
72let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
7374let now = TimeoutInstant::now();
75let mut entries = Vec::with_capacity(new_size);
76for i in 0..new_size {
77// We must ensure the seed is not zero
78entries.push(Bucket::new(now, i as u32 + 1));
79 }
8081Box::new(HashTable {
82 entries: entries.into_boxed_slice(),
83hash_bits,
84 _prev: prev,
85 })
86 }
87}
8889#[repr(align(64))]
90struct Bucket {
91// Lock protecting the queue
92mutex: WordLock,
9394// Linked list of threads waiting on this bucket
95queue_head: Cell<*const ThreadData>,
96 queue_tail: Cell<*const ThreadData>,
9798// Next time at which point be_fair should be set
99fair_timeout: UnsafeCell<FairTimeout>,
100}
101102impl Bucket {
103#[inline]
104pub fn new(timeout: TimeoutInstant, seed: u32) -> Self {
105Self {
106 mutex: WordLock::new(),
107 queue_head: Cell::new(ptr::null()),
108 queue_tail: Cell::new(ptr::null()),
109 fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
110 }
111 }
112}
113114struct FairTimeout {
115// Next time at which point be_fair should be set
116timeout: TimeoutInstant,
117118// the PRNG state for calculating the next timeout
119seed: u32,
120}
121122impl FairTimeout {
123#[inline]
124fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout {
125FairTimeout { timeout, seed }
126 }
127128// Determine whether we should force a fair unlock, and update the timeout
129#[inline]
130fn should_timeout(&mut self) -> bool {
131let now = TimeoutInstant::now();
132if now > self.timeout {
133// Time between 0 and 1ms.
134let nanos = self.gen_u32() % 1_000_000;
135self.timeout = now + Duration::new(0, nanos);
136true
137} else {
138false
139}
140 }
141142// Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
143fn gen_u32(&mut self) -> u32 {
144self.seed ^= self.seed << 13;
145self.seed ^= self.seed >> 17;
146self.seed ^= self.seed << 5;
147self.seed
148 }
149}
150151struct ThreadData {
152 parker: ThreadParker,
153154// Key that this thread is sleeping on. This may change if the thread is
155 // requeued to a different key.
156key: AtomicUsize,
157158// Linked list of parked threads in a bucket
159next_in_queue: Cell<*const ThreadData>,
160161// UnparkToken passed to this thread when it is unparked
162unpark_token: Cell<UnparkToken>,
163164// ParkToken value set by the thread when it was parked
165park_token: Cell<ParkToken>,
166167// Is the thread parked with a timeout?
168parked_with_timeout: Cell<bool>,
169170// Extra data for deadlock detection
171#[cfg(feature = "deadlock_detection")]
172deadlock_data: deadlock::DeadlockData,
173}
174175impl ThreadData {
176fn new() -> ThreadData {
177// Keep track of the total number of live ThreadData objects and resize
178 // the hash table accordingly.
179let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
180grow_hashtable(num_threads);
181182ThreadData {
183 parker: ThreadParker::new(),
184 key: AtomicUsize::new(0),
185 next_in_queue: Cell::new(ptr::null()),
186 unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
187 park_token: Cell::new(DEFAULT_PARK_TOKEN),
188 parked_with_timeout: Cell::new(false),
189#[cfg(feature = "deadlock_detection")]
190deadlock_data: deadlock::DeadlockData::new(),
191 }
192 }
193}
194195// Invokes the given closure with a reference to the current thread `ThreadData`.
196#[inline(always)]
197fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
198// Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
199 // to construct. Try to use a thread-local version if possible. Otherwise just
200 // create a ThreadData on the stack
201let mut thread_data_storage = None;
202const THREAD_DATA: ::std::thread::LocalKey<ThreadData> =
{
#[inline]
fn __rust_std_internal_init_fn() -> ThreadData { ThreadData::new() }
unsafe {
::std::thread::LocalKey::new(const {
if ::std::mem::needs_drop::<ThreadData>() {
|__rust_std_internal_init|
{
#[thread_local]
static __RUST_STD_INTERNAL_VAL:
::std::thread::local_impl::LazyStorage<ThreadData, ()> =
::std::thread::local_impl::LazyStorage::new();
__RUST_STD_INTERNAL_VAL.get_or_init(__rust_std_internal_init,
__rust_std_internal_init_fn)
}
} else {
|__rust_std_internal_init|
{
#[thread_local]
static __RUST_STD_INTERNAL_VAL:
::std::thread::local_impl::LazyStorage<ThreadData, !> =
::std::thread::local_impl::LazyStorage::new();
__RUST_STD_INTERNAL_VAL.get_or_init(__rust_std_internal_init,
__rust_std_internal_init_fn)
}
}
})
}
};thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
203let thread_data_ptr = THREAD_DATA204 .try_with(|x| xas *const ThreadData)
205 .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
206207f(unsafe { &*thread_data_ptr })
208}
209210impl Dropfor ThreadData {
211fn drop(&mut self) {
212NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
213 }
214}
215216/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
217/// The reference is valid forever. However, the `HashTable` it references might become stale
218/// at any point. Meaning it still exists, but it is not the instance in active use.
219#[inline]
220fn get_hashtable() -> &'static HashTable {
221let table = HASHTABLE.load(Ordering::Acquire);
222223// If there is no table, create one
224if table.is_null() {
225create_hashtable()
226 } else {
227// SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
228unsafe { &*table }
229 }
230}
231232/// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
233/// The reference is valid forever. However, the `HashTable` it references might become stale
234/// at any point. Meaning it still exists, but it is not the instance in active use.
235#[cold]
236fn create_hashtable() -> &'static HashTable {
237let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
238239// If this fails then it means some other thread created the hash table first.
240let table = match HASHTABLE.compare_exchange(
241 ptr::null_mut(),
242new_table,
243 Ordering::AcqRel,
244 Ordering::Acquire,
245 ) {
246Ok(_) => new_table,
247Err(old_table) => {
248// Free the table we created
249 // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
250unsafe {
251let _ = Box::from_raw(new_table);
252 }
253old_table254 }
255 };
256// SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
257 // created here, or it is one loaded from `HASHTABLE`.
258unsafe { &*table }
259}
260261// Grow the hash table so that it is big enough for the given number of threads.
262// This isn't performance-critical since it is only done when a ThreadData is
263// created, which only happens once per thread.
264fn grow_hashtable(num_threads: usize) {
265// Lock all buckets in the existing table and get a reference to it
266let old_table = loop {
267let table = get_hashtable();
268269// Check if we need to resize the existing table
270if table.entries.len() >= LOAD_FACTOR * num_threads {
271return;
272 }
273274// Lock all buckets in the old table
275for bucket in &table.entries[..] {
276 bucket.mutex.lock();
277 }
278279// Now check if our table is still the latest one. Another thread could
280 // have grown the hash table between us reading HASHTABLE and locking
281 // the buckets.
282if HASHTABLE.load(Ordering::Relaxed) == tableas *const _ as *mut _ {
283break table;
284 }
285286// Unlock buckets and try again
287for bucket in &table.entries[..] {
288// SAFETY: We hold the lock here, as required
289unsafe { bucket.mutex.unlock() };
290 }
291 };
292293// Create the new table
294let mut new_table = HashTable::new(num_threads, old_table);
295296// Move the entries from the old table to the new one
297for bucket in &old_table.entries[..] {
298// SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
299 // lists. All `ThreadData` instances in these lists will remain valid as long as they are
300 // present in the lists, meaning as long as their threads are parked.
301unsafe { rehash_bucket_into(bucket, &mut new_table) };
302 }
303304// Publish the new table. No races are possible at this point because
305 // any other thread trying to grow the hash table is blocked on the bucket
306 // locks in the old table.
307HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
308309// Unlock all buckets in the old table
310for bucket in &old_table.entries[..] {
311// SAFETY: We hold the lock here, as required
312unsafe { bucket.mutex.unlock() };
313 }
314}
315316/// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
317/// in the bucket their key correspond to for this table.
318///
319/// # Safety
320///
321/// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
322/// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
323///
324/// The given `table` must only contain buckets with correctly constructed linked lists.
325unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
326let mut current: *const ThreadData = bucket.queue_head.get();
327while !current.is_null() {
328let next = (*current).next_in_queue.get();
329let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
330if table.entries[hash].queue_tail.get().is_null() {
331 table.entries[hash].queue_head.set(current);
332 } else {
333 (*table.entries[hash].queue_tail.get())
334 .next_in_queue
335 .set(current);
336 }
337 table.entries[hash].queue_tail.set(current);
338 (*current).next_in_queue.set(ptr::null());
339 current = next;
340 }
341}
342343// Hash function for addresses
344#[cfg(target_pointer_width = "32")]
345#[inline]
346fn hash(key: usize, bits: u32) -> usize {
347 key.wrapping_mul(0x9E3779B9) >> (32 - bits)
348}
349#[cfg(target_pointer_width = "64")]
350#[inline]
351fn hash(key: usize, bits: u32) -> usize {
352key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
353}
354355/// Locks the bucket for the given key and returns a reference to it.
356/// The returned bucket must be unlocked again in order to not cause deadlocks.
357#[inline]
358fn lock_bucket(key: usize) -> &'static Bucket {
359loop {
360let hashtable = get_hashtable();
361362let hash = hash(key, hashtable.hash_bits);
363let bucket = &hashtable.entries[hash];
364365// Lock the bucket
366bucket.mutex.lock();
367368// If no other thread has rehashed the table before we grabbed the lock
369 // then we are good to go! The lock we grabbed prevents any rehashes.
370if HASHTABLE.load(Ordering::Relaxed) == hashtableas *const _ as *mut _ {
371return bucket;
372 }
373374// Unlock the bucket and try again
375 // SAFETY: We hold the lock here, as required
376unsafe { bucket.mutex.unlock() };
377 }
378}
379380/// Locks the bucket for the given key and returns a reference to it. But checks that the key
381/// hasn't been changed in the meantime due to a requeue.
382/// The returned bucket must be unlocked again in order to not cause deadlocks.
383#[inline]
384fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
385loop {
386let hashtable = get_hashtable();
387let current_key = key.load(Ordering::Relaxed);
388389let hash = hash(current_key, hashtable.hash_bits);
390let bucket = &hashtable.entries[hash];
391392// Lock the bucket
393bucket.mutex.lock();
394395// Check that both the hash table and key are correct while the bucket
396 // is locked. Note that the key can't change once we locked the proper
397 // bucket for it, so we just keep trying until we have the correct key.
398if HASHTABLE.load(Ordering::Relaxed) == hashtableas *const _ as *mut _
399&& key.load(Ordering::Relaxed) == current_key400 {
401return (current_key, bucket);
402 }
403404// Unlock the bucket and try again
405 // SAFETY: We hold the lock here, as required
406unsafe { bucket.mutex.unlock() };
407 }
408}
409410/// Locks the two buckets for the given pair of keys and returns references to them.
411/// The returned buckets must be unlocked again in order to not cause deadlocks.
412///
413/// If both keys hash to the same value, both returned references will be to the same bucket. Be
414/// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
415#[inline]
416fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
417loop {
418let hashtable = get_hashtable();
419420let hash1 = hash(key1, hashtable.hash_bits);
421let hash2 = hash(key2, hashtable.hash_bits);
422423// Get the bucket at the lowest hash/index first
424let bucket1 = if hash1 <= hash2 {
425&hashtable.entries[hash1]
426 } else {
427&hashtable.entries[hash2]
428 };
429430// Lock the first bucket
431bucket1.mutex.lock();
432433// If no other thread has rehashed the table before we grabbed the lock
434 // then we are good to go! The lock we grabbed prevents any rehashes.
435if HASHTABLE.load(Ordering::Relaxed) == hashtableas *const _ as *mut _ {
436// Now lock the second bucket and return the two buckets
437if hash1 == hash2 {
438return (bucket1, bucket1);
439 } else if hash1 < hash2 {
440let bucket2 = &hashtable.entries[hash2];
441bucket2.mutex.lock();
442return (bucket1, bucket2);
443 } else {
444let bucket2 = &hashtable.entries[hash1];
445bucket2.mutex.lock();
446return (bucket2, bucket1);
447 }
448 }
449450// Unlock the bucket and try again
451 // SAFETY: We hold the lock here, as required
452unsafe { bucket1.mutex.unlock() };
453 }
454}
455456/// Unlock a pair of buckets
457///
458/// # Safety
459///
460/// Both buckets must be locked
461#[inline]
462unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
463bucket1.mutex.unlock();
464if !ptr::eq(bucket1, bucket2) {
465bucket2.mutex.unlock();
466 }
467}
468469/// Result of a park operation.
470#[derive(#[automatically_derived]
impl ::core::marker::Copy for ParkResult { }Copy, #[automatically_derived]
impl ::core::clone::Clone for ParkResult {
#[inline]
fn clone(&self) -> ParkResult {
let _: ::core::clone::AssertParamIsClone<UnparkToken>;
*self
}
}Clone, #[automatically_derived]
impl ::core::cmp::Eq for ParkResult {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) -> () {
let _: ::core::cmp::AssertParamIsEq<UnparkToken>;
}
}Eq, #[automatically_derived]
impl ::core::cmp::PartialEq for ParkResult {
#[inline]
fn eq(&self, other: &ParkResult) -> bool {
let __self_discr = ::core::intrinsics::discriminant_value(self);
let __arg1_discr = ::core::intrinsics::discriminant_value(other);
__self_discr == __arg1_discr &&
match (self, other) {
(ParkResult::Unparked(__self_0),
ParkResult::Unparked(__arg1_0)) => __self_0 == __arg1_0,
_ => true,
}
}
}PartialEq, #[automatically_derived]
impl ::core::fmt::Debug for ParkResult {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
match self {
ParkResult::Unparked(__self_0) =>
::core::fmt::Formatter::debug_tuple_field1_finish(f,
"Unparked", &__self_0),
ParkResult::Invalid =>
::core::fmt::Formatter::write_str(f, "Invalid"),
ParkResult::TimedOut =>
::core::fmt::Formatter::write_str(f, "TimedOut"),
}
}
}Debug)]
471pub enum ParkResult {
472/// We were unparked by another thread with the given token.
473Unparked(UnparkToken),
474475/// The validation callback returned false.
476Invalid,
477478/// The timeout expired.
479TimedOut,
480}
481482impl ParkResult {
483/// Returns true if we were unparked by another thread.
484#[inline]
485pub fn is_unparked(self) -> bool {
486if let ParkResult::Unparked(_) = self{
487true
488} else {
489false
490}
491 }
492}
493494/// Result of an unpark operation.
495#[derive(#[automatically_derived]
impl ::core::marker::Copy for UnparkResult { }Copy, #[automatically_derived]
impl ::core::clone::Clone for UnparkResult {
#[inline]
fn clone(&self) -> UnparkResult {
let _: ::core::clone::AssertParamIsClone<usize>;
let _: ::core::clone::AssertParamIsClone<bool>;
let _: ::core::clone::AssertParamIsClone<()>;
*self
}
}Clone, #[automatically_derived]
impl ::core::default::Default for UnparkResult {
#[inline]
fn default() -> UnparkResult {
UnparkResult {
unparked_threads: ::core::default::Default::default(),
requeued_threads: ::core::default::Default::default(),
have_more_threads: ::core::default::Default::default(),
be_fair: ::core::default::Default::default(),
_sealed: ::core::default::Default::default(),
}
}
}Default, #[automatically_derived]
impl ::core::cmp::Eq for UnparkResult {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) -> () {
let _: ::core::cmp::AssertParamIsEq<usize>;
let _: ::core::cmp::AssertParamIsEq<bool>;
let _: ::core::cmp::AssertParamIsEq<()>;
}
}Eq, #[automatically_derived]
impl ::core::cmp::PartialEq for UnparkResult {
#[inline]
fn eq(&self, other: &UnparkResult) -> bool {
self.have_more_threads == other.have_more_threads &&
self.be_fair == other.be_fair &&
self._sealed == other._sealed &&
self.unparked_threads == other.unparked_threads &&
self.requeued_threads == other.requeued_threads
}
}PartialEq, #[automatically_derived]
impl ::core::fmt::Debug for UnparkResult {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_struct_field5_finish(f, "UnparkResult",
"unparked_threads", &self.unparked_threads, "requeued_threads",
&self.requeued_threads, "have_more_threads",
&self.have_more_threads, "be_fair", &self.be_fair, "_sealed",
&&self._sealed)
}
}Debug)]
496pub struct UnparkResult {
497/// The number of threads that were unparked.
498pub unparked_threads: usize,
499500/// The number of threads that were requeued.
501pub requeued_threads: usize,
502503/// Whether there are any threads remaining in the queue. This only returns
504 /// true if a thread was unparked.
505pub have_more_threads: bool,
506507/// This is set to true on average once every 0.5ms for any given key. It
508 /// should be used to switch to a fair unlocking mechanism for a particular
509 /// unlock.
510pub be_fair: bool,
511512/// Private field so new fields can be added without breakage.
513_sealed: (),
514}
515516/// Operation that `unpark_requeue` should perform.
517#[derive(#[automatically_derived]
impl ::core::marker::Copy for RequeueOp { }Copy, #[automatically_derived]
impl ::core::clone::Clone for RequeueOp {
#[inline]
fn clone(&self) -> RequeueOp { *self }
}Clone, #[automatically_derived]
impl ::core::cmp::Eq for RequeueOp {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) -> () {}
}Eq, #[automatically_derived]
impl ::core::cmp::PartialEq for RequeueOp {
#[inline]
fn eq(&self, other: &RequeueOp) -> bool {
let __self_discr = ::core::intrinsics::discriminant_value(self);
let __arg1_discr = ::core::intrinsics::discriminant_value(other);
__self_discr == __arg1_discr
}
}PartialEq, #[automatically_derived]
impl ::core::fmt::Debug for RequeueOp {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::write_str(f,
match self {
RequeueOp::Abort => "Abort",
RequeueOp::UnparkOneRequeueRest => "UnparkOneRequeueRest",
RequeueOp::RequeueAll => "RequeueAll",
RequeueOp::UnparkOne => "UnparkOne",
RequeueOp::RequeueOne => "RequeueOne",
})
}
}Debug)]
518pub enum RequeueOp {
519/// Abort the operation without doing anything.
520Abort,
521522/// Unpark one thread and requeue the rest onto the target queue.
523UnparkOneRequeueRest,
524525/// Requeue all threads onto the target queue.
526RequeueAll,
527528/// Unpark one thread and leave the rest parked. No requeuing is done.
529UnparkOne,
530531/// Requeue one thread and leave the rest parked on the original queue.
532RequeueOne,
533}
534535/// Operation that `unpark_filter` should perform for each thread.
536#[derive(#[automatically_derived]
impl ::core::marker::Copy for FilterOp { }Copy, #[automatically_derived]
impl ::core::clone::Clone for FilterOp {
#[inline]
fn clone(&self) -> FilterOp { *self }
}Clone, #[automatically_derived]
impl ::core::cmp::Eq for FilterOp {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) -> () {}
}Eq, #[automatically_derived]
impl ::core::cmp::PartialEq for FilterOp {
#[inline]
fn eq(&self, other: &FilterOp) -> bool {
let __self_discr = ::core::intrinsics::discriminant_value(self);
let __arg1_discr = ::core::intrinsics::discriminant_value(other);
__self_discr == __arg1_discr
}
}PartialEq, #[automatically_derived]
impl ::core::fmt::Debug for FilterOp {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::write_str(f,
match self {
FilterOp::Unpark => "Unpark",
FilterOp::Skip => "Skip",
FilterOp::Stop => "Stop",
})
}
}Debug)]
537pub enum FilterOp {
538/// Unpark the thread and continue scanning the list of parked threads.
539Unpark,
540541/// Don't unpark the thread and continue scanning the list of parked threads.
542Skip,
543544/// Don't unpark the thread and stop scanning the list of parked threads.
545Stop,
546}
547548/// A value which is passed from an unparker to a parked thread.
549#[derive(#[automatically_derived]
impl ::core::marker::Copy for UnparkToken { }Copy, #[automatically_derived]
impl ::core::clone::Clone for UnparkToken {
#[inline]
fn clone(&self) -> UnparkToken {
let _: ::core::clone::AssertParamIsClone<usize>;
*self
}
}Clone, #[automatically_derived]
impl ::core::cmp::Eq for UnparkToken {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) -> () {
let _: ::core::cmp::AssertParamIsEq<usize>;
}
}Eq, #[automatically_derived]
impl ::core::cmp::PartialEq for UnparkToken {
#[inline]
fn eq(&self, other: &UnparkToken) -> bool { self.0 == other.0 }
}PartialEq, #[automatically_derived]
impl ::core::fmt::Debug for UnparkToken {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_tuple_field1_finish(f, "UnparkToken",
&&self.0)
}
}Debug)]
550pub struct UnparkToken(pub usize);
551552/// A value associated with a parked thread which can be used by `unpark_filter`.
553#[derive(#[automatically_derived]
impl ::core::marker::Copy for ParkToken { }Copy, #[automatically_derived]
impl ::core::clone::Clone for ParkToken {
#[inline]
fn clone(&self) -> ParkToken {
let _: ::core::clone::AssertParamIsClone<usize>;
*self
}
}Clone, #[automatically_derived]
impl ::core::cmp::Eq for ParkToken {
#[inline]
#[doc(hidden)]
#[coverage(off)]
fn assert_receiver_is_total_eq(&self) -> () {
let _: ::core::cmp::AssertParamIsEq<usize>;
}
}Eq, #[automatically_derived]
impl ::core::cmp::PartialEq for ParkToken {
#[inline]
fn eq(&self, other: &ParkToken) -> bool { self.0 == other.0 }
}PartialEq, #[automatically_derived]
impl ::core::fmt::Debug for ParkToken {
#[inline]
fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
::core::fmt::Formatter::debug_tuple_field1_finish(f, "ParkToken",
&&self.0)
}
}Debug)]
554pub struct ParkToken(pub usize);
555556/// A default unpark token to use.
557pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
558559/// A default park token to use.
560pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
561562/// Parks the current thread in the queue associated with the given key.
563///
564/// The `validate` function is called while the queue is locked and can abort
565/// the operation by returning false. If `validate` returns true then the
566/// current thread is appended to the queue and the queue is unlocked.
567///
568/// The `before_sleep` function is called after the queue is unlocked but before
569/// the thread is put to sleep. The thread will then sleep until it is unparked
570/// or the given timeout is reached.
571///
572/// The `timed_out` function is also called while the queue is locked, but only
573/// if the timeout was reached. It is passed the key of the queue it was in when
574/// it timed out, which may be different from the original key if
575/// `unpark_requeue` was called. It is also passed a bool which indicates
576/// whether it was the last thread in the queue.
577///
578/// # Safety
579///
580/// You should only call this function with an address that you control, since
581/// you could otherwise interfere with the operation of other synchronization
582/// primitives.
583///
584/// The `validate` and `timed_out` functions are called while the queue is
585/// locked and must not panic or call into any function in `parking_lot`.
586///
587/// The `before_sleep` function is called outside the queue lock and is allowed
588/// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
589/// it is not allowed to call `park` or panic.
590#[inline]
591pub unsafe fn park(
592 key: usize,
593 validate: impl FnOnce() -> bool,
594 before_sleep: impl FnOnce(),
595 timed_out: impl FnOnce(usize, bool),
596 park_token: ParkToken,
597 timeout: Option<Instant>,
598) -> ParkResult {
599// Grab our thread data, this also ensures that the hash table exists
600with_thread_data(|thread_data| {
601// Lock the bucket for the given key
602let bucket = lock_bucket(key);
603604// If the validation function fails, just return
605if !validate() {
606// SAFETY: We hold the lock here, as required
607bucket.mutex.unlock();
608return ParkResult::Invalid;
609 }
610611// Append our thread data to the queue and unlock the bucket
612thread_data.parked_with_timeout.set(timeout.is_some());
613thread_data.next_in_queue.set(ptr::null());
614thread_data.key.store(key, Ordering::Relaxed);
615thread_data.park_token.set(park_token);
616thread_data.parker.prepare_park();
617if !bucket.queue_head.get().is_null() {
618 (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
619 } else {
620bucket.queue_head.set(thread_data);
621 }
622bucket.queue_tail.set(thread_data);
623// SAFETY: We hold the lock here, as required
624bucket.mutex.unlock();
625626// Invoke the pre-sleep callback
627before_sleep();
628629// Park our thread and determine whether we were woken up by an unpark
630 // or by our timeout. Note that this isn't precise: we can still be
631 // unparked since we are still in the queue.
632let unparked = match timeout {
633Some(timeout) => thread_data.parker.park_until(timeout),
634None => {
635thread_data.parker.park();
636// call deadlock detection on_unpark hook
637deadlock::on_unpark(thread_data);
638true
639}
640 };
641642// If we were unparked, return now
643if unparked {
644return ParkResult::Unparked(thread_data.unpark_token.get());
645 }
646647// Lock our bucket again. Note that the hashtable may have been rehashed in
648 // the meantime. Our key may also have changed if we were requeued.
649let (key, bucket) = lock_bucket_checked(&thread_data.key);
650651// Now we need to check again if we were unparked or timed out. Unlike the
652 // last check this is precise because we hold the bucket lock.
653if !thread_data.parker.timed_out() {
654// SAFETY: We hold the lock here, as required
655bucket.mutex.unlock();
656return ParkResult::Unparked(thread_data.unpark_token.get());
657 }
658659// We timed out, so we now need to remove our thread from the queue
660let mut link = &bucket.queue_head;
661let mut current = bucket.queue_head.get();
662let mut previous = ptr::null();
663let mut was_last_thread = true;
664while !current.is_null() {
665if current == thread_data {
666let next = (*current).next_in_queue.get();
667 link.set(next);
668if bucket.queue_tail.get() == current {
669 bucket.queue_tail.set(previous);
670 } else {
671// Scan the rest of the queue to see if there are any other
672 // entries with the given key.
673let mut scan = next;
674while !scan.is_null() {
675if (*scan).key.load(Ordering::Relaxed) == key {
676 was_last_thread = false;
677break;
678 }
679 scan = (*scan).next_in_queue.get();
680 }
681 }
682683// Callback to indicate that we timed out, and whether we were the
684 // last thread on the queue.
685timed_out(key, was_last_thread);
686break;
687 } else {
688if (*current).key.load(Ordering::Relaxed) == key {
689 was_last_thread = false;
690 }
691 link = &(*current).next_in_queue;
692 previous = current;
693 current = link.get();
694 }
695 }
696697// There should be no way for our thread to have been removed from the queue
698 // if we timed out.
699if true {
if !!current.is_null() {
::core::panicking::panic("assertion failed: !current.is_null()")
};
};debug_assert!(!current.is_null());
700701// Unlock the bucket, we are done
702 // SAFETY: We hold the lock here, as required
703bucket.mutex.unlock();
704 ParkResult::TimedOut705 })
706}
707708/// Unparks one thread from the queue associated with the given key.
709///
710/// The `callback` function is called while the queue is locked and before the
711/// target thread is woken up. The `UnparkResult` argument to the function
712/// indicates whether a thread was found in the queue and whether this was the
713/// last thread in the queue. This value is also returned by `unpark_one`.
714///
715/// The `callback` function should return an `UnparkToken` value which will be
716/// passed to the thread that is unparked. If no thread is unparked then the
717/// returned value is ignored.
718///
719/// # Safety
720///
721/// You should only call this function with an address that you control, since
722/// you could otherwise interfere with the operation of other synchronization
723/// primitives.
724///
725/// The `callback` function is called while the queue is locked and must not
726/// panic or call into any function in `parking_lot`.
727///
728/// The `parking_lot` functions are not re-entrant and calling this method
729/// from the context of an asynchronous signal handler may result in undefined
730/// behavior, including corruption of internal state and/or deadlocks.
731#[inline]
732pub unsafe fn unpark_one(
733 key: usize,
734 callback: impl FnOnce(UnparkResult) -> UnparkToken,
735) -> UnparkResult {
736// Lock the bucket for the given key
737let bucket = lock_bucket(key);
738739// Find a thread with a matching key and remove it from the queue
740let mut link = &bucket.queue_head;
741let mut current = bucket.queue_head.get();
742let mut previous = ptr::null();
743let mut result = UnparkResult::default();
744while !current.is_null() {
745if (*current).key.load(Ordering::Relaxed) == key {
746// Remove the thread from the queue
747let next = (*current).next_in_queue.get();
748 link.set(next);
749if bucket.queue_tail.get() == current {
750 bucket.queue_tail.set(previous);
751 } else {
752// Scan the rest of the queue to see if there are any other
753 // entries with the given key.
754let mut scan = next;
755while !scan.is_null() {
756if (*scan).key.load(Ordering::Relaxed) == key {
757 result.have_more_threads = true;
758break;
759 }
760 scan = (*scan).next_in_queue.get();
761 }
762 }
763764// Invoke the callback before waking up the thread
765result.unparked_threads = 1;
766 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
767let token = callback(result);
768769// Set the token for the target thread
770(*current).unpark_token.set(token);
771772// This is a bit tricky: we first lock the ThreadParker to prevent
773 // the thread from exiting and freeing its ThreadData if its wait
774 // times out. Then we unlock the queue since we don't want to keep
775 // the queue locked while we perform a system call. Finally we wake
776 // up the parked thread.
777let handle = (*current).parker.unpark_lock();
778// SAFETY: We hold the lock here, as required
779bucket.mutex.unlock();
780 handle.unpark();
781782return result;
783 } else {
784 link = &(*current).next_in_queue;
785 previous = current;
786 current = link.get();
787 }
788 }
789790// No threads with a matching key were found in the bucket
791callback(result);
792// SAFETY: We hold the lock here, as required
793bucket.mutex.unlock();
794result795}
796797/// Unparks all threads in the queue associated with the given key.
798///
799/// The given `UnparkToken` is passed to all unparked threads.
800///
801/// This function returns the number of threads that were unparked.
802///
803/// # Safety
804///
805/// You should only call this function with an address that you control, since
806/// you could otherwise interfere with the operation of other synchronization
807/// primitives.
808///
809/// The `parking_lot` functions are not re-entrant and calling this method
810/// from the context of an asynchronous signal handler may result in undefined
811/// behavior, including corruption of internal state and/or deadlocks.
812#[inline]
813pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
814// Lock the bucket for the given key
815let bucket = lock_bucket(key);
816817// Remove all threads with the given key in the bucket
818let mut link = &bucket.queue_head;
819let mut current = bucket.queue_head.get();
820let mut previous = ptr::null();
821let mut threads = SmallVec::<[_; 8]>::new();
822while !current.is_null() {
823if (*current).key.load(Ordering::Relaxed) == key {
824// Remove the thread from the queue
825let next = (*current).next_in_queue.get();
826 link.set(next);
827if bucket.queue_tail.get() == current {
828 bucket.queue_tail.set(previous);
829 }
830831// Set the token for the target thread
832(*current).unpark_token.set(unpark_token);
833834// Don't wake up threads while holding the queue lock. See comment
835 // in unpark_one. For now just record which threads we need to wake
836 // up.
837threads.push((*current).parker.unpark_lock());
838 current = next;
839 } else {
840 link = &(*current).next_in_queue;
841 previous = current;
842 current = link.get();
843 }
844 }
845846// Unlock the bucket
847 // SAFETY: We hold the lock here, as required
848bucket.mutex.unlock();
849850// Now that we are outside the lock, wake up all the threads that we removed
851 // from the queue.
852let num_threads = threads.len();
853for handle in threads.into_iter() {
854 handle.unpark();
855 }
856857num_threads858}
859860/// Removes all threads from the queue associated with `key_from`, optionally
861/// unparks the first one and requeues the rest onto the queue associated with
862/// `key_to`.
863///
864/// The `validate` function is called while both queues are locked. Its return
865/// value will determine which operation is performed, or whether the operation
866/// should be aborted. See `RequeueOp` for details about the different possible
867/// return values.
868///
869/// The `callback` function is also called while both queues are locked. It is
870/// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
871/// indicating whether a thread was unparked and whether there are threads still
872/// parked in the new queue. This `UnparkResult` value is also returned by
873/// `unpark_requeue`.
874///
875/// The `callback` function should return an `UnparkToken` value which will be
876/// passed to the thread that is unparked. If no thread is unparked then the
877/// returned value is ignored.
878///
879/// # Safety
880///
881/// You should only call this function with an address that you control, since
882/// you could otherwise interfere with the operation of other synchronization
883/// primitives.
884///
885/// The `validate` and `callback` functions are called while the queue is locked
886/// and must not panic or call into any function in `parking_lot`.
887#[inline]
888pub unsafe fn unpark_requeue(
889 key_from: usize,
890 key_to: usize,
891 validate: impl FnOnce() -> RequeueOp,
892 callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
893) -> UnparkResult {
894// Lock the two buckets for the given key
895let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
896897// If the validation function fails, just return
898let mut result = UnparkResult::default();
899let op = validate();
900if op == RequeueOp::Abort {
901// SAFETY: Both buckets are locked, as required.
902unlock_bucket_pair(bucket_from, bucket_to);
903return result;
904 }
905906// Remove all threads with the given key in the source bucket
907let mut link = &bucket_from.queue_head;
908let mut current = bucket_from.queue_head.get();
909let mut previous = ptr::null();
910let mut requeue_threads: *const ThreadData = ptr::null();
911let mut requeue_threads_tail: *const ThreadData = ptr::null();
912let mut wakeup_thread = None;
913while !current.is_null() {
914if (*current).key.load(Ordering::Relaxed) == key_from {
915// Remove the thread from the queue
916let next = (*current).next_in_queue.get();
917 link.set(next);
918if bucket_from.queue_tail.get() == current {
919 bucket_from.queue_tail.set(previous);
920 }
921922// Prepare the first thread for wakeup and requeue the rest.
923if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
924 && wakeup_thread.is_none()
925 {
926 wakeup_thread = Some(current);
927 result.unparked_threads = 1;
928 } else {
929if !requeue_threads.is_null() {
930 (*requeue_threads_tail).next_in_queue.set(current);
931 } else {
932 requeue_threads = current;
933 }
934 requeue_threads_tail = current;
935 (*current).key.store(key_to, Ordering::Relaxed);
936 result.requeued_threads += 1;
937 }
938if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
939// Scan the rest of the queue to see if there are any other
940 // entries with the given key.
941let mut scan = next;
942while !scan.is_null() {
943if (*scan).key.load(Ordering::Relaxed) == key_from {
944 result.have_more_threads = true;
945break;
946 }
947 scan = (*scan).next_in_queue.get();
948 }
949break;
950 }
951 current = next;
952 } else {
953 link = &(*current).next_in_queue;
954 previous = current;
955 current = link.get();
956 }
957 }
958959// Add the requeued threads to the destination bucket
960if !requeue_threads.is_null() {
961 (*requeue_threads_tail).next_in_queue.set(ptr::null());
962if !bucket_to.queue_head.get().is_null() {
963 (*bucket_to.queue_tail.get())
964 .next_in_queue
965 .set(requeue_threads);
966 } else {
967bucket_to.queue_head.set(requeue_threads);
968 }
969bucket_to.queue_tail.set(requeue_threads_tail);
970 }
971972// Invoke the callback before waking up the thread
973if result.unparked_threads != 0 {
974result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
975 }
976let token = callback(op, result);
977978// See comment in unpark_one for why we mess with the locking
979if let Some(wakeup_thread) = wakeup_thread {
980 (*wakeup_thread).unpark_token.set(token);
981let handle = (*wakeup_thread).parker.unpark_lock();
982// SAFETY: Both buckets are locked, as required.
983unlock_bucket_pair(bucket_from, bucket_to);
984handle.unpark();
985 } else {
986// SAFETY: Both buckets are locked, as required.
987unlock_bucket_pair(bucket_from, bucket_to);
988 }
989990result991}
992993/// Unparks a number of threads from the front of the queue associated with
994/// `key` depending on the results of a filter function which inspects the
995/// `ParkToken` associated with each thread.
996///
997/// The `filter` function is called for each thread in the queue or until
998/// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
999/// associated with a particular thread, which is unparked if `FilterOp::Unpark`
1000/// is returned.
1001///
1002/// The `callback` function is also called while both queues are locked. It is
1003/// passed an `UnparkResult` indicating the number of threads that were unparked
1004/// and whether there are still parked threads in the queue. This `UnparkResult`
1005/// value is also returned by `unpark_filter`.
1006///
1007/// The `callback` function should return an `UnparkToken` value which will be
1008/// passed to all threads that are unparked. If no thread is unparked then the
1009/// returned value is ignored.
1010///
1011/// # Safety
1012///
1013/// You should only call this function with an address that you control, since
1014/// you could otherwise interfere with the operation of other synchronization
1015/// primitives.
1016///
1017/// The `filter` and `callback` functions are called while the queue is locked
1018/// and must not panic or call into any function in `parking_lot`.
1019#[inline]
1020pub unsafe fn unpark_filter(
1021 key: usize,
1022mut filter: impl FnMut(ParkToken) -> FilterOp,
1023 callback: impl FnOnce(UnparkResult) -> UnparkToken,
1024) -> UnparkResult {
1025// Lock the bucket for the given key
1026let bucket = lock_bucket(key);
10271028// Go through the queue looking for threads with a matching key
1029let mut link = &bucket.queue_head;
1030let mut current = bucket.queue_head.get();
1031let mut previous = ptr::null();
1032let mut threads = SmallVec::<[_; 8]>::new();
1033let mut result = UnparkResult::default();
1034while !current.is_null() {
1035if (*current).key.load(Ordering::Relaxed) == key {
1036// Call the filter function with the thread's ParkToken
1037let next = (*current).next_in_queue.get();
1038match filter((*current).park_token.get()) {
1039 FilterOp::Unpark => {
1040// Remove the thread from the queue
1041link.set(next);
1042if bucket.queue_tail.get() == current {
1043 bucket.queue_tail.set(previous);
1044 }
10451046// Add the thread to our list of threads to unpark
1047threads.push((current, None));
10481049 current = next;
1050 }
1051 FilterOp::Skip => {
1052 result.have_more_threads = true;
1053 link = &(*current).next_in_queue;
1054 previous = current;
1055 current = link.get();
1056 }
1057 FilterOp::Stop => {
1058 result.have_more_threads = true;
1059break;
1060 }
1061 }
1062 } else {
1063 link = &(*current).next_in_queue;
1064 previous = current;
1065 current = link.get();
1066 }
1067 }
10681069// Invoke the callback before waking up the threads
1070result.unparked_threads = threads.len();
1071if result.unparked_threads != 0 {
1072result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1073 }
1074let token = callback(result);
10751076// Pass the token to all threads that are going to be unparked and prepare
1077 // them for unparking.
1078for t in threads.iter_mut() {
1079 (*t.0).unpark_token.set(token);
1080 t.1 = Some((*t.0).parker.unpark_lock());
1081 }
10821083// SAFETY: We hold the lock here, as required
1084bucket.mutex.unlock();
10851086// Now that we are outside the lock, wake up all the threads that we removed
1087 // from the queue.
1088for (_, handle) in threads.into_iter() {
1089 handle.unchecked_unwrap().unpark();
1090 }
10911092result1093}
10941095/// \[Experimental\] Deadlock detection
1096///
1097/// Enabled via the `deadlock_detection` feature flag.
1098pub mod deadlock {
1099#[cfg(feature = "deadlock_detection")]
1100use super::deadlock_impl;
11011102#[cfg(feature = "deadlock_detection")]
1103pub(super) use super::deadlock_impl::DeadlockData;
11041105/// Acquire a resource identified by key in the deadlock detector
1106 /// Noop if `deadlock_detection` feature isn't enabled.
1107 ///
1108 /// # Safety
1109 ///
1110 /// Call after the resource is acquired
1111#[inline]
1112pub unsafe fn acquire_resource(_key: usize) {
1113#[cfg(feature = "deadlock_detection")]
1114deadlock_impl::acquire_resource(_key);
1115 }
11161117/// Release a resource identified by key in the deadlock detector.
1118 /// Noop if `deadlock_detection` feature isn't enabled.
1119 ///
1120 /// # Panics
1121 ///
1122 /// Panics if the resource was already released or wasn't acquired in this thread.
1123 ///
1124 /// # Safety
1125 ///
1126 /// Call before the resource is released
1127#[inline]
1128pub unsafe fn release_resource(_key: usize) {
1129#[cfg(feature = "deadlock_detection")]
1130deadlock_impl::release_resource(_key);
1131 }
11321133/// Returns all deadlocks detected *since* the last call.
1134 /// Each cycle consist of a vector of `DeadlockedThread`.
1135#[cfg(feature = "deadlock_detection")]
1136 #[inline]
1137pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1138 deadlock_impl::check_deadlock()
1139 }
11401141#[inline]
1142pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1143#[cfg(feature = "deadlock_detection")]
1144deadlock_impl::on_unpark(_td);
1145 }
1146}
11471148#[cfg(feature = "deadlock_detection")]
1149mod deadlock_impl {
1150use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1151use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1152use crate::word_lock::WordLock;
1153use backtrace::Backtrace;
1154use petgraph;
1155use petgraph::graphmap::DiGraphMap;
1156use std::cell::{Cell, UnsafeCell};
1157use std::collections::HashSet;
1158use std::sync::atomic::Ordering;
1159use std::sync::mpsc;
1160use std::thread::ThreadId;
11611162/// Representation of a deadlocked thread
1163pub struct DeadlockedThread {
1164 thread_id: ThreadId,
1165 backtrace: Backtrace,
1166 }
11671168impl DeadlockedThread {
1169/// The system thread id
1170pub fn thread_id(&self) -> ThreadId {
1171self.thread_id
1172 }
11731174/// The thread backtrace
1175pub fn backtrace(&self) -> &Backtrace {
1176&self.backtrace
1177 }
1178 }
11791180pub struct DeadlockData {
1181// Currently owned resources (keys)
1182resources: UnsafeCell<Vec<usize>>,
11831184// Set when there's a pending callstack request
1185deadlocked: Cell<bool>,
11861187// Sender used to report the backtrace
1188backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
11891190// System thread id
1191thread_id: ThreadId,
1192 }
11931194impl DeadlockData {
1195pub fn new() -> Self {
1196 DeadlockData {
1197 resources: UnsafeCell::new(Vec::new()),
1198 deadlocked: Cell::new(false),
1199 backtrace_sender: UnsafeCell::new(None),
1200 thread_id: std::thread::current().id(),
1201 }
1202 }
1203 }
12041205pub(super) unsafe fn on_unpark(td: &ThreadData) {
1206if td.deadlock_data.deadlocked.get() {
1207let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1208 sender
1209 .send(DeadlockedThread {
1210 thread_id: td.deadlock_data.thread_id,
1211 backtrace: Backtrace::new(),
1212 })
1213 .unwrap();
1214// make sure to close this sender
1215drop(sender);
12161217// park until the end of the time
1218td.parker.prepare_park();
1219 td.parker.park();
1220unreachable!("unparked deadlocked thread!");
1221 }
1222 }
12231224pub unsafe fn acquire_resource(key: usize) {
1225 with_thread_data(|thread_data| {
1226 (*thread_data.deadlock_data.resources.get()).push(key);
1227 });
1228 }
12291230pub unsafe fn release_resource(key: usize) {
1231 with_thread_data(|thread_data| {
1232let resources = &mut (*thread_data.deadlock_data.resources.get());
12331234// There is only one situation where we can fail to find the
1235 // resource: we are currently running TLS destructors and our
1236 // ThreadData has already been freed. There isn't much we can do
1237 // about it at this point, so just ignore it.
1238if let Some(p) = resources.iter().rposition(|x| *x == key) {
1239 resources.swap_remove(p);
1240 }
1241 });
1242 }
12431244pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1245unsafe {
1246// fast pass
1247if check_wait_graph_fast() {
1248// double check
1249check_wait_graph_slow()
1250 } else {
1251 Vec::new()
1252 }
1253 }
1254 }
12551256// Simple algorithm that builds a wait graph f the threads and the resources,
1257 // then checks for the presence of cycles (deadlocks).
1258 // This variant isn't precise as it doesn't lock the entire table before checking
1259unsafe fn check_wait_graph_fast() -> bool {
1260let table = get_hashtable();
1261let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1262let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
12631264for b in &(*table).entries[..] {
1265 b.mutex.lock();
1266let mut current = b.queue_head.get();
1267while !current.is_null() {
1268if !(*current).parked_with_timeout.get()
1269 && !(*current).deadlock_data.deadlocked.get()
1270 {
1271// .resources are waiting for their owner
1272for &resource in &(*(*current).deadlock_data.resources.get()) {
1273 graph.add_edge(resource, current as usize, ());
1274 }
1275// owner waits for resource .key
1276graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1277 }
1278 current = (*current).next_in_queue.get();
1279 }
1280// SAFETY: We hold the lock here, as required
1281b.mutex.unlock();
1282 }
12831284 petgraph::algo::is_cyclic_directed(&graph)
1285 }
12861287#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1288enum WaitGraphNode {
1289 Thread(*const ThreadData),
1290 Resource(usize),
1291 }
12921293use self::WaitGraphNode::*;
12941295// Contrary to the _fast variant this locks the entries table before looking for cycles.
1296 // Returns all detected thread wait cycles.
1297 // Note that once a cycle is reported it's never reported again.
1298unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1299static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new();
1300 DEADLOCK_DETECTION_LOCK.lock();
13011302let mut table = get_hashtable();
1303loop {
1304// Lock all buckets in the old table
1305for b in &table.entries[..] {
1306 b.mutex.lock();
1307 }
13081309// Now check if our table is still the latest one. Another thread could
1310 // have grown the hash table between us getting and locking the hash table.
1311let new_table = get_hashtable();
1312if new_table as *const _ == table as *const _ {
1313break;
1314 }
13151316// Unlock buckets and try again
1317for b in &table.entries[..] {
1318// SAFETY: We hold the lock here, as required
1319b.mutex.unlock();
1320 }
13211322 table = new_table;
1323 }
13241325let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1326let mut graph =
1327 DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
13281329for b in &table.entries[..] {
1330let mut current = b.queue_head.get();
1331while !current.is_null() {
1332if !(*current).parked_with_timeout.get()
1333 && !(*current).deadlock_data.deadlocked.get()
1334 {
1335// .resources are waiting for their owner
1336for &resource in &(*(*current).deadlock_data.resources.get()) {
1337 graph.add_edge(Resource(resource), Thread(current), ());
1338 }
1339// owner waits for resource .key
1340graph.add_edge(
1341 Thread(current),
1342 Resource((*current).key.load(Ordering::Relaxed)),
1343 (),
1344 );
1345 }
1346 current = (*current).next_in_queue.get();
1347 }
1348 }
13491350for b in &table.entries[..] {
1351// SAFETY: We hold the lock here, as required
1352b.mutex.unlock();
1353 }
13541355// find cycles
1356let cycles = graph_cycles(&graph);
13571358let mut results = Vec::with_capacity(cycles.len());
13591360for cycle in cycles {
1361let (sender, receiver) = mpsc::channel();
1362for td in cycle {
1363let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1364 (*td).deadlock_data.deadlocked.set(true);
1365*(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1366let handle = (*td).parker.unpark_lock();
1367// SAFETY: We hold the lock here, as required
1368bucket.mutex.unlock();
1369// unpark the deadlocked thread!
1370 // on unpark it'll notice the deadlocked flag and report back
1371handle.unpark();
1372 }
1373// make sure to drop our sender before collecting results
1374drop(sender);
1375 results.push(receiver.iter().collect());
1376 }
13771378 DEADLOCK_DETECTION_LOCK.unlock();
13791380 results
1381 }
13821383// normalize a cycle to start with the "smallest" node
1384fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1385let min_pos = input
1386 .iter()
1387 .enumerate()
1388 .min_by_key(|&(_, &t)| t)
1389 .map(|(p, _)| p)
1390 .unwrap_or(0);
1391 input
1392 .iter()
1393 .cycle()
1394 .skip(min_pos)
1395 .take(input.len())
1396 .cloned()
1397 .collect()
1398 }
13991400// returns all thread cycles in the wait graph
1401fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1402use petgraph::visit::depth_first_search;
1403use petgraph::visit::DfsEvent;
1404use petgraph::visit::NodeIndexable;
14051406let mut cycles = HashSet::new();
1407let mut path = Vec::with_capacity(g.node_bound());
1408// start from threads to get the correct threads cycle
1409let threads = g
1410 .nodes()
1411 .filter(|n| if let &Thread(_) = n { true } else { false });
14121413 depth_first_search(g, threads, |e| match e {
1414 DfsEvent::Discover(Thread(n), _) => path.push(n),
1415 DfsEvent::Finish(Thread(_), _) => {
1416 path.pop();
1417 }
1418 DfsEvent::BackEdge(_, Thread(n)) => {
1419let from = path.iter().rposition(|&i| i == n).unwrap();
1420 cycles.insert(normalize_cycle(&path[from..]));
1421 }
1422_ => (),
1423 });
14241425 cycles.iter().cloned().collect()
1426 }
1427}
14281429#[cfg(test)]
1430mod tests {
1431use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1432use std::{
1433 ptr,
1434 sync::{
1435 atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1436 Arc,
1437 },
1438 thread,
1439 time::Duration,
1440 };
14411442/// Calls a closure for every `ThreadData` currently parked on a given key
1443fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1444let bucket = super::lock_bucket(key);
14451446let mut current: *const ThreadData = bucket.queue_head.get();
1447while !current.is_null() {
1448let current_ref = unsafe { &*current };
1449if current_ref.key.load(Ordering::Relaxed) == key {
1450 f(current_ref);
1451 }
1452 current = current_ref.next_in_queue.get();
1453 }
14541455// SAFETY: We hold the lock here, as required
1456unsafe { bucket.mutex.unlock() };
1457 }
14581459macro_rules! test {
1460 ( $( $name:ident(
1461 repeats: $repeats:expr,
1462 latches: $latches:expr,
1463 delay: $delay:expr,
1464 threads: $threads:expr,
1465 single_unparks: $single_unparks:expr);
1466 )* ) => {
1467 $(#[test]
1468fn $name() {
1469let delay = Duration::from_micros($delay);
1470for _ in 0..$repeats {
1471 run_parking_test($latches, delay, $threads, $single_unparks);
1472 }
1473 })*
1474 };
1475 }
14761477test! {
1478 unpark_all_one_fast(
1479 repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1480);
1481 unpark_all_hundred_fast(
1482 repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1483);
1484 unpark_one_one_fast(
1485 repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1486);
1487 unpark_one_hundred_fast(
1488 repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1489);
1490 unpark_one_fifty_then_fifty_all_fast(
1491 repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1492);
1493 unpark_all_one(
1494 repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1495);
1496 unpark_all_hundred(
1497 repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1498);
1499 unpark_one_one(
1500 repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1501);
1502 unpark_one_fifty(
1503 repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1504);
1505 unpark_one_fifty_then_fifty_all(
1506 repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1507);
1508 hundred_unpark_all_one_fast(
1509 repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1510);
1511 hundred_unpark_all_one(
1512 repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1513);
1514 }
15151516fn run_parking_test(
1517 num_latches: usize,
1518 delay: Duration,
1519 num_threads: usize,
1520 num_single_unparks: usize,
1521 ) {
1522let mut tests = Vec::with_capacity(num_latches);
15231524for _ in 0..num_latches {
1525let test = Arc::new(SingleLatchTest::new(num_threads));
1526let mut threads = Vec::with_capacity(num_threads);
1527for _ in 0..num_threads {
1528let test = test.clone();
1529 threads.push(thread::spawn(move || test.run()));
1530 }
1531 tests.push((test, threads));
1532 }
15331534for unpark_index in 0..num_single_unparks {
1535 thread::sleep(delay);
1536for (test, _) in &tests {
1537 test.unpark_one(unpark_index);
1538 }
1539 }
15401541for (test, threads) in tests {
1542 test.finish(num_single_unparks);
1543for thread in threads {
1544 thread.join().expect("Test thread panic");
1545 }
1546 }
1547 }
15481549struct SingleLatchTest {
1550 semaphore: AtomicIsize,
1551 num_awake: AtomicUsize,
1552/// Holds the pointer to the last *unprocessed* woken up thread.
1553last_awoken: AtomicPtr<ThreadData>,
1554/// Total number of threads participating in this test.
1555num_threads: usize,
1556 }
15571558impl SingleLatchTest {
1559pub fn new(num_threads: usize) -> Self {
1560Self {
1561// This implements a fair (FIFO) semaphore, and it starts out unavailable.
1562semaphore: AtomicIsize::new(0),
1563 num_awake: AtomicUsize::new(0),
1564 last_awoken: AtomicPtr::new(ptr::null_mut()),
1565 num_threads,
1566 }
1567 }
15681569pub fn run(&self) {
1570// Get one slot from the semaphore
1571self.down();
15721573// Report back to the test verification code that this thread woke up
1574let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1575self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1576self.num_awake.fetch_add(1, Ordering::SeqCst);
1577 }
15781579pub fn unpark_one(&self, single_unpark_index: usize) {
1580// last_awoken should be null at all times except between self.up() and at the bottom
1581 // of this method where it's reset to null again
1582assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
15831584let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1585 for_each(self.semaphore_addr(), |thread_data| {
1586 queue.push(thread_data as *const _ as *mut _);
1587 });
1588assert!(queue.len() <= self.num_threads - single_unpark_index);
15891590let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
15911592self.up();
15931594// Wait for a parked thread to wake up and update num_awake + last_awoken.
1595while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1596 thread::yield_now();
1597 }
15981599// At this point the other thread should have set last_awoken inside the run() method
1600let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1601assert!(!last_awoken.is_null());
1602if !queue.is_empty() && queue[0] != last_awoken {
1603panic!(
1604"Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1605 queue, last_awoken
1606 );
1607 }
1608self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1609 }
16101611pub fn finish(&self, num_single_unparks: usize) {
1612// The amount of threads not unparked via unpark_one
1613let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
16141615// Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1616 // still be threads that has not yet parked.
1617while num_threads_left > 0 {
1618let mut num_waiting_on_address = 0;
1619 for_each(self.semaphore_addr(), |_thread_data| {
1620 num_waiting_on_address += 1;
1621 });
1622assert!(num_waiting_on_address <= num_threads_left);
16231624let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
16251626let num_unparked =
1627unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1628assert!(num_unparked >= num_waiting_on_address);
1629assert!(num_unparked <= num_threads_left);
16301631// Wait for all unparked threads to wake up and update num_awake + last_awoken.
1632while self.num_awake.load(Ordering::SeqCst)
1633 != num_awake_before_unpark + num_unparked
1634 {
1635 thread::yield_now()
1636 }
16371638 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1639 }
1640// By now, all threads should have been woken up
1641assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
16421643// Make sure no thread is parked on our semaphore address
1644let mut num_waiting_on_address = 0;
1645 for_each(self.semaphore_addr(), |_thread_data| {
1646 num_waiting_on_address += 1;
1647 });
1648assert_eq!(num_waiting_on_address, 0);
1649 }
16501651pub fn down(&self) {
1652let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
16531654if old_semaphore_value > 0 {
1655// We acquired the semaphore. Done.
1656return;
1657 }
16581659// We need to wait.
1660let validate = || true;
1661let before_sleep = || {};
1662let timed_out = |_, _| {};
1663unsafe {
1664super::park(
1665self.semaphore_addr(),
1666 validate,
1667 before_sleep,
1668 timed_out,
1669 DEFAULT_PARK_TOKEN,
1670None,
1671 );
1672 }
1673 }
16741675pub fn up(&self) {
1676let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
16771678// Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1679if old_semaphore_value < 0 {
1680// We need to continue until we have actually unparked someone. It might be that
1681 // the thread we want to pass ownership to has decremented the semaphore counter,
1682 // but not yet parked.
1683loop {
1684match unsafe {
1685super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1686 .unparked_threads
1687 } {
16881 => break,
16890 => (),
1690 i => panic!("Should not wake up {} threads", i),
1691 }
1692 }
1693 }
1694 }
16951696fn semaphore_addr(&self) -> usize {
1697&self.semaphore as *const _ as usize
1698 }
1699 }
1700}