Skip to content

Commit 8df0e55

Browse files
committed
added ticketing system
1 parent fb5ed11 commit 8df0e55

File tree

2 files changed

+131
-120
lines changed

2 files changed

+131
-120
lines changed

Diff for: futures-util/src/lock/rwlock.rs

+124-120
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,49 @@ use std::cell::UnsafeCell;
55
use std::fmt;
66
use std::ops::{Deref, DerefMut};
77
use std::pin::Pin;
8-
use std::process;
98
use std::sync::atomic::{AtomicUsize, Ordering};
109

10+
struct State {
11+
ins: AtomicUsize,
12+
out: AtomicUsize,
13+
}
14+
1115
/// A futures-aware read-write lock.
1216
pub struct RwLock<T: ?Sized> {
13-
state: AtomicUsize,
17+
read_state: State,
18+
write_state: State,
1419
readers: WaiterSet,
1520
writers: WaiterSet,
1621
value: UnsafeCell<T>,
1722
}
1823

1924
impl<T: ?Sized> fmt::Debug for RwLock<T> {
2025
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21-
let state = self.state.load(Ordering::SeqCst);
22-
f.debug_struct("RwLock")
23-
.field("is_locked", &((state & IS_LOCKED) != 0))
24-
.field("readers", &((state & READ_COUNT) >> 1))
25-
.finish()
26+
f.debug_struct("RwLock").finish()
2627
}
2728
}
2829

2930
#[allow(clippy::identity_op)]
30-
const IS_LOCKED: usize = 1 << 0;
31-
const ONE_READER: usize = 1 << 1;
32-
const READ_COUNT: usize = !(ONE_READER - 1);
33-
const MAX_READERS: usize = usize::max_value() >> 1;
31+
const PHASE: usize = 1 << 0;
32+
const ONE_WRITER: usize = 1 << 1;
33+
const ONE_READER: usize = 1 << 2;
34+
const WRITE_BITS: usize = ONE_WRITER | PHASE;
35+
36+
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
37+
const WAIT_KEY_NONE: usize = usize::max_value();
3438

3539
impl<T> RwLock<T> {
3640
/// Creates a new futures-aware read-write lock.
3741
pub fn new(t: T) -> RwLock<T> {
3842
RwLock {
39-
state: AtomicUsize::new(0),
43+
read_state: State {
44+
ins: AtomicUsize::new(0),
45+
out: AtomicUsize::new(0),
46+
},
47+
write_state: State {
48+
ins: AtomicUsize::new(0),
49+
out: AtomicUsize::new(0),
50+
},
4051
readers: WaiterSet::new(),
4152
writers: WaiterSet::new(),
4253
value: UnsafeCell::new(t),
@@ -59,52 +70,14 @@ impl<T> RwLock<T> {
5970
}
6071

6172
impl<T: ?Sized> RwLock<T> {
62-
/// Attempt to acquire a lock with shared read access immediately.
63-
///
64-
/// If the lock is currently held by a writer, this will return `None`.
65-
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
66-
let mut state = self.state.load(Ordering::Acquire);
67-
68-
loop {
69-
if state & IS_LOCKED != 0 {
70-
return None;
71-
}
72-
73-
if state > MAX_READERS {
74-
process::abort();
75-
}
76-
77-
match self.state.compare_exchange_weak(
78-
state,
79-
state + ONE_READER,
80-
Ordering::SeqCst,
81-
Ordering::SeqCst,
82-
) {
83-
Ok(_) => return Some(RwLockReadGuard { rwlock: self }),
84-
Err(s) => state = s,
85-
}
86-
}
87-
}
88-
89-
/// Attempt to acquire a lock with exclusive write access immediately.
90-
///
91-
/// If there are any other locks, either for read or write access, this
92-
/// will return `None`.
93-
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
94-
if self.state.compare_and_swap(0, IS_LOCKED, Ordering::SeqCst) == 0 {
95-
Some(RwLockWriteGuard { rwlock: self })
96-
} else {
97-
None
98-
}
99-
}
100-
10173
/// Acquire a read access lock asynchronously.
10274
///
10375
/// This method returns a future that will resolve once all write access
10476
/// locks have been dropped.
10577
pub fn read(&self) -> RwLockReadFuture<'_, T> {
10678
RwLockReadFuture {
10779
rwlock: Some(self),
80+
ticket: None,
10881
wait_key: WAIT_KEY_NONE,
10982
}
11083
}
@@ -116,6 +89,7 @@ impl<T: ?Sized> RwLock<T> {
11689
pub fn write(&self) -> RwLockWriteFuture<'_, T> {
11790
RwLockWriteFuture {
11891
rwlock: Some(self),
92+
tickets: (None, None),
11993
wait_key: WAIT_KEY_NONE,
12094
}
12195
}
@@ -133,22 +107,20 @@ impl<T: ?Sized> RwLock<T> {
133107
///
134108
/// let mut rwlock = RwLock::new(0);
135109
/// *rwlock.get_mut() = 10;
136-
/// assert_eq!(*rwlock.lock().await, 10);
110+
/// assert_eq!(*rwlock.read().await, 10);
137111
/// # });
138112
/// ```
139113
pub fn get_mut(&mut self) -> &mut T {
140114
unsafe { &mut *self.value.get() }
141115
}
142116
}
143117

144-
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
145-
const WAIT_KEY_NONE: usize = usize::max_value();
146-
147118
/// A future which resolves when the target read access lock has been successfully
148119
/// acquired.
149120
pub struct RwLockReadFuture<'a, T: ?Sized> {
150121
// `None` indicates that the mutex was successfully acquired.
151122
rwlock: Option<&'a RwLock<T>>,
123+
ticket: Option<usize>,
152124
wait_key: usize,
153125
}
154126

