Skip to content

Commit 803097a

Browse files
authored
Merge branch 'master' into waker_changes3
2 parents 2d3ce08 + 446134c commit 803097a

File tree

21 files changed

+252
-93
lines changed

21 files changed

+252
-93
lines changed

futures-channel/src/lib.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77

88
#![cfg_attr(not(feature = "std"), no_std)]
99

10-
#![warn(missing_docs, missing_debug_implementations)]
11-
#![deny(bare_trait_objects)]
10+
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
1211

1312
#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.12/futures_channel")]
1413

futures-channel/src/mpsc/mod.rs

+122-50
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,8 @@ use crate::mpsc::queue::Queue;
9494

9595
mod queue;
9696

97-
/// The transmission end of a bounded mpsc channel.
98-
///
99-
/// This value is created by the [`channel`](channel) function.
10097
#[derive(Debug)]
101-
pub struct Sender<T> {
98+
struct SenderInner<T> {
10299
// Channel state shared between the sender and receiver.
103100
inner: Arc<Inner<T>>,
104101

@@ -112,14 +109,20 @@ pub struct Sender<T> {
112109
maybe_parked: bool,
113110
}
114111

115-
// We never project Pin<&mut Sender> to `Pin<&mut T>`
116-
impl<T> Unpin for Sender<T> {}
112+
// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
113+
impl<T> Unpin for SenderInner<T> {}
114+
115+
/// The transmission end of a bounded mpsc channel.
116+
///
117+
/// This value is created by the [`channel`](channel) function.
118+
#[derive(Debug)]
119+
pub struct Sender<T>(Option<SenderInner<T>>);
117120

118121
/// The transmission end of an unbounded mpsc channel.
119122
///
120123
/// This value is created by the [`unbounded`](unbounded) function.
121124
#[derive(Debug)]
122-
pub struct UnboundedSender<T>(Sender<T>);
125+
pub struct UnboundedSender<T>(Option<SenderInner<T>>);
123126

124127
trait AssertKinds: Send + Sync + Clone {}
125128
impl AssertKinds for UnboundedSender<u32> {}
@@ -357,7 +360,8 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
357360
// Check that the requested buffer size does not exceed the maximum buffer
358361
// size permitted by the system.
359362
assert!(buffer < MAX_BUFFER, "requested buffer size too large");
360-
channel2(Some(buffer))
363+
let (tx, rx) = channel2(Some(buffer));
364+
(Sender(Some(tx)), rx)
361365
}
362366

363367
/// Creates an unbounded mpsc channel for communicating between asynchronous
@@ -372,10 +376,10 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
372376
/// process to run out of memory. In this case, the process will be aborted.
373377
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
374378
let (tx, rx) = channel2(None);
375-
(UnboundedSender(tx), UnboundedReceiver(rx))
379+
(UnboundedSender(Some(tx)), UnboundedReceiver(rx))
376380
}
377381

