Skip to content

Commit 25525a8

Browse files
authored
Merge pull request #17 from n0-computer/pluggable-gc-with-exemptions
feat: Pluggable gc with exemptions
2 parents b9dbf2c + 24a1172 commit 25525a8

File tree

2 files changed

+138
-53
lines changed

2 files changed

+138
-53
lines changed

src/net_protocol.rs

+66-53
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
#![allow(missing_docs)]
55

66
use std::{
7-
collections::BTreeMap,
7+
collections::{BTreeMap, BTreeSet},
88
fmt::Debug,
9+
ops::DerefMut,
910
sync::{Arc, OnceLock},
1011
};
1112

12-
use anyhow::{anyhow, Result};
13+
use anyhow::{anyhow, bail, Result};
1314
use futures_lite::future::Boxed as BoxedFuture;
15+
use futures_util::future::BoxFuture;
1416
use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr};
1517
use iroh_base::hash::{BlobFormat, Hash};
1618
use serde::{Deserialize, Serialize};
@@ -23,27 +25,32 @@ use crate::{
2325
Stats,
2426
},
2527
provider::EventSender,
28+
store::GcConfig,
2629
util::{
27-
local_pool::LocalPoolHandle,
30+
local_pool::{self, LocalPoolHandle},
2831
progress::{AsyncChannelProgressSender, ProgressSender},
2932
SetTagOption,
3033
},
3134
HashAndFormat, TempTag,
3235
};
3336

