Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SAFETY comments and notes for unsafe blocks #79

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ impl<M> Header<M> {
let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);

// If the task was not notifying or registering an awaiter...
// Note: The NOTIFYING and REGISTERING bits act as locks on the awaiter UnsafeCell.
if state & (NOTIFYING | REGISTERING) == 0 {
// Take the waker out.
// SAFETY: self.awaiter is tied to atomic ordering operations on self.state.
let waker = unsafe { (*self.awaiter.get()).take() };

// Unset the bit indicating that the task is notifying its awaiter.
Expand Down Expand Up @@ -92,9 +94,10 @@ impl<M> Header<M> {
let mut state = self.state.fetch_or(0, Ordering::Acquire);

loop {
// There can't be two concurrent registrations because `Task` can only be polled
// by a unique pinned reference.
debug_assert!(state & REGISTERING == 0);
// There can't be two concurrent registrations because `Task` can only be polled by a
// unique pinned reference. Enforcing this here instead of marking the whole function
// unsafe.
assert!(state & REGISTERING == 0);

// If we're in the notifying state at this moment, just wake and return without
// registering.
Expand All @@ -119,6 +122,8 @@ impl<M> Header<M> {
}

// Put the waker into the awaiter field.
// SAFETY: We have OR'd the state of the header with REGISTERING so we have a lock on
// self.awaiter and can write to it.
unsafe {
abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
}
Expand All @@ -130,6 +135,12 @@ impl<M> Header<M> {
loop {
// If there was a notification, take the waker out of the awaiter field.
if state & NOTIFYING != 0 {
// SAFETY: We have guaranteed that self.state is or'd with NOTIFYING, which
// prevents everyone else from writing to self.awaiter. So we know that we won't
// race with any writes from other threads.
// We can't reach this branch on the first loop through, but we can if someone
// notifies the task while we are in the middle of registering. Normally, they
// would also take a waker, but they won't if we have the NOTIFYING bit set.
if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
abort_on_panic(|| waker = Some(w));
}
Expand Down
70 changes: 69 additions & 1 deletion src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ impl<F, T, S, M> RawTask<F, T, S, M> {
let offset_r = offset_union;

TaskLayout {
// SAFETY: layout came from a Layout::extend call, which dynamically checks the
// invariants for StdLayout and returns None if they are not met. The leap_unwrap!
// would have panicked before this point.
layout: unsafe { layout.into_std() },
offset_s,
offset_f,
Expand Down Expand Up @@ -167,6 +170,8 @@ where
Some(p) => p,
};

// SAFETY: task_layout.layout has the correct layout for a C-style struct of Header
// followed by S followed by union { F, T }.
let raw = Self::from_ptr(ptr.as_ptr());

let crate::Builder {
Expand All @@ -176,6 +181,10 @@ where
} = builder;

// Write the header as the first field of the task.
// SAFETY: This write it OK because it's through a mutable pointer to a Header<M> that
// is definitely properly aligned and points to enough memory for a Header<M>. We
// didn't pass our pointer through any const references or other const-ifying
// operations so the provenance is good.
(raw.header as *mut Header<M>).write(Header {
state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
awaiter: UnsafeCell::new(None),
Expand All @@ -195,25 +204,37 @@ where
});

// Write the schedule function as the third field of the task.
// SAFETY: raw.schedule is also non-null, properly aligned, valid for writes of size
// size_of::<Schedule>().
(raw.schedule as *mut S).write(schedule);

// Generate the future, now that the metadata has been pinned in place.
// SAFETY: Dereferencing raw.header is OK because it's properly initialized since we
// wrote to it.
let future = abort_on_panic(|| future(&(*raw.header).metadata));

// Write the future as the fourth field of the task.
// SAFETY: This write is OK because raw.future is non-null, properly-aligned, and valid
// for writes of size F. Because we're not casting anything here we know it's the right
// type.
raw.future.write(future);

ptr
}
}

/// Creates a `RawTask` from a raw task pointer.
///
/// ptr must point to a region that has a size and alignment matching task layout, since doing
/// pointer arithmetic that leaves the region or creating unaligned pointers is UB.
#[inline]
pub(crate) fn from_ptr(ptr: *const ()) -> Self {
pub(crate) unsafe fn from_ptr(ptr: *const ()) -> Self {
let task_layout = Self::task_layout();
let p = ptr as *const u8;

unsafe {
// SAFETY: We're just picking apart the given pointer into its constituent fields.
// These do correctly correspond to the fields as laid out in task_layout.
Self {
header: p as *const Header<M>,
schedule: p.add(task_layout.offset_s) as *const S,
Expand All @@ -229,6 +250,8 @@ where
Self::TASK_LAYOUT
}
/// Wakes a waker.
///
/// Assumes ptr points to a valid task.
unsafe fn wake(ptr: *const ()) {
// This is just an optimization. If the schedule function has captured variables, then
// we'll do less reference counting if we wake the waker by reference and then drop it.
Expand All @@ -240,6 +263,8 @@ where

let raw = Self::from_ptr(ptr);

// SAFETY: This is just loading the state. Note that this does implicitly create an
// &AtomicUsize, which is intentional.
let mut state = (*raw.header).state.load(Ordering::Acquire);

loop {
Expand Down Expand Up @@ -295,6 +320,8 @@ where
}

/// Wakes a waker by reference.
///
/// Assumes ptr points to a valid task.
unsafe fn wake_by_ref(ptr: *const ()) {
let raw = Self::from_ptr(ptr);

Expand Down Expand Up @@ -346,6 +373,8 @@ where
// because the schedule function cannot be destroyed while the waker is
// still alive.
let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ()));
// SAFETY: The task is still alive, so we can call its schedule
// function.
(*raw.schedule).schedule(task, ScheduleInfo::new(false));
}

Expand Down Expand Up @@ -394,9 +423,17 @@ where
(*raw.header)
.state
.store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
// SAFETY: ptr still points to a valid task even though its refcount has dropped
// to zero.
// NOTE: We should make sure that the executor is properly dropping scheduled tasks
// with a refcount of zero.
Self::schedule(ptr, ScheduleInfo::new(false));
} else {
// Otherwise, destroy the task right away.
// NOTE: This isn't going to drop the output/result from the future. We have to
// have already dealt with it, so whoever is calling drop_waker needs to be
// checked. It looks like whoever sets the TASK bit to zero is affirming that they
// have moved or dropped the output/result.
Self::destroy(ptr);
}
}
Expand Down Expand Up @@ -435,6 +472,8 @@ where
}

let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ()));
// NOTE: The schedule function has to drop tasks with a refcount of zero. That's not
// happening in this function, so it has to be happening in the schedule member function.
(*raw.schedule).schedule(task, info);
}

