diff --git a/.circleci/config.yml b/.circleci/config.yml index 1ca9c60d8b..7738aa70bb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -10,7 +10,7 @@ jobs: - checkout - restore_cache: keys: - - fluence01-{{ checksum "server/Cargo.lock" }} + - fluence01-{{ checksum "Cargo.lock" }} - run: | rustup toolchain install stable rustup component add rustfmt @@ -23,7 +23,7 @@ jobs: paths: - ~/.cargo - ~/.rustup - key: fluence01-{{ checksum "server/Cargo.lock" }} + key: fluence01-{{ checksum "Cargo.lock" }} workflows: version: 2 diff --git a/.gitattributes b/.gitattributes index 6bcb07f82b..a9336a01b7 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,2 +1 @@ *.wasm filter=lfs diff=lfs merge=lfs -text -bench/* linguist-detectable=false diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index d7c884b0ee..0000000000 --- a/.gitmodules +++ /dev/null @@ -1,4 +0,0 @@ -[submodule "docs/book"] - path = docs/book - url = git@github.com:fluencelabs/docs.git - branch = master diff --git a/Cargo.lock b/Cargo.lock index 357aa4cd1d..6a3033e8f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,9 +84,9 @@ checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" [[package]] name = "arc-swap" -version = "0.4.7" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" +checksum = "b585a98a234c46fc563103e9278c9391fde1f4e6850334da895d27edb9580f62" [[package]] name = "arrayref" @@ -128,6 +128,36 @@ dependencies = [ "syn", ] +[[package]] +name = "async-h1" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "effd501febce09534b924aa471e6a7fd689071fee63659473413f62a1979ae56" +dependencies = [ + "async-std", + "byte-pool", + "futures-core", + "http-types", + "httparse", + "lazy_static", + "log", + "pin-project-lite", + "url 2.1.1", +] + +[[package]] +name = "async-sse" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6de73294dd1287b32b10f4c884186446353048f183071dff38c1481f82c053b3" +dependencies = [ + "async-std", + "http-types", + "log", + "memchr", + "pin-project-lite", +] + [[package]] name = "async-std" version = "1.5.0" @@ -345,6 +375,16 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" +[[package]] +name = "byte-pool" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9342e102eac8b1879fbedf9a7e0572c40b0cc5805b663c4d4ca791cae0bae221" +dependencies = [ + "crossbeam-queue", + "stable_deref_trait", +] + [[package]] name = "byte-tools" version = "0.3.1" @@ -471,6 +511,16 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "cookie" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" +dependencies = [ + "time", + "url 1.7.2", +] + [[package]] name = "core-foundation" version = "0.7.0" @@ -717,6 +767,16 @@ dependencies = [ "termcolor", ] +[[package]] +name = "error-chain" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d371106cc88ffdfb1eabd7111e432da544f16f3e2d7bf1dfe8bf575f1df045cd" +dependencies = [ + "backtrace", + "version_check", +] + [[package]] name = "faas-api" version = "0.0.5" @@ -737,7 +797,7 @@ dependencies = [ "serde 1.0.111", "serde_json", "trust-graph", - "url", + "url 2.1.1", ] [[package]] @@ -795,6 +855,7 @@ dependencies = [ "async-timer", "bencher", "bs58", + "clap", "config", "ctrlc-adapter", "env_logger", @@ -827,6 +888,7 @@ dependencies = [ "async-std", "async-timer", "bs58", + "clap", "ctrlc-adapter", "env_logger", "faas-api", @@ -887,12 +949,14 @@ dependencies = [ "fluence-libp2p", "futures 0.3.5", "futures-util", + "http-types", "itertools 0.9.0", "libp2p", "log", "multihash", "once_cell", "parity-multiaddr", + "prometheus", "prost", "quickcheck", "rand 0.7.3", @@ -900,9 +964,10 @@ dependencies = [ "serde_json", "serde_with", "smallvec 1.4.0", + "tide", "trust-graph", "unsigned-varint", - "url", + "url 2.1.1", "uuid", "void", ] @@ -1219,6 +1284,43 @@ dependencies = [ "http", ] +[[package]] +name = "http-service" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c574f90a9567a12b72e9071ab541704ce8d2bea45a950c33991556a1880cd32" +dependencies = [ + "async-std", + "http-types", +] + +[[package]] +name = "http-service-h1" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b10c11288f86378ecdab7d468e5313304f8dfcffee3107f151f946e3b00271df" +dependencies = [ + "async-h1", + "async-std", + "http-service", + "http-types", +] + +[[package]] +name = "http-types" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05af75a78dfeb163d472b1d27bebb6a8845917a069accdf53a9bed47aaff9bfc" +dependencies = [ + "anyhow", + "async-std", + "cookie", + "infer", + "omnom", + "pin-project-lite", + "url 2.1.1", +] + [[package]] name = "httparse" version = "1.3.4" @@ -1271,6 +1373,17 @@ dependencies = [ "tokio-tls", ] +[[package]] +name = "idna" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.2.0" @@ -1291,6 +1404,15 @@ dependencies = [ "autocfg 1.0.0", ] +[[package]] +name = "infer" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d55c406a76164eb346a829ed4b97b73cb06259078eca01adeb12e8ca308d4123" +dependencies = [ + "byteorder 1.3.4", +] + [[package]] name = "iovec" version = "0.1.4" @@ -1770,7 +1892,7 @@ dependencies = [ "rustls", "rw-stream-sink", "soketto", - "url", + "url 2.1.1", "webpki", "webpki-roots 0.18.0", ] @@ -2140,6 +2262,15 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cbca9424c482ee628fa549d9c812e2cd22f1180b9222c9200fdfa6eb31aecb2" +[[package]] +name = "omnom" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6b216cee2e0d6e680f73158d15468c80b39e571c11669cd90556f9a644e9fd3" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.4.0" @@ -2174,9 +2305,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" [[package]] name = "openssl-sys" -version = "0.9.58" +version = "0.9.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a842db4709b604f0fe5d1170ae3565899be2ad3d9cbc72dedc789ac0511f78de" +checksum = "7410fef80af8ac071d4f63755c0ab89ac3df0fd1ea91f1d1f37cf5cec4395990" dependencies = [ "autocfg 1.0.0", "cc", @@ -2195,11 +2326,11 @@ dependencies = [ "byteorder 1.3.4", "data-encoding", "multihash", - "percent-encoding", + "percent-encoding 2.1.0", "serde 1.0.111", "static_assertions 1.1.0", "unsigned-varint", - "url", + "url 2.1.1", ] [[package]] @@ -2285,6 +2416,12 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "percent-encoding" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" + [[package]] name = "percent-encoding" version = "2.1.0" @@ -2303,18 +2440,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.20" +version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919" +checksum = "ba3a1acf4a3e70849f8a673497ef984f043f95d2d8252dcdf74d54e6a1e47e8a" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.20" +version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d" +checksum = "194e88048b71a3e02eb4ee36a6995fed9b8236c11a7bb9f7247a9d9835b3f265" dependencies = [ "proc-macro2", "quote", @@ -2353,9 +2490,9 @@ checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" [[package]] name = "proc-macro-nested" -version = "0.1.5" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de" +checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" [[package]] name = "proc-macro2" @@ -2468,9 +2605,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.7" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +checksum = "54a21852a652ad6f610c9510194f398ff6f8692e334fd1145fed931f7fbe44ea" dependencies = [ "proc-macro2", ] @@ -2728,13 +2865,13 @@ dependencies = [ "mime", "mime_guess", "native-tls", - "percent-encoding", + "percent-encoding 2.1.0", "pin-project-lite", "serde 1.0.111", "serde_urlencoded", "tokio", "tokio-tls", - "url", + "url 2.1.1", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -2756,6 +2893,12 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "route-recognizer" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea509065eb0b3c446acdd0102f0d46567dc30902dc0be91d6552035d92b0f4f8" + [[package]] name = "rust-ini" version = "0.13.0" @@ -2968,6 +3111,18 @@ dependencies = [ "serde 1.0.111", ] +[[package]] +name = "serde_qs" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d43eef44996bbe16e99ac720e1577eefa16f7b76b5172165c98ced20ae9903e1" +dependencies = [ + "data-encoding", + "error-chain", + "percent-encoding 1.0.1", + "serde 1.0.111", +] + [[package]] name = "serde_test" version = "0.8.23" @@ -2986,7 +3141,7 @@ dependencies = [ "dtoa", "itoa", "serde 1.0.111", - "url", + "url 2.1.1", ] [[package]] @@ -3141,6 +3296,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "stable_deref_trait" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" + [[package]] name = "static_assertions" version = "0.3.4" @@ -3264,6 +3425,29 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "tide" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf955af2fe3c761498002bd8d0b677795a262a62fc761968d5de7e4de7912a99" +dependencies = [ + "async-sse", + "async-std", + "cookie", + "futures-core", + "http-service", + "http-service-h1", + "http-types", + "kv-log-macro", + "mime", + "mime_guess", + "route-recognizer", + "serde 1.0.111", + "serde_json", + "serde_qs", + "url 2.1.1", +] + [[package]] name = "time" version = "0.1.43" @@ -3460,15 +3644,26 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "url" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" +dependencies = [ + "idna 0.1.5", + "matches", + "percent-encoding 1.0.1", +] + [[package]] name = "url" version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" dependencies = [ - "idna", + "idna 0.2.0", "matches", - "percent-encoding", + "percent-encoding 2.1.0", "serde 1.0.111", ] diff --git a/client/fluence-ipfs/Cargo.toml b/client/fluence-ipfs/Cargo.toml index 32f450ba20..2e415e2570 100644 --- a/client/fluence-ipfs/Cargo.toml +++ b/client/fluence-ipfs/Cargo.toml @@ -28,5 +28,6 @@ itertools = "0.9.0" uuid = { version = "0.8.1", features = ["serde", "v4"] } async-timer = { version = "0.7.3", features = ["stream"] } once_cell = "1.3.1" +clap = "2.33" [dev-dependencies] diff --git a/client/fluence-ipfs/src/lib.rs b/client/fluence-ipfs/src/lib.rs index fb6d4e19b6..949b375262 100644 --- a/client/fluence-ipfs/src/lib.rs +++ b/client/fluence-ipfs/src/lib.rs @@ -28,7 +28,7 @@ use async_timer::Interval; use faas_api::{relay, Address, FunctionCall, Protocol}; -use fluence_client::{Client, ClientCommand, ClientEvent}; +use fluence_client::{Client, ClientEvent}; use futures::{channel::oneshot::Receiver, select, FutureExt, StreamExt}; use libp2p::identity::ed25519::Keypair; use libp2p::PeerId; @@ -129,7 +129,7 @@ pub async fn run_ipfs_multiaddr_service( bootstrap_id.clone().unwrap(), client.peer_id.clone(), reply_to, msg_id, &ipfs, &client.key_pair ); if let Some(node) = bootstrap_id.clone() { - client.send(ClientCommand::Call { node, call }) + client.send(call, node) } else { log::warn!("Can't send {} reply: bootstrap hasn't connected yed", IPFS_SERVICE.deref()); } @@ -150,10 +150,7 @@ pub async fn run_ipfs_multiaddr_service( let call = register_call(client.peer_id.clone(), bootstrap_id.clone(), IPFS_SERVICE_ID, &client.key_pair); log::info!("Sending register call {:?}", call); - client.send(ClientCommand::Call { - node: bootstrap_id, - call, - }); + client.send(call, bootstrap_id); } } _ = stop.next() => { diff --git a/client/rust-libp2p/Cargo.toml b/client/rust-libp2p/Cargo.toml index 6eaf928d07..4e1fe0845e 100644 --- a/client/rust-libp2p/Cargo.toml +++ b/client/rust-libp2p/Cargo.toml @@ -26,6 +26,7 @@ rand = "0.7.3" uuid = { version = "0.8.1", features = ["v4"] } itertools = "0.9.0" bencher = "0.1.5" +clap = "2.33.1" [dev-dependencies] rand = "0.7.3" diff --git a/client/rust-libp2p/src/client.rs b/client/rust-libp2p/src/client.rs index 39d3b8cc43..50ff4d8956 100644 --- a/client/rust-libp2p/src/client.rs +++ b/client/rust-libp2p/src/client.rs @@ -14,11 +14,9 @@ * limitations under the License. */ -use crate::{ - behaviour::ClientBehaviour, command::ClientCommand, function_call_api::FunctionCallApi, - ClientEvent, -}; +use crate::{behaviour::ClientBehaviour, function_call_api::FunctionCallApi, ClientEvent}; use async_std::{task, task::JoinHandle}; +use faas_api::{relay, Address, FunctionCall, Protocol}; use fluence_libp2p::{ build_memory_transport, build_transport, types::{Inlet, OneshotOutlet, Outlet}, @@ -45,12 +43,18 @@ impl Transport { } } +#[derive(Debug)] +struct Command { + node: PeerId, + call: FunctionCall, +} + #[derive(Debug)] pub struct Client { pub key_pair: ed25519::Keypair, pub peer_id: PeerId, /// Channel to send commands to node - relay_outlet: Outlet, + relay_outlet: Outlet, /// Stream of messages received from node client_inlet: Fuse>, stop_outlet: OneshotOutlet<()>, @@ -58,7 +62,7 @@ pub struct Client { impl Client { fn new( - relay_outlet: Outlet, + relay_outlet: Outlet, client_inlet: Inlet, stop_outlet: OneshotOutlet<()>, ) -> Self { @@ -74,8 +78,8 @@ impl Client { } } - pub fn send(&self, cmd: ClientCommand) { - if let Err(err) = self.relay_outlet.unbounded_send(cmd) { + pub fn send(&self, call: FunctionCall, node: PeerId) { + if let Err(err) = self.relay_outlet.unbounded_send(Command { node, call }) { let err_msg = format!("{:?}", err); let msg = err.into_inner(); log::warn!("Unable to send msg {:?}: {:?}", msg, err_msg) @@ -92,6 +96,16 @@ impl Client { } } + pub fn relay_address(&self, node: PeerId) -> Address { + let addr = relay!(node, self.peer_id.clone()); + let sig = self.sign(addr.path().as_bytes()); + addr.append(Protocol::Signature(sig)) + } + + pub fn sign(&self, bytes: &[u8]) -> Vec { + self.key_pair.sign(bytes) + } + fn dial( &self, relay: Multiaddr, @@ -178,12 +192,9 @@ impl Client { Ok((client, task)) } - fn send_to_node>( - swarm: &mut S, - cmd: ClientCommand, - ) { + fn send_to_node>(swarm: &mut S, cmd: Command) { match cmd { - ClientCommand::Call { node, call } => swarm.call(node, call), + Command { node, call } => swarm.call(node, call), } } diff --git a/client/rust-libp2p/src/command.rs b/client/rust-libp2p/src/command.rs index 22f34cd75c..ea902346d9 100644 --- a/client/rust-libp2p/src/command.rs +++ b/client/rust-libp2p/src/command.rs @@ -15,17 +15,19 @@ */ use faas_api::FunctionCall; -use fluence_libp2p::peerid_serializer; -use libp2p::PeerId; use serde::{Deserialize, Serialize}; /// Describes commands sent from client to relay node; also see `ToNodeNetworkMsg` #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(tag = "command", content = "body")] +#[serde(tag = "command")] pub enum ClientCommand { - Call { - #[serde(with = "peerid_serializer")] - node: PeerId, - call: FunctionCall, - }, + Call { call: FunctionCall }, +} + +impl Into for ClientCommand { + fn into(self) -> FunctionCall { + match self { + ClientCommand::Call { call } => call, + } + } } diff --git a/client/rust-libp2p/src/main.rs b/client/rust-libp2p/src/main.rs index 875a298357..017c122564 100644 --- a/client/rust-libp2p/src/main.rs +++ b/client/rust-libp2p/src/main.rs @@ -27,9 +27,11 @@ )] use async_std::task; +use clap::{App, Arg}; use ctrlc_adapter::block_until_ctrlc; -use faas_api::{relay, service, FunctionCall}; +use faas_api::{service, Address, FunctionCall}; use fluence_client::{Client, ClientCommand, ClientEvent}; +use futures::task::Poll; use futures::{ channel::{mpsc, mpsc::UnboundedReceiver, oneshot}, prelude::*, @@ -39,16 +41,21 @@ use futures::{ use libp2p::PeerId; use parity_multiaddr::Multiaddr; use std::error::Error; -use std::time::Duration; fn main() -> Result<(), Box> { env_logger::builder().format_timestamp_micros().init(); - let relay_addr: Multiaddr = std::env::args() - .nth(1) - .expect("multiaddr of relay peer should be provided by the first argument") + let args = &[Arg::from_usage(" 'Multiaddr of the Fluence node'").required(true)]; + + let arg_matches = App::new("Connect to fluence server") + .args(args) + .get_matches(); + + let relay_addr: Multiaddr = arg_matches + .value_of("multiaddr") + .expect("multiaddr is required") .parse() - .expect("provided wrong Multiaddr"); + .expect("provided incorrect Multiaddr"); let (exit_sender, exit_receiver) = oneshot::channel::<()>(); @@ -79,13 +86,19 @@ async fn run_client( let mut stdin_cmds = stdin_cmds.into_stream().fuse(); let mut stop = exit_receiver.into_stream().fuse(); + let mut node: Option = None; + loop { select!( cmd = stdin_cmds.select_next_some() => { match cmd { Ok(cmd) => { - client.send(cmd); - print!("\n"); + if let Some(node) = &node { + client.send(cmd.into(), node.clone()); + print!("\n"); + } else { + print!("Not connected yet!"); + } }, Err(e) => println!("incorrect string provided: {:?}", e) } @@ -94,7 +107,8 @@ async fn run_client( match incoming { Some(ClientEvent::NewConnection{ peer_id, ..}) => { log::info!("Connected to {}", peer_id); - print_example(&client.peer_id, peer_id); + node = Some(peer_id.clone()); + print_example(client.relay_address(peer_id.clone())); } Some(msg) => println!("Received\n{}\n", serde_json::to_string_pretty(&msg).unwrap()), None => { @@ -126,20 +140,34 @@ fn read_cmds_from_stdin() -> UnboundedReceiver(); - - // blocking happens in 'for' below - for cmd in stream { - cmd_sender.unbounded_send(cmd).expect("send cmd"); - task::sleep(Duration::from_nanos(10)).await; // return Poll::Pending from future's fn poll - } + let mut stream = Deserializer::from_reader(stdin).into_iter().fuse(); + + let cmd_sender = cmd_sender.clone(); + task::spawn(async move { + futures::future::poll_fn(|cx| { + // Read parsed command from JSON stream (blocking) + let cmd = stream.next(); + if let Some(cmd) = cmd { + // Send command to select! that reads from `cmd_recv` + cmd_sender.unbounded_send(cmd).expect("send cmd"); + // Call waker to respawn this future + cx.waker().clone().wake(); + return Poll::Pending; + } else { + // Return Ready so await below completes + return Poll::Ready(()); + } + }) + .await; + }) + .await; } }); cmd_recv } -fn print_example(peer_id: &PeerId, bootstrap: PeerId) { +fn print_example(reply_to: Address) { use serde_json::json; use std::time::SystemTime; fn show(cmd: ClientCommand) { @@ -155,42 +183,39 @@ fn print_example(peer_id: &PeerId, bootstrap: PeerId) { .as_millis() .to_string(); - let call_multiaddr = ClientCommand::Call { - node: bootstrap.clone(), + let call_identify = ClientCommand::Call { call: FunctionCall { uuid: uuid(), - target: Some(service!("IPFS.multiaddr")), - reply_to: Some(relay!(bootstrap.clone(), peer_id.clone())), + target: Some(service!("identify")), + reply_to: Some(reply_to.clone()), arguments: json!({ "hash": "QmFile", "msg_id": time }), - name: Some("call multiaddr".to_string()), + name: Some("call identify".to_string()), }, }; let register_ipfs_get = ClientCommand::Call { - node: bootstrap.clone(), call: FunctionCall { uuid: uuid(), target: Some(service!("provide")), - reply_to: Some(relay!(bootstrap.clone(), peer_id.clone())), + reply_to: Some(reply_to.clone()), arguments: json!({ "service_id": "IPFS.get_QmFile3", "msg_id": time }), name: Some("register service".to_string()), }, }; let call_ipfs_get = ClientCommand::Call { - node: bootstrap.clone(), call: FunctionCall { uuid: uuid(), target: Some(service!("IPFS.get_QmFile3")), - reply_to: Some(relay!(bootstrap, peer_id.clone())), + reply_to: Some(reply_to.clone()), arguments: serde_json::Value::Null, name: Some("call ipfs get".to_string()), }, }; println!("Possible messages:"); - println!("\n### call IPFS.multiaddr"); - show(call_multiaddr); + println!("\n### call identify"); + show(call_identify); println!("\n### Register IPFS.get service"); show(register_ipfs_get); println!("\n### Call IPFS.get service"); diff --git a/client/rust-libp2p/tests/endurance.rs b/client/rust-libp2p/tests/endurance.rs index b1a9b0ff75..150b0c113b 100644 --- a/client/rust-libp2p/tests/endurance.rs +++ b/client/rust-libp2p/tests/endurance.rs @@ -15,8 +15,8 @@ */ use config::{Config, File}; -use faas_api::{relay, service, FunctionCall, Protocol}; -use fluence_client::{Client, ClientCommand, ClientEvent}; +use faas_api::{service, Address, FunctionCall, Protocol}; +use fluence_client::{Client, ClientEvent}; use fluence_libp2p::peerid_serializer; use libp2p::PeerId; use log::LevelFilter; @@ -135,7 +135,8 @@ fn endurance() { let mut periodic = interval(service.period); (&mut periodic).await; - provider.send(registration(service.node.peer_id.clone(), provider.peer_id.clone(), service.id.clone())); + let reply_to = provider.relay_address(service.node.peer_id.clone()); + provider.send(registration(reply_to, service.id.clone()), service.node.peer_id.clone()); log::info!("{: <14} - Provider sent registration", prefix); task::sleep(Duration::from_secs(pause)).await; @@ -153,7 +154,7 @@ fn endurance() { report(ConsumerConnected); let sent = Instant::now(); - consumer.send(service_call(node.peer_id.clone(), service.id.clone())); + consumer.send(service_call(service.id.clone()), node.peer_id.clone()); log::info!("{: <14}🌐 {: <8} ⇨ {: <8} - Consumer sent service call", prefix, node.name, service.node.name); if wait_call(&mut provider, &service).await.success() { @@ -248,30 +249,24 @@ fn uuid() -> String { uuid::Uuid::new_v4().to_string() } -fn service_call(node: PeerId, service_id: String) -> ClientCommand { - ClientCommand::Call { - node, - call: FunctionCall { - uuid: uuid(), - target: Some(service!(service_id)), - reply_to: None, - arguments: serde_json::Value::Null, - name: Some("call service".into()), - }, +fn service_call(service_id: String) -> FunctionCall { + FunctionCall { + uuid: uuid(), + target: Some(service!(service_id)), + reply_to: None, + arguments: serde_json::Value::Null, + name: Some("call service".into()), } } -fn registration(node: PeerId, client: PeerId, service_id: String) -> ClientCommand { +fn registration(reply_to: Address, service_id: String) -> FunctionCall { use serde_json::json; - ClientCommand::Call { - node: node.clone(), - call: FunctionCall { - uuid: uuid(), - target: Some(service!("provide")), - reply_to: Some(relay!(node, client)), - arguments: json!({ "service_id": service_id }), - name: Some("registration".into()), - }, + FunctionCall { + uuid: uuid(), + target: Some(service!("provide")), + reply_to: Some(reply_to), + arguments: json!({ "service_id": service_id }), + name: Some("registration".into()), } } diff --git a/crates/ctrlc-adapter/src/lib.rs b/crates/ctrlc-adapter/src/lib.rs index 5ca01f9fef..33f19d369b 100644 --- a/crates/ctrlc-adapter/src/lib.rs +++ b/crates/ctrlc-adapter/src/lib.rs @@ -20,6 +20,7 @@ pub fn block_until_ctrlc() { let ctrlc_outlet = std::cell::RefCell::new(Some(ctrlc_outlet)); ctrlc::set_handler(move || { + println!("ctrlc fired!"); if let Some(outlet) = ctrlc_outlet.borrow_mut().take() { outlet.send(()).expect("sending shutdown signal failed"); } diff --git a/crates/libp2p/src/macros.rs b/crates/libp2p/src/macros.rs index 902da85ac5..224dc340ac 100644 --- a/crates/libp2p/src/macros.rs +++ b/crates/libp2p/src/macros.rs @@ -18,7 +18,7 @@ /// Intended to simplify simple polling functions that just return internal events from a /// internal queue. macro_rules! event_polling { - ($func_name:ident, $event_field_name:ident, $poll_type:ty) => { + ($func_name:ident, $event_field_name:ident, $poll_type:ty$(, $tick:ident)?) => { fn $func_name( &mut self, _: &mut std::task::Context, @@ -26,6 +26,8 @@ macro_rules! event_polling { ) -> std::task::Poll<$poll_type> { use std::task::Poll; + $(self.$tick())?; + if let Some(event) = self.$event_field_name.pop_front() { return Poll::Ready(event); } diff --git a/server/Cargo.toml b/server/Cargo.toml index bfe9fabd2d..1fc808dfaa 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -30,9 +30,12 @@ clap = "2.33.0" void = "1.0.2" smallvec = "1.3.0" prost = "0.6.1" -once_cell = "1.3.1" +once_cell = "1.4.0" uuid = { version = "0.8.1", features = ["v4"] } url = "2.1.1" +tide = "0.8.1" +prometheus = "0.9.0" +http-types = "1.2.0" [dev-dependencies] rand = "0.7.3" diff --git a/server/src/behaviour/bootstrapper.rs b/server/src/behaviour/bootstrapper.rs index cec1d01ec6..96fbe6dcff 100644 --- a/server/src/behaviour/bootstrapper.rs +++ b/server/src/behaviour/bootstrapper.rs @@ -20,28 +20,28 @@ use libp2p::swarm::NetworkBehaviourEventProcess; impl NetworkBehaviourEventProcess for ServerBehaviour { fn inject_event(&mut self, event: BootstrapperEvent) { + // TODO: do not reconnect to boostraps all the time, make it stop after a few minutes after node was started + // In other words, reconnect first 5 minutes or so, then stop. No reason to treat bootstrap nodes in a special way anymore. match event { - BootstrapperEvent::BootstrapConnected { peer_id, .. } => { - log::debug!( - "Bootstrap connected {}, triggering bootstrap procedure", - peer_id - ); + BootstrapperEvent::RunBootstrap => { + log::debug!("Running bootstrap procedure"); + // TODO: refactor out "thin bootstrap": only look ourselves in kademlia self.bootstrap() } - BootstrapperEvent::BootstrapDisconnected { peer_id, multiaddr } => { - log::info!("Bootstrap disconnected {}, reconnecting", peer_id); - self.dial(multiaddr); - self.dial_peer(peer_id); - } - BootstrapperEvent::ReachFailure { - multiaddr, error, .. + BootstrapperEvent::ReconnectToBootstrap { + peer_id, + multiaddr, + error, } => { - log::warn!( - "Failed to reach bootstrap at {:?}: {}, reconnecting", - &multiaddr, - error + log::info!( + "Bootstrap disconnected {} {}, reconnecting", + peer_id.as_ref().map(|p| p.to_string()).unwrap_or_default(), + error.unwrap_or_default() ); self.dial(multiaddr); + if let Some(peer_id) = peer_id { + self.dial_peer(peer_id) + } } } } diff --git a/server/src/behaviour/identify.rs b/server/src/behaviour/identify.rs index faf75cb41a..2edac31bfd 100644 --- a/server/src/behaviour/identify.rs +++ b/server/src/behaviour/identify.rs @@ -73,7 +73,7 @@ fn filter_addresses(addresses: Vec) -> Vec { let exists_global = addresses.iter().any(is_global_maddr); if !exists_global { - log::warn!("No globally-reachable IP addresses found. Are we running on localhost?"); + log::debug!("No globally-reachable IP addresses found. Are we running on localhost?"); // If there are no global addresses, we are most likely running locally // So take loopback address, and go with it. addresses.into_iter().filter(is_local_maddr).collect() diff --git a/server/src/behaviour/server_behaviour.rs b/server/src/behaviour/server_behaviour.rs index 77f7e0a7b4..878e1eb42b 100644 --- a/server/src/behaviour/server_behaviour.rs +++ b/server/src/behaviour/server_behaviour.rs @@ -26,6 +26,7 @@ use libp2p::{ PeerId, }; use parity_multiaddr::Multiaddr; +use prometheus::Registry; use std::collections::VecDeque; use trust_graph::TrustGraph; @@ -50,9 +51,10 @@ impl ServerBehaviour { listening_addresses: Vec, trust_graph: TrustGraph, bootstrap_nodes: Vec, + registry: Option<&Registry>, ) -> Self { let config = RouterConfig::new(key_pair.clone(), local_peer_id, listening_addresses); - let router = FunctionRouter::new(config, trust_graph); + let router = FunctionRouter::new(config, trust_graph, registry); let local_public_key = PublicKey::Ed25519(key_pair.public()); let identity = Identify::new( "/fluence/faas/1.0.0".into(), diff --git a/server/src/bootstrapper/behaviour.rs b/server/src/bootstrapper/behaviour.rs index 71471dba58..49d19041ff 100644 --- a/server/src/bootstrapper/behaviour.rs +++ b/server/src/bootstrapper/behaviour.rs @@ -24,13 +24,21 @@ use libp2p::PeerId; use parity_multiaddr::Multiaddr; use std::collections::{HashSet, VecDeque}; use std::error::Error; +use std::mem; +use std::time::{Duration, Instant}; pub type SwarmEventType = generate_swarm_event_type!(Bootstrapper); +// TODO: make it exponential +static RECONNECT_DELAY: Option = Some(Duration::from_millis(1500)); +static BOOTSTRAP_DELAY: Duration = Duration::from_millis(10000); +static BOOTSTRAP_MAX_DELAY: Duration = Duration::from_secs(60); pub struct Bootstrapper { pub bootstrap_nodes: HashSet, bootstrap_peers: HashSet, + delayed_events: Vec<(Option, SwarmEventType)>, events: VecDeque, + bootstrap_scheduled: Option<(Instant, Duration)>, } impl Bootstrapper { @@ -38,13 +46,60 @@ impl Bootstrapper { Self { bootstrap_nodes: bootstrap_nodes.into_iter().collect(), bootstrap_peers: Default::default(), + delayed_events: Default::default(), events: Default::default(), + bootstrap_scheduled: None, } } - fn push_event(&mut self, event: BootstrapperEvent) { - self.events - .push_back(NetworkBehaviourAction::GenerateEvent(event)); + fn push_event(&mut self, event: BootstrapperEvent, delay: Option) { + let event = NetworkBehaviourAction::GenerateEvent(event); + let deadline = delay.map(|d| Instant::now() + d); + self.delayed_events.push((deadline, event)); + } + + /// Schedule sending of `RunBootstrap` event after a `BOOTSTRAP_DELAY` + fn schedule_bootstrap(&mut self) { + match self.bootstrap_scheduled { + Some((scheduled, mut delay)) if delay < BOOTSTRAP_MAX_DELAY => { + // Delay bootstrap by `elapsed` + delay += scheduled.elapsed() + } + Some(_) => { /* maximum delay reached */ } + mut empty => { + empty.replace((Instant::now(), BOOTSTRAP_DELAY)); + } + }; + } + + /// Send `RunBootstrap` if delay is reached + fn trigger_bootstrap(&mut self, now: Instant) { + if let Some(&(scheduled, delay)) = self.bootstrap_scheduled.as_ref() { + if now >= scheduled + delay { + self.push_event(BootstrapperEvent::RunBootstrap, None) + } + } + } + + /// Send delayed events for which delay was reached + fn complete_delayed(&mut self, now: Instant) { + let delayed = mem::replace(&mut self.delayed_events, vec![]); + + let (ready, not_ready) = delayed.into_iter().partition(|(deadline, _)| { + let ready = deadline.map(|d| d >= now).unwrap_or(true); + ready + }); + + self.delayed_events = not_ready; + self.events = ready.into_iter().map(|(_, e)| e).collect(); + } + + /// Called on each poll + fn on_poll(&mut self) { + let now = Instant::now(); + + self.complete_delayed(now); + self.trigger_bootstrap(now); } } @@ -79,10 +134,7 @@ impl NetworkBehaviour for Bootstrapper { if self.bootstrap_nodes.contains(maddr) || self.bootstrap_peers.contains(peer_id) { self.bootstrap_peers.insert(peer_id.clone()); - self.push_event(BootstrapperEvent::BootstrapConnected { - peer_id: peer_id.clone(), - multiaddr: maddr.clone(), - }); + self.schedule_bootstrap(); } } @@ -98,10 +150,14 @@ impl NetworkBehaviour for Bootstrapper { }; if self.bootstrap_nodes.contains(maddr) || self.bootstrap_peers.contains(peer_id) { - self.push_event(BootstrapperEvent::BootstrapDisconnected { - peer_id: peer_id.clone(), - multiaddr: maddr.clone(), - }); + self.push_event( + BootstrapperEvent::ReconnectToBootstrap { + peer_id: Some(peer_id.clone()), + multiaddr: maddr.clone(), + error: None, + }, + RECONNECT_DELAY, + ); } } @@ -117,13 +173,16 @@ impl NetworkBehaviour for Bootstrapper { || peer_id.map_or(false, |id| self.bootstrap_peers.contains(id)); if is_bootstrap { - self.push_event(BootstrapperEvent::ReachFailure { - peer_id: peer_id.cloned(), - multiaddr: maddr.clone(), - error: format!("{:?}", error), - }); + self.push_event( + BootstrapperEvent::ReconnectToBootstrap { + peer_id: peer_id.cloned(), + multiaddr: maddr.clone(), + error: Some(format!("{:?}", error)), + }, + RECONNECT_DELAY, + ); } } - event_polling!(poll, events, SwarmEventType); + event_polling!(poll, events, SwarmEventType, on_poll); } diff --git a/server/src/bootstrapper/event.rs b/server/src/bootstrapper/event.rs index 0d9f2d8b3e..6ee8a49657 100644 --- a/server/src/bootstrapper/event.rs +++ b/server/src/bootstrapper/event.rs @@ -19,19 +19,12 @@ use parity_multiaddr::Multiaddr; #[derive(Debug, Clone)] pub enum BootstrapperEvent { - BootstrapConnected { - peer_id: PeerId, - multiaddr: Multiaddr, - }, - - BootstrapDisconnected { - peer_id: PeerId, - multiaddr: Multiaddr, - }, + RunBootstrap, - ReachFailure { + // Command to reconnect to specified bootstrap + ReconnectToBootstrap { peer_id: Option, multiaddr: Multiaddr, - error: String, + error: Option, }, } diff --git a/server/src/config/fluence_config.rs b/server/src/config/fluence_config.rs index 6efc7c0cc0..9a25714751 100644 --- a/server/src/config/fluence_config.rs +++ b/server/src/config/fluence_config.rs @@ -65,6 +65,9 @@ pub struct ServerConfig { /// External address to advertise via identify protocol pub external_address: Option, + + /// Prometheus port + pub prometheus_port: u16, } impl Default for ServerConfig { @@ -76,6 +79,7 @@ impl Default for ServerConfig { bootstrap_nodes: vec![], websocket_port: 9999, external_address: None, + prometheus_port: 18080, } } } diff --git a/server/src/function/dht_names.rs b/server/src/function/dht_names.rs index 80b55a943e..7d4075c7c5 100644 --- a/server/src/function/dht_names.rs +++ b/server/src/function/dht_names.rs @@ -214,7 +214,7 @@ impl FunctionRouter { name: &Address, provider: &Address, ) -> Result<(), libp2p::kad::store::Error> { - let record = ProviderRecord::new(provider, &self.config.keypair); + let record = ProviderRecord::signed(provider, &self.config.keypair); self.kademlia .put_record(Record::new(name, record.into()), Quorum::Majority) .map(|_| ()) @@ -227,4 +227,11 @@ impl FunctionRouter { Quorum::N(NonZeroUsize::new(GET_QUORUM_N).unwrap()), ); } + + /// Remove ourselves from providers of this record, and replicate this to DHT + pub(super) fn unpublish_name(&mut self, name: Address) { + let key = (&name).into(); + self.kademlia.remove_record(&key); + self.kademlia.replicate_record(key) + } } diff --git a/server/src/function/peers.rs b/server/src/function/peers.rs index a320a41ad9..25b98e48e1 100644 --- a/server/src/function/peers.rs +++ b/server/src/function/peers.rs @@ -16,6 +16,7 @@ use super::wait_peer::WaitPeer; use super::FunctionRouter; +use crate::function::waiting_queues::Enqueued; use faas_api::{FunctionCall, Protocol}; use libp2p::{ swarm::{DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction}, @@ -42,9 +43,10 @@ impl FunctionRouter { /// Query for peers closest to the `peer_id` as DHT key, enqueue call until response fn query_closest(&mut self, peer_id: PeerId, call: WaitPeer) { - self.wait_peer.enqueue(peer_id.clone(), call); - // TODO: don't call get_closest_peers if there are already some calls waiting for it - self.kademlia.get_closest_peers(peer_id); + // Don't call get_closest_peers if there are already some calls waiting for it + if let Enqueued::New = self.wait_peer.enqueue(peer_id.clone(), call) { + self.kademlia.get_closest_peers(peer_id.as_bytes()); + } } /// Send all calls waiting for this peer to be found @@ -62,6 +64,11 @@ impl FunctionRouter { Ok(peer_id) => peer_id, }; + if !peer_id.is_inlining() { + // Warn about Qm... PeerId + log::warn!("Found closest peers for non-inlining peer id: {}", peer_id); + } + // Forward to `peer_id` let calls = self.wait_peer.remove_with(&peer_id, |wp| wp.found()); for call in calls { @@ -147,19 +154,12 @@ impl FunctionRouter { /// Whether peer is in the routing table pub(super) fn is_routable(&mut self, peer_id: &PeerId) -> bool { - // TODO: Checking `is_connected` inside `is_routable` smells... - let connected = self.is_connected(peer_id); - - let kad = self.kademlia.addresses_of_peer(peer_id); - let in_kad = !kad.is_empty(); - - log::debug!( - "peer {} in routing table: Connected? {} Kademlia {:?}", - peer_id, - connected, - kad - ); - connected || in_kad + // TODO (only relevant to local clients): + // Is it possible for a client to be routable via swarm, but not via kademlia? + // If so, need to ask swarm if client is routable + let in_kad = !self.kademlia.addresses_of_peer(peer_id).is_empty(); + log::debug!("peer {} in routing table? {:?}", peer_id, in_kad); + in_kad } /// Whether given peer id is equal to ours diff --git a/server/src/function/provider_record.rs b/server/src/function/provider_record.rs index 590956e0c2..c6cad76fdd 100644 --- a/server/src/function/provider_record.rs +++ b/server/src/function/provider_record.rs @@ -45,7 +45,7 @@ pub struct ProviderRecord { impl ProviderRecord { // Create ProviderRecord, signing address path (without schema) with passed keypair - pub fn new(address: &Address, kp: &Keypair) -> Self { + pub fn signed(address: &Address, kp: &Keypair) -> Self { let address = address.path(); let signature = kp.sign(address.as_bytes()); @@ -107,7 +107,7 @@ mod tests { let relay = RandomPeerId::random(); let address: Address = relay!(relay, client); - let rec = ProviderRecord::new(&address, &keypair); + let rec = ProviderRecord::signed(&address, &keypair); let encoded: Vec = rec.clone().into(); let decoded: ProviderRecord = encoded.as_slice().into(); diff --git a/server/src/function/router.rs b/server/src/function/router.rs index d2e46a87cb..a19a2ed70a 100644 --- a/server/src/function/router.rs +++ b/server/src/function/router.rs @@ -32,6 +32,7 @@ use libp2p::{ PeerId, }; use parity_multiaddr::Multiaddr; +use prometheus::Registry; use std::collections::{HashMap, HashSet, VecDeque}; use trust_graph::TrustGraph; use uuid::Uuid; @@ -75,20 +76,27 @@ pub struct FunctionRouter { // TODO: move public methods to a trait impl FunctionRouter { - pub(crate) fn new(config: RouterConfig, trust_graph: TrustGraph) -> Self { + pub(crate) fn new( + config: RouterConfig, + trust_graph: TrustGraph, + registry: Option<&Registry>, + ) -> Self { let mut cfg = KademliaConfig::default(); cfg.set_query_timeout(Duration::from_secs(5)) .set_max_packet_size(100 * 4096 * 4096) // 100 Mb .set_replication_factor(std::num::NonZeroUsize::new(5).unwrap()) .set_connection_idle_timeout(Duration::from_secs(2_628_000_000)); // ~month let store = MemoryStore::new(config.peer_id.clone()); - let kademlia = Kademlia::with_config( + let mut kademlia = Kademlia::with_config( config.keypair.clone(), config.peer_id.clone(), store, cfg, trust_graph, ); + if let Some(registry) = registry { + kademlia.enable_metrics(registry); + } Self { config, @@ -151,13 +159,15 @@ impl FunctionRouter { return; } Client(id) if is_local => { - let id = id.clone(); - let client = target.next().unwrap(); + let client_id = id.clone(); + let client_protocol = target.next().unwrap(); // Remove signature from target match target.next() { Some(Signature(_)) => { - let target = Address::cons(client, target); - self.send_to(id, Routable, call.with_target(target)); + let target = Address::cons(client_protocol, target); + // TODO: Why Routable? If it's a local client, it should be connected + // It can't be Routable, because it's not reachable via Kademlia + self.send_to(client_id, Routable, call.with_target(target)); } Some(other) => { let path = target.join(""); @@ -198,12 +208,12 @@ impl FunctionRouter { let status = self.peer_status(&to); // Check if peer is in expected (or better) status if status < expected { - // TODO: this error is not helpful. Example of helpful error: "Peer wasn't found via GetClosestPeers" - // consider custom errors for different pairs of (status, expected) + // TODO: This error is not helpful. Example of helpful error: "Peer wasn't found via GetClosestPeers". + // Consider custom errors for different pairs of (status, expected) #[rustfmt::skip] - let err_msg = format!("unexpected status. Got {:?} expected {:?}", status, expected); + let err_msg = format!("Unexpected status for {}. Got {:?} expected {:?}", to, status, expected); #[rustfmt::skip] - log::error!("Can't send call {:?} to peer {}: {}", call, to, err_msg); + log::error!("Can't send call {:?}: {}", call, err_msg); self.send_error_on_call(call, err_msg); return; } diff --git a/server/src/function/services.rs b/server/src/function/services.rs index 70efc0ad95..1d6cb6bd9f 100644 --- a/server/src/function/services.rs +++ b/server/src/function/services.rs @@ -143,10 +143,13 @@ impl FunctionRouter { } } - // Removes all names that resolve to an address containing `resolvee` + /// Removes all names that resolve to an address containing `resolvee` pub(super) fn remove_halted_names(&mut self, resolvee: &PeerId) { use Protocol::*; + // TODO: use drain_filter once available https://github.com/rust-lang/rust/issues/59618 + let mut removed = vec![]; + self.provided_names.retain(|k, v| { let protocols = v.protocols(); let halted = protocols.iter().any(|p| match p { @@ -156,13 +159,19 @@ impl FunctionRouter { }); if halted { log::info!( - "Removing halted name {}. forward_to: {} due to peer {} disconnection", + "Unpublishing halted name {}. forward_to: {} due to peer {} disconnection", k, v, resolvee ); + // TODO: avoid clone? + removed.push(k.clone()) } !halted }); + + for name in removed { + self.unpublish_name(name); + } } } diff --git a/server/src/kademlia/memory_store.rs b/server/src/kademlia/memory_store.rs index d6344fe84f..614dd183b2 100644 --- a/server/src/kademlia/memory_store.rs +++ b/server/src/kademlia/memory_store.rs @@ -117,19 +117,33 @@ impl MultiRecord { // Take max of expiration times, and set it to `self.expires`. // `None` means the record will never expire. if let Some(current) = self.expires.as_ref() { - if mrec.expires.map_or(false, |their| current.le(&their)) { + // If `mrec.expires` is None, mark current record to never expire + if mrec.expires.map_or(true, |their| current.le(&their)) { self.expires = mrec.expires; } } } /// Return whether this record is of `SimpleRecord` kind - pub fn simple(&self) -> bool { + pub fn is_simple(&self) -> bool { match self.kind { MultiRecordKind::SimpleRecord => true, MultiRecordKind::MultiRecord => false, } } + + /// Remove values by specified publisher + pub fn remove(&mut self, publisher: &PeerId) { + self.values.retain(|_, p| p != publisher) + } + + pub fn is_expired(&self, now: Instant) -> bool { + self.expires.map_or(false, |t| now >= t) + } + + pub fn is_empty(&self) -> bool { + self.values.is_empty() + } } /// In-memory implementation of a `RecordStore`. @@ -237,7 +251,7 @@ impl<'a> RecordStore<'a> for MemoryStore { match self.records.entry(key) { hash_map::Entry::Occupied(mut e) => { - if mrec.simple() { + if mrec.is_simple() { // Replace if mrec is of simple kind e.insert(mrec); } else { @@ -257,9 +271,19 @@ impl<'a> RecordStore<'a> for MemoryStore { } fn remove(&'a mut self, k: &Key) { - // TODO: removing everything here, for MultiRecord should remove only unexpired? - // TODO: Add method 'remove_expired' and use it instead of remove when removing due to ttl - self.records.remove(k); + if let Some(rec) = self.records.get_mut(k) { + if rec.is_expired(Instant::now()) { + // Whole record expired, remove it + self.records.remove(k); + } else { + // Remove ourselves from multirecord + rec.remove(self.local_key.preimage()); + if rec.is_empty() { + // If record is now empty, remove it + self.records.remove(k); + } + } + } } fn records(&'a self) -> Self::RecordsIter { @@ -380,16 +404,35 @@ mod tests { impl Arbitrary for NewRecord { fn arbitrary(g: &mut G) -> NewRecord { - NewRecord(Record { - key: NewKey::arbitrary(g).0, - value: Vec::arbitrary(g), - publisher: Some(PeerId::random()), - expires: if g.gen() { - Some(Instant::now() + Duration::from_secs(g.gen_range(0, 60))) - } else { - None - }, - }) + fn gen_rec(g: &mut G) -> Record { + Record { + key: NewKey::arbitrary(g).0, + value: Vec::arbitrary(g), + publisher: Some(PeerId::random()), + expires: if g.gen() { + Some(Instant::now() + Duration::from_secs(g.gen_range(0, 60))) + } else { + None + }, + } + } + + let rec = if g.gen() { + gen_rec(g) + } else { + // 25 chosen without any reason, you can change it up to max_values_per_multi_record + let max_values = 25; + let mut rec = try_to_multirecord(gen_rec(g)).expect("reduced"); + for _ in 1..g.gen_range(1, max_values) { + let mut new_rec = try_to_multirecord(gen_rec(g)).expect("reduced"); + new_rec.key = rec.key.clone(); + rec.merge(new_rec, max_values); + } + + reduce_multirecord(rec) + }; + + NewRecord(rec) } } @@ -416,17 +459,75 @@ mod tests { } #[test] - fn put_get_remove_record() { + fn put_get_remove_unexpired_record() { fn prop(r: NewRecord) { let r = r.0; - let key = r.key.clone(); + let key = &r.key; + + let mut mrec = try_to_multirecord(r.clone()).expect("reduce ok"); + // Set record to never expire + mrec.expires = None; + let reduced = reduce_multirecord(mrec.clone()); let mut store = MemoryStore::new(PeerId::random()); - assert!(store.put(r.clone()).is_ok()); - let mrec = try_to_multirecord(r).expect("reduce ok"); - let reduced = reduce_multirecord(mrec); - assert_eq!(Some(Cow::Owned(reduced)), store.get(&key)); - store.remove(&key); - assert!(store.get(&key).is_none()); + assert!(store.put(reduced.clone()).is_ok()); + + let from_store = store.get(key).expect("contains").into_owned(); + assert_eq!(from_store.key, reduced.key); + assert_eq!(from_store.publisher, reduced.publisher); + assert_eq!(from_store.expires, reduced.expires); + + store.remove(key); + let after_remove = store.get(key); + + let local_values: Vec<_> = mrec + .values + .values() + .filter(|&v| v == store.local_key.preimage()) + .collect(); + + if local_values.len() == mrec.values.len() { + // If all values stored in multirecord were published by store.local_key + // then record would be removed completely + assert_eq!(after_remove, None) + } else { + // There were non-local values in mrecord, can't be empty + let after_remove = after_remove.expect("non empty").into_owned(); + let after_remove = try_to_multirecord(after_remove).expect("must be a multirecord"); + // only local values should be deleted + assert_eq!( + after_remove.values.len(), + mrec.values.len() - local_values.len(), + "local values must be deleted" + ); + } + } + quickcheck(prop as fn(_)) + } + + #[test] + fn put_get_remove_expired_record() { + fn prop(r: NewRecord) { + let r = r.0; + let key = &r.key; + + let mut mrec = try_to_multirecord(r.clone()).expect("reduce ok"); + // Set record to expire right now + let now = Instant::now(); + mrec.expires = Some(now); + let reduced = reduce_multirecord(mrec.clone()); + + let mut store = MemoryStore::new(PeerId::random()); + assert!(store.put(reduced.clone()).is_ok()); + + let from_store = store.get(key).expect("contains").into_owned(); + + assert_eq!(from_store.key, reduced.key); + assert_eq!(from_store.publisher, reduced.publisher); + // We can't compare expires directly due to serialization quirks + assert!(from_store.expires.expect("expires defined") >= now); + + store.remove(key); + assert!(store.get(key).is_none()); } quickcheck(prop as fn(_)) } diff --git a/server/src/kademlia/record.rs b/server/src/kademlia/record.rs index 197094708b..fd16ed4f46 100644 --- a/server/src/kademlia/record.rs +++ b/server/src/kademlia/record.rs @@ -52,6 +52,7 @@ impl From for MultiRecordProto { .map(|(v, p)| ValueProto::new(v, p.into_bytes())) .collect(); + // TODO: calculate absolute timestamp (i.e., `t - EPOCH`) here let ttl = expires.map(|t| { let now = Instant::now(); if t > now { @@ -93,9 +94,17 @@ impl TryInto for MultiRecordProto { .collect::>() .map_err(|e: Error| Box::new(e))?; // TODO: get rid of explicit boxing - let expires = ttl - .filter(|&ttl| ttl > 0) - .map(|ttl| Instant::now() + Duration::from_secs(ttl as u64)); + // ttl == 0 means record doesn't expire + // ttl == 1 means record already expired + let expires = ttl.filter(|&ttl| ttl > 0).map(|ttl| { + if ttl > 1 { + let dur = Duration::from_secs(ttl as u64); + Instant::now() + dur + } else { + // ttl == 1 means record already expired + Instant::now() + } + }); Ok(MultiRecord { key: key.into(), diff --git a/server/src/server.rs b/server/src/server.rs index 8331faf867..98e24db85c 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -19,6 +19,7 @@ use crate::config::ServerConfig; use fluence_libp2p::{build_transport, types::OneshotOutlet}; use async_std::task; +use futures::future::BoxFuture; use futures::{channel::oneshot, select, stream::StreamExt, FutureExt}; use libp2p::{ identity::ed25519::{self, Keypair}, @@ -26,13 +27,16 @@ use libp2p::{ PeerId, Swarm, TransportError, }; use parity_multiaddr::{Multiaddr, Protocol}; +use prometheus::Registry; use std::io; +use std::net::IpAddr; use trust_graph::TrustGraph; // TODO: documentation pub struct Server { swarm: Swarm, config: ServerConfig, + registry: Registry, } impl Server { @@ -47,6 +51,7 @@ impl Server { log::info!("server peer id = {}", local_peer_id); let trust_graph = TrustGraph::new(root_weights); + let registry = Registry::new(); let mut swarm = { let behaviour = ServerBehaviour::new( @@ -55,6 +60,7 @@ impl Server { config.external_addresses(), trust_graph, config.bootstrap_nodes.clone(), + Some(®istry), ); let key_pair = libp2p::identity::Keypair::Ed25519(key_pair); let transport = build_transport(key_pair, socket_timeout); @@ -68,7 +74,11 @@ impl Server { .into_iter() .for_each(|addr| Swarm::add_external_address(&mut swarm, addr)); - let node_service = Self { swarm, config }; + let node_service = Self { + swarm, + config, + registry, + }; Box::new(node_service) } @@ -82,9 +92,15 @@ impl Server { self.swarm.dial_bootstrap_nodes(); task::spawn(async move { + let mut metrics = Self::start_metrics_endpoint( + self.registry, + (self.config.listen_ip, self.config.prometheus_port), + ) + .fuse(); loop { select!( _ = self.swarm.select_next_some() => {}, + _ = metrics => {}, _ = exit_inlet.next() => { break } @@ -95,6 +111,38 @@ impl Server { exit_outlet } + pub fn start_metrics_endpoint( + registry: Registry, + listen_addr: (IpAddr, u16), + ) -> BoxFuture<'static, io::Result<()>> { + use http_types::{Error, StatusCode::InternalServerError}; + use prometheus::{Encoder, TextEncoder}; + + let mut app = tide::with_state(registry); + app.at("/metrics") + .get(|req: tide::Request| async move { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = req.state().gather(); + + encoder + .encode(&metric_families, &mut buffer) + .map_err(|err| { + let msg = format!("Error encoding prometheus metrics: {:?}", err); + log::warn!("{}", msg); + Error::from_str(InternalServerError, msg) + })?; + + String::from_utf8(buffer).map_err(|err| { + let msg = format!("Error encoding prometheus metrics: {:?}", err); + log::warn!("{}", msg); + Error::from_str(InternalServerError, msg) + }) + }); + + Box::pin(app.listen(listen_addr)) + } + /// Starts node service listener. #[inline] fn listen(&mut self) -> Result<(), TransportError> { diff --git a/server/tests/routing.rs b/server/tests/routing.rs index e504d048fa..eb350b2ab6 100644 --- a/server/tests/routing.rs +++ b/server/tests/routing.rs @@ -27,18 +27,17 @@ unreachable_patterns )] -mod utils; - -use crate::utils::*; - use faas_api::{service, FunctionCall, Protocol}; +use fluence_client::Transport; use libp2p::{identity::PublicKey::Ed25519, PeerId}; -use serde_json::Value; -use trust_graph::{current_time, Certificate}; - use parity_multiaddr::Multiaddr; +use serde_json::Value; use std::str::FromStr; use std::thread::sleep; +use trust_graph::{current_time, Certificate}; + +use crate::utils::*; +mod utils; #[test] // Send calls between clients through relays @@ -103,7 +102,6 @@ fn invalid_relay_signature() { #[test] fn missing_relay_signature() { - enable_logs(); let (mut sender, receiver) = ConnectedClient::make_clients().expect("connect clients"); let target = Protocol::Peer(receiver.node.clone()) / receiver.client_address(); @@ -287,9 +285,12 @@ fn get_certs() { }; let swarm_count = 5; - let swarms = make_swarms_with(swarm_count, |bs, maddr| { - create_swarm(bs, maddr, Some(trust.clone())) - }); + let swarms = make_swarms_with( + swarm_count, + |bs, maddr| create_swarm(bs, maddr, Some(trust.clone()), Transport::Memory, None), + create_memory_maddr, + true, + ); sleep(KAD_TIMEOUT); let mut consumer = ConnectedClient::connect_to(swarms[1].1.clone()).expect("connect consumer"); let peer_id = PeerId::from(Ed25519(last_key)); @@ -324,9 +325,12 @@ fn add_certs() { }; let swarm_count = 5; - let swarms = make_swarms_with(swarm_count, |bs, maddr| { - create_swarm(bs, maddr, Some(trust.clone())) - }); + let swarms = make_swarms_with( + swarm_count, + |bs, maddr| create_swarm(bs, maddr, Some(trust.clone()), Transport::Memory, None), + create_memory_maddr, + true, + ); sleep(KAD_TIMEOUT); let mut registrar = ConnectedClient::connect_to(swarms[1].1.clone()).expect("connect consumer"); @@ -354,9 +358,12 @@ fn add_certs_invalid_signature() { }; let swarm_count = 5; - let swarms = make_swarms_with(swarm_count, |bs, maddr| { - create_swarm(bs, maddr, Some(trust.clone())) - }); + let swarms = make_swarms_with( + swarm_count, + |bs, maddr| create_swarm(bs, maddr, Some(trust.clone()), Transport::Memory, None), + create_memory_maddr, + true, + ); sleep(KAD_TIMEOUT); // invalidate signature in last trust in `cert` @@ -393,7 +400,7 @@ fn identify() { fn check_reply(consumer: &mut ConnectedClient, swarm_addr: &Multiaddr, msg_id: &str) { let reply = consumer.receive(); #[rustfmt::skip] - let reply_msg_id = reply.arguments.get("msg_id").expect("not empty").as_str().expect("str"); + let reply_msg_id = reply.arguments.get("msg_id").expect("not empty").as_str().expect("str"); assert_eq!(reply_msg_id, msg_id); let addrs = reply.arguments["addresses"].as_array().expect("not empty"); assert!(!addrs.is_empty()); diff --git a/server/tests/run_multiple_nodes.rs b/server/tests/run_multiple_nodes.rs new file mode 100644 index 0000000000..eef7cc42f6 --- /dev/null +++ b/server/tests/run_multiple_nodes.rs @@ -0,0 +1,97 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#![allow(unused_imports, dead_code)] + +use fluence_client::Transport; +use fluence_server::Server; +use prometheus::Registry; + +use crate::utils::*; +mod utils; + +#[test] +#[ignore] +fn main() { + use async_std::task; + use libp2p::core::multiaddr::{Multiaddr, Protocol}; + use rand::prelude::*; + use std::env; + use std::net::IpAddr; + + env_logger::init(); + + let count: usize = env::var("COUNT") + .unwrap_or("10".into()) + .parse() + .expect("count correct"); + + let host: IpAddr = env::var("HOST") + .unwrap_or("127.0.0.1".into()) + .parse() + .expect("host correct"); + + let port: u16 = env::var("PORT") + .unwrap_or("2000".into()) + .parse() + .expect("port correct"); + + // Max number of bootstrap nodes + let bs_max: usize = env::var("BS_MAX") + .unwrap_or("10".into()) + .parse() + .expect("bs correct"); + + // Boostrap nodes will be HOST:BS_PORT..HOST:BS_PORT+BS_MAX + let bs_port: Option = env::var("BS_PORT") + .map(|s| s.parse().expect("bs correct")) + .ok(); + + fn create_maddr(host: IpAddr, port: u16) -> Multiaddr { + let mut maddr = Multiaddr::from(host); + maddr.push(Protocol::Tcp(port)); + maddr + } + + let registry = Registry::new(); + + let mut idx = 0; + let mut rng = thread_rng(); + let external_bootstraps = bs_port.into_iter().flat_map(|p| { + (p..p + bs_max as u16) + .map(|p| create_maddr(host, p)) + .collect::>() + }); + + make_swarms_with( + count, + |bs, maddr| { + let rnd = bs.into_iter().choose_multiple(&mut rng, bs_max); + let bs: Vec<_> = rnd.into_iter().chain(external_bootstraps.clone()).collect(); + create_swarm(bs, maddr, None, Transport::Network, Some(®istry)) + }, + || { + let maddr = create_maddr(host, port + idx); + idx += 1; + maddr + }, + false, + ); + + log::info!("started /metrics at {}:{}", host, port - 1); + task::block_on(Server::start_metrics_endpoint(registry, (host, port - 1))) + .expect("Start /metrics endpoint"); +} diff --git a/server/tests/utils/connected_client.rs b/server/tests/utils/connected_client.rs index b0d7602c48..12ab88ba26 100644 --- a/server/tests/utils/connected_client.rs +++ b/server/tests/utils/connected_client.rs @@ -15,23 +15,23 @@ */ use crate::utils::misc::{make_swarms, CreatedSwarm, Result, SHORT_TIMEOUT, TIMEOUT}; -use faas_api::{relay, Address, FunctionCall, Protocol}; +use faas_api::{Address, FunctionCall, Protocol}; use async_std::future::timeout; use async_std::task; -use fluence_client::{Client, ClientCommand, ClientEvent, Transport}; +use fluence_client::{Client, ClientEvent, Transport}; use libp2p::PeerId; use parity_multiaddr::Multiaddr; #[derive(Debug)] -pub(crate) struct ConnectedClient { - pub(crate) client: Client, - pub(crate) node: PeerId, - pub(crate) node_address: Multiaddr, +pub struct ConnectedClient { + pub client: Client, + pub node: PeerId, + pub node_address: Multiaddr, } impl ConnectedClient { - pub(crate) fn connect_to(node_address: Multiaddr) -> Result { + pub fn connect_to(node_address: Multiaddr) -> Result { use core::result::Result; use std::io::{Error, ErrorKind}; @@ -57,7 +57,7 @@ impl ConnectedClient { Ok(task::block_on(timeout(TIMEOUT, connect))??) } - pub(crate) fn new() -> Result { + pub fn new() -> Result { let swarm = make_swarms(3).into_iter().next().unwrap(); let CreatedSwarm(node, addr1) = swarm; @@ -76,7 +76,7 @@ impl ConnectedClient { Ok(task::block_on(timeout(TIMEOUT, connect))?) } - pub(crate) fn make_clients() -> Result<(Self, Self)> { + pub fn make_clients() -> Result<(Self, Self)> { let swarms = make_swarms(3); let mut swarms = swarms.into_iter(); let CreatedSwarm(peer_id1, addr1) = swarms.next().expect("get swarm"); @@ -116,20 +116,15 @@ impl ConnectedClient { } pub fn relay_address(&self) -> Address { - let addr = relay!(self.node.clone(), self.client.peer_id.clone()); - let sig = self.sign(addr.path().as_bytes()); - addr.append(Protocol::Signature(sig)) + self.client.relay_address(self.node.clone()) } pub fn sign(&self, bytes: &[u8]) -> Vec { - self.client.key_pair.sign(bytes) + self.client.sign(bytes) } pub fn send(&self, call: FunctionCall) { - self.client.send(ClientCommand::Call { - node: self.node.clone(), - call, - }) + self.client.send(call, self.node.clone()) } pub fn receive(&mut self) -> FunctionCall { diff --git a/server/tests/utils/misc.rs b/server/tests/utils/misc.rs index 291e8bec08..3b56368a6c 100644 --- a/server/tests/utils/misc.rs +++ b/server/tests/utils/misc.rs @@ -16,9 +16,10 @@ use async_std::task; use faas_api::{service, Address, FunctionCall}; -use fluence_libp2p::build_memory_transport; +use fluence_libp2p::{build_memory_transport, build_transport}; use fluence_server::ServerBehaviour; +use fluence_client::Transport; use libp2p::{ identity::{ ed25519::{Keypair, PublicKey}, @@ -27,6 +28,7 @@ use libp2p::{ PeerId, Swarm, }; use parity_multiaddr::Multiaddr; +use prometheus::Registry; use serde_json::{json, Value}; use std::time::{Duration, Instant}; use trust_graph::{Certificate, TrustGraph}; @@ -34,12 +36,12 @@ use uuid::Uuid; /// Utility functions for tests. -pub(crate) type Result = core::result::Result>; -pub(crate) static TIMEOUT: Duration = Duration::from_secs(5); -pub(crate) static SHORT_TIMEOUT: Duration = Duration::from_millis(100); -pub(crate) static KAD_TIMEOUT: Duration = Duration::from_millis(500); +pub type Result = core::result::Result>; +pub static TIMEOUT: Duration = Duration::from_secs(5); +pub static SHORT_TIMEOUT: Duration = Duration::from_millis(100); +pub static KAD_TIMEOUT: Duration = Duration::from_millis(500); -pub(crate) fn certificates_call(peer_id: PeerId, reply_to: Address) -> FunctionCall { +pub fn certificates_call(peer_id: PeerId, reply_to: Address) -> FunctionCall { FunctionCall { uuid: uuid(), target: Some(service!("certificates")), @@ -49,7 +51,7 @@ pub(crate) fn certificates_call(peer_id: PeerId, reply_to: Address) -> FunctionC } } -pub(crate) fn add_certificates_call( +pub fn add_certificates_call( peer_id: PeerId, reply_to: Address, certs: Vec, @@ -68,7 +70,7 @@ pub(crate) fn add_certificates_call( } } -pub(crate) fn provide_call(service_id: &str, reply_to: Address) -> FunctionCall { +pub fn provide_call(service_id: &str, reply_to: Address) -> FunctionCall { FunctionCall { uuid: uuid(), target: Some(service!("provide")), @@ -78,7 +80,7 @@ pub(crate) fn provide_call(service_id: &str, reply_to: Address) -> FunctionCall } } -pub(crate) fn service_call(service_id: &str, consumer: Address) -> FunctionCall { +pub fn service_call(service_id: &str, consumer: Address) -> FunctionCall { FunctionCall { uuid: uuid(), target: Some(service!(service_id)), @@ -88,7 +90,7 @@ pub(crate) fn service_call(service_id: &str, consumer: Address) -> FunctionCall } } -pub(crate) fn reply_call(reply_to: Address) -> FunctionCall { +pub fn reply_call(reply_to: Address) -> FunctionCall { FunctionCall { uuid: uuid(), target: Some(reply_to), @@ -98,11 +100,11 @@ pub(crate) fn reply_call(reply_to: Address) -> FunctionCall { } } -pub(crate) fn uuid() -> String { +pub fn uuid() -> String { Uuid::new_v4().to_string() } -pub(crate) fn get_cert() -> Certificate { +pub fn get_cert() -> Certificate { use std::str::FromStr; Certificate::from_str( @@ -127,7 +129,7 @@ HFF3V9XXbhdTLWGVZkJYd9a7NyuD5BLWLdwc4EFBcCZa #[allow(dead_code)] // Enables logging, filtering out unnecessary details -pub(crate) fn enable_logs() { +pub fn enable_logs() { use log::LevelFilter::{Debug, Info}; env_logger::builder() @@ -151,14 +153,25 @@ pub(crate) fn enable_logs() { .ok(); } -pub(crate) struct CreatedSwarm(pub PeerId, pub Multiaddr); -pub(crate) fn make_swarms(n: usize) -> Vec { - make_swarms_with(n, |bs, maddr| create_swarm(bs, maddr, None)) +pub struct CreatedSwarm(pub PeerId, pub Multiaddr); +pub fn make_swarms(n: usize) -> Vec { + make_swarms_with( + n, + |bs, maddr| create_swarm(bs, maddr, None, Transport::Memory, None), + create_memory_maddr, + true, + ) } -pub(crate) fn make_swarms_with(n: usize, create_swarm: F) -> Vec +pub fn make_swarms_with( + n: usize, + mut create_swarm: F, + mut create_maddr: M, + wait_connected: bool, +) -> Vec where - F: Fn(Vec, Multiaddr) -> (PeerId, Swarm), + F: FnMut(Vec, Multiaddr) -> (PeerId, Swarm), + M: FnMut() -> Multiaddr, { use futures::stream::FuturesUnordered; use futures_util::StreamExt; @@ -207,7 +220,7 @@ where connected.fetch_add(1, Ordering::SeqCst); let total = connected.load(Ordering::Relaxed); if total % 10 == 0 { - log::trace!( + log::info!( "established {: <10} +{: <10} (= {:<5})", total, format_args!("{:.3}s", start.elapsed().as_secs_f32()), @@ -225,24 +238,28 @@ where .expect("drain"); }); - let now = Instant::now(); - while connected.load(Ordering::SeqCst) < (n * (n - 1)) {} - log::debug!("Connection took {}s", now.elapsed().as_secs_f32()); + if wait_connected { + let now = Instant::now(); + while connected.load(Ordering::SeqCst) < (n * (n - 1)) {} + log::info!("Connection took {}s", now.elapsed().as_secs_f32()); + } infos } #[derive(Default, Clone)] -pub(crate) struct Trust { - pub(crate) root_weights: Vec<(PublicKey, u32)>, - pub(crate) certificates: Vec, - pub(crate) cur_time: Duration, +pub struct Trust { + pub root_weights: Vec<(PublicKey, u32)>, + pub certificates: Vec, + pub cur_time: Duration, } -pub(crate) fn create_swarm( +pub fn create_swarm( bootstraps: Vec, listen_on: Multiaddr, trust: Option, + transport: Transport, + registry: Option<&Registry>, ) -> (PeerId, Swarm) { use libp2p::identity; @@ -266,10 +283,18 @@ pub(crate) fn create_swarm( vec![listen_on.clone()], trust_graph, bootstraps, + registry, ); - let transport = build_memory_transport(Ed25519(kp)); - - Swarm::new(transport, server, peer_id.clone()) + match transport { + Transport::Memory => { + Swarm::new(build_memory_transport(Ed25519(kp)), server, peer_id.clone()) + } + Transport::Network => Swarm::new( + build_transport(Ed25519(kp), Duration::from_secs(10)), + server, + peer_id.clone(), + ), + } }; Swarm::listen_on(&mut swarm, listen_on).unwrap(); @@ -277,7 +302,7 @@ pub(crate) fn create_swarm( (peer_id, swarm) } -fn create_maddr() -> Multiaddr { +pub fn create_memory_maddr() -> Multiaddr { use libp2p::core::multiaddr::Protocol; let port = 1 + rand::random::(); diff --git a/server/tests/utils/mod.rs b/server/tests/utils/mod.rs index b1c46fe6d8..9e4ba05d8c 100644 --- a/server/tests/utils/mod.rs +++ b/server/tests/utils/mod.rs @@ -17,5 +17,5 @@ mod connected_client; mod misc; -pub(crate) use connected_client::ConnectedClient; -pub(crate) use misc::*; +pub use connected_client::ConnectedClient; +pub use misc::*;