forked from rs-ipfs/rust-ipfs
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkademlia.rs
203 lines (169 loc) · 7.42 KB
/
kademlia.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
use cid::{Cid, Codec};
use ipfs::{p2p::MultiaddrWithPeerId, Block, Node};
use libp2p::{kad::Quorum, multiaddr::Protocol, Multiaddr};
use multihash::Sha2_256;
use tokio::time::timeout;
use std::{convert::TryInto, time::Duration};
mod common;
use common::{interop::ForeignNode, spawn_nodes, Topology};
fn strip_peer_id(addr: Multiaddr) -> Multiaddr {
let MultiaddrWithPeerId { multiaddr, .. } = addr.try_into().unwrap();
multiaddr.into()
}
/// Check if `Ipfs::find_peer` works without DHT involvement.
#[tokio::test(max_threads = 1)]
async fn find_peer_local() {
let nodes = spawn_nodes(2, Topology::None).await;
nodes[0].connect(nodes[1].addrs[0].clone()).await.unwrap();
// while nodes[0] is connected to nodes[1], they know each
// other's addresses and can find them without using the DHT
let mut found_addrs = nodes[0].find_peer(nodes[1].id.clone()).await.unwrap();
for addr in &mut found_addrs {
addr.push(Protocol::P2p(nodes[1].id.clone().into()));
assert!(nodes[1].addrs.contains(addr));
}
}
// starts the specified number of rust IPFS nodes connected in a chain.
#[cfg(all(not(feature = "test_go_interop"), not(feature = "test_js_interop")))]
async fn spawn_bootstrapped_nodes(n: usize) -> (Vec<Node>, Option<ForeignNode>) {
// fire up `n` nodes
let nodes = spawn_nodes(n, Topology::None).await;
// register the nodes' addresses so they can bootstrap against
// one another in a chain; node 0 is only aware of the existence
// of node 1 and so on; bootstrap them eagerly/quickly, so that
// they don't have a chance to form the full picture in the DHT
for i in 0..n {
let (next_id, next_addr) = if i < n - 1 {
(nodes[i + 1].id.clone(), nodes[i + 1].addrs[0].clone())
} else {
// the last node in the chain also needs to know some address
// in order to bootstrap, so give it its neighbour's information
// and then bootstrap it as well
(nodes[n - 2].id.clone(), nodes[n - 2].addrs[0].clone())
};
nodes[i].add_peer(next_id, next_addr).await.unwrap();
nodes[i].bootstrap().await.unwrap();
}
// make sure that the nodes are not actively connected to each other
// and that we are actually going to be testing the DHT here
for node in &nodes {
assert!([1usize, 2].contains(&node.peers().await.unwrap().len()));
}
(nodes, None)
}
// most of the setup is the same as in the not(feature = "test_X_interop") case, with
// the addition of a foreign node in the middle of the chain; the first half of the chain
// learns about the next peer, the foreign node being the last one, and the second half
// learns about the previous peer, the foreign node being the first one; a visualization:
// r[0] > r[1] > .. > foreign < .. < r[n - 3] < r[n - 2]
#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
async fn spawn_bootstrapped_nodes(n: usize) -> (Vec<Node>, Option<ForeignNode>) {
// start a foreign IPFS node
let foreign_node = ForeignNode::new();
// exclude one node to make room for the intermediary foreign node
let nodes = spawn_nodes(n - 1, Topology::None).await;
// skip the last index again, as there is a foreign node without one bound to it
for i in 0..(n - 1) {
let (next_id, next_addr) = if i == n / 2 - 1 || i == n / 2 {
println!("telling rust node {} about the foreign node", i);
(foreign_node.id.clone(), foreign_node.addrs[0].clone())
} else if i < n / 2 {
println!("telling rust node {} about rust node {}", i, i + 1);
(nodes[i + 1].id.clone(), nodes[i + 1].addrs[0].clone())
} else {
println!("telling rust node {} about rust node {}", i, i - 1);
(nodes[i - 1].id.clone(), nodes[i - 1].addrs[0].clone())
};
nodes[i].add_peer(next_id, next_addr).await.unwrap();
nodes[i].bootstrap().await.unwrap();
}
// in this case we can't make sure that all the nodes only have 1 or 2 peers but it's not a big
// deal, since in reality this kind of extreme conditions are unlikely and we already test that
// in the pure-rust setup
(nodes, Some(foreign_node))
}
/// Check if `Ipfs::find_peer` works using DHT.
#[tokio::test(max_threads = 1)]
async fn dht_find_peer() {
// works for numbers >=2, though 2 would essentially just
// be the same as find_peer_local, so it should be higher
const CHAIN_LEN: usize = 10;
let (nodes, foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
let last_index = CHAIN_LEN - if foreign_node.is_none() { 1 } else { 2 };
// node 0 now tries to find the address of the very last node in the
// chain; the chain should be long enough for it not to automatically
// be connected to it after the bootstrap
let found_addrs = nodes[0]
.find_peer(nodes[last_index].id.clone())
.await
.unwrap();
let to_be_found = strip_peer_id(nodes[last_index].addrs[0].clone());
assert_eq!(found_addrs, vec![to_be_found]);
}
#[tokio::test(max_threads = 1)]
async fn dht_get_closest_peers() {
const CHAIN_LEN: usize = 10;
let (nodes, _foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
assert_eq!(
nodes[0]
.get_closest_peers(nodes[0].id.clone())
.await
.unwrap()
.len(),
CHAIN_LEN - 1
);
}
#[ignore = "targets an actual bootstrapper, so random failures can happen"]
#[tokio::test(max_threads = 1)]
async fn dht_popular_content_discovery() {
let peer = Node::new("a").await;
peer.restore_bootstrappers().await.unwrap();
// the Cid of the IPFS logo
let cid: Cid = "bafkreicncneocapbypwwe3gl47bzvr3pkpxmmobzn7zr2iaz67df4kjeiq"
.parse()
.unwrap();
assert!(timeout(Duration::from_secs(10), peer.get_block(&cid))
.await
.is_ok());
}
/// Check if Ipfs::{get_providers, provide} does its job.
#[tokio::test(max_threads = 1)]
async fn dht_providing() {
const CHAIN_LEN: usize = 10;
let (nodes, foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
let last_index = CHAIN_LEN - if foreign_node.is_none() { 1 } else { 2 };
// the last node puts a block in order to have something to provide
let data = b"hello block\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
nodes[last_index]
.put_block(Block {
cid: cid.clone(),
data,
})
.await
.unwrap();
// the last node then provides the Cid
nodes[last_index].provide(cid.clone()).await.unwrap();
// and the first node should be able to learn that the last one provides it
assert!(nodes[0]
.get_providers(cid)
.await
.unwrap()
.contains(&nodes[last_index].id.clone()));
}
/// Check if Ipfs::{get, put} does its job.
#[tokio::test(max_threads = 1)]
async fn dht_get_put() {
const CHAIN_LEN: usize = 10;
let (nodes, foreign_node) = spawn_bootstrapped_nodes(CHAIN_LEN).await;
let last_index = CHAIN_LEN - if foreign_node.is_none() { 1 } else { 2 };
let (key, value) = (b"key".to_vec(), b"value".to_vec());
let quorum = Quorum::One;
// the last node puts a key+value record
nodes[last_index]
.dht_put(key.clone(), value.clone(), quorum)
.await
.unwrap();
// and the first node should be able to get it
assert_eq!(nodes[0].dht_get(key, quorum).await.unwrap(), vec![value]);
}