Expand All @@ -459,6 +498,9 @@ where
///
/// The schedule function will be dropped, and the task will then get deallocated.
/// The task must be closed before this function is called.
///
/// NOTE: Whoever calls this function has to have already dealt with the return value of the
/// future or its error if it failed. We are not going to drop it!
#[inline]
unsafe fn destroy(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
Expand All @@ -467,13 +509,18 @@ where
// We need a safeguard against panics because destructors can panic.
abort_on_panic(|| {
// Drop the header along with the metadata.
// SAFETY: This points to a valid Header<M> that we have permission to move out of and
// drop.
(raw.header as *mut Header<M>).drop_in_place();

// Drop the schedule function.
// SAFETY: This points to a valid S that we have permission to move out of and drop.
(raw.schedule as *mut S).drop_in_place();
});

// Finally, deallocate the memory reserved by the task.
// SAFETY: We know that ptr was allocated with layout task_layout.layout, so deallocating
// it with the same layout is correct.
alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
}

Expand All @@ -482,9 +529,11 @@ where
/// If polling its future panics, the task will be closed and the panic will be propagated into
/// the caller.
unsafe fn run(ptr: *const ()) -> bool {
// SAFETY: As long as it's a pointer to a valid task, we can get the raw form of it.
let raw = Self::from_ptr(ptr);

// Create a context from the raw task pointer and the vtable inside the its header.
// SAFETY: The implementation of RAW_WAKER_VTABLE is correct.
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
let cx = &mut Context::from_waker(&waker);

Expand All @@ -507,6 +556,8 @@ where
}

// Drop the task reference.
// SAFETY: This pointer is definitely alive. The Waker that is registered into the
// executor holds it.
Self::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
Expand Down Expand Up @@ -563,7 +614,10 @@ where
match poll {
Poll::Ready(out) => {
// Replace the future with its output.
// SAFETY: We have exclusive access to the task so we can drop the future for it.
Self::drop_future(ptr);
// SAFETY: raw.output definitely points to a valid memory location to hold the
// Output type of the future.
raw.output.write(out);

// The task is now completed.
Expand Down Expand Up @@ -593,10 +647,12 @@ where
// Take the awaiter out.
let mut awaiter = None;
if state & AWAITER != 0 {
// SAFETY: This is safe for the same reasons as we said earlier.
awaiter = (*raw.header).take(None);
}

// Drop the task reference.
// SAFETY: We "own" the ref to this task and are allowed to drop it.
Self::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
Expand Down Expand Up @@ -625,6 +681,9 @@ where
if state & CLOSED != 0 && !future_dropped {
// The thread that closed the task didn't drop the future because it was
// running so now it's our responsibility to do so.
// SAFETY: This is corroborated by header.rs where they state that closing
// a task doesn't drop the future, it just marks it closed and puts it back
// in the polling queue so a poller can drop it.
Self::drop_future(ptr);
future_dropped = true;
}
Expand All @@ -648,6 +707,8 @@ where
}

// Drop the task reference.
// SAFETY: We're allowed to drop the ref as stated earlier. We
// checked that it won't accidentally be double-dropped.
Self::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
Expand All @@ -657,10 +718,13 @@ where
} else if state & SCHEDULED != 0 {
// The thread that woke the task up didn't reschedule it because
// it was running so now it's our responsibility to do so.
// SAFETY: ptr definitely points to a valid task that hasn't been
// dropped. It has its SCHEDULED bit set.
Self::schedule(ptr, ScheduleInfo::new(true));
return true;
} else {
// Drop the task reference.
// SAFETY: We're still allowed.
Self::drop_ref(ptr);
}
break;
Expand Down Expand Up @@ -697,6 +761,7 @@ where
if state & CLOSED != 0 {
// The thread that closed the task didn't drop the future because it
// was running so now it's our responsibility to do so.
// SAFETY: If poll panicked then the thread didn't drop the future.
RawTask::<F, T, S, M>::drop_future(ptr);

// Mark the task as not running and not scheduled.
Expand All @@ -711,6 +776,7 @@ where
}

// Drop the task reference.
// SAFETY: We still have permission to drop a ref.
RawTask::<F, T, S, M>::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
Expand All @@ -729,6 +795,8 @@ where
) {
Ok(state) => {
// Drop the future because the task is now closed.
// SAFETY: This is effectively the same situation as earlier.
// TODO: DRY this up by refactoring this.
RawTask::<F, T, S, M>::drop_future(ptr);

// Take the awaiter out.
Expand Down
Loading
Loading