Skip to content

Commit 79ab386

Browse files
committed
sync: add len and is_empty methods to mpsc senders
1 parent a82bdee commit 79ab386

File tree

4 files changed

+156
-2
lines changed

4 files changed

+156
-2
lines changed

tokio/src/sync/mpsc/bounded.rs

+40-1
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,6 @@ impl<T> Receiver<T> {
515515
/// tx.send(0).await.unwrap();
516516
/// assert!(!rx.is_empty());
517517
/// }
518-
///
519518
/// ```
520519
pub fn is_empty(&self) -> bool {
521520
self.chan.is_empty()
@@ -1060,6 +1059,46 @@ impl<T> Sender<T> {
10601059
self.chan.is_closed()
10611060
}
10621061

1062+
/// Checks if a channel is empty.
1063+
///
1064+
/// This method returns `true` if the channel has no messages.
1065+
///
1066+
/// # Examples
1067+
/// ```
1068+
/// use tokio::sync::mpsc;
1069+
///
1070+
/// #[tokio::main]
1071+
/// async fn main() {
1072+
/// let (tx, rx) = mpsc::unbounded_channel();
1073+
/// assert!(rx.is_empty());
1074+
///
1075+
/// tx.send(0).await.unwrap();
1076+
/// assert!(!rx.is_empty());
1077+
/// }
1078+
/// ```
1079+
pub fn is_empty(&self) -> bool {
1080+
self.chan.semaphore().bound - self.chan.semaphore().semaphore.available_permits() == 0
1081+
}
1082+
1083+
/// Returns the number of messages in the channel.
1084+
///
1085+
/// # Examples
1086+
/// ```
1087+
/// use tokio::sync::mpsc;
1088+
///
1089+
/// #[tokio::main]
1090+
/// async fn main() {
1091+
/// let (tx, rx) = mpsc::unbounded_channel();
1092+
/// assert_eq!(0, rx.len());
1093+
///
1094+
/// tx.send(0).await.unwrap();
1095+
/// assert_eq!(1, rx.len());
1096+
/// }
1097+
/// ```
1098+
pub fn len(&self) -> usize {
1099+
self.chan.semaphore().bound - self.chan.semaphore().semaphore.available_permits()
1100+
}
1101+
10631102
/// Waits for channel capacity. Once capacity to send one message is
10641103
/// available, it is reserved for the caller.
10651104
///

tokio/src/sync/mpsc/chan.rs

+18
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub(crate) trait Semaphore {
4444

4545
fn add_permits(&self, n: usize);
4646

47+
fn available_permits(&self) -> usize;
48+
4749
fn close(&self);
4850

4951
fn is_closed(&self) -> bool;
@@ -193,6 +195,14 @@ impl<T, S> Tx<T, S> {
193195
}
194196

195197
impl<T, S: Semaphore> Tx<T, S> {
198+
pub(super) fn is_empty(&self) -> bool {
199+
self.inner.semaphore.available_permits() == 0
200+
}
201+
202+
pub(super) fn len(&self) -> usize {
203+
self.inner.semaphore.available_permits()
204+
}
205+
196206
pub(crate) fn is_closed(&self) -> bool {
197207
self.inner.semaphore.is_closed()
198208
}
@@ -576,6 +586,10 @@ impl Semaphore for bounded::Semaphore {
576586
self.semaphore.release(n)
577587
}
578588

589+
fn available_permits(&self) -> usize {
590+
self.semaphore.available_permits()
591+
}
592+
579593
fn is_idle(&self) -> bool {
580594
self.semaphore.available_permits() == self.bound
581595
}
@@ -610,6 +624,10 @@ impl Semaphore for unbounded::Semaphore {
610624
}
611625
}
612626

627+
fn available_permits(&self) -> usize {
628+
self.0.load(Acquire) >> 1
629+
}
630+
613631
fn is_idle(&self) -> bool {
614632
self.0.load(Acquire) >> 1 == 0
615633
}

tokio/src/sync/mpsc/unbounded.rs

+40-1
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,6 @@ impl<T> UnboundedReceiver<T> {
382382
/// tx.send(0).unwrap();
383383
/// assert!(!rx.is_empty());
384384
/// }
385-
///
386385
/// ```
387386
pub fn is_empty(&self) -> bool {
388387
self.chan.is_empty()
@@ -577,6 +576,46 @@ impl<T> UnboundedSender<T> {
577576
}
578577
}
579578

579+
/// Checks if a channel is empty.
580+
///
581+
/// This method returns `true` if the channel has no messages.
582+
///
583+
/// # Examples
584+
/// ```
585+
/// use tokio::sync::mpsc;
586+
///
587+
/// #[tokio::main]
588+
/// async fn main() {
589+
/// let (tx, rx) = mpsc::unbounded_channel();
590+
/// assert!(tx.is_empty());
591+
///
592+
/// tx.send(0).unwrap();
593+
/// assert!(!tx.is_empty());
594+
/// }
595+
/// ```
596+
pub fn is_empty(&self) -> bool {
597+
self.chan.is_empty()
598+
}
599+
600+
/// Returns the number of messages in the channel.
601+
///
602+
/// # Examples
603+
/// ```
604+
/// use tokio::sync::mpsc;
605+
///
606+
/// #[tokio::main]
607+
/// async fn main() {
608+
/// let (tx, rx) = mpsc::unbounded_channel();
609+
/// assert_eq!(0, tx.len());
610+
///
611+
/// tx.send(0).unwrap();
612+
/// assert_eq!(1, tx.len());
613+
/// }
614+
/// ```
615+
pub fn len(&self) -> usize {
616+
self.chan.len()
617+
}
618+
580619
/// Completes when the receiver has dropped.
581620
///
582621
/// This allows the producers to get notified when interest in the produced

tokio/tests/sync_mpsc.rs

+58
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,64 @@ async fn test_tx_capacity() {
10391039
assert_eq!(tx.max_capacity(), 10);
10401040
}
10411041

1042+
#[tokio::test]
1043+
async fn test_bounded_tx_len() {
1044+
let (tx, mut rx) = mpsc::channel::<()>(10);
1045+
1046+
// initially len should be 0
1047+
assert_eq!(tx.len(), 0);
1048+
assert!(tx.is_empty());
1049+
1050+
// queue one message, and len should be 1
1051+
tx.send(()).await.unwrap();
1052+
assert_eq!(tx.len(), 1);
1053+
assert!(!tx.is_empty());
1054+
1055+
// queue a second message, and len should be 2
1056+
tx.send(()).await.unwrap();
1057+
assert_eq!(tx.len(), 2);
1058+
assert!(!tx.is_empty());
1059+
1060+
// consume a message, and len should be 1
1061+
let _ = rx.recv().await;
1062+
assert_eq!(tx.len(), 1);
1063+
assert!(!tx.is_empty());
1064+
1065+
// consume a final message, and len should be 0
1066+
let _ = rx.recv().await;
1067+
assert_eq!(tx.len(), 0);
1068+
assert!(tx.is_empty());
1069+
}
1070+
1071+
#[tokio::test]
1072+
async fn test_unbounded_tx_len() {
1073+
let (tx, mut rx) = mpsc::unbounded_channel();
1074+
1075+
// initially len should be 0
1076+
assert_eq!(tx.len(), 0);
1077+
assert!(tx.is_empty());
1078+
1079+
// queue one message, and len should be 1
1080+
tx.send(()).unwrap();
1081+
assert_eq!(tx.len(), 1);
1082+
assert!(!tx.is_empty());
1083+
1084+
// queue a second message, and len should be 2
1085+
tx.send(()).unwrap();
1086+
assert_eq!(tx.len(), 2);
1087+
assert!(!tx.is_empty());
1088+
1089+
// consume a message, and len should be 1
1090+
let _ = rx.recv().await;
1091+
assert_eq!(tx.len(), 1);
1092+
assert!(!tx.is_empty());
1093+
1094+
// consume a final message, and len should be 0
1095+
let _ = rx.recv().await;
1096+
assert_eq!(tx.len(), 0);
1097+
assert!(tx.is_empty());
1098+
}
1099+
10421100
#[tokio::test]
10431101
async fn test_rx_is_closed_when_calling_close_with_sender() {
10441102
// is_closed should return true after calling close but still has a sender

0 commit comments

Comments
 (0)