-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathcustom-protocol.rs
300 lines (266 loc) · 11.3 KB
/
custom-protocol.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
//! Example for adding a custom protocol to a iroh node.
//!
//! We are building a very simple custom protocol here, and make our iroh nodes speak this protocol
//! in addition to the built-in protocols (blobs, gossip, docs).
//!
//! Our custom protocol allows querying the blob store of other nodes for text matches. For
//! this, we keep a very primitive index of the UTF-8 text of our blobs.
//!
//! The example is contrived - we only use memory nodes, and our database is a hashmap in a mutex,
//! and our queries just match if the query string appears as-is in a blob.
//! Nevertheless, this shows how powerful systems can be built with custom protocols by also using
//! the existing iroh protocols (blobs in this case).
//!
//! ## Usage
//!
//! In one terminal, run
//!
//! cargo run --example custom-protocol --features=examples -- listen "hello-world" "foo-bar" "hello-moon"
//!
//! This spawns an iroh nodes with three blobs. It will print the node's node id.
//!
//! In another terminal, run
//!
//! cargo run --example custom-protocol --features=examples -- query <node-id> hello
//!
//! Replace <node-id> with the node id from above. This will connect to the listening node with our
//! custom protocol and query for the string `hello`. The listening node will return a list of
//! blob hashes that contain `hello`. We will then download all these blobs with iroh-blobs,
//! and then print a list of the hashes with their content.
//!
//! For this example, this will print:
//!
//! moobakc6gao3ufmk: hello moon
//! 25eyd35hbigiqc4n: hello world
//!
//! That's it! Follow along in the code below, we added a bunch of comments to explain things.
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use anyhow::Result;
use clap::Parser;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{
endpoint::Connection,
protocol::{ProtocolHandler, Router},
Endpoint, NodeId,
};
use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::MemClient, Hash};
use tracing_subscriber::{prelude::*, EnvFilter};
#[derive(Debug, Parser)]
pub struct Cli {
#[clap(subcommand)]
command: Command,
}
#[derive(Debug, Parser)]
pub enum Command {
/// Spawn a node in listening mode.
Listen {
/// Each text string will be imported as a blob and inserted into the search database.
text: Vec<String>,
},
/// Query a remote node for data and print the results.
Query {
/// The node id of the node we want to query.
node_id: NodeId,
/// The text we want to match.
query: String,
},
}
/// Each custom protocol is identified by its ALPN string.
///
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
/// and the connection is aborted unless both nodes pass the same bytestring.
const ALPN: &[u8] = b"iroh-example/text-search/0";
#[tokio::main]
async fn main() -> Result<()> {
setup_logging();
let args = Cli::parse();
// Build a in-memory node. For production code, you'd want a persistent node instead usually.
let endpoint = Endpoint::builder().bind().await?;
let builder = Router::builder(endpoint);
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let blobs_client = blobs.client();
// Build our custom protocol handler. The `builder` exposes access to various subsystems in the
// iroh node. In our case, we need a blobs client and the endpoint.
let proto = BlobSearch::new(blobs_client.clone(), builder.endpoint().clone());
// Add our protocol, identified by our ALPN, to the node, and spawn the node.
let builder = builder.accept(ALPN, proto.clone());
let node = builder.spawn().await?;
match args.command {
Command::Listen { text } => {
let node_id = node.endpoint().node_id();
println!("our node id: {node_id}");
// Insert the text strings as blobs and index them.
for text in text.into_iter() {
proto.insert_and_index(text).await?;
}
// Wait for Ctrl-C to be pressed.
tokio::signal::ctrl_c().await?;
}
Command::Query { node_id, query } => {
// Query the remote node.
// This will send the query over our custom protocol, read hashes on the reply stream,
// and download each hash over iroh-blobs.
let hashes = proto.query_remote(node_id, &query).await?;
// Print out our query results.
for hash in hashes {
read_and_print(blobs_client, hash).await?;
}
}
}
node.shutdown().await?;
Ok(())
}
#[derive(Debug, Clone)]
struct BlobSearch {
blobs: MemClient,
endpoint: Endpoint,
index: Arc<Mutex<HashMap<String, Hash>>>,
}
impl ProtocolHandler for BlobSearch {
/// The `accept` method is called for each incoming connection for our ALPN.
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(&self, connection: Connection) -> BoxedFuture<Result<()>> {
let this = self.clone();
// We have to return a boxed future from the handler.
Box::pin(async move {
// We can get the remote's node id from the connection.
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");
// Our protocol is a simple request-response protocol, so we expect the
// connecting peer to open a single bi-directional stream.
let (mut send, mut recv) = connection.accept_bi().await?;
// We read the query from the receive stream, while enforcing a max query length.
let query_bytes = recv.read_to_end(64).await?;
// Now, we can perform the actual query on our local database.
let query = String::from_utf8(query_bytes)?;
let hashes = this.query_local(&query);
// We want to return a list of hashes. We do the simplest thing possible, and just send
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
// very easy to parse on the other end.
for hash in hashes {
send.write_all(hash.as_bytes()).await?;
}
// By calling `finish` on the send stream we signal that we will not send anything
// further, which makes the receive stream on the other end terminate.
send.finish()?;
// By calling stopped we wait until the remote iroh Endpoint has acknowledged
// all data. This does not mean the remote application has received all data
// from the Endpoint.
send.stopped().await?;
Ok(())
})
}
}
impl BlobSearch {
/// Create a new protocol handler.
pub fn new(blobs: MemClient, endpoint: Endpoint) -> Arc<Self> {
Arc::new(Self {
blobs,
endpoint,
index: Default::default(),
})
}
/// Query a remote node, download all matching blobs and print the results.
pub async fn query_remote(&self, node_id: NodeId, query: &str) -> Result<Vec<Hash>> {
// Establish a connection to our node.
// We use the default node discovery in iroh, so we can connect by node id without
// providing further information.
let conn = self.endpoint.connect(node_id, ALPN).await?;
// Open a bi-directional in our connection.
let (mut send, mut recv) = conn.open_bi().await?;
// Send our query.
send.write_all(query.as_bytes()).await?;
// Finish the send stream, signalling that no further data will be sent.
// This makes the `read_to_end` call on the accepting side terminate.
send.finish()?;
// By calling stopped we wait until the remote iroh Endpoint has acknowledged all
// data. This does not mean the remote application has received all data from the
// Endpoint.
send.stopped().await?;
// In this example, we simply collect all results into a vector.
// For real protocols, you'd usually want to return a stream of results instead.
let mut out = vec![];
// The response is sent as a list of 32-byte long hashes.
// We simply read one after the other into a byte buffer.
let mut hash_bytes = [0u8; 32];
loop {
// Read 32 bytes from the stream.
match recv.read_exact(&mut hash_bytes).await {
// FinishedEarly means that the remote side did not send further data,
// so in this case we break our loop.
Err(iroh::endpoint::ReadExactError::FinishedEarly(_)) => break,
// Other errors are connection errors, so we bail.
Err(err) => return Err(err.into()),
Ok(_) => {}
};
// Upcast the raw bytes to the `Hash` type.
let hash = Hash::from_bytes(hash_bytes);
// Download the content via iroh-blobs.
self.blobs.download(hash, node_id.into()).await?.await?;
// Add the blob to our local database.
self.add_to_index(hash).await?;
out.push(hash);
}
Ok(out)
}
/// Query the local database.
///
/// Returns the list of hashes of blobs which contain `query` literally.
pub fn query_local(&self, query: &str) -> Vec<Hash> {
let db = self.index.lock().unwrap();
db.iter()
.filter_map(|(text, hash)| text.contains(query).then_some(*hash))
.collect::<Vec<_>>()
}
/// Insert a text string into the database.
///
/// This first imports the text as a blob into the iroh blob store, and then inserts a
/// reference to that hash in our (primitive) text database.
pub async fn insert_and_index(&self, text: String) -> Result<Hash> {
let hash = self.blobs.add_bytes(text.into_bytes()).await?.hash;
self.add_to_index(hash).await?;
Ok(hash)
}
/// Index a blob which is already in our blob store.
///
/// This only indexes complete blobs that are smaller than 1KiB.
///
/// Returns `true` if the blob was indexed.
async fn add_to_index(&self, hash: Hash) -> Result<bool> {
let mut reader = self.blobs.read(hash).await?;
// Skip blobs larger than 1KiB.
if reader.size() > 1024 * 1024 {
return Ok(false);
}
let bytes = reader.read_to_bytes().await?;
match String::from_utf8(bytes.to_vec()) {
Ok(text) => {
let mut db = self.index.lock().unwrap();
db.insert(text, hash);
Ok(true)
}
Err(_err) => Ok(false),
}
}
}
/// Read a blob from the local blob store and print it to STDOUT.
async fn read_and_print(blobs: &MemClient, hash: Hash) -> Result<()> {
let content = blobs.read_to_bytes(hash).await?;
let message = String::from_utf8(content.to_vec())?;
println!("{}: {message}", hash.fmt_short());
Ok(())
}
/// Set the RUST_LOG env var to one of {debug,info,warn} to see logging.
fn setup_logging() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::from_default_env())
.try_init()
.ok();
}