Skip to content

Commit 0428e31

Browse files
committed
added ticketing system
1 parent fb5ed11 commit 0428e31

File tree

2 files changed

+139
-119
lines changed

2 files changed

+139
-119
lines changed

Diff for: futures-util/src/lock/rwlock.rs

+132-119
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,49 @@ use std::cell::UnsafeCell;
55
use std::fmt;
66
use std::ops::{Deref, DerefMut};
77
use std::pin::Pin;
8-
use std::process;
98
use std::sync::atomic::{AtomicUsize, Ordering};
109

10+
struct State {
11+
ins: AtomicUsize,
12+
out: AtomicUsize,
13+
}
14+
1115
/// A futures-aware read-write lock.
1216
pub struct RwLock<T: ?Sized> {
13-
state: AtomicUsize,
17+
read_state: State,
18+
write_state: State,
1419
readers: WaiterSet,
1520
writers: WaiterSet,
1621
value: UnsafeCell<T>,
1722
}
1823

1924
impl<T: ?Sized> fmt::Debug for RwLock<T> {
2025
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21-
let state = self.state.load(Ordering::SeqCst);
22-
f.debug_struct("RwLock")
23-
.field("is_locked", &((state & IS_LOCKED) != 0))
24-
.field("readers", &((state & READ_COUNT) >> 1))
25-
.finish()
26+
f.debug_struct("RwLock").finish()
2627
}
2728
}
2829

2930
#[allow(clippy::identity_op)]
30-
const IS_LOCKED: usize = 1 << 0;
31-
const ONE_READER: usize = 1 << 1;
32-
const READ_COUNT: usize = !(ONE_READER - 1);
33-
const MAX_READERS: usize = usize::max_value() >> 1;
31+
const PHASE: usize = 1 << 0;
32+
const ONE_WRITER: usize = 1 << 1;
33+
const ONE_READER: usize = 1 << 2;
34+
const WRITE_BITS: usize = ONE_WRITER | PHASE;
35+
36+
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
37+
const WAIT_KEY_NONE: usize = usize::max_value();
3438

