Skip to content

Commit b37c1b4

Browse files
committed
Add example how one would implement expiring tags
1 parent 25af32b commit b37c1b4

File tree

1 file changed

+155
-0
lines changed

1 file changed

+155
-0
lines changed

examples/expiring-tags.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
use std::time::{Duration, SystemTime};
2+
3+
use chrono::Utc;
4+
use futures_lite::StreamExt;
5+
use iroh::endpoint;
6+
use iroh_blobs::store::GcConfig;
7+
use iroh_blobs::{hashseq::HashSeq, BlobFormat, HashAndFormat};
8+
use iroh_blobs::Hash;
9+
10+
use iroh_blobs::rpc::client::blobs::MemClient as BlobsClient;
11+
use tokio::signal::ctrl_c;
12+
13+
/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
14+
///
15+
/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
16+
///
17+
async fn create_expiring_tag(
18+
iroh: &BlobsClient,
19+
hashes: &[Hash],
20+
prefix: &str,
21+
expiry: SystemTime,
22+
) -> anyhow::Result<()> {
23+
let expiry = chrono::DateTime::<chrono::Utc>::from(expiry);
24+
let expiry = expiry.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
25+
let tagname = format!("{}-{}", prefix, expiry);
26+
let batch = iroh.batch().await?;
27+
let tt = if hashes.is_empty() {
28+
return Ok(());
29+
} else if hashes.len() == 1 {
30+
let hash = hashes[0];
31+
batch.temp_tag(HashAndFormat::raw(hash)).await?
32+
} else {
33+
let hs = hashes.into_iter().copied().collect::<HashSeq>();
34+
batch
35+
.add_bytes_with_opts(hs.into_inner(), BlobFormat::HashSeq)
36+
.await?
37+
};
38+
batch.persist_to(tt, tagname.as_str().into()).await?;
39+
println!("Created tag {}", tagname);
40+
Ok(())
41+
}
42+
43+
async fn delete_expired_tags(iroh: &BlobsClient, prefix: &str) -> anyhow::Result<()> {
44+
let mut tags = iroh.tags().list().await?;
45+
let prefix = format!("{}-", prefix);
46+
let now = chrono::Utc::now();
47+
let mut to_delete = Vec::new();
48+
while let Some(tag) = tags.next().await {
49+
let tag = tag?.name;
50+
if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) {
51+
let Ok(expiry) = std::str::from_utf8(rest) else {
52+
tracing::warn!("Tag {} does have non utf8 expiry", tag);
53+
continue;
54+
};
55+
let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else {
56+
tracing::warn!("Tag {} does have invalid expiry date", tag);
57+
continue;
58+
};
59+
let expiry = expiry.with_timezone(&Utc);
60+
if expiry < now {
61+
to_delete.push(tag);
62+
}
63+
}
64+
}
65+
for tag in to_delete {
66+
println!("Deleting expired tag {}", tag);
67+
iroh.tags().delete(tag).await?;
68+
}
69+
Ok(())
70+
}
71+
72+
async fn print_tags_task(blobs: BlobsClient) -> anyhow::Result<()> {
73+
loop {
74+
let now = chrono::Utc::now();
75+
let mut tags = blobs.tags().list().await?;
76+
println!("Tags at {}:\n", now);
77+
while let Some(tag) = tags.next().await {
78+
let tag = tag?;
79+
println!(" {:?}", tag);
80+
}
81+
println!();
82+
tokio::time::sleep(Duration::from_secs(5)).await;
83+
}
84+
}
85+
86+
async fn print_blobs_task(blobs: BlobsClient) -> anyhow::Result<()> {
87+
loop {
88+
let now = chrono::Utc::now();
89+
let mut blobs = blobs.list().await?;
90+
println!("Blobs at {}:\n", now);
91+
while let Some(info) = blobs.next().await {
92+
println!(" {:?}", info?);
93+
}
94+
println!();
95+
tokio::time::sleep(Duration::from_secs(5)).await;
96+
}
97+
}
98+
99+
async fn delete_expired_tags_task(blobs: BlobsClient, prefix: &str, ) -> anyhow::Result<()> {
100+
loop {
101+
delete_expired_tags(&blobs, prefix).await?;
102+
tokio::time::sleep(Duration::from_secs(5)).await;
103+
}
104+
}
105+
106+
#[tokio::main]
107+
async fn main() -> anyhow::Result<()> {
108+
tracing_subscriber::fmt::init();
109+
let endpoint = endpoint::Endpoint::builder().bind().await?;
110+
let store = iroh_blobs::store::fs::Store::load("blobs").await?;
111+
let blobs = iroh_blobs::net_protocol::Blobs::builder(store)
112+
.build(&endpoint);
113+
// enable gc with a short period
114+
blobs.start_gc(GcConfig {
115+
period: Duration::from_secs(1),
116+
done_callback: None,
117+
})?;
118+
// create a router and add blobs as a service
119+
//
120+
// You can skip this if you don't want to serve the data over the network.
121+
let router = iroh::protocol::Router::builder(endpoint)
122+
.accept(iroh_blobs::ALPN, blobs.clone())
123+
.spawn().await?;
124+
125+
// setup: add some data and tag it
126+
{
127+
// add several blobs and tag them with an expiry date 10 seconds in the future
128+
let batch = blobs.client().batch().await?;
129+
let a = batch.add_bytes("blob 1".as_bytes()).await?;
130+
let b = batch.add_bytes("blob 2".as_bytes()).await?;
131+
let expires_at = SystemTime::now().checked_add(Duration::from_secs(10)).unwrap();
132+
create_expiring_tag(blobs.client(), &[*a.hash(), *b.hash()], "expiring", expires_at).await?;
133+
134+
// add a single blob and tag it with an expiry date 60 seconds in the future
135+
let c = batch.add_bytes("blob 3".as_bytes()).await?;
136+
let expires_at = SystemTime::now().checked_add(Duration::from_secs(60)).unwrap();
137+
create_expiring_tag(blobs.client(), &[*c.hash()], "expiring", expires_at).await?;
138+
// batch goes out of scope, so data is only protected by the tags we created
139+
}
140+
let client = blobs.client().clone();
141+
142+
// delete expired tags every 5 seconds
143+
let check_task = tokio::spawn(delete_expired_tags_task(client.clone(), "expiring"));
144+
// print tags every 5 seconds
145+
let print_tags_task = tokio::spawn(print_tags_task(client.clone()));
146+
// print blobs every 5 seconds
147+
let print_blobs_task = tokio::spawn(print_blobs_task(client));
148+
149+
ctrl_c().await?;
150+
router.shutdown().await?;
151+
check_task.abort();
152+
print_tags_task.abort();
153+
print_blobs_task.abort();
154+
Ok(())
155+
}

0 commit comments

Comments
 (0)