378-
fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
382+
fn channel2<T>(buffer: Option<usize>) -> (SenderInner<T>, Receiver<T>) {
379383
let inner = Arc::new(Inner {
380384
buffer,
381385
state: AtomicUsize::new(INIT_STATE),
@@ -385,7 +389,7 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
385389
recv_task: AtomicWaker::new(),
386390
});
387391

388-
let tx = Sender {
392+
let tx = SenderInner {
389393
inner: inner.clone(),
390394
sender_task: Arc::new(Mutex::new(SenderTask::new())),
391395
maybe_parked: false,
@@ -404,10 +408,10 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
404408
*
405409
*/
406410

407-
impl<T> Sender<T> {
411+
impl<T> SenderInner<T> {
408412
/// Attempts to send a message on this `Sender`, returning the message
409413
/// if there was an error.
410-
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
414+
fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
411415
// If the sender is currently blocked, reject the message
412416
if !self.poll_unparked(None).is_ready() {
413417
return Err(TrySendError {
@@ -422,16 +426,6 @@ impl<T> Sender<T> {
422426
self.do_send_b(msg)
423427
}
424428

425-
/// Send a message on the channel.
426-
///
427-
/// This function should only be called after
428-
/// [`poll_ready`](Sender::poll_ready) has reported that the channel is
429-
/// ready to receive a message.
430-
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
431-
self.try_send(msg)
432-
.map_err(|e| e.err)
433-
}
434-
435429
// Do the send without failing.
436430
// Can be called only by bounded sender.
437431
fn do_send_b(&mut self, msg: T)
@@ -484,7 +478,7 @@ impl<T> Sender<T> {
484478
Poll::Ready(Ok(()))
485479
} else {
486480
Poll::Ready(Err(SendError {
487-
kind: SendErrorKind::Full,
481+
kind: SendErrorKind::Disconnected,
488482
}))
489483
}
490484
}
@@ -559,7 +553,7 @@ impl<T> Sender<T> {
559553
/// capacity, in which case the current task is queued to be notified once
560554
/// capacity is available;
561555
/// - `Err(SendError)` if the receiver has been dropped.
562-
pub fn poll_ready(
556+
fn poll_ready(
563557
&mut self,
564558
waker: &Waker
565559
) -> Poll<Result<(), SendError>> {
@@ -574,12 +568,12 @@ impl<T> Sender<T> {
574568
}
575569

576570
/// Returns whether this channel is closed without needing a context.
577-
pub fn is_closed(&self) -> bool {
571+
fn is_closed(&self) -> bool {
578572
!decode_state(self.inner.state.load(SeqCst)).is_open
579573
}
580574

581575
/// Closes this channel from the sender side, preventing any new messages.
582-
pub fn close_channel(&mut self) {
576+
fn close_channel(&self) {
583577
// There's no need to park this sender, its dropping,
584578
// and we don't want to check for capacity, so skip
585579
// that stuff from `do_send`.
@@ -615,43 +609,116 @@ impl<T> Sender<T> {
615609
}
616610
}
617611

612+
impl<T> Sender<T> {
613+
/// Attempts to send a message on this `Sender`, returning the message
614+
/// if there was an error.
615+
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
616+
if let Some(inner) = &mut self.0 {
617+
inner.try_send(msg)
618+
} else {
619+
Err(TrySendError {
620+
err: SendError {
621+
kind: SendErrorKind::Disconnected,
622+
},
623+
val: msg,
624+
})
625+
}
626+
}
627+
628+
/// Send a message on the channel.
629+
///
630+
/// This function should only be called after
631+
/// [`poll_ready`](Sender::poll_ready) has reported that the channel is
632+
/// ready to receive a message.
633+
pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
634+
self.try_send(msg)
635+
.map_err(|e| e.err)
636+
}
637+
638+
/// Polls the channel to determine if there is guaranteed capacity to send
639+
/// at least one item without waiting.
640+
///
641+
/// # Return value
642+
///
643+
/// This method returns:
644+
///
645+
/// - `Ok(Async::Ready(_))` if there is sufficient capacity;
646+
/// - `Ok(Async::Pending)` if the channel may not have
647+
/// capacity, in which case the current task is queued to be notified once
648+
/// capacity is available;
649+
/// - `Err(SendError)` if the receiver has been dropped.
650+
pub fn poll_ready(
651+
&mut self,
652+
lw: &LocalWaker
653+
) -> Poll<Result<(), SendError>> {
654+
let inner = self.0.as_mut().ok_or(SendError {
655+
kind: SendErrorKind::Disconnected,
656+
})?;
657+
inner.poll_ready(lw)
658+
}
659+
660+
/// Returns whether this channel is closed without needing a context.
661+
pub fn is_closed(&self) -> bool {
662+
self.0.as_ref().map(SenderInner::is_closed).unwrap_or(true)
663+
}
664+
665+
/// Closes this channel from the sender side, preventing any new messages.
666+
pub fn close_channel(&mut self) {
667+
if let Some(inner) = &mut self.0 {
668+
inner.close_channel();
669+
}
670+
}
671+
672+
/// Disconnects this sender from the channel, closing it if there are no more senders left.
673+
pub fn disconnect(&mut self) {
674+
self.0 = None;
675+
}
676+
}
677+
618678
impl<T> UnboundedSender<T> {
619679
/// Check if the channel is ready to receive a message.
620680
pub fn poll_ready(
621681
&self,
622682
_: &Waker,
623683
) -> Poll<Result<(), SendError>> {
624-
self.0.poll_ready_nb()
684+
let inner = self.0.as_ref().ok_or(SendError {
685+
kind: SendErrorKind::Disconnected,
686+
})?;
687+
inner.poll_ready_nb()
625688
}
626689

627690
/// Returns whether this channel is closed without needing a context.
628691
pub fn is_closed(&self) -> bool {
629-
self.0.is_closed()
692+
self.0.as_ref().map(SenderInner::is_closed).unwrap_or(true)
630693
}
631694

632695
/// Closes this channel from the sender side, preventing any new messages.
633696
pub fn close_channel(&self) {
634-
self.0.inner.set_closed();
635-
self.0.inner.recv_task.wake();
697+
if let Some(inner) = &self.0 {
698+
inner.close_channel();
699+
}
700+
}
701+
702+
/// Disconnects this sender from the channel, closing it if there are no more senders left.
703+
pub fn disconnect(&mut self) {
704+
self.0 = None;
636705
}
637706

638707
// Do the send without parking current task.
639708
fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
640-
match self.0.inc_num_messages() {
641-
Some(_num_messages) => {}
642-
None => {
643-
return Err(TrySendError {
644-
err: SendError {
645-
kind: SendErrorKind::Disconnected,
646-
},
647-
val: msg,
648-
});
649-
},
650-
};
651-
652-
self.0.queue_push_and_signal(msg);
709+
if let Some(inner) = &self.0 {
710+
if inner.inc_num_messages().is_some() {
711+
inner.queue_push_and_signal(msg);
712+
return Ok(());
713+
}
714+
}
653715

654-
Ok(())
716+
Err(TrySendError {
717+
err: SendError {
718+
kind: SendErrorKind::Disconnected,
719+
},
720+
val: msg,
721+
})
655722
}
656723

657724
/// Send a message on the channel.
@@ -673,15 +740,20 @@ impl<T> UnboundedSender<T> {
673740
}
674741
}
675742

743+
impl<T> Clone for Sender<T> {
744+
fn clone(&self) -> Sender<T> {
745+
Sender(self.0.clone())
746+
}
747+
}
748+
676749
impl<T> Clone for UnboundedSender<T> {
677750
fn clone(&self) -> UnboundedSender<T> {
678751
UnboundedSender(self.0.clone())
679752
}
680753
}
681754

682-
683-
impl<T> Clone for Sender<T> {
684-
fn clone(&self) -> Sender<T> {
755+
impl<T> Clone for SenderInner<T> {
756+
fn clone(&self) -> SenderInner<T> {
685757
// Since this atomic op isn't actually guarding any memory and we don't
686758
// care about any orderings besides the ordering on the single atomic
687759
// variable, a relaxed ordering is acceptable.
@@ -701,7 +773,7 @@ impl<T> Clone for Sender<T> {
701773
// The ABA problem doesn't matter here. We only care that the
702774
// number of senders never exceeds the maximum.
703775
if actual == curr {
704-
return Sender {
776+
return SenderInner {
705777
inner: self.inner.clone(),
706778
sender_task: Arc::new(Mutex::new(SenderTask::new())),
707779
maybe_parked: false,
@@ -713,7 +785,7 @@ impl<T> Clone for Sender<T> {
713785
}
714786
}
715787

716-
impl<T> Drop for Sender<T> {
788+
impl<T> Drop for SenderInner<T> {
717789
fn drop(&mut self) {
718790
// Ordering between variables don't matter here
719791
let prev = self.inner.num_senders.fetch_sub(1, SeqCst);

futures-channel/tests/channel.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures::executor::block_on;
55
use futures::future::poll_fn;
66
use futures::stream::StreamExt;
77
use futures::sink::SinkExt;
8-
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
8+
use std::sync::atomic::{AtomicUsize, Ordering};
99
use std::thread;
1010

1111
#[test]
@@ -52,7 +52,7 @@ fn drop_rx() {
5252

5353
#[test]
5454
fn drop_order() {
55-
static DROPS: AtomicUsize = ATOMIC_USIZE_INIT;
55+
static DROPS: AtomicUsize = AtomicUsize::new(0);
5656
let (mut tx, rx) = mpsc::channel(1);
5757

5858
struct A;

0 commit comments

Comments
 (0)