3539
impl<T> RwLock<T> {
3640
/// Creates a new futures-aware read-write lock.
3741
pub fn new(t: T) -> RwLock<T> {
3842
RwLock {
39-
state: AtomicUsize::new(0),
43+
read_state: State {
44+
ins: AtomicUsize::new(0),
45+
out: AtomicUsize::new(0),
46+
},
47+
write_state: State {
48+
ins: AtomicUsize::new(0),
49+
out: AtomicUsize::new(0),
50+
},
4051
readers: WaiterSet::new(),
4152
writers: WaiterSet::new(),
4253
value: UnsafeCell::new(t),
@@ -59,52 +70,14 @@ impl<T> RwLock<T> {
5970
}
6071

6172
impl<T: ?Sized> RwLock<T> {
62-
/// Attempt to acquire a lock with shared read access immediately.
63-
///
64-
/// If the lock is currently held by a writer, this will return `None`.
65-
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
66-
let mut state = self.state.load(Ordering::Acquire);
67-
68-
loop {
69-
if state & IS_LOCKED != 0 {
70-
return None;
71-
}
72-
73-
if state > MAX_READERS {
74-
process::abort();
75-
}
76-
77-
match self.state.compare_exchange_weak(
78-
state,
79-
state + ONE_READER,
80-
Ordering::SeqCst,
81-
Ordering::SeqCst,
82-
) {
83-
Ok(_) => return Some(RwLockReadGuard { rwlock: self }),
84-
Err(s) => state = s,
85-
}
86-
}
87-
}
88-
89-
/// Attempt to acquire a lock with exclusive write access immediately.
90-
///
91-
/// If there are any other locks, either for read or write access, this
92-
/// will return `None`.
93-
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
94-
if self.state.compare_and_swap(0, IS_LOCKED, Ordering::SeqCst) == 0 {
95-
Some(RwLockWriteGuard { rwlock: self })
96-
} else {
97-
None
98-
}
99-
}
100-
10173
/// Acquire a read access lock asynchronously.
10274
///
10375
/// This method returns a future that will resolve once all write access
10476
/// locks have been dropped.
10577
pub fn read(&self) -> RwLockReadFuture<'_, T> {
10678
RwLockReadFuture {
10779
rwlock: Some(self),
80+
ticket: None,
10881
wait_key: WAIT_KEY_NONE,
10982
}
11083
}
@@ -116,6 +89,7 @@ impl<T: ?Sized> RwLock<T> {
11689
pub fn write(&self) -> RwLockWriteFuture<'_, T> {
11790
RwLockWriteFuture {
11891
rwlock: Some(self),
92+
ticket: None,
11993
wait_key: WAIT_KEY_NONE,
12094
}
12195
}
@@ -133,22 +107,35 @@ impl<T: ?Sized> RwLock<T> {
133107
///
134108
/// let mut rwlock = RwLock::new(0);
135109
/// *rwlock.get_mut() = 10;
136-
/// assert_eq!(*rwlock.lock().await, 10);
110+
/// assert_eq!(*rwlock.read().await, 10);
137111
/// # });
138112
/// ```
139113
pub fn get_mut(&mut self) -> &mut T {
140114
unsafe { &mut *self.value.get() }
141115
}
142116
}
143117

144-
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
145-
const WAIT_KEY_NONE: usize = usize::max_value();
118+
#[derive(Debug)]
119+
enum Ticket {
120+
Read(usize),
121+
Write(usize),
122+
}
123+
124+
impl Ticket {
125+
fn value(&self) -> usize {
126+
match self {
127+
Ticket::Read(value) => *value,
128+
Ticket::Write(value) => *value,
129+
}
130+
}
131+
}
146132

147133
/// A future which resolves when the target read access lock has been successfully
148134
/// acquired.
149135
pub struct RwLockReadFuture<'a, T: ?Sized> {
150136
// `None` indicates that the mutex was successfully acquired.
151137
rwlock: Option<&'a RwLock<T>>,
138+
ticket: Option<Ticket>,
152139
wait_key: usize,
153140
}
154141