@@ -157,6 +129,7 @@ impl<T: ?Sized> fmt::Debug for RwLockReadFuture<'_, T> {
157129
f.debug_struct("RwLockReadFuture")
158130
.field("was_acquired", &self.rwlock.is_none())
159131
.field("rwlock", &self.rwlock)
132+
.field("ticket", &self.ticket)
160133
.field(
161134
"wait_key",
162135
&(if self.wait_key == WAIT_KEY_NONE {
@@ -183,37 +156,43 @@ impl<'a, T: ?Sized> Future for RwLockReadFuture<'a, T> {
183156
.rwlock
184157
.expect("polled RwLockReadFuture after completion");
185158

186-
if let Some(lock) = rwlock.try_read() {
159+
// The ticket is defined by the write bits stored within the read-in count
160+
let ticket = *self.ticket.get_or_insert_with(|| {
161+
rwlock
162+
.read_state
163+
.ins
164+
.fetch_add(ONE_READER, Ordering::SeqCst)
165+
& WRITE_BITS
166+
});
167+
168+
// Safe to create guard when either there are no writers (ticket == 0) or if
169+
// at least one of the two write bits change.
170+
// Writers always wait until the current reader phase completes before acquiring
171+
// the lock; thus the PHASE bit both maintains the read-write condition and
172+
// prevents deadlock in the case that this line isn't reached before a writer sets
173+
// the ONE_WRITER bit.
174+
if ticket == 0 || ticket != rwlock.read_state.ins.load(Ordering::Relaxed) & WRITE_BITS {
187175
if self.wait_key != WAIT_KEY_NONE {
188176
rwlock.readers.remove(self.wait_key);
189177
}
190178
self.rwlock = None;
191-
return Poll::Ready(lock);
192-
}
193-
194-
if self.wait_key == WAIT_KEY_NONE {
195-
self.wait_key = rwlock.readers.insert(cx.waker());
179+
Poll::Ready(RwLockReadGuard { rwlock })
196180
} else {
197-
rwlock.readers.register(self.wait_key, cx.waker());
198-
}
199-
200-
// Ensure that we haven't raced `RwLockWriteGuard::drop`'s unlock path by
201-
// attempting to acquire the lock again.
202-
if let Some(lock) = rwlock.try_read() {
203-
rwlock.readers.remove(self.wait_key);
204-
self.rwlock = None;
205-
return Poll::Ready(lock);
181+
if self.wait_key == WAIT_KEY_NONE {
182+
self.wait_key = rwlock.readers.insert(cx.waker());
183+
} else {
184+
rwlock.readers.register(self.wait_key, cx.waker());
185+
}
186+
Poll::Pending
206187
}
207-
208-
Poll::Pending
209188
}
210189
}
211190

212191
impl<T: ?Sized> Drop for RwLockReadFuture<'_, T> {
213192
fn drop(&mut self) {
214-
if let Some(rwlock) = self.rwlock {
193+
if let Some(_) = self.rwlock {
215194
if self.wait_key != WAIT_KEY_NONE {
216-
rwlock.readers.remove(self.wait_key);
195+
panic!("RwLockReadFuture dropped before completion");
217196
}
218197
}
219198
}
@@ -223,6 +202,9 @@ impl<T: ?Sized> Drop for RwLockReadFuture<'_, T> {
223202
/// acquired.
224203
pub struct RwLockWriteFuture<'a, T: ?Sized> {
225204
rwlock: Option<&'a RwLock<T>>,
205+
// The left ticket is used when waiting for other writers to finish, the right
206+
// ticket is used when waiting on the current reader phase to finish.
207+
tickets: (Option<usize>, Option<usize>),
226208
wait_key: usize,
227209
}
228210

@@ -257,42 +239,72 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
257239
.rwlock
258240
.expect("polled RwLockWriteFuture after completion");
259241

260-
if let Some(lock) = rwlock.try_write() {
261-
if self.wait_key != WAIT_KEY_NONE {
262-
rwlock.writers.remove(self.wait_key);
242+
match self.tickets {
243+
(None, None) => {
244+
let ticket = rwlock
245+
.write_state
246+
.ins
247+
.fetch_add(1, Ordering::SeqCst);
248+
self.tickets.0 = Some(ticket);
249+
if ticket == rwlock.write_state.out.load(Ordering::Relaxed) {
250+
// Note that the WRITE_BITS are always cleared at this point.
251+
let ticket = rwlock
252+
.read_state
253+
.ins
254+
.fetch_add(ONE_WRITER | (ticket & PHASE), Ordering::SeqCst);
255+
self.tickets.1 = Some(ticket);
256+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
257+
self.rwlock = None;
258+
Poll::Ready(RwLockWriteGuard { rwlock })
259+
} else {
260+
self.wait_key = rwlock.writers.insert(cx.waker());
261+
Poll::Pending
262+
}
263+
} else {
264+
self.wait_key = rwlock.writers.insert(cx.waker());
265+
Poll::Pending
266+
}
267+
}
268+
(Some(ticket), None) => {
269+
if ticket == rwlock.write_state.out.load(Ordering::Relaxed) {
270+
// Note that the WRITE_BITS are always cleared at this point.
271+
let ticket = rwlock
272+
.read_state
273+
.ins
274+
.fetch_add(ONE_WRITER | (ticket & PHASE), Ordering::SeqCst);
275+
self.tickets.1 = Some(ticket);
276+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
277+
rwlock.writers.remove(self.wait_key);
278+
self.rwlock = None;
279+
Poll::Ready(RwLockWriteGuard { rwlock })
280+
} else {
281+
rwlock.writers.register(self.wait_key, cx.waker());
282+
Poll::Pending
283+
}
284+
} else {
285+
rwlock.writers.register(self.wait_key, cx.waker());
286+
Poll::Pending
287+
}
288+
}
289+
(_, Some(ticket)) => {
290+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
291+
rwlock.writers.remove(self.wait_key);
292+
self.rwlock = None;
293+
Poll::Ready(RwLockWriteGuard { rwlock })
294+
} else {
295+
rwlock.writers.register(self.wait_key, cx.waker());
296+
Poll::Pending
297+
}
263298
}
264-
self.rwlock = None;
265-
return Poll::Ready(lock);
266-
}
267-
268-
if self.wait_key == WAIT_KEY_NONE {
269-
self.wait_key = rwlock.writers.insert(cx.waker());
270-
} else {
271-
rwlock.writers.register(self.wait_key, cx.waker());
272-
}
273-
274-
// Ensure that we haven't raced `RwLockWriteGuard::drop` or
275-
// `RwLockReadGuard::drop`'s unlock path by attempting to acquire
276-
// the lock again.
277-
if let Some(lock) = rwlock.try_write() {
278-
rwlock.writers.remove(self.wait_key);
279-
self.rwlock = None;
280-
return Poll::Ready(lock);
281299
}
282-
283-
Poll::Pending
284300
}
285301
}
286302

