Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

fix restore_bootstrappers doesn't enable content discovery #406

Merged
merged 16 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Next

* fix: restore_bootstrappers doesn't enable content discovery [#406]

[#406]: https://github.com/rs-ipfs/rust-ipfs/pull/406

# 0.2.0

First real release, with big changes and feature improvements. Started tracking
Expand Down
67 changes: 48 additions & 19 deletions examples/fetch_and_cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@ async fn main() {
// The other connecting or connected peer must be providing the requested CID or this will hang
// forever.

let (path, target) = match parse_options() {
let (bootstrappers, path, target) = match parse_options() {
Ok(Some(tuple)) => tuple,
Ok(None) => {
eprintln!("Usage: fetch_and_cat <IPFS_PATH | CID> [MULTIADDR]");
eprintln!(
"Example will accept connections and print all bytes of the unixfs file to \
stdout."
"Usage: fetch_and_cat [--default-bootstrappers] <IPFS_PATH | CID> [MULTIADDR]"
);
eprintln!();
eprintln!(
"Example will try to find the file by the given IPFS_PATH and print its contents to stdout."
);
eprintln!();
eprintln!("The example has three modes in the order of precedence:");
eprintln!(
"1. When --default-bootstrappers is given, use default bootstrappers to find the content"
);
eprintln!(
"2. When IPFS_PATH and MULTIADDR are given, connect to MULTIADDR to get the file"
);
eprintln!(
"3. When only IPFS_PATH is given, wait to be connected to by another ipfs node"
);
eprintln!("If second argument is present, it is expected to be a Multiaddr with \
peer_id. The given Multiaddr will be connected to instead of awaiting an incoming connection.");
exit(0);
}
Err(e) => {
Expand All @@ -54,7 +65,11 @@ async fn main() {
// the libp2p.
tokio::task::spawn(fut);

if let Some(target) = target {
if bootstrappers == BootstrapperOption::RestoreDefault {
// applications wishing to find content on the global IPFS swarm should restore the latest
// bootstrappers which are hopefully updated between releases
ipfs.restore_bootstrappers().await.unwrap();
} else if let Some(target) = target {
ipfs.connect(target).await.unwrap();
} else {
let (_, addresses) = ipfs.identity().await.unwrap();
Expand All @@ -81,20 +96,12 @@ async fn main() {
pin_mut!(stream);

let mut stdout = tokio::io::stdout();
let mut total = 0;

loop {
// This could be made more performant by polling the stream while writing to stdout.
match stream.next().await {
Some(Ok(bytes)) => {
total += bytes.len();
stdout.write_all(&bytes).await.unwrap();

eprintln!(
"Received: {:>12} bytes, Total: {:>12} bytes",
bytes.len(),
total
);
}
Some(Err(e)) => {
eprintln!("Error: {}", e);
Expand All @@ -103,12 +110,34 @@ async fn main() {
None => break,
}
}
}

eprintln!("Total received: {} bytes", total);
#[derive(PartialEq)]
enum BootstrapperOption {
RestoreDefault,
ConnectionsOnly,
}

fn parse_options() -> Result<Option<(IpfsPath, Option<MultiaddrWithPeerId>)>, Error> {
let mut args = env::args().skip(1);
fn parse_options(
) -> Result<Option<(BootstrapperOption, IpfsPath, Option<MultiaddrWithPeerId>)>, Error> {
let mut args = env::args().skip(1).peekable();

// by default use only the manual connections
let mut bootstrappers = BootstrapperOption::ConnectionsOnly;

while let Some(option) = args.peek() {
if !option.starts_with("--") {
break;
}

let option = args.next().expect("already checked when peeking");

if option == "--default-bootstrappers" {
bootstrappers = BootstrapperOption::RestoreDefault;
} else {
return Err(anyhow::format_err!("unknown option: {}", option));
}
}

let path = if let Some(path) = args.next() {
path.parse::<IpfsPath>()
Expand All @@ -129,5 +158,5 @@ fn parse_options() -> Result<Option<(IpfsPath, Option<MultiaddrWithPeerId>)>, Er
None
};

Ok(Some((path, target)))
Ok(Some((bootstrappers, path, target)))
}
27 changes: 21 additions & 6 deletions http/src/v0/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use warp::{query, Filter, Rejection, Reply};

#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
peers: Vec<String>,
struct Response<S: AsRef<str>> {
peers: Vec<S>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -48,7 +48,8 @@ pub struct BootstrapAddQuery {
timeout: Option<StringSerialized<humantime::Duration>>,
}

// used in both bootstrap_add_query and bootstrap_restore_query
// optionally timed-out wrapper around [`Ipfs::restore_bootstrappers`] with stringified errors, used
// in both bootstrap_add_query and bootstrap_restore_query
async fn restore_helper<T: IpfsTypes>(
ipfs: Ipfs<T>,
timeout: &Option<StringSerialized<humantime::Duration>>,
Expand Down Expand Up @@ -82,7 +83,14 @@ async fn bootstrap_add_query<T: IpfsTypes>(
.map_err(StringError::from)?
.to_string()]
} else if default == Some(true) {
restore_helper(ipfs, &timeout).await?
// HTTP api documents `?default=true` as deprecated
let _ = restore_helper(ipfs, &timeout).await?;

// return a list of all known bootstrap nodes as js-ipfs does
ipfs::config::BOOTSTRAP_NODES
.iter()
.map(|&s| String::from(s))
.collect()
} else {
return Err(warp::reject::custom(StringError::from(
"invalid query string",
Expand All @@ -94,6 +102,7 @@ async fn bootstrap_add_query<T: IpfsTypes>(
Ok(warp::reply::json(&response))
}

/// https://docs.ipfs.io/reference/http/api/#api-v0-bootstrap-add
pub fn bootstrap_add<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
Expand All @@ -107,7 +116,8 @@ pub struct BootstrapClearQuery {
timeout: Option<StringSerialized<humantime::Duration>>,
}

// used in both bootstrap_clear_query and bootstrap_rm_query
// optionally timed-out wrapper over [`Ipfs::clear_bootstrappers`] used in both
// `bootstrap_clear_query` and `bootstrap_rm_query`.
async fn clear_helper<T: IpfsTypes>(
ipfs: Ipfs<T>,
timeout: &Option<StringSerialized<humantime::Duration>>,
Expand Down Expand Up @@ -192,12 +202,17 @@ async fn bootstrap_restore_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: BootstrapRestoreQuery,
) -> Result<impl Reply, Rejection> {
let peers = restore_helper(ipfs, &query.timeout).await?;
let _ = restore_helper(ipfs, &query.timeout).await?;

// similar to add?default=true; returns a list of all bootstrap nodes, not only the added ones
let peers = ipfs::config::BOOTSTRAP_NODES.to_vec();
let response = Response { peers };

Ok(warp::reply::json(&response))
}

/// https://docs.ipfs.io/reference/http/api/#api-v0-bootstrap-add-default, similar functionality
/// also available via /bootstrap/add?default=true through [`bootstrap_add`].
pub fn bootstrap_restore<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
Expand Down
16 changes: 16 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
//! Static configuration (the bootstrap node(s)).

/// The supported bootstrap nodes (/dnsaddr is not yet supported). This will be updated to contain
/// the latest known supported IPFS bootstrap peers.
// FIXME: it would be nice to parse these into MultiaddrWithPeerId with const fn.
pub const BOOTSTRAP_NODES: &[&str] =
&["/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"];

#[cfg(test)]
mod tests {
use crate::p2p::MultiaddrWithPeerId;

#[test]
fn bootstrap_nodes_are_multiaddr_with_peerid() {
super::BOOTSTRAP_NODES
.iter()
.try_for_each(|s| s.parse::<MultiaddrWithPeerId>().map(|_| ()))
.unwrap();
}
}
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! [js-ipfs]: https://github.com/ipfs/js-ipfs/
//#![deny(missing_docs)]

mod config;
pub mod config;
pub mod dag;
pub mod error;
#[macro_use]
Expand Down Expand Up @@ -1113,6 +1113,8 @@ impl<Types: IpfsTypes> Ipfs<Types> {
}

/// Extend the list of used bootstrapper nodes with an additional address.
/// Return value cannot be used to determine if the `addr` was a new bootstrapper, subject to
/// change.
pub async fn add_bootstrapper(&self, addr: MultiaddrWithPeerId) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
Expand All @@ -1129,6 +1131,8 @@ impl<Types: IpfsTypes> Ipfs<Types> {
}

/// Remove an address from the currently used list of bootstrapper nodes.
/// Return value cannot be used to determine if the `addr` was an actual bootstrapper, subject to
/// change.
pub async fn remove_bootstrapper(&self, addr: MultiaddrWithPeerId) -> Result<Multiaddr, Error> {
async move {
let (tx, rx) = oneshot_channel();
Expand Down
74 changes: 60 additions & 14 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,12 +607,15 @@ impl<Types: IpfsTypes> Behaviour<Types> {
addr: MultiaddrWithPeerId,
) -> Result<Multiaddr, anyhow::Error> {
let ret = addr.clone().into();
self.swarm.bootstrappers.insert(addr.clone());
let MultiaddrWithPeerId {
multiaddr: _,
peer_id,
} = addr.clone();
self.kademlia.add_address(&peer_id, addr.into());
if self.swarm.bootstrappers.insert(addr.clone()) {
let MultiaddrWithPeerId {
multiaddr: ma,
peer_id,
} = addr;
self.kademlia.add_address(&peer_id, ma.into());
// the return value of add_address doesn't implement Debug
trace!(peer_id=%peer_id, "tried to add a bootstrapper");
}
Ok(ret)
}

Expand All @@ -621,24 +624,67 @@ impl<Types: IpfsTypes> Behaviour<Types> {
addr: MultiaddrWithPeerId,
) -> Result<Multiaddr, anyhow::Error> {
let ret = addr.clone().into();
self.swarm.bootstrappers.remove(&addr);
if self.swarm.bootstrappers.remove(&addr) {
let peer_id = addr.peer_id;
let prefix: Multiaddr = addr.multiaddr.into();

if let Some(e) = self.kademlia.remove_address(&peer_id, &prefix) {
info!(peer_id=%peer_id, status=?e.status, "removed bootstrapper");
} else {
warn!(peer_id=%peer_id, "attempted to remove an unknown bootstrapper");
}
}
Ok(ret)
}

pub fn clear_bootstrappers(&mut self) -> Vec<Multiaddr> {
self.swarm.bootstrappers.drain().map(|a| a.into()).collect()
let removed = self.swarm.bootstrappers.drain();
let mut ret = Vec::with_capacity(removed.len());

for addr_with_peer_id in removed {
let peer_id = &addr_with_peer_id.peer_id;
let prefix: Multiaddr = addr_with_peer_id.multiaddr.clone().into();

if let Some(e) = self.kademlia.remove_address(peer_id, &prefix) {
info!(peer_id=%peer_id, status=?e.status, "cleared bootstrapper");
ret.push(addr_with_peer_id.into());
} else {
error!(peer_id=%peer_id, "attempted to clear an unknown bootstrapper");
}
}

ret
}

pub fn restore_bootstrappers(&mut self) -> Result<Vec<Multiaddr>, anyhow::Error> {
let mut ret = Vec::new();

for addr in BOOTSTRAP_NODES {
let addr = addr.parse::<MultiaddrWithPeerId>().unwrap();
self.swarm.bootstrappers.insert(addr);
let addr = addr
.parse::<MultiaddrWithPeerId>()
.expect("see test bootstrap_nodes_are_multiaddr_with_peerid");
if self.swarm.bootstrappers.insert(addr.clone()) {
let MultiaddrWithPeerId {
multiaddr: ma,
peer_id,
} = addr.clone();

// this is intentionally the multiaddr without peerid turned into plain multiaddr:
// libp2p cannot dial addresses which include peerids.
let ma: Multiaddr = ma.into();

// same as with add_bootstrapper: the return value from kademlia.add_address
// doesn't implement Debug
self.kademlia.add_address(&peer_id, ma.clone());
trace!(peer_id=%peer_id, "tried to restore a bootstrapper");

// report with the peerid
let reported: Multiaddr = addr.into();
ret.push(reported);
}
}

Ok(BOOTSTRAP_NODES
.iter()
.map(|addr| addr.parse().unwrap())
.collect())
Ok(ret)
}
}

Expand Down
15 changes: 2 additions & 13 deletions tests/kademlia.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use cid::{Cid, Codec};
use ipfs::{p2p::MultiaddrWithPeerId, Block, Node};
use libp2p::{kad::Quorum, multiaddr::Protocol, Multiaddr, PeerId};
use libp2p::{kad::Quorum, multiaddr::Protocol, Multiaddr};
use multihash::Sha2_256;
use tokio::time::timeout;

Expand Down Expand Up @@ -139,20 +139,9 @@ async fn dht_get_closest_peers() {
#[ignore = "targets an actual bootstrapper, so random failures can happen"]
#[tokio::test(max_threads = 1)]
async fn dht_popular_content_discovery() {
let (bootstrapper_id, bootstrapper_addr): (PeerId, Multiaddr) = (
"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"
.parse()
.unwrap(),
"/ip4/104.131.131.82/tcp/4001".parse().unwrap(),
);

let peer = Node::new("a").await;

// connect it to one of the well-known bootstrappers
assert!(peer
.add_peer(bootstrapper_id, bootstrapper_addr)
.await
.is_ok());
peer.restore_bootstrappers().await.unwrap();

// the Cid of the IPFS logo
let cid: Cid = "bafkreicncneocapbypwwe3gl47bzvr3pkpxmmobzn7zr2iaz67df4kjeiq"
Expand Down