Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

add a DHT mode param #298

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions http/src/main.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use std::num::NonZeroU16;
use std::path::PathBuf;
use structopt::StructOpt;

use ipfs::{Ipfs, IpfsOptions, IpfsTypes, UninitializedIpfs};
use ipfs::{DhtMode, Ipfs, IpfsOptions, IpfsTypes, UninitializedIpfs};
use ipfs_http::{config, v0};

#[derive(Debug, StructOpt)]
@@ -134,8 +134,14 @@ fn main() {
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop");

rt.block_on(async move {
let opts: IpfsOptions =
IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false, None);
let opts: IpfsOptions = IpfsOptions::new(
home.clone().into(),
keypair,
Vec::new(),
false,
None,
DhtMode::Client,
);

let (ipfs, task): (Ipfs<ipfs::TestTypes>, _) = UninitializedIpfs::new(opts, None)
.await
45 changes: 28 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ use self::dag::IpldDag;
pub use self::error::Error;
use self::ipns::Ipns;
pub use self::p2p::pubsub::{PubsubMessage, SubscriptionStream};
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
use self::p2p::{create_swarm, TSwarm};
pub use self::p2p::{Connection, ConnectionTarget};
pub use self::path::IpfsPath;
pub use self::repo::RepoTypes;
@@ -75,19 +75,31 @@ impl RepoTypes for TestTypes {
type TDataStore = repo::mem::MemDataStore;
}

#[derive(Clone, Copy, Debug, PartialEq)]
/// The way the IPFS node operates within the Kademlia DHT.
pub enum DhtMode {
/// As a client, able to discover peers and content.
Client,
/// As a server, able to both discover peers and data
/// and also provide them to the network's DHT.
Server,
}

/// Ipfs options
#[derive(Clone)]
pub struct IpfsOptions {
/// The path of the ipfs repo.
pub ipfs_path: PathBuf,
/// The keypair used with libp2p.
pub keypair: Keypair,
pub keypair: DebuggableKeypair<Keypair>,
/// Nodes dialed during startup.
pub bootstrap: Vec<(Multiaddr, PeerId)>,
/// Enables mdns for peer discovery when true.
pub mdns: bool,
/// Custom Kademlia protocol name.
pub kad_protocol: Option<String>,
/// DHT mode.
pub dht_mode: DhtMode,
}

impl fmt::Debug for IpfsOptions {
@@ -97,7 +109,7 @@ impl fmt::Debug for IpfsOptions {
fmt.debug_struct("IpfsOptions")
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.field("keypair", &self.keypair)
.field("mdns", &self.mdns)
.field("kad_protocol", &self.kad_protocol)
.finish()
@@ -109,18 +121,19 @@ impl IpfsOptions {
pub fn inmemory_with_generated_keys() -> Self {
Self {
ipfs_path: std::env::temp_dir().into(),
keypair: Keypair::generate_ed25519(),
keypair: DebuggableKeypair(Keypair::generate_ed25519()),
mdns: Default::default(),
bootstrap: Default::default(),
kad_protocol: Default::default(),
dht_mode: DhtMode::Client,
}
}
}

/// Workaround for libp2p::identity::Keypair missing a Debug impl, works with references and owned
/// keypairs.
#[derive(Clone)]
struct DebuggableKeypair<I: Borrow<Keypair>>(I);
pub struct DebuggableKeypair<I: Borrow<Keypair>>(I);

impl<I: Borrow<Keypair>> fmt::Debug for DebuggableKeypair<I> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -147,13 +160,15 @@ impl IpfsOptions {
bootstrap: Vec<(Multiaddr, PeerId)>,
mdns: bool,
kad_protocol: Option<String>,
dht_mode: DhtMode,
) -> Self {
Self {
ipfs_path,
keypair,
keypair: DebuggableKeypair(keypair),
bootstrap,
mdns,
kad_protocol,
dht_mode,
}
}
}
@@ -194,7 +209,7 @@ impl Default for IpfsOptions {
.join("rust-ipfs")
.join("config.json");
let config = ConfigFile::new(config_path).unwrap();
let keypair = config.secio_key_pair();
let keypair = DebuggableKeypair(config.secio_key_pair());
let bootstrap = config.bootstrap();

IpfsOptions {
@@ -203,6 +218,7 @@ impl Default for IpfsOptions {
bootstrap,
mdns: true,
kad_protocol: None,
dht_mode: DhtMode::Client,
}
}
}
@@ -221,8 +237,8 @@ impl<Types: IpfsTypes> Clone for Ipfs<Types> {
#[derive(Debug)]
pub struct IpfsInner<Types: IpfsTypes> {
pub span: Span,
options: IpfsOptions,
repo: Repo<Types>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
}

@@ -267,7 +283,6 @@ enum IpfsEvent {
pub struct UninitializedIpfs<Types: IpfsTypes> {
repo: Repo<Types>,
span: Span,
keys: Keypair,
options: IpfsOptions,
repo_events: Receiver<RepoEvent>,
}
@@ -282,13 +297,11 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
pub async fn new(options: IpfsOptions, span: Option<Span>) -> Self {
let repo_options = RepoOptions::from(&options);
let (repo, repo_events) = create_repo(repo_options);
let keys = options.keypair.clone();
let span = span.unwrap_or_else(|| tracing::trace_span!("ipfs"));

UninitializedIpfs {
repo,
span,
keys,
options,
repo_events,
}
@@ -306,24 +319,22 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
let UninitializedIpfs {
repo,
span,
keys,
repo_events,
..
options,
} = self;

repo.init().await?;

let (to_task, receiver) = channel::<IpfsEvent>(1);

let ipfs = Ipfs(Arc::new(IpfsInner {
options: options.clone(),
span,
repo,
keys: DebuggableKeypair(keys),
to_task,
}));

let swarm_options = SwarmOptions::from(&self.options);
let swarm = create_swarm(swarm_options, ipfs.clone()).await;
let swarm = create_swarm(ipfs.clone()).await;

let fut = IpfsFuture {
repo_events: repo_events.fuse(),
@@ -537,7 +548,7 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.send(IpfsEvent::GetAddresses(tx))
.await?;
let addresses = rx.await?;
Ok((self.keys.get_ref().public(), addresses))
Ok((self.options.keypair.get_ref().public(), addresses))
})
.await
}
25 changes: 12 additions & 13 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::pubsub::Pubsub;
use super::swarm::{Connection, ConnectionTarget, Disconnector, SwarmApi};
use crate::p2p::SwarmOptions;
use crate::repo::BlockPut;
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use crate::{Ipfs, IpfsTypes};
@@ -336,27 +335,28 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour

impl<Types: IpfsTypes> Behaviour<Types> {
/// Create a Kademlia behaviour with the IPFS bootstrap nodes.
pub async fn new(options: SwarmOptions, ipfs: Ipfs<Types>) -> Self {
info!("net: starting with peer id {}", options.peer_id);
pub async fn new(ipfs: Ipfs<Types>) -> Self {
let peer_id = ipfs.options.keypair.get_ref().public().into_peer_id();
info!("net: starting with peer id {}", peer_id);

let mdns = if options.mdns {
let mdns = if ipfs.options.mdns {
Some(Mdns::new().expect("Failed to create mDNS service"))
} else {
None
}
.into();

let store = MemoryStore::new(options.peer_id.to_owned());
let store = MemoryStore::new(peer_id.to_owned());

let mut kad_config = KademliaConfig::default();
kad_config.disjoint_query_paths(true);
kad_config.set_query_timeout(std::time::Duration::from_secs(300));
if let Some(protocol) = options.kad_protocol {
kad_config.set_protocol_name(protocol.into_bytes());
if let Some(ref protocol) = ipfs.options.kad_protocol {
kad_config.set_protocol_name(protocol.clone().into_bytes());
}
let mut kademlia = Kademlia::with_config(options.peer_id.to_owned(), store, kad_config);
let mut kademlia = Kademlia::with_config(peer_id.to_owned(), store, kad_config);

for (addr, peer_id) in &options.bootstrap {
for (addr, peer_id) in &ipfs.options.bootstrap {
kademlia.add_address(peer_id, addr.to_owned());
}

@@ -365,9 +365,9 @@ impl<Types: IpfsTypes> Behaviour<Types> {
let identify = Identify::new(
"/ipfs/0.1.0".into(),
"rust-ipfs".into(),
options.keypair.public(),
ipfs.options.keypair.get_ref().public(),
);
let pubsub = Pubsub::new(options.peer_id);
let pubsub = Pubsub::new(peer_id);
let swarm = SwarmApi::default();

Behaviour {
@@ -480,8 +480,7 @@ impl<Types: IpfsTypes> Behaviour<Types> {

/// Create a IPFS behaviour with the IPFS bootstrap nodes.
pub async fn build_behaviour<TIpfsTypes: IpfsTypes>(
options: SwarmOptions,
ipfs: Ipfs<TIpfsTypes>,
) -> Behaviour<TIpfsTypes> {
Behaviour::new(options, ipfs).await
Behaviour::new(ipfs).await
}
41 changes: 5 additions & 36 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! P2P handling for IPFS nodes.
use crate::{Ipfs, IpfsOptions, IpfsTypes};
use libp2p::identity::Keypair;
use crate::{Ipfs, IpfsTypes};
use libp2p::Swarm;
use libp2p::{Multiaddr, PeerId};
use tracing::Span;

mod behaviour;
@@ -14,46 +12,17 @@ pub use swarm::{Connection, ConnectionTarget};

pub type TSwarm<T> = Swarm<behaviour::Behaviour<T>>;

pub struct SwarmOptions {
pub keypair: Keypair,
pub peer_id: PeerId,
pub bootstrap: Vec<(Multiaddr, PeerId)>,
pub mdns: bool,
pub kad_protocol: Option<String>,
}

impl From<&IpfsOptions> for SwarmOptions {
fn from(options: &IpfsOptions) -> Self {
let keypair = options.keypair.clone();
let peer_id = keypair.public().into_peer_id();
let bootstrap = options.bootstrap.clone();
let mdns = options.mdns;
let kad_protocol = options.kad_protocol.clone();

SwarmOptions {
keypair,
peer_id,
bootstrap,
mdns,
kad_protocol,
}
}
}

/// Creates a new IPFS swarm.
pub async fn create_swarm<TIpfsTypes: IpfsTypes>(
options: SwarmOptions,
ipfs: Ipfs<TIpfsTypes>,
) -> TSwarm<TIpfsTypes> {
let peer_id = options.peer_id.clone();
pub async fn create_swarm<TIpfsTypes: IpfsTypes>(ipfs: Ipfs<TIpfsTypes>) -> TSwarm<TIpfsTypes> {
let peer_id = ipfs.options.keypair.get_ref().public().into_peer_id();

// Set up an encrypted TCP transport over the Mplex protocol.
let transport = transport::build_transport(options.keypair.clone());
let transport = transport::build_transport(ipfs.options.keypair.get_ref().clone());

let swarm_span = ipfs.0.span.clone();

// Create a Kademlia behaviour
let behaviour = behaviour::build_behaviour(options, ipfs).await;
let behaviour = behaviour::build_behaviour(ipfs).await;

// Create a Swarm
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)