diff --git a/futures-util/src/compat/compat03as01.rs b/futures-util/src/compat/compat03as01.rs index a04bebaa9c..614fcaba97 100644 --- a/futures-util/src/compat/compat03as01.rs +++ b/futures-util/src/compat/compat03as01.rs @@ -4,11 +4,21 @@ use futures_01::{ StartSend as StartSend01, Stream as Stream01, }; use futures_core::{ - task as task03, TryFuture as TryFuture03, TryStream as TryStream03, + task::{ + self as task03, + RawWaker, + RawWakerVTable, + }, + TryFuture as TryFuture03, + TryStream as TryStream03, }; use futures_sink::Sink as Sink03; -use crate::task::ArcWake as ArcWake03; -use std::{pin::Pin, sync::Arc}; +use crate::task::{ArcWake as ArcWake03, WakerRef}; +use std::{ + mem, + pin::Pin, + sync::Arc, +}; /// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture), /// [`TryStream`](futures_core::stream::TryStream) or @@ -108,6 +118,7 @@ where } } +#[derive(Clone)] struct Current(task01::Task); impl Current { @@ -115,12 +126,32 @@ impl Current { Current(task01::current()) } - fn as_waker(&self) -> task03::Waker { - // For simplicity reasons wrap the Waker into an Arc. - // We can optimize this again later on and reintroduce WakerLt<'a> which - // derefs to Waker, and where cloning it through RawWakerVTable returns - // an Arc version - ArcWake03::into_waker(Arc::new(Current(self.0.clone()))) + fn as_waker(&self) -> WakerRef<'_> { + unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current { + &*(ptr as *const Current) + } + fn current_to_ptr(current: &Current) -> *const () { + current as *const Current as *const () + } + + unsafe fn clone(ptr: *const ()) -> RawWaker { + // Lazily create the `Arc` only when the waker is actually cloned. + // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion + // function is landed in `core`. + mem::transmute::( + Arc::new(ptr_to_current(ptr).clone()).into_waker() + ) + } + unsafe fn drop(_: *const ()) {} + unsafe fn wake(ptr: *const ()) { + ptr_to_current(ptr).0.notify() + } + + let ptr = current_to_ptr(self); + let vtable = &RawWakerVTable { clone, drop, wake }; + unsafe { + WakerRef::new(task03::Waker::new_unchecked(RawWaker::new(ptr, vtable))) + } } } diff --git a/futures-util/src/task/arc_wake.rs b/futures-util/src/task/arc_wake.rs index b0b88f98b4..aee4285986 100644 --- a/futures-util/src/task/arc_wake.rs +++ b/futures-util/src/task/arc_wake.rs @@ -1,16 +1,7 @@ +use std::mem; use std::sync::Arc; use std::task::{Waker, RawWaker, RawWakerVTable}; -macro_rules! waker_vtable { - ($ty:ident) => { - &RawWakerVTable { - clone: clone_arc_raw::<$ty>, - drop: drop_arc_raw::<$ty>, - wake: wake_arc_raw::<$ty>, - } - }; -} - /// A way of waking up a specific task. /// /// By implementing this trait, types that are expected to be wrapped in an `Arc` @@ -32,9 +23,9 @@ pub trait ArcWake { /// /// If `wake()` is called on the returned `Waker`, /// the `wake()` function that is defined inside this trait will get called. - fn into_waker(wake: Arc) -> Waker where Self: Sized + fn into_waker(self: Arc) -> Waker where Self: Sized { - let ptr = Arc::into_raw(wake) as *const(); + let ptr = Arc::into_raw(self) as *const(); unsafe { Waker::new_unchecked(RawWaker::new(ptr, waker_vtable!(Self))) @@ -42,29 +33,33 @@ pub trait ArcWake { } } +// FIXME: panics on Arc::clone / refcount changes could wreak havoc on the +// code here. We should guard against this by aborting. + unsafe fn increase_refcount(data: *const()) { // Retain Arc by creating a copy let arc: Arc = Arc::from_raw(data as *const T); let arc_clone = arc.clone(); // Forget the Arcs again, so that the refcount isn't decrased - let _ = Arc::into_raw(arc); - let _ = Arc::into_raw(arc_clone); + mem::forget(arc); + mem::forget(arc_clone); } -unsafe fn clone_arc_raw(data: *const()) -> RawWaker { +// used by `waker_ref` +pub(super) unsafe fn clone_arc_raw(data: *const()) -> RawWaker { increase_refcount::(data); RawWaker::new(data, waker_vtable!(T)) } unsafe fn drop_arc_raw(data: *const()) { - // Drop Arc - let _: Arc = Arc::from_raw(data as *const T); + drop(Arc::::from_raw(data as *const T)) } -unsafe fn wake_arc_raw(data: *const()) { +// used by `waker_ref` +pub(super) unsafe fn wake_arc_raw(data: *const()) { let arc: Arc = Arc::from_raw(data as *const T); - ArcWake::wake(&arc); // TODO: If this panics, the refcount is too big - let _ = Arc::into_raw(arc); + ArcWake::wake(&arc); + mem::forget(arc); } #[cfg(test)] @@ -115,4 +110,4 @@ mod tests { drop(w1); assert_eq!(1, Arc::strong_count(&some_w)); } -} \ No newline at end of file +} diff --git a/futures-util/src/task/mod.rs b/futures-util/src/task/mod.rs index af7e783d33..580b0e12c8 100644 --- a/futures-util/src/task/mod.rs +++ b/futures-util/src/task/mod.rs @@ -1,5 +1,17 @@ //! Task notification +/// A macro for creating a `RawWaker` vtable for a type that implements +/// the `ArcWake` trait. +macro_rules! waker_vtable { + ($ty:ident) => { + &RawWakerVTable { + clone: clone_arc_raw::<$ty>, + drop: drop_arc_raw::<$ty>, + wake: wake_arc_raw::<$ty>, + } + }; +} + #[cfg(feature = "std")] mod arc_wake; #[cfg(feature = "std")] diff --git a/futures-util/src/task/waker_ref.rs b/futures-util/src/task/waker_ref.rs index 1b39b3e66c..9608831b6f 100644 --- a/futures-util/src/task/waker_ref.rs +++ b/futures-util/src/task/waker_ref.rs @@ -1,10 +1,10 @@ #![allow(clippy::cast_ptr_alignment)] // clippy is too strict here +use super::arc_wake::{ArcWake, clone_arc_raw, wake_arc_raw}; use std::marker::PhantomData; use std::ops::Deref; use std::sync::Arc; use std::task::{Waker, RawWaker, RawWakerVTable}; -use super::ArcWake; /// A [`Waker`](::std::task::Waker) that is only valid for a given lifetime. /// @@ -38,28 +38,8 @@ impl<'a> Deref for WakerRef<'a> { } } -// Another reference vtable which doesn't do decrement the refcount on drop. -// However on clone it will create a vtable which equals a Waker, and on wake -// it will call the nonlocal wake function. -macro_rules! ref_vtable { - ($ty:ident) => { - &RawWakerVTable { - clone: clone_arc_raw::<$ty>, - drop: noop, - wake: wake_arc_raw::<$ty>, - } - }; -} - -macro_rules! owned_vtable { - ($ty:ident) => { - &RawWakerVTable { - clone: clone_arc_raw::<$ty>, - drop: drop_arc_raw::<$ty>, - wake: wake_arc_raw::<$ty>, - } - }; -} +#[inline] +unsafe fn noop(_data: *const ()) {} /// Creates a reference to a [`Waker`](::std::task::Waker) /// from a local [`wake`](::std::task::Wake). @@ -75,36 +55,16 @@ where // This is potentially not stable let ptr = &*wake as &W as *const W as *const(); + // Similar to `waker_vtable`, but with a no-op `drop` function. + // Clones of the resulting `RawWaker` will still be dropped normally. + let vtable = &RawWakerVTable { + clone: clone_arc_raw::, + drop: noop, + wake: wake_arc_raw::, + }; + let waker = unsafe { - Waker::new_unchecked(RawWaker::new(ptr, ref_vtable!(W))) + Waker::new_unchecked(RawWaker::new(ptr, vtable)) }; WakerRef::new(waker) } - -unsafe fn noop(_data: *const()) { -} - -unsafe fn increase_refcount(data: *const()) { - // Retain Arc by creating a copy - let arc: Arc = Arc::from_raw(data as *const T); - let arc_clone = arc.clone(); - // Forget the Arcs again, so that the refcount isn't decrased - let _ = Arc::into_raw(arc); - let _ = Arc::into_raw(arc_clone); -} - -unsafe fn clone_arc_raw(data: *const()) -> RawWaker { - increase_refcount::(data); - RawWaker::new(data, owned_vtable!(T)) -} - -unsafe fn drop_arc_raw(data: *const()) { - // Drop Arc - let _: Arc = Arc::from_raw(data as *const T); -} - -unsafe fn wake_arc_raw(data: *const()) { - let arc: Arc = Arc::from_raw(data as *const T); - ArcWake::wake(&arc); // TODO: If this panics, the refcount is too big - let _ = Arc::into_raw(arc); -} \ No newline at end of file