34-
// pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
35-
//
36-
// #[derive(derive_more::Debug)]
37-
// enum GcState {
38-
// Initial(#[debug(skip)] Vec<ProtectCb>),
39-
// Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
40-
// }
41-
//
42-
// impl Default for GcState {
43-
// fn default() -> Self {
44-
// Self::Initial(Vec::new())
45-
// }
46-
// }
37+
/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
38+
pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
39+
40+
/// The state of the gc loop.
41+
#[derive(derive_more::Debug)]
42+
enum GcState {
43+
// Gc loop is not yet running. Other protocols can add protect callbacks
44+
Initial(#[debug(skip)] Vec<ProtectCb>),
45+
// Gc loop is running. No more protect callbacks can be added.
46+
Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
47+
}
48+
49+
impl Default for GcState {
50+
fn default() -> Self {
51+
Self::Initial(Vec::new())
52+
}
53+
}
4754

4855
#[derive(Debug)]
4956
pub struct Blobs<S> {
@@ -53,6 +60,7 @@ pub struct Blobs<S> {
5360
downloader: Downloader,
5461
batches: tokio::sync::Mutex<BlobBatches>,
5562
endpoint: Endpoint,
63+
gc_state: Arc<std::sync::Mutex<GcState>>,
5664
#[cfg(feature = "rpc")]
5765
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
5866
}
@@ -184,6 +192,7 @@ impl<S: crate::store::Store> Blobs<S> {
184192
downloader,
185193
endpoint,
186194
batches: Default::default(),
195+
gc_state: Default::default(),
187196
#[cfg(feature = "rpc")]
188197
rpc_handler: Arc::new(OnceLock::new()),
189198
}
@@ -205,43 +214,47 @@ impl<S: crate::store::Store> Blobs<S> {
205214
&self.endpoint
206215
}
207216

208-
// pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
209-
// let mut state = self.gc_state.lock().unwrap();
210-
// match &mut *state {
211-
// GcState::Initial(cbs) => {
212-
// cbs.push(cb);
213-
// }
214-
// GcState::Started(_) => {
215-
// anyhow::bail!("cannot add protected blobs after gc has started");
216-
// }
217-
// }
218-
// Ok(())
219-
// }
220-
//
221-
// pub fn start_gc(&self, config: GcConfig) -> Result<()> {
222-
// let mut state = self.gc_state.lock().unwrap();
223-
// let protected = match state.deref_mut() {
224-
// GcState::Initial(items) => std::mem::take(items),
225-
// GcState::Started(_) => anyhow::bail!("gc already started"),
226-
// };
227-
// let protected = Arc::new(protected);
228-
// let protected_cb = move || {
229-
// let protected = protected.clone();
230-
// async move {
231-
// let mut set = BTreeSet::new();
232-
// for cb in protected.iter() {
233-
// cb(&mut set).await;
234-
// }
235-
// set
236-
// }
237-
// };
238-
// let store = self.store.clone();
239-
// let run = self
240-
// .rt
241-
// .spawn(move || async move { store.gc_run(config, protected_cb).await });
242-
// *state = GcState::Started(Some(run));
243-
// Ok(())
244-
// }
217+
/// Add a callback that will be called before the garbage collector runs.
218+
///
219+
/// This can only be called before the garbage collector has started, otherwise it will return an error.
220+
pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
221+
let mut state = self.gc_state.lock().unwrap();
222+
match &mut *state {
223+
GcState::Initial(cbs) => {
224+
cbs.push(cb);
225+
}
226+
GcState::Started(_) => {
227+
anyhow::bail!("cannot add protected blobs after gc has started");
228+
}
229+
}
230+
Ok(())
231+
}
232+
233+
/// Start garbage collection with the given settings.
234+
pub fn start_gc(&self, config: GcConfig) -> Result<()> {
235+
let mut state = self.gc_state.lock().unwrap();
236+
let protected = match state.deref_mut() {
237+
GcState::Initial(items) => std::mem::take(items),
238+
GcState::Started(_) => bail!("gc already started"),
239+
};
240+
let protected = Arc::new(protected);
241+
let protected_cb = move || {
242+
let protected = protected.clone();
243+
async move {
244+
let mut set = BTreeSet::new();
245+
for cb in protected.iter() {
246+
cb(&mut set).await;
247+
}
248+
set
249+
}
250+
};
251+
let store = self.store.clone();
252+
let run = self
253+
.rt
254+
.spawn(move || async move { store.gc_run(config, protected_cb).await });
255+
*state = GcState::Started(Some(run));
256+
Ok(())
257+
}
245258

246259
pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
247260
self.batches.lock().await

tests/blobs.rs

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#![cfg(feature = "net_protocol")]
2+
use std::{
3+
sync::{Arc, Mutex},
4+
time::Duration,
5+
};
6+
7+
use iroh::Endpoint;
8+
use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool};
9+
use testresult::TestResult;
10+
11+
#[tokio::test]
12+
async fn blobs_gc_smoke() -> TestResult<()> {
13+
let pool = LocalPool::default();
14+
let endpoint = Endpoint::builder().bind().await?;
15+
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
16+
let client = blobs.clone().client();
17+
blobs.start_gc(GcConfig {
18+
period: Duration::from_millis(1),
19+
done_callback: None,
20+
})?;
21+
let h1 = client.add_bytes(b"test".to_vec()).await?;
22+
tokio::time::sleep(Duration::from_millis(100)).await;
23+
assert!(client.has(h1.hash).await?);
24+
client.tags().delete(h1.tag).await?;
25+
tokio::time::sleep(Duration::from_millis(100)).await;
26+
assert!(!client.has(h1.hash).await?);
27+
Ok(())
28+
}
29+
30+
#[tokio::test]
31+
async fn blobs_gc_protected() -> TestResult<()> {
32+
let pool = LocalPool::default();
33+
let endpoint = Endpoint::builder().bind().await?;
34+
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
35+
let client: iroh_blobs::rpc::client::blobs::Client<
36+
quic_rpc::transport::flume::FlumeConnector<
37+
iroh_blobs::rpc::proto::Response,
38+
iroh_blobs::rpc::proto::Request,
39+
>,
40+
> = blobs.clone().client();
41+
let h1 = client.add_bytes(b"test".to_vec()).await?;
42+
let protected = Arc::new(Mutex::new(Vec::new()));
43+
blobs.add_protected(Box::new({
44+
let protected = protected.clone();
45+
move |x| {
46+
let protected = protected.clone();
47+
Box::pin(async move {
48+
let protected = protected.lock().unwrap();
49+
for h in protected.as_slice() {
50+
x.insert(*h);
51+
}
52+
})
53+
}
54+
}))?;
55+
blobs.start_gc(GcConfig {
56+
period: Duration::from_millis(1),
57+
done_callback: None,
58+
})?;
59+
tokio::time::sleep(Duration::from_millis(100)).await;
60+
// protected from gc due to tag
61+
assert!(client.has(h1.hash).await?);
62+
protected.lock().unwrap().push(h1.hash);
63+
client.tags().delete(h1.tag).await?;
64+
tokio::time::sleep(Duration::from_millis(100)).await;
65+
// protected from gc due to being in protected set
66+
assert!(client.has(h1.hash).await?);
67+
protected.lock().unwrap().clear();
68+
tokio::time::sleep(Duration::from_millis(100)).await;
69+
// not protected, must be gone
70+
assert!(!client.has(h1.hash).await?);
71+
Ok(())
72+
}

0 commit comments

Comments
 (0)