Skip to content

Commit 7979543

Browse files
Adam Lesinskicramertj
authored andcommitted
Add Drop impl to futures::channel::mpsc::UnboundedReceiver
Pre-0.3.2, clients could expect to drop an UnboundedReceiver and observe that incoming messages are dropped and the channel closed. This change re-introduces this behavior for UnboundedReceiver and adds tests to prevent regressions. Note that a drop impl already exists for BoundedReceiver.
1 parent 09d7001 commit 7979543

File tree

2 files changed

+55
-0
lines changed

2 files changed

+55
-0
lines changed

futures-channel/src/mpsc/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,18 @@ impl<T> Stream for UnboundedReceiver<T> {
12111211
}
12121212
}
12131213

1214+
impl<T> Drop for UnboundedReceiver<T> {
1215+
fn drop(&mut self) {
1216+
// Drain the channel of all pending messages
1217+
self.close();
1218+
if self.inner.is_some() {
1219+
while let Poll::Ready(Some(..)) = self.next_message() {
1220+
// ...
1221+
}
1222+
}
1223+
}
1224+
}
1225+
12141226
/*
12151227
*
12161228
* ===== impl Inner =====

futures-channel/tests/mpsc-close.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use futures::channel::mpsc;
22
use futures::executor::block_on;
33
use futures::sink::SinkExt;
44
use futures::stream::StreamExt;
5+
use std::sync::Arc;
56
use std::thread;
67

78
#[test]
@@ -99,3 +100,45 @@ fn multiple_senders_close_channel() {
99100
assert_eq!(block_on(rx.next()), None);
100101
}
101102
}
103+
104+
#[test]
105+
fn single_receiver_drop_closes_channel_and_drains() {
106+
{
107+
let ref_count = Arc::new(0);
108+
let weak_ref = Arc::downgrade(&ref_count);
109+
110+
let (sender, receiver) = mpsc::unbounded();
111+
sender.unbounded_send(ref_count).expect("failed to send");
112+
113+
// Verify that the sent message is still live.
114+
assert!(weak_ref.upgrade().is_some());
115+
116+
drop(receiver);
117+
118+
// The sender should know the channel is closed.
119+
assert!(sender.is_closed());
120+
121+
// Verify that the sent message has been dropped.
122+
assert!(weak_ref.upgrade().is_none());
123+
}
124+
125+
{
126+
let ref_count = Arc::new(0);
127+
let weak_ref = Arc::downgrade(&ref_count);
128+
129+
let (mut sender, receiver) = mpsc::channel(1);
130+
sender.try_send(ref_count).expect("failed to send");
131+
132+
// Verify that the sent message is still live.
133+
assert!(weak_ref.upgrade().is_some());
134+
135+
drop(receiver);
136+
137+
// The sender should know the channel is closed.
138+
assert!(sender.is_closed());
139+
140+
// Verify that the sent message has been dropped.
141+
assert!(weak_ref.upgrade().is_none());
142+
assert!(sender.is_closed());
143+
}
144+
}

0 commit comments

Comments
 (0)