@@ -157,6 +144,7 @@ impl<T: ?Sized> fmt::Debug for RwLockReadFuture<'_, T> {
157144
f.debug_struct("RwLockReadFuture")
158145
.field("was_acquired", &self.rwlock.is_none())
159146
.field("rwlock", &self.rwlock)
147+
.field("ticket", &self.ticket)
160148
.field(
161149
"wait_key",
162150
&(if self.wait_key == WAIT_KEY_NONE {
@@ -183,37 +171,39 @@ impl<'a, T: ?Sized> Future for RwLockReadFuture<'a, T> {
183171
.rwlock
184172
.expect("polled RwLockReadFuture after completion");
185173

186-
if let Some(lock) = rwlock.try_read() {
174+
// The ticket is defined by the write bits stored within the read-in count
175+
let ticket = self.ticket.get_or_insert_with(|| {
176+
Ticket::Read(rwlock.read_state.ins.fetch_add(ONE_READER, Ordering::SeqCst) & WRITE_BITS)
177+
}).value();
178+
179+
// Safe to create guard when either there are no writers (ticket == 0) or if
180+
// at least one of the two write bits change.
181+
// Writers always wait until the current reader phase completes before acquiring
182+
// the lock; thus the PHASE bit both maintains the read-write condition and
183+
// prevents deadlock in the case that this line isn't reached before a writer sets
184+
// the ONE_WRITER bit.
185+
if ticket == 0 || ticket != rwlock.read_state.ins.load(Ordering::Relaxed) & WRITE_BITS {
187186
if self.wait_key != WAIT_KEY_NONE {
188187
rwlock.readers.remove(self.wait_key);
189188
}
190189
self.rwlock = None;
191-
return Poll::Ready(lock);
192-
}
193-
194-
if self.wait_key == WAIT_KEY_NONE {
195-
self.wait_key = rwlock.readers.insert(cx.waker());
190+
Poll::Ready(RwLockReadGuard { rwlock })
196191
} else {
197-
rwlock.readers.register(self.wait_key, cx.waker());
198-
}
199-
200-
// Ensure that we haven't raced `RwLockWriteGuard::drop`'s unlock path by
201-
// attempting to acquire the lock again.
202-
if let Some(lock) = rwlock.try_read() {
203-
rwlock.readers.remove(self.wait_key);
204-
self.rwlock = None;
205-
return Poll::Ready(lock);
192+
if self.wait_key == WAIT_KEY_NONE {
193+
self.wait_key = rwlock.readers.insert(cx.waker());
194+
} else {
195+
rwlock.readers.register(self.wait_key, cx.waker());
196+
}
197+
Poll::Pending
206198
}
207-
208-
Poll::Pending
209199
}
210200
}
211201

212202
impl<T: ?Sized> Drop for RwLockReadFuture<'_, T> {
213203
fn drop(&mut self) {
214-
if let Some(rwlock) = self.rwlock {
204+
if let Some(_) = self.rwlock {
215205
if self.wait_key != WAIT_KEY_NONE {
216-
rwlock.readers.remove(self.wait_key);
206+
panic!("RwLockReadFuture dropped before completion");
217207
}
218208
}
219209
}
@@ -223,6 +213,7 @@ impl<T: ?Sized> Drop for RwLockReadFuture<'_, T> {
223213
/// acquired.
224214
pub struct RwLockWriteFuture<'a, T: ?Sized> {
225215
rwlock: Option<&'a RwLock<T>>,
216+
ticket: Option<Ticket>,
226217
wait_key: usize,
227218
}
228219

@@ -257,42 +248,72 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
257248
.rwlock
258249
.expect("polled RwLockWriteFuture after completion");
259250

260-
if let Some(lock) = rwlock.try_write() {
261-
if self.wait_key != WAIT_KEY_NONE {
262-
rwlock.writers.remove(self.wait_key);
251+
match self.ticket {
252+
None => {
253+
let ticket = rwlock
254+
.write_state
255+
.ins
256+
.fetch_add(1, Ordering::SeqCst);
257+
self.ticket = Some(Ticket::Write(ticket));
258+
if ticket == rwlock.write_state.out.load(Ordering::Relaxed) {
259+
// Note that the WRITE_BITS are always cleared at this point.
260+
let ticket = rwlock
261+
.read_state
262+
.ins
263+
.fetch_add(ONE_WRITER | (ticket & PHASE), Ordering::SeqCst);
264+
self.ticket = Some(Ticket::Read(ticket));
265+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
266+
self.rwlock = None;
267+
Poll::Ready(RwLockWriteGuard { rwlock })
268+
} else {
269+
self.wait_key = rwlock.writers.insert(cx.waker());
270+
Poll::Pending
271+
}
272+
} else {
273+
self.wait_key = rwlock.writers.insert(cx.waker());
274+
Poll::Pending
275+
}
276+
}
277+
Some(Ticket::Write(ticket)) => {
278+
if ticket == rwlock.write_state.out.load(Ordering::Relaxed) {
279+
// Note that the WRITE_BITS are always cleared at this point.
280+
let ticket = rwlock
281+
.read_state
282+
.ins
283+
.fetch_add(ONE_WRITER | (ticket & PHASE), Ordering::SeqCst);
284+
self.ticket = Some(Ticket::Read(ticket));
285+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
286+
rwlock.writers.remove(self.wait_key);
287+
self.rwlock = None;
288+
Poll::Ready(RwLockWriteGuard { rwlock })
289+
} else {
290+
rwlock.writers.register(self.wait_key, cx.waker());
291+
Poll::Pending
292+
}
293+
} else {
294+
rwlock.writers.register(self.wait_key, cx.waker());
295+
Poll::Pending
296+
}
297+
}
298+
Some(Ticket::Read(ticket)) => {
299+
if ticket == rwlock.read_state.out.load(Ordering::Relaxed) {
300+
rwlock.writers.remove(self.wait_key);
301+
self.rwlock = None;
302+
Poll::Ready(RwLockWriteGuard { rwlock })
303+
} else {
304+
rwlock.writers.register(self.wait_key, cx.waker());
305+
Poll::Pending
306+
}
263307
}
264-
self.rwlock = None;
265-
return Poll::Ready(lock);
266-
}
267-
268-
if self.wait_key == WAIT_KEY_NONE {
269-
self.wait_key = rwlock.writers.insert(cx.waker());
270-
} else {
271-
rwlock.writers.register(self.wait_key, cx.waker());
272-
}
273-
274-
// Ensure that we haven't raced `RwLockWriteGuard::drop` or
275-
// `RwLockReadGuard::drop`'s unlock path by attempting to acquire
276-
// the lock again.
277-
if let Some(lock) = rwlock.try_write() {
278-
rwlock.writers.remove(self.wait_key);
279-
self.rwlock = None;
280-
return Poll::Ready(lock);
281308
}
282-
283-
Poll::Pending
284309
}
285310
}
286311

287312
impl<T: ?Sized> Drop for RwLockWriteFuture<'_, T> {
288313
fn drop(&mut self) {
289-
if let Some(rwlock) = self.rwlock {
314+
if let Some(_) = self.rwlock {
290315
if self.wait_key != WAIT_KEY_NONE {
291-
// This future was dropped before it acquired the rwlock.
292-
//
293-
// Remove ourselves from the map, waking up another waiter if we
294-
// had been awoken to acquire the lock.
295-
rwlock.writers.cancel(self.wait_key);
316+
panic!("RwLockWriteFuture dropped before completion");
296317
}
297318
}
298319
}
@@ -316,15 +337,17 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> {
316337

317338
impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {
318339
fn drop(&mut self) {
319-
let old_state = self.rwlock.state.fetch_sub(ONE_READER, Ordering::SeqCst);
320-
if old_state & READ_COUNT == ONE_READER {
321-
self.rwlock.writers.notify_any();
322-
}
340+
self.rwlock
341+
.read_state
342+
.out
343+
.fetch_add(ONE_READER, Ordering::SeqCst);
344+
self.rwlock.writers.notify_all();
323345
}
324346
}
325347

326348
impl<T: ?Sized> Deref for RwLockReadGuard<'_, T> {
327349
type Target = T;
350+
328351
fn deref(&self) -> &T {
329352
unsafe { &*self.rwlock.value.get() }
330353
}
@@ -354,10 +377,10 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> {
354377

355378
impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {
356379
fn drop(&mut self) {
357-
self.rwlock.state.store(0, Ordering::SeqCst);
358-
if !self.rwlock.readers.notify_all() {
359-
self.rwlock.writers.notify_any();
360-
}
380+
self.rwlock.read_state.ins.fetch_and(!WRITE_BITS, Ordering::Relaxed);
381+
self.rwlock.write_state.out.fetch_add(1, Ordering::Relaxed);
382+
self.rwlock.writers.notify_all();
383+
self.rwlock.readers.notify_all();
361384
}
362385
}
363386

@@ -388,13 +411,3 @@ unsafe impl<T: ?Sized + Sync> Sync for RwLockReadGuard<'_, T> {}
388411

389412
unsafe impl<T: ?Sized + Send> Send for RwLockWriteGuard<'_, T> {}
390413
unsafe impl<T: ?Sized + Sync> Sync for RwLockWriteGuard<'_, T> {}
391-
392-
#[test]
393-
fn test_rwlock_guard_debug_not_recurse() {
394-
let rwlock = RwLock::new(42);
395-
let guard = rwlock.try_read().unwrap();
396-
let _ = format!("{:?}", guard);
397-
drop(guard);
398-
let guard = rwlock.try_write().unwrap();
399-
let _ = format!("{:?}", guard);
400-
}

0 commit comments

Comments
 (0)