287303
impl<T: ?Sized> Drop for RwLockWriteFuture<'_, T> {
288304
fn drop(&mut self) {
289-
if let Some(rwlock) = self.rwlock {
305+
if let Some(_) = self.rwlock {
290306
if self.wait_key != WAIT_KEY_NONE {
291-
// This future was dropped before it acquired the rwlock.
292-
//
293-
// Remove ourselves from the map, waking up another waiter if we
294-
// had been awoken to acquire the lock.
295-
rwlock.writers.cancel(self.wait_key);
307+
panic!("RwLockWriteFuture dropped before completion");
296308
}
297309
}
298310
}
@@ -316,15 +328,17 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> {
316328

317329
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
318330
fn drop(&mut self) {
319-
let old_state = self.rwlock.state.fetch_sub(ONE_READER, Ordering::SeqCst);
320-
if old_state & READ_COUNT == ONE_READER {
321-
self.rwlock.writers.notify_any();
322-
}
331+
self.rwlock
332+
.read_state
333+
.out
334+
.fetch_add(ONE_READER, Ordering::SeqCst);
335+
self.rwlock.writers.notify_all();
323336
}
324337
}
325338

326339
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
327340
type Target = T;
341+
328342
fn deref(&self) -> &T {
329343
unsafe { &*self.rwlock.value.get() }
330344
}
@@ -354,10 +368,10 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> {
354368

355369
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
356370
fn drop(&mut self) {
357-
self.rwlock.state.store(0, Ordering::SeqCst);
358-
if !self.rwlock.readers.notify_all() {
359-
self.rwlock.writers.notify_any();
360-
}
371+
self.rwlock.read_state.ins.fetch_and(!WRITE_BITS, Ordering::Relaxed);
372+
self.rwlock.write_state.out.fetch_add(1, Ordering::Relaxed);
373+
self.rwlock.writers.notify_all();
374+
self.rwlock.readers.notify_all();
361375
}
362376
}
363377

@@ -388,13 +402,3 @@ unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {}
388402

389403
unsafe impl<T: ?Sized + Send> Send for RwLockWriteGuard<'_, T> {}
390404
unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {}
391-
392-
#[test]
393-
fn test_rwlock_guard_debug_not_recurse() {
394-
let rwlock = RwLock::new(42);
395-
let guard = rwlock.try_read().unwrap();
396-
let _ = format!("{:?}", guard);
397-
drop(guard);
398-
let guard = rwlock.try_write().unwrap();
399-
let _ = format!("{:?}", guard);
400-
}

0 commit comments

Comments
 (0)