|
8 | 8 | use cell::{Cell, UnsafeCell};
|
9 | 9 | use ptr;
|
10 | 10 | use sync::atomic::{AtomicPtr, AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
|
11 |
| -use thread::LocalKey; |
12 | 11 | use time::{Duration, Instant};
|
13 | 12 | use sys_common::thread_parker::ThreadParker;
|
14 | 13 | use super::util::UncheckedOptionExt;
|
@@ -140,24 +139,26 @@ impl ThreadData {
|
140 | 139 | }
|
141 | 140 | }
|
142 | 141 |
|
143 |
| -// Returns a ThreadData structure for the current thread |
144 |
| -unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData { |
145 |
| - // Try to read from thread-local storage, but return None if the TLS has |
146 |
| - // already been destroyed. |
147 |
| - fn try_get_tls(key: &'static LocalKey<ThreadData>) -> Option<*const ThreadData> { |
148 |
| - key.try_with(|x| x as *const ThreadData).ok() |
149 |
| - } |
150 |
| - |
| 142 | +// Invokes the given closure with a reference to the current thread `ThreadData`. |
| 143 | +fn with_thread_data<F, T>(f: F) -> T |
| 144 | +where |
| 145 | + F: FnOnce(&ThreadData) -> T, |
| 146 | +{ |
151 | 147 | // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
|
152 | 148 | // to construct. Try to use a thread-local version if possible.
|
| 149 | + let mut thread_data_ptr = ptr::null(); |
153 | 150 | thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
|
154 |
| - if let Some(tls) = try_get_tls(&THREAD_DATA) { |
155 |
| - return &*tls; |
| 151 | + if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) { |
| 152 | + thread_data_ptr = tls_thread_data; |
156 | 153 | }
|
157 | 154 |
|
158 | 155 | // Otherwise just create a ThreadData on the stack
|
159 |
| - *local = Some(ThreadData::new()); |
160 |
| - local.as_ref().unwrap() |
| 156 | + let mut thread_data_storage = None; |
| 157 | + if thread_data_ptr.is_null() { |
| 158 | + thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new); |
| 159 | + } |
| 160 | + |
| 161 | + f(unsafe { &*thread_data_ptr }) |
161 | 162 | }
|
162 | 163 |
|
163 | 164 | impl Drop for ThreadData {
|
@@ -552,104 +553,103 @@ unsafe fn park_internal(
|
552 | 553 | timeout: Option<Instant>,
|
553 | 554 | ) -> ParkResult {
|
554 | 555 | // Grab our thread data, this also ensures that the hash table exists
|
555 |
| - let mut thread_data = None; |
556 |
| - let thread_data = get_thread_data(&mut thread_data); |
| 556 | + with_thread_data(|thread_data| { |
| 557 | + // Lock the bucket for the given key |
| 558 | + let bucket = lock_bucket(key); |
557 | 559 |
|
558 |
| - // Lock the bucket for the given key |
559 |
| - let bucket = lock_bucket(key); |
| 560 | + // If the validation function fails, just return |
| 561 | + if !validate() { |
| 562 | + bucket.mutex.unlock(); |
| 563 | + return ParkResult::Invalid; |
| 564 | + } |
560 | 565 |
|
561 |
| - // If the validation function fails, just return |
562 |
| - if !validate() { |
| 566 | + // Append our thread data to the queue and unlock the bucket |
| 567 | + thread_data.parked_with_timeout.set(timeout.is_some()); |
| 568 | + thread_data.next_in_queue.set(ptr::null()); |
| 569 | + thread_data.key.store(key, Ordering::Relaxed); |
| 570 | + thread_data.park_token.set(park_token); |
| 571 | + thread_data.parker.prepare_park(); |
| 572 | + if !bucket.queue_head.get().is_null() { |
| 573 | + (*bucket.queue_tail.get()).next_in_queue.set(thread_data); |
| 574 | + } else { |
| 575 | + bucket.queue_head.set(thread_data); |
| 576 | + } |
| 577 | + bucket.queue_tail.set(thread_data); |
563 | 578 | bucket.mutex.unlock();
|
564 |
| - return ParkResult::Invalid; |
565 |
| - } |
566 | 579 |
|
567 |
| - // Append our thread data to the queue and unlock the bucket |
568 |
| - thread_data.parked_with_timeout.set(timeout.is_some()); |
569 |
| - thread_data.next_in_queue.set(ptr::null()); |
570 |
| - thread_data.key.store(key, Ordering::Relaxed); |
571 |
| - thread_data.park_token.set(park_token); |
572 |
| - thread_data.parker.prepare_park(); |
573 |
| - if !bucket.queue_head.get().is_null() { |
574 |
| - (*bucket.queue_tail.get()).next_in_queue.set(thread_data); |
575 |
| - } else { |
576 |
| - bucket.queue_head.set(thread_data); |
577 |
| - } |
578 |
| - bucket.queue_tail.set(thread_data); |
579 |
| - bucket.mutex.unlock(); |
580 |
| - |
581 |
| - // Invoke the pre-sleep callback |
582 |
| - before_sleep(); |
| 580 | + // Invoke the pre-sleep callback |
| 581 | + before_sleep(); |
| 582 | + |
| 583 | + // Park our thread and determine whether we were woken up by an unpark or by |
| 584 | + // our timeout. Note that this isn't precise: we can still be unparked since |
| 585 | + // we are still in the queue. |
| 586 | + let unparked = match timeout { |
| 587 | + Some(timeout) => thread_data.parker.park_until(timeout), |
| 588 | + None => { |
| 589 | + thread_data.parker.park(); |
| 590 | + true |
| 591 | + } |
| 592 | + }; |
583 | 593 |
|
584 |
| - // Park our thread and determine whether we were woken up by an unpark or by |
585 |
| - // our timeout. Note that this isn't precise: we can still be unparked since |
586 |
| - // we are still in the queue. |
587 |
| - let unparked = match timeout { |
588 |
| - Some(timeout) => thread_data.parker.park_until(timeout), |
589 |
| - None => { |
590 |
| - thread_data.parker.park(); |
591 |
| - true |
| 594 | + // If we were unparked, return now |
| 595 | + if unparked { |
| 596 | + return ParkResult::Unparked(thread_data.unpark_token.get()); |
592 | 597 | }
|
593 |
| - }; |
594 |
| - |
595 |
| - // If we were unparked, return now |
596 |
| - if unparked { |
597 |
| - return ParkResult::Unparked(thread_data.unpark_token.get()); |
598 |
| - } |
599 | 598 |
|
600 |
| - // Lock our bucket again. Note that the hashtable may have been rehashed in |
601 |
| - // the meantime. Our key may also have changed if we were requeued. |
602 |
| - let (key, bucket) = lock_bucket_checked(&thread_data.key); |
| 599 | + // Lock our bucket again. Note that the hashtable may have been rehashed in |
| 600 | + // the meantime. Our key may also have changed if we were requeued. |
| 601 | + let (key, bucket) = lock_bucket_checked(&thread_data.key); |
603 | 602 |
|
604 |
| - // Now we need to check again if we were unparked or timed out. Unlike the |
605 |
| - // last check this is precise because we hold the bucket lock. |
606 |
| - if !thread_data.parker.timed_out() { |
607 |
| - bucket.mutex.unlock(); |
608 |
| - return ParkResult::Unparked(thread_data.unpark_token.get()); |
609 |
| - } |
| 603 | + // Now we need to check again if we were unparked or timed out. Unlike the |
| 604 | + // last check this is precise because we hold the bucket lock. |
| 605 | + if !thread_data.parker.timed_out() { |
| 606 | + bucket.mutex.unlock(); |
| 607 | + return ParkResult::Unparked(thread_data.unpark_token.get()); |
| 608 | + } |
610 | 609 |
|
611 |
| - // We timed out, so we now need to remove our thread from the queue |
612 |
| - let mut link = &bucket.queue_head; |
613 |
| - let mut current = bucket.queue_head.get(); |
614 |
| - let mut previous = ptr::null(); |
615 |
| - while !current.is_null() { |
616 |
| - if current == thread_data { |
617 |
| - let next = (*current).next_in_queue.get(); |
618 |
| - link.set(next); |
619 |
| - let mut was_last_thread = true; |
620 |
| - if bucket.queue_tail.get() == current { |
621 |
| - bucket.queue_tail.set(previous); |
622 |
| - } else { |
623 |
| - // Scan the rest of the queue to see if there are any other |
624 |
| - // entries with the given key. |
625 |
| - let mut scan = next; |
626 |
| - while !scan.is_null() { |
627 |
| - if (*scan).key.load(Ordering::Relaxed) == key { |
628 |
| - was_last_thread = false; |
629 |
| - break; |
| 610 | + // We timed out, so we now need to remove our thread from the queue |
| 611 | + let mut link = &bucket.queue_head; |
| 612 | + let mut current = bucket.queue_head.get(); |
| 613 | + let mut previous = ptr::null(); |
| 614 | + while !current.is_null() { |
| 615 | + if current == thread_data { |
| 616 | + let next = (*current).next_in_queue.get(); |
| 617 | + link.set(next); |
| 618 | + let mut was_last_thread = true; |
| 619 | + if bucket.queue_tail.get() == current { |
| 620 | + bucket.queue_tail.set(previous); |
| 621 | + } else { |
| 622 | + // Scan the rest of the queue to see if there are any other |
| 623 | + // entries with the given key. |
| 624 | + let mut scan = next; |
| 625 | + while !scan.is_null() { |
| 626 | + if (*scan).key.load(Ordering::Relaxed) == key { |
| 627 | + was_last_thread = false; |
| 628 | + break; |
| 629 | + } |
| 630 | + scan = (*scan).next_in_queue.get(); |
630 | 631 | }
|
631 |
| - scan = (*scan).next_in_queue.get(); |
632 | 632 | }
|
633 |
| - } |
634 | 633 |
|
635 |
| - // Callback to indicate that we timed out, and whether we were the |
636 |
| - // last thread on the queue. |
637 |
| - timed_out(key, was_last_thread); |
638 |
| - break; |
639 |
| - } else { |
640 |
| - link = &(*current).next_in_queue; |
641 |
| - previous = current; |
642 |
| - current = link.get(); |
| 634 | + // Callback to indicate that we timed out, and whether we were the |
| 635 | + // last thread on the queue. |
| 636 | + timed_out(key, was_last_thread); |
| 637 | + break; |
| 638 | + } else { |
| 639 | + link = &(*current).next_in_queue; |
| 640 | + previous = current; |
| 641 | + current = link.get(); |
| 642 | + } |
643 | 643 | }
|
644 |
| - } |
645 | 644 |
|
646 |
| - // There should be no way for our thread to have been removed from the queue |
647 |
| - // if we timed out. |
648 |
| - debug_assert!(!current.is_null()); |
| 645 | + // There should be no way for our thread to have been removed from the queue |
| 646 | + // if we timed out. |
| 647 | + debug_assert!(!current.is_null()); |
649 | 648 |
|
650 |
| - // Unlock the bucket, we are done |
651 |
| - bucket.mutex.unlock(); |
652 |
| - ParkResult::TimedOut |
| 649 | + // Unlock the bucket, we are done |
| 650 | + bucket.mutex.unlock(); |
| 651 | + ParkResult::TimedOut |
| 652 | + }) |
653 | 653 | }
|
654 | 654 |
|
655 | 655 | /// Unparks one thread from the queue associated with the given key.
|
|
0 commit comments