Skip to content

Commit c1e6971

Browse files
authored
Merge pull request #9334 from drmingdrmer/50-drop-watcher
feat(meta/watch): remove stream when a watch client is dropped
2 parents 482a40e + 47be88d commit c1e6971

File tree

6 files changed

+179
-73
lines changed

6 files changed

+179
-73
lines changed

src/meta/service/src/api/grpc/grpc_service.rs

+14-3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use crate::version::from_digit_ver;
5858
use crate::version::to_digit_ver;
5959
use crate::version::METASRV_SEMVER;
6060
use crate::version::MIN_METACLI_SEMVER;
61+
use crate::watcher::WatchStream;
6162

6263
pub struct MetaServiceImpl {
6364
token: GrpcToken,
@@ -239,10 +240,20 @@ impl MetaService for MetaServiceImpl {
239240
) -> Result<Response<Self::WatchStream>, Status> {
240241
let (tx, rx) = mpsc::channel(4);
241242

242-
self.meta_node.add_watcher(request.into_inner(), tx);
243+
let mn = &self.meta_node;
243244

244-
let output_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
245-
Ok(Response::new(Box::pin(output_stream) as Self::WatchStream))
245+
let add_res = mn.add_watcher(request.into_inner(), tx).await;
246+
247+
match add_res {
248+
Ok(watcher) => {
249+
let stream = WatchStream::new(rx, watcher, mn.dispatcher_handle.clone());
250+
Ok(Response::new(Box::pin(stream) as Self::WatchStream))
251+
}
252+
Err(e) => {
253+
// TODO: test error return.
254+
Err(Status::invalid_argument(e))
255+
}
256+
}
246257
}
247258

248259
async fn transaction(

src/meta/service/src/meta_service/raftmeta.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use common_meta_types::MetaOperationError;
6161
use common_meta_types::MetaStartupError;
6262
use common_meta_types::Node;
6363
use common_meta_types::NodeId;
64+
use futures::channel::oneshot;
6465
use itertools::Itertools;
6566
use openraft::Config;
6667
use openraft::LogId;
@@ -86,7 +87,7 @@ use crate::store::RaftStoreBare;
8687
use crate::watcher::DispatcherSender;
8788
use crate::watcher::EventDispatcher;
8889
use crate::watcher::EventDispatcherHandle;
89-
use crate::watcher::WatchEvent;
90+
use crate::watcher::Watcher;
9091
use crate::watcher::WatcherSender;
9192
use crate::Opened;
9293

@@ -1085,10 +1086,22 @@ impl MetaNode {
10851086
Ok(resp)
10861087
}
10871088

1088-
pub(crate) fn add_watcher(&self, request: WatchRequest, tx: WatcherSender) {
1089-
let _ = self
1090-
.dispatcher_handle
1091-
.tx
1092-
.send(WatchEvent::AddWatcher((request, tx)));
1089+
pub(crate) async fn add_watcher(
1090+
&self,
1091+
request: WatchRequest,
1092+
tx: WatcherSender,
1093+
) -> Result<Watcher, &'static str> {
1094+
let (resp_tx, resp_rx) = oneshot::channel();
1095+
1096+
self.dispatcher_handle.request(|d: &mut EventDispatcher| {
1097+
let add_res = d.add_watcher(request, tx);
1098+
let _ = resp_tx.send(add_res);
1099+
});
1100+
1101+
let recv_res = resp_rx.await;
1102+
match recv_res {
1103+
Ok(add_res) => add_res,
1104+
Err(_e) => Err("dispatcher closed"),
1105+
}
10931106
}
10941107
}

src/meta/service/src/watcher/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ mod watcher_stream;
1818
pub(crate) use watcher_manager::DispatcherSender;
1919
pub(crate) use watcher_manager::EventDispatcher;
2020
pub use watcher_manager::EventDispatcherHandle;
21-
pub(crate) use watcher_manager::WatchEvent;
2221
pub use watcher_manager::WatcherId;
2322
pub use watcher_manager::WatcherSender;
24-
pub use watcher_stream::WatcherInfo;
25-
pub use watcher_stream::WatcherStream;
23+
pub use watcher_stream::WatchStream;
24+
pub use watcher_stream::WatchStreamHandle;
25+
pub use watcher_stream::Watcher;

src/meta/service/src/watcher/watcher_manager.rs

+33-30
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use core::ops::Range;
1515

1616
use common_base::base::tokio;
1717
use common_base::base::tokio::sync::mpsc;
18-
use common_base::base::tokio::sync::mpsc::Sender;
1918
use common_base::base::tokio::sync::oneshot;
2019
use common_base::rangemap::RangeMap;
2120
use common_base::rangemap::RangeMapKey;
@@ -31,23 +30,22 @@ use tonic::Status;
3130
use tracing::info;
3231
use tracing::warn;
3332

34-
use super::WatcherStream;
33+
use super::WatchStreamHandle;
3534
use crate::metrics::network_metrics;
3635
use crate::metrics::server_metrics;
36+
use crate::watcher::Watcher;
3737

3838
pub type WatcherId = i64;
3939

4040
/// A sender for dispatcher to send event to interested watchers.
41-
pub type WatcherSender = Sender<Result<WatchResponse, Status>>;
41+
pub type WatcherSender = mpsc::Sender<Result<WatchResponse, Status>>;
4242

4343
/// A sender for event source, such as raft state machine, to send event to [`EventDispatcher`].
4444
#[derive(Clone, Debug)]
4545
pub(crate) struct DispatcherSender(pub(crate) mpsc::UnboundedSender<WatchEvent>);
4646

47+
/// An event sent to EventDispatcher.
4748
pub(crate) enum WatchEvent {
48-
/// Inform the dispatcher to add a new watcher.
49-
AddWatcher((WatchRequest, WatcherSender)),
50-
5149
/// Submit a kv change event to dispatcher
5250
KVChange(Change<Vec<u8>, String>),
5351

@@ -59,6 +57,7 @@ pub(crate) enum WatchEvent {
5957
},
6058
}
6159

60+
#[derive(Clone, Debug)]
6261
pub struct EventDispatcherHandle {
6362
/// For sending event or command to the dispatcher.
6463
pub(crate) tx: mpsc::UnboundedSender<WatchEvent>,
@@ -94,7 +93,7 @@ pub struct EventDispatcher {
9493
event_rx: mpsc::UnboundedReceiver<WatchEvent>,
9594

9695
/// map range to WatcherId
97-
watcher_range_map: RangeMap<String, WatcherId, WatcherStream>,
96+
watcher_range_map: RangeMap<String, WatcherId, WatchStreamHandle>,
9897

9998
current_watcher_id: WatcherId,
10099
}
@@ -120,9 +119,6 @@ impl EventDispatcher {
120119
loop {
121120
if let Some(event) = self.event_rx.recv().await {
122121
match event {
123-
WatchEvent::AddWatcher((req, tx)) => {
124-
self.add_watcher(req, tx).await;
125-
}
126122
WatchEvent::KVChange(kv_change) => {
127123
self.dispatch_event(kv_change).await;
128124
}
@@ -178,57 +174,64 @@ impl EventDispatcher {
178174
watcher_id, err
179175
);
180176
remove_range_keys.push(RangeMapKey::new(
181-
stream.watcher.key.clone()..stream.watcher.key_end.clone(),
177+
stream.watcher.key_range.clone(),
182178
watcher_id,
183179
));
184180
};
185181
}
186182

187183
// TODO: when a watcher stream is dropped, send a event to remove the watcher explicitly
188184
for range_key in remove_range_keys {
189-
self.remove_watcher(range_key);
185+
self.remove_watcher(&range_key);
190186
}
191187
}
192188

193189
#[tracing::instrument(level = "debug", skip(self))]
194-
pub async fn add_watcher(&mut self, create: WatchRequest, tx: WatcherSender) {
195-
info!("create_watcher_stream: {:?}", create);
196-
197-
let range = match EventDispatcher::get_range_key(create.key.clone(), &create.key_end) {
190+
pub fn add_watcher(
191+
&mut self,
192+
create: WatchRequest,
193+
tx: WatcherSender,
194+
) -> Result<Watcher, &'static str> {
195+
info!("add_watcher: {:?}", create);
196+
197+
let range = match EventDispatcher::build_key_range(create.key.clone(), &create.key_end) {
198198
Ok(range) => range,
199-
Err(_) => return,
199+
Err(e) => return Err(e),
200200
};
201201

202202
self.current_watcher_id += 1;
203203
let watcher_id = self.current_watcher_id;
204204
let filter: FilterType = create.filter_type();
205205

206-
let watcher_stream = WatcherStream::new(
207-
watcher_id,
208-
filter,
209-
tx,
210-
range.start.clone(),
211-
range.end.clone(),
212-
);
206+
let watcher = Watcher::new(watcher_id, filter, range.clone());
207+
let stream_handle = WatchStreamHandle::new(watcher.clone(), tx);
213208

214209
self.watcher_range_map
215-
.insert(range, watcher_id, watcher_stream);
210+
.insert(range, watcher_id, stream_handle);
216211

217212
server_metrics::incr_watchers(1);
213+
214+
Ok(watcher)
218215
}
219216

220-
#[tracing::instrument(level = "debug", skip(self))]
221-
fn remove_watcher(&mut self, key: RangeMapKey<String, WatcherId>) {
222-
self.watcher_range_map.remove_by_key(&key);
217+
#[tracing::instrument(level = "debug", skip_all)]
218+
pub fn remove_watcher(&mut self, key: &RangeMapKey<String, WatcherId>) {
219+
info!("remove_watcher: {:?}", key);
220+
221+
self.watcher_range_map.remove_by_key(key);
223222

223+
// TODO: decrease it only when the key is actually removed
224224
server_metrics::incr_watchers(-1);
225225
}
226226

227-
fn get_range_key(key: String, key_end: &Option<String>) -> Result<Range<String>, bool> {
227+
fn build_key_range(
228+
key: String,
229+
key_end: &Option<String>,
230+
) -> Result<Range<String>, &'static str> {
228231
match key_end {
229232
Some(key_end) => {
230233
if &key > key_end {
231-
return Err(false);
234+
return Err("empty range");
232235
}
233236
Ok(key..key_end.to_string())
234237
}

src/meta/service/src/watcher/watcher_stream.rs

+85-23
Original file line numberDiff line numberDiff line change
@@ -12,48 +12,54 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::ops::Range;
16+
use std::pin::Pin;
17+
use std::task::Context;
18+
use std::task::Poll;
19+
1520
use common_base::base::tokio::sync::mpsc::error::SendError;
21+
use common_base::base::tokio::sync::mpsc::Receiver;
22+
use common_base::rangemap::RangeMapKey;
1623
use common_meta_types::protobuf::watch_request::FilterType;
1724
use common_meta_types::protobuf::WatchResponse;
25+
use futures::Stream;
1826
use tonic::Status;
1927

2028
use super::WatcherId;
2129
use super::WatcherSender;
30+
use crate::watcher::EventDispatcherHandle;
2231

2332
/// Attributes of a watcher that is interested in kv change events.
24-
pub struct WatcherInfo {
33+
#[derive(Clone, Debug)]
34+
pub struct Watcher {
2535
pub id: WatcherId,
2636

37+
/// Defines how to filter keys with `key_range`.
2738
pub filter_type: FilterType,
2839

29-
pub key: String,
30-
31-
pub key_end: String,
40+
/// The range of key this watcher is interested in.
41+
pub key_range: Range<String>,
3242
}
3343

34-
pub struct WatcherStream {
35-
pub watcher: WatcherInfo,
44+
impl Watcher {
45+
pub fn new(id: WatcherId, filter_type: FilterType, key_range: Range<String>) -> Self {
46+
Self {
47+
id,
48+
filter_type,
49+
key_range,
50+
}
51+
}
52+
}
3653

54+
/// A handle of a watching stream, for feeding messages to the stream.
55+
pub struct WatchStreamHandle {
56+
pub watcher: Watcher,
3757
tx: WatcherSender,
3858
}
3959

40-
impl WatcherStream {
41-
pub fn new(
42-
id: WatcherId,
43-
filter_type: FilterType,
44-
tx: WatcherSender,
45-
key: String,
46-
key_end: String,
47-
) -> Self {
48-
WatcherStream {
49-
watcher: WatcherInfo {
50-
id,
51-
filter_type,
52-
key,
53-
key_end,
54-
},
55-
tx,
56-
}
60+
impl WatchStreamHandle {
61+
pub fn new(watcher: Watcher, tx: WatcherSender) -> Self {
62+
WatchStreamHandle { watcher, tx }
5763
}
5864

5965
pub async fn send(
@@ -63,3 +69,59 @@ impl WatcherStream {
6369
self.tx.send(Ok(resp)).await
6470
}
6571
}
72+
73+
/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
74+
#[derive(Debug)]
75+
pub struct WatchStream<T> {
76+
inner: Receiver<T>,
77+
watcher: Watcher,
78+
dispatcher: EventDispatcherHandle,
79+
}
80+
81+
impl<T> Drop for WatchStream<T> {
82+
fn drop(&mut self) {
83+
let rng = self.watcher.key_range.clone();
84+
let id = self.watcher.id;
85+
86+
self.dispatcher.request(move |d| {
87+
let key = RangeMapKey::new(rng, id);
88+
d.remove_watcher(&key)
89+
})
90+
}
91+
}
92+
93+
impl<T> WatchStream<T> {
94+
/// Create a new `WatcherStream`.
95+
pub fn new(rx: Receiver<T>, watcher: Watcher, dispatcher: EventDispatcherHandle) -> Self {
96+
Self {
97+
inner: rx,
98+
watcher,
99+
dispatcher,
100+
}
101+
}
102+
103+
/// Closes the receiving half of a channel without dropping it.
104+
pub fn close(&mut self) {
105+
self.inner.close()
106+
}
107+
}
108+
109+
impl<T> Stream for WatchStream<T> {
110+
type Item = T;
111+
112+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
113+
self.inner.poll_recv(cx)
114+
}
115+
}
116+
117+
impl<T> AsRef<Receiver<T>> for WatchStream<T> {
118+
fn as_ref(&self) -> &Receiver<T> {
119+
&self.inner
120+
}
121+
}
122+
123+
impl<T> AsMut<Receiver<T>> for WatchStream<T> {
124+
fn as_mut(&mut self) -> &mut Receiver<T> {
125+
&mut self.inner
126+
}
127+
}

0 commit comments

Comments
 (0)