Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.

feat: add types and update all deps #214

Merged
merged 2 commits into from
Feb 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module.exports = {
bundlesize: { maxSize: '222kB' }
bundlesize: { maxSize: '170kB' }
}


12 changes: 11 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
language: node_js
cache: npm

branches:
only:
- master
- /^release\/.*$/

stages:
- check
- test
- cov

node_js:
- '10'
- 'lts/*'
- 'node'

os:
- linux
- osx

before_install:
# modules with pre-built binaries may not have deployed versions for bleeding-edge node so this lets us fall back to building from source
- npm install -g node-pre-gyp

script: npx nyc -s npm run test:node -- --bail
after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov

28 changes: 16 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
"lint": "aegir lint",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"build": "aegir build",
"prepare": "aegir build --no-bundle",
"docs": "aegir docs",
"release": "aegir release --docs -t node",
"release-minor": "aegir release --type minor --docs -t node",
@@ -39,25 +39,28 @@
"node": ">=12.0.0",
"npm": ">=6.0.0"
},
"eslintConfig": {
"extends": "ipfs"
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"types": "dist/src/index.d.ts",
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"cids": "^1.1.5",
"debug": "^4.3.1",
"err-code": "^2.0.3",
"err-code": "^3.0.0",
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "^3.0.3",
"interface-datastore": "^3.0.4",
"it-first": "^1.0.4",
"it-length-prefixed": "^3.1.0",
"it-pipe": "^1.1.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "^0.19.0",
"libp2p-interfaces": "^0.8.2",
"libp2p-record": "^0.9.0",
"libp2p-record": "^0.10.0",
"multiaddr": "^8.1.2",
"multihashing-async": "^2.0.1",
"multihashing-async": "^2.1.0",
"p-filter": "^2.1.0",
"p-map": "^4.0.0",
"p-queue": "^6.6.2",
@@ -68,19 +71,20 @@
"protons": "^2.0.0",
"streaming-iterables": "^5.0.4",
"uint8arrays": "^2.0.5",
"varint": "^5.0.0",
"varint": "^6.0.0",
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^25.0.0",
"@types/debug": "^4.1.5",
"aegir": "^30.3.0",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"datastore-level": "^2.0.0",
"delay": "^4.3.0",
"datastore-level": "^4.0.0",
"delay": "^5.0.0",
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"libp2p": "^0.28.5",
"libp2p": "^0.30.7",
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
117 changes: 69 additions & 48 deletions src/content-fetching/index.js
Original file line number Diff line number Diff line change
@@ -3,16 +3,26 @@
const errcode = require('err-code')
const pTimeout = require('p-timeout')
const uint8ArrayEquals = require('uint8arrays/equals')
const uint8ArrayToString = require('uint8arrays/to-string')
const libp2pRecord = require('libp2p-record')

const c = require('../constants')
const Query = require('../query')

const utils = require('../utils')

const Record = libp2pRecord.Record

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../query').DHTQueryResult} DHTQueryResult
*/

/**
* @param {import('../')} dht
*/
module.exports = (dht) => {
/**
* @param {Uint8Array} key
* @param {Uint8Array} rec
*/
const putLocal = async (key, rec) => { // eslint-disable-line require-await
return dht.datastore.put(utils.bufferToKey(key), rec)
}
@@ -22,30 +32,26 @@ module.exports = (dht) => {
* the local datastore.
*
* @param {Uint8Array} key
* @returns {Promise<Record>}
*
* @private
*/
const getLocal = async (key) => {
dht._log('getLocal %b', key)
dht._log(`getLocal ${uint8ArrayToString(key, 'base32')}`)

const raw = await dht.datastore.get(utils.bufferToKey(key))
dht._log('found %b in local datastore', key)
dht._log(`found ${uint8ArrayToString(key, 'base32')} in local datastore`)

const rec = Record.deserialize(raw)

await dht._verifyRecordLocally(rec)

return rec
}

/**
* Send the best record found to any peers that have an out of date record.
*
* @param {Uint8Array} key
* @param {Array<Object>} vals - values retrieved from the DHT
* @param {Object} best - the best record that was found
* @returns {Promise}
*
* @private
* @param {import('../query').DHTQueryValue[]} vals - values retrieved from the DHT
* @param {Uint8Array} best - the best record that was found
*/
const sendCorrectionRecord = async (key, vals, best) => {
const fixupRec = await utils.createPutRecord(key, best)
@@ -78,10 +84,9 @@ module.exports = (dht) => {
return {
/**
* Store the given key/value pair locally, in the datastore.
*
* @param {Uint8Array} key
* @param {Uint8Array} rec - encoded record
* @returns {Promise<void>}
* @private
*/
async _putLocal (key, rec) { // eslint-disable-line require-await
return putLocal(key, rec)
@@ -92,9 +97,8 @@ module.exports = (dht) => {
*
* @param {Uint8Array} key
* @param {Uint8Array} value
* @param {Object} [options] - put options
* @param {object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
* @returns {Promise<void>}
*/
async put (key, value, options = {}) {
dht._log('PutValue %b', key)
@@ -134,9 +138,8 @@ module.exports = (dht) => {
* Times out after 1 minute by default.
*
* @param {Uint8Array} key
* @param {Object} [options] - get options
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Uint8Array>}
*/
async get (key, options = {}) {
options.timeout = options.timeout || c.minute
@@ -173,16 +176,15 @@ module.exports = (dht) => {
*
* @param {Uint8Array} key
* @param {number} nvals
* @param {Object} [options] - get options
* @param {object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Uint8Array}>>}
*/
async getMany (key, nvals, options = {}) {
options.timeout = options.timeout || c.minute

dht._log('getMany %b (%s)', key, nvals)

let vals = []
const vals = []
let localRec

try {
@@ -204,9 +206,8 @@ module.exports = (dht) => {
return vals
}

const paths = []
const id = await utils.convertBuffer(key)
const rtp = dht.routingTable.closestPeers(id, this.kBucketSize)
const rtp = dht.routingTable.closestPeers(id, dht.kBucketSize)

dht._log('peers in rt: %d', rtp.length)

@@ -220,15 +221,23 @@ module.exports = (dht) => {
return vals
}

// we have peers, lets do the actual query to them
const query = new Query(dht, key, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - vals.length, numPaths)
const pathVals = []
paths.push(pathVals)
const valsLength = vals.length

// Here we return the query function to use on this particular disjoint path
return async (peer) => {
/**
* @param {number} pathIndex
* @param {number} numPaths
*/
function createQuery (pathIndex, numPaths) {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - valsLength, numPaths)
let queryResults = 0

/**
* Here we return the query function to use on this particular disjoint path
*
* @param {PeerId} peer
*/
async function disjointPathQuery (peer) {
let rec, peers, lookupErr
try {
const results = await dht._getValueOrPeers(peer, key)
@@ -242,37 +251,49 @@ module.exports = (dht) => {
lookupErr = err
}

const res = { closerPeers: peers }
/** @type {import('../query').QueryResult} */
const res = {
closerPeers: peers
}

if (rec && rec.value) {
vals.push({
val: rec.value,
from: peer
})

if ((rec && rec.value) || lookupErr) {
pathVals.push({
val: rec && rec.value,
queryResults++
} else if (lookupErr) {
vals.push({
err: lookupErr,
from: peer
})

queryResults++
}

// enough is enough
if (pathVals.length >= pathSize) {
if (queryResults >= pathSize) {
res.pathComplete = true
}

return res
}
})

let error
try {
await pTimeout(query.run(rtp), options.timeout)
} catch (err) {
error = err
return disjointPathQuery
}
query.stop()

// combine vals from each path
vals = [].concat.apply(vals, paths).slice(0, nvals)
// we have peers, lets send the actual query to them
const query = new Query(dht, key, createQuery)

if (error && vals.length === 0) {
throw error
try {
await pTimeout(query.run(rtp), options.timeout)
} catch (err) {
if (vals.length === 0) {
throw err
}
} finally {
query.stop()
}

return vals
93 changes: 67 additions & 26 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
@@ -9,13 +9,21 @@ const Message = require('../message')
const Query = require('../query')
const utils = require('../utils')

/**
* @typedef {import('cids')} CID
* @typedef {import('peer-id')} PeerId
* @typedef {import('multiaddr')} Multiaddr
*/

/**
* @param {import('../')} dht
*/
module.exports = (dht) => {
/**
* Check for providers from a single node.
*
* @param {PeerId} peer
* @param {CID} key
* @returns {Promise<Message>}
*
* @private
*/
@@ -26,13 +34,14 @@ module.exports = (dht) => {

return {
/**
* Announce to the network that we can provide given key's value.
* Announce to the network that we can provide the value for a given key
*
* @param {CID} key
* @returns {Promise<void>}
*/
async provide (key) {
dht._log('provide: %s', key.toBaseEncodedString())
dht._log(`provide: ${key}`)

/** @type {Error[]} */
const errors = []

// Add peer as provider
@@ -45,49 +54,65 @@ module.exports = (dht) => {
multiaddrs
}]

// Notify closest peers
await utils.mapParallel(dht.getClosestPeers(key.bytes), async (peer) => {
dht._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
/**
* @param {PeerId} peer
*/
async function mapPeer (peer) {
dht._log(`putProvider ${key} to ${peer.toB58String()}`)
try {
await dht.network.sendMessage(peer, msg)
} catch (err) {
errors.push(err)
}
})
}

// Notify closest peers
await utils.mapParallel(dht.getClosestPeers(key.bytes), mapPeer)

if (errors.length) {
// TODO:
// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED'), { errors })
throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`), 'ERR_SOME_PROVIDES_FAILED', { errors })
}
},

/**
* Search the dht for up to `K` providers of the given CID.
*
* @param {CID} key
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @param {Object} [options] - findProviders options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds
* @param {number} [options.maxNumProviders=5] - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = {}) {
async * findProviders (key, options = { timeout: 60000, maxNumProviders: 5 }) {
const providerTimeout = options.timeout || c.minute
const n = options.maxNumProviders || c.K

dht._log('findProviders %s', key.toBaseEncodedString())
dht._log(`findProviders ${key}`)

const out = new LimitedPeerList(n)
const provs = await dht.providers.getProviders(key)

provs.forEach((id) => {
const peerData = dht.peerStore.get(id) || {}
out.push({
id: peerData.id || id,
multiaddrs: (peerData.addresses || []).map((address) => address.multiaddr)
provs
.forEach(id => {
const peerData = dht.peerStore.get(id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const peerData = dht.peerStore.get(id)
const peer = dht.peerStore.get(id)

We initially were going with the peerData naming for peerStore records, but we decided after the DHT refactor to just call peer and make it consistent here later on.


if (peerData) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (peerData) {
if (peer) {

out.push({
id: peerData.id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
id: peerData.id,
id: peer.id,

multiaddrs: peerData.addresses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
multiaddrs: peerData.addresses
multiaddrs: peer.addresses

.map((address) => address.multiaddr)
})
} else {
out.push({
id,
multiaddrs: []
})
}
})
})

// All done
if (out.length >= n) {
@@ -99,21 +124,34 @@ module.exports = (dht) => {
}

// need more, query the network
/** @type {LimitedPeerList[]} */
const paths = []
const query = new Query(dht, key.bytes, (pathIndex, numPaths) => {

/**
*
* @param {number} pathIndex
* @param {number} numPaths
*/
function makePath (pathIndex, numPaths) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we refactor the query logic, we should get rid of this makePath functions within the main API. Perhaps creating a set of ready to go types of query and then each function uses what it is looking for

// This function body runs once per disjoint path
const pathSize = utils.pathSize(n - out.length, numPaths)
const pathProviders = new LimitedPeerList(pathSize)
paths.push(pathProviders)

// Here we return the query function to use on this particular disjoint path
return async (peer) => {
/**
* The query function to use on this particular disjoint path
*
* @param {PeerId} peer
*/
async function queryDisjointPath (peer) {
const msg = await findProvidersSingle(peer, key)
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerId.toB58String(), provs.length)
dht._log(`Found ${provs.length} provider entries for ${key}`)

provs.forEach((prov) => {
pathProviders.push({ id: prov.id })
pathProviders.push({
...prov
})
})

// hooray we have all that we want
@@ -124,8 +162,11 @@ module.exports = (dht) => {
// it looks like we want some more
return { closerPeers: msg.closerPeers }
}
})

return queryDisjointPath
}

const query = new Query(dht, key.bytes, makePath)
const peers = dht.routingTable.closestPeers(key.bytes, dht.kBucketSize)

try {
180 changes: 98 additions & 82 deletions src/index.js

Large diffs are not rendered by default.

37 changes: 32 additions & 5 deletions src/message/index.js
Original file line number Diff line number Diff line change
@@ -2,19 +2,31 @@

const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
// @ts-ignore
const protons = require('protons')
const { Record } = require('libp2p-record')
const pbm = protons(require('./dht.proto'))

const MESSAGE_TYPE = pbm.Message.MessageType
const CONNECTION_TYPE = pbm.Message.ConnectionType

/**
* @typedef {0|1|2|3|4} ConnectionType
*
* @typedef {object} PBPeer
* @property {Uint8Array} id
* @property {Uint8Array[]} addrs
* @property {ConnectionType} connection
*
* @typedef {import('../index').PeerData} PeerData
*/

/**
* Represents a single DHT control message.
*/
class Message {
/**
* @param {MessageType} type
* @param {MESSAGE_TYPE} type
* @param {Uint8Array} key
* @param {number} level
*/
@@ -26,8 +38,12 @@ class Message {
this.type = type
this.key = key
this._clusterLevelRaw = level

/** @type {PeerData[]} */
this.closerPeers = []
/** @type {PeerData[]} */
this.providerPeers = []
/** @type {import('libp2p-record').Record | null} */
this.record = null
}

@@ -49,15 +65,17 @@ class Message {

/**
* Encode into protobuf
* @returns {Uint8Array}
*/
serialize () {
const obj = {
key: this.key,
type: this.type,
clusterLevelRaw: this._clusterLevelRaw,
closerPeers: this.closerPeers.map(toPbPeer),
providerPeers: this.providerPeers.map(toPbPeer)
providerPeers: this.providerPeers.map(toPbPeer),

/** @type {Uint8Array | undefined} */
record: undefined
}

if (this.record) {
@@ -75,7 +93,6 @@ class Message {
* Decode from protobuf
*
* @param {Uint8Array} raw
* @returns {Message}
*/
static deserialize (raw) {
const dec = pbm.Message.decode(raw)
@@ -84,6 +101,7 @@ class Message {

msg.closerPeers = dec.closerPeers.map(fromPbPeer)
msg.providerPeers = dec.providerPeers.map(fromPbPeer)

if (dec.record) {
msg.record = Record.deserialize(dec.record)
}
@@ -95,14 +113,23 @@ class Message {
Message.TYPES = MESSAGE_TYPE
Message.CONNECTION_TYPES = CONNECTION_TYPE

/**
* @param {PeerData} peer
*/
function toPbPeer (peer) {
return {
/** @type {PBPeer} */
const output = {
id: peer.id.id,
addrs: (peer.multiaddrs || []).map((m) => m.bytes),
connection: CONNECTION_TYPE.CONNECTED
}

return output
}

/**
* @param {PBPeer} peer
*/
function fromPbPeer (peer) {
return {
id: new PeerId(peer.id),
59 changes: 34 additions & 25 deletions src/network.js
Original file line number Diff line number Diff line change
@@ -2,10 +2,11 @@

const errcode = require('err-code')

const pipe = require('it-pipe')
const { pipe } = require('it-pipe')
const lp = require('it-length-prefixed')
const pTimeout = require('p-timeout')
const { consume } = require('streaming-iterables')
const first = require('it-first')

const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

@@ -14,17 +15,22 @@ const c = require('./constants')
const Message = require('./message')
const utils = require('./utils')

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
*/

/**
* Handle network operations for the dht
*/
class Network {
/**
* Create a new network.
* Create a new network
*
* @param {KadDHT} self
* @param {import('./index')} dht
*/
constructor (self) {
this.dht = self
constructor (dht) {
this.dht = dht
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
this._log = utils.logger(this.dht.peerId, 'net')
this._rpc = rpc(this.dht)
@@ -33,8 +39,7 @@ class Network {
}

/**
* Start the network.
* @returns {Promise<void>}
* Start the network
*/
async start () {
if (this._running) {
@@ -65,8 +70,7 @@ class Network {
}

/**
* Stop all network activity.
* @returns {Promise<void>}
* Stop all network activity
*/
async stop () {
if (!this.dht.isStarted && !this.isStarted) {
@@ -75,13 +79,15 @@ class Network {
this._running = false

// unregister protocol and handlers
await this.dht.registrar.unregister(this._registrarId)
if (this._registrarId) {
await this.dht.registrar.unregister(this._registrarId)
}
}

/**
* Is the network online?
*
* @type {bool}
* @type {boolean}
*/
get isStarted () {
return this._running
@@ -90,7 +96,7 @@ class Network {
/**
* Are all network components there?
*
* @type {bool}
* @type {boolean}
*/
get isConnected () {
// TODO add a way to check if switch has started or not
@@ -99,9 +105,8 @@ class Network {

/**
* Registrar notifies a connection successfully with dht protocol.
* @private
* @param {PeerId} peerId remote peer id
* @returns {Promise<void>}
*
* @param {PeerId} peerId - remote peer id
*/
async _onPeerConnected (peerId) {
await this.dht._add(peerId)
@@ -110,10 +115,10 @@ class Network {

/**
* Send a request and record RTT for latency measurements.
*
* @async
* @param {PeerId} to - The peer that should receive a message
* @param {Message} msg - The message to send.
* @returns {Promise<Message>}
*/
async sendRequest (to, msg) {
// TODO: record latency
@@ -139,7 +144,6 @@ class Network {
*
* @param {PeerId} to
* @param {Message} msg
* @returns {Promise<void>}
*/
async sendMessage (to, msg) {
if (!this.isConnected) {
@@ -163,10 +167,8 @@ class Network {
* If no response is received after the specified timeout
* this will error out.
*
* @param {DuplexIterable} stream - the stream to use
* @param {MuxedStream} stream - the stream to use
* @param {Uint8Array} msg - the message to send
* @returns {Promise<Message>}
* @private
*/
async _writeReadMessage (stream, msg) { // eslint-disable-line require-await
return pTimeout(
@@ -178,10 +180,8 @@ class Network {
/**
* Write a message to the given stream.
*
* @param {DuplexIterable} stream - the stream to use
* @param {MuxedStream} stream - the stream to use
* @param {Uint8Array} msg - the message to send
* @returns {Promise<void>}
* @private
*/
_writeMessage (stream, msg) {
return pipe(
@@ -193,15 +193,24 @@ class Network {
}
}

/**
* @param {MuxedStream} stream
* @param {Uint8Array} msg
*/
async function writeReadMessage (stream, msg) {
const res = await pipe(
[msg],
lp.encode(),
stream,
lp.decode(),
/**
* @param {AsyncIterable<Uint8Array>} source
*/
async source => {
for await (const chunk of source) {
return chunk.slice()
const buf = await first(source)

if (buf) {
return buf.slice()
}
}
)
18 changes: 9 additions & 9 deletions src/peer-list/index.js
Original file line number Diff line number Diff line change
@@ -1,60 +1,60 @@
'use strict'

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../').PeerData} PeerData
*/

/**
* A list of unique peers.
*/
class PeerList {
constructor () {
/** @type {PeerData[]} */
this.list = []
}

/**
* Add a new peer. Returns `true` if it was a new one
*
* @param {PeerData} peerData
* @returns {bool}
*/
push (peerData) {
if (!this.has(peerData.id)) {
this.list.push(peerData)

return true
}

return false
}

/**
* Check if this PeerData is already in here.
*
* @param {PeerId} peerId
* @returns {bool}
*/
has (peerId) {
const match = this.list.find((i) => i.id.isEqual(peerId))
const match = this.list.find((i) => i.id.equals(peerId))
return Boolean(match)
}

/**
* Get the list as an array.
*
* @returns {Array<PeerData>}
*/
toArray () {
return this.list.slice()
}

/**
* Remove the last element
*
* @returns {PeerData}
*/
pop () {
return this.list.pop()
}

/**
* The length of the list
*
* @type {number}
*/
get length () {
return this.list.length
6 changes: 5 additions & 1 deletion src/peer-list/limited-peer-list.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,10 @@

const PeerList = require('.')

/**
* @typedef {import('../').PeerData} PeerData
*/

/**
* Like PeerList but with a length restriction.
*/
@@ -20,12 +24,12 @@ class LimitedPeerList extends PeerList {
* Add a PeerData if it fits in the list
*
* @param {PeerData} peerData
* @returns {bool}
*/
push (peerData) {
if (this.length < this.limit) {
return super.push(peerData)
}

return false
}
}
16 changes: 12 additions & 4 deletions src/peer-list/peer-distance-list.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
'use strict'

// @ts-ignore
const distance = require('xor-distance')
const utils = require('../utils')
const pMap = require('p-map')
const uint8ArrayEquals = require('uint8arrays/equals')

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../').PeerData} PeerData
*/

/**
* Maintains a list of peerIds sorted by distance from a DHT key.
*/
@@ -18,6 +24,8 @@ class PeerDistanceList {
constructor (originDhtKey, capacity) {
this.originDhtKey = originDhtKey
this.capacity = capacity

/** @type {{ peerId: PeerId, distance: Uint8Array }[]} */
this.peerDistances = []
}

@@ -39,7 +47,6 @@ class PeerDistanceList {
* Add a peerId to the list.
*
* @param {PeerId} peerId
* @returns {Promise<void>}
*/
async add (peerId) {
if (this.peerDistances.find(pd => uint8ArrayEquals(pd.peerId.id, peerId.id))) {
@@ -61,8 +68,7 @@ class PeerDistanceList {
* Indicates whether any of the peerIds passed as a parameter are closer
* to the origin key than the furthest peerId in the PeerDistanceList.
*
* @param {Array<PeerId>} peerIds
* @returns {Boolean}
* @param {PeerId[]} peerIds
*/
async anyCloser (peerIds) {
if (!peerIds.length) {
@@ -74,14 +80,16 @@ class PeerDistanceList {
}

const dhtKeys = await pMap(peerIds, (peerId) => utils.convertPeerId(peerId))

const furthestDistance = this.peerDistances[this.peerDistances.length - 1].distance

for (const dhtKey of dhtKeys) {
const keyDistance = distance(this.originDhtKey, dhtKey)

if (distance.compare(keyDistance, furthestDistance) < 0) {
return true
}
}

return false
}
}
7 changes: 6 additions & 1 deletion src/peer-list/peer-queue.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
'use strict'

// @ts-ignore
const Heap = require('heap')
// @ts-ignore
const distance = require('xor-distance')
const debug = require('debug')

const utils = require('../utils')

const log = debug('libp2p:dht:peer-queue')

/**
* @typedef {import('peer-id')} PeerId
*/

/**
* PeerQueue is a heap that sorts its entries (PeerIds) by their
* xor distance to the inital provided key.
@@ -52,7 +58,6 @@ class PeerQueue {
* Add a new PeerId to the queue.
*
* @param {PeerId} id
* @returns {Promise}
*/
async enqueue (id) {
log('enqueue %s', id.toB58String())
77 changes: 49 additions & 28 deletions src/peer-routing/index.js
Original file line number Diff line number Diff line change
@@ -5,22 +5,30 @@ const pTimeout = require('p-timeout')

const PeerId = require('peer-id')
const crypto = require('libp2p-crypto')
const uint8ArrayToString = require('uint8arrays/to-string')

const c = require('../constants')
const Message = require('../message')
const Query = require('../query')

const utils = require('../utils')

/**
* @typedef {import('multiaddr')} Multiaddr
*/

/**
* @param {import('../index')} dht
*/
module.exports = (dht) => {
/**
* Look if we are connected to a peer with the given id.
* Returns its id and addresses, if found, otherwise `undefined`.
*
* @param {PeerId} peer
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
const findPeerLocal = async (peer) => {
dht._log('findPeerLocal %s', peer.toB58String())
dht._log(`findPeerLocal ${peer.toB58String()}`)
const p = await dht.routingTable.find(peer)

const peerData = p && dht.peerStore.get(p)
@@ -35,6 +43,7 @@ module.exports = (dht) => {

/**
* Get a value via rpc call for the given parameters.
*
* @param {PeerId} peer
* @param {Uint8Array} key
* @returns {Promise<Message>}
@@ -47,14 +56,15 @@ module.exports = (dht) => {

/**
* Find close peers for a given peer
*
* @param {Uint8Array} key
* @param {PeerId} peer
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>}
* @private
*/

const closerPeersSingle = async (key, peer) => {
dht._log('closerPeersSingle %b from %s', key, peer.toB58String())
dht._log(`closerPeersSingle ${uint8ArrayToString(key, 'base32')} from ${peer.toB58String()}`)
const msg = await dht.peerRouting._findPeerSingle(peer, new PeerId(key))

return msg.closerPeers
@@ -68,23 +78,22 @@ module.exports = (dht) => {

/**
* Get the public key directly from a node.
*
* @param {PeerId} peer
* @returns {Promise<PublicKey>}
* @private
*/
const getPublicKeyFromNode = async (peer) => {
const pkKey = utils.keyForPublicKey(peer)
const msg = await getValueSingle(peer, pkKey)

if (!msg.record || !msg.record.value) {
throw errcode(`Node not responding with its public key: ${peer.toB58String()}`, 'ERR_INVALID_RECORD')
throw errcode(new Error(`Node not responding with its public key: ${peer.toB58String()}`), 'ERR_INVALID_RECORD')
}

const recPeer = PeerId.createFromPubKey(msg.record.value)
const recPeer = await PeerId.createFromPubKey(msg.record.value)

// compare hashes of the pub key
if (!recPeer.isEqual(peer)) {
throw errcode('public key does not match id', 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID')
if (!recPeer.equals(peer)) {
throw errcode(new Error('public key does not match id'), 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID')
}

return recPeer.pubKey
@@ -93,6 +102,7 @@ module.exports = (dht) => {
return {
/**
* Ask peer `peer` if they know where the peer with id `target` is.
*
* @param {PeerId} peer
* @param {PeerId} target
* @returns {Promise<Message>}
@@ -107,12 +117,13 @@ module.exports = (dht) => {

/**
* Search for a peer with the given ID.
*
* @param {PeerId} id
* @param {Object} options - findPeer options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {Object} [options] - findPeer options
* @param {number} [options.timeout=60000] - how long the query should maximally run, in milliseconds
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options = {}) {
async findPeer (id, options = { timeout: 60000 }) {
options.timeout = options.timeout || c.minute
dht._log('findPeer %s', id.toB58String())

@@ -148,10 +159,13 @@ module.exports = (dht) => {

// query the network
const query = new Query(dht, id.id, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in dht scope.
// Just return the actual query function.
return async (peer) => {
/**
* There is no distinction between the disjoint paths, so there are no per-path
* variables in dht scope. Just return the actual query function.
*
* @param {PeerId} peer
*/
const queryFn = async (peer) => {
const msg = await this._findPeerSingle(peer, id)
const match = msg.closerPeers.find((p) => p.id.isEqual(id))

@@ -167,20 +181,20 @@ module.exports = (dht) => {
closerPeers: msg.closerPeers
}
}

return queryFn
})

let error, result
let result
try {
result = await pTimeout(query.run(peers), options.timeout)
} catch (err) {
error = err
} finally {
query.stop()
}
query.stop()
if (error) throw error

let success = false
result.paths.forEach((result) => {
if (result.success) {
if (result.success && result.peer) {
success = true
dht.peerStore.addressBook.add(result.peer.id, result.peer.multiaddrs)
}
@@ -193,6 +207,10 @@ module.exports = (dht) => {

const peerData = dht.peerStore.get(id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const peerData = dht.peerStore.get(id)
const peer = dht.peerStore.get(id)


if (!peerData) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!peerData) {
if (!peer) {

throw errcode(new Error('No peer found in peer store'), 'ERR_NOT_FOUND')
}

return {
id: peerData.id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
id: peerData.id,
id: peer.id,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other occurrences in this file, I will not flag all of them

multiaddrs: peerData.addresses.map((address) => address.multiaddr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
multiaddrs: peerData.addresses.map((address) => address.multiaddr)
multiaddrs: peer.addresses.map((address) => address.multiaddr)

@@ -201,9 +219,10 @@ module.exports = (dht) => {

/**
* Kademlia 'node lookup' operation.
*
* @param {Uint8Array} key
* @param {Object} [options]
* @param {boolean} [options.shallow] shallow query (default: false)
* @param {boolean} [options.shallow=false] - shallow query
* @returns {AsyncIterable<PeerId>}
*/
async * getClosestPeers (key, options = { shallow: false }) {
@@ -240,21 +259,23 @@ module.exports = (dht) => {

/**
* Get the public key for the given peer id.
*
* @param {PeerId} peer
* @returns {Promise<PubKey>}
*/
async getPublicKey (peer) {
dht._log('getPublicKey %s', peer.toB58String())

// local check
const peerData = dht.peerStore.get(peer)

if (peerData && peerData.id.pubKey) {
dht._log('getPublicKey: found local copy')
return peerData.id.pubKey
}

// try the node directly
let pk

try {
pk = await getPublicKeyFromNode(peer)
} catch (err) {
@@ -264,10 +285,10 @@ module.exports = (dht) => {
pk = crypto.keys.unmarshalPublicKey(value)
}

peerData.id = new PeerId(peer.id, null, pk)
const addrs = peerData.addresses.map((address) => address.multiaddr)
dht.peerStore.addressBook.add(peerData.id, addrs)
dht.peerStore.keyBook.set(peerData.id, pk)
const peerId = new PeerId(peer.id, undefined, pk)
const addrs = ((peerData && peerData.addresses) || []).map((address) => address.multiaddr)
dht.peerStore.addressBook.add(peerId, addrs)
dht.peerStore.keyBook.set(peerId, pk)

return pk
}
36 changes: 20 additions & 16 deletions src/providers.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
'use strict'

const cache = require('hashlru')
// @ts-ignore
const varint = require('varint')
const PeerId = require('peer-id')
const { Key } = require('interface-datastore')
const { default: Queue } = require('p-queue')
const c = require('./constants')
const utils = require('./utils')

/**
* @typedef {import('cids')} CID
* @typedef {import('interface-datastore').Datastore} Datastore
*/

/**
* This class manages known providers.
* A provider is a peer that we know to have the content for a given CID.
@@ -22,7 +28,7 @@ const utils = require('./utils')
*/
class Providers {
/**
* @param {Object} datastore
* @param {Datastore} datastore
* @param {PeerId} [self]
* @param {number} [cacheSize=256]
*/
@@ -52,14 +58,14 @@ class Providers {
*/
this.lruCacheSize = cacheSize || c.PROVIDERS_LRU_CACHE_SIZE

// @ts-ignore hashlru types are wrong
this.providers = cache(this.lruCacheSize)

this.syncQueue = new Queue({ concurrency: 1 })
}

/**
* Start the provider cleanup service
* @returns {void}
*/
start () {
this._cleaner = setInterval(
@@ -70,11 +76,12 @@ class Providers {

/**
* Release any resources.
* @returns {void}
*/
stop () {
clearInterval(this._cleaner)
this._cleaner = null
if (this._cleaner) {
clearInterval(this._cleaner)
this._cleaner = null
}
}

/**
@@ -148,7 +155,7 @@ class Providers {
* Get the currently known provider peer ids for a given CID.
*
* @param {CID} cid
* @returns {Promise<Map<String, Date>>}
* @returns {Promise<Map<string, Date>>}
*
* @private
*/
@@ -175,7 +182,7 @@ class Providers {
const provs = await this._getProvidersMap(cid)

this._log('loaded %s provs', provs.size)
const now = Date.now()
const now = new Date()
provs.set(utils.encodeBase32(provider.id), now)

const dsKey = makeProviderKey(cid)
@@ -220,10 +227,7 @@ function makeProviderKey (cid) {
* @param {Datastore} store
* @param {CID} cid
* @param {PeerId} peer
* @param {number} time
* @returns {Promise<void>}
*
* @private
* @param {Date} time
*/
async function writeProviderEntry (store, cid, peer, time) { // eslint-disable-line require-await
const dsKey = [
@@ -233,17 +237,14 @@ async function writeProviderEntry (store, cid, peer, time) { // eslint-disable-l
].join('')

const key = new Key(dsKey)
const buffer = Uint8Array.from(varint.encode(time))
const buffer = Uint8Array.from(varint.encode(time.getTime()))
return store.put(key, buffer)
}

/**
* Parse the CID and provider peer id from the key
*
* @param {DKey} key
* @returns {Object} object with peer id and cid
*
* @private
* @param {import('interface-datastore').Key} key
*/
function parseProviderKey (key) {
const parts = key.toString().split('/')
@@ -276,6 +277,9 @@ async function loadProviders (store, cid) {
return providers
}

/**
* @param {Uint8Array} buf
*/
function readTime (buf) {
return varint.decode(buf)
}
4 changes: 4 additions & 0 deletions src/query-manager.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
'use strict'

/**
* @typedef {import('./query')} Query
*/

/**
* Keeps track of all running queries.
*/
61 changes: 37 additions & 24 deletions src/query/index.js
Original file line number Diff line number Diff line change
@@ -5,34 +5,45 @@ const mh = require('multihashing-async').multihash
const utils = require('../utils')
const Run = require('./run')

/**
* @typedef {import('peer-id')} PeerId
* @typedef {{from: PeerId, val: Uint8Array}} DHTQueryValue
* @typedef {{from: PeerId, err: Error}} DHTQueryError
* @typedef {DHTQueryValue | DHTQueryError} DHTQueryResult
* @typedef {import('../').PeerData} PeerData
*
* @typedef {{ pathComplete?: boolean, queryComplete?: boolean, closerPeers?: PeerData[], peer?: PeerData, success?: boolean }} QueryResult
*/

/**
* User-supplied function to set up an individual disjoint path. Per-path
* query state should be held in this function's closure.
*
* Accepts the numeric index from zero to numPaths - 1 and returns a function
* to call on each peer in the query.
*
* @typedef {(pathIndex: number, numPaths: number) => QueryFunc } MakeQueryFunc
*/

/**
* Query function
*
* @typedef {(peer: PeerId) => Promise<QueryResult> } QueryFunc
*/

/**
* Divide peers up into disjoint paths (subqueries). Any peer can only be used once over all paths.
* Within each path, query peers from closest to farthest away.
*/
class Query {
/**
* User-supplied function to set up an individual disjoint path. Per-path
* query state should be held in this function's closure.
* @typedef {makePath} function
* @param {number} pathNum - Numeric index from zero to numPaths - 1
* @returns {queryFunc} - Function to call on each peer in the query
*/

/**
* Query function.
* @typedef {queryFunc} function
* @param {PeerId} next - Peer to query
* @param {function(Error, Object)} callback - Query result callback
*/

/**
* Create a new query. The makePath function is called once per disjoint path, so that per-path
* variables can be created in that scope. makePath then returns the actual query function (queryFunc) to
* use when on that path.
*
* @param {DHT} dht - DHT instance
* @param {import('../index')} dht - DHT instance
* @param {Uint8Array} key
* @param {makePath} makePath - Called to set up each disjoint path. Must return the query function.
* @param {MakeQueryFunc} makePath - Called to set up each disjoint path. Must return the query function.
*/
constructor (dht, key, makePath) {
this.dht = dht
@@ -49,8 +60,7 @@ class Query {
/**
* Run this query, start with the given list of peers first.
*
* @param {Array<PeerId>} peers
* @returns {Promise}
* @param {PeerId[]} peers
*/
async run (peers) { // eslint-disable-line require-await
if (!this.dht._queryManager.running) {
@@ -96,7 +106,7 @@ class Query {
* Stop the query.
*/
stop () {
this._log(`query:done in ${Date.now() - this._startTime}ms`)
this._log(`query:done in ${Date.now() - (this._startTime || 0)}ms`)

if (this._run) {
this._log(`${this._run.errors.length} of ${this._run.peersSeen.size} peers errored (${this._run.errors.length / this._run.peersSeen.size * 100}% fail rate)`)
@@ -106,11 +116,14 @@ class Query {
return
}

this._run.removeListener('start', this._onStart)
this._run.removeListener('complete', this._onComplete)

this.running = false
this._run && this._run.stop()

if (this._run) {
this._run.removeListener('start', this._onStart)
this._run.removeListener('complete', this._onComplete)
this._run.stop()
}

this.dht._queryManager.queryCompleted(this)
}
}
30 changes: 16 additions & 14 deletions src/query/path.js
Original file line number Diff line number Diff line change
@@ -9,46 +9,47 @@ const utils = require('../utils')
// This should help reduce the high end call times of queries
const QUERY_FUNC_TIMEOUT = 30e3

/**
* @typedef {import('peer-id')} PeerId
*/

/**
* Manages a single Path through the DHT.
*/
class Path {
/**
* Creates a Path.
*
* @param {Run} run
* @param {queryFunc} queryFunc
* @param {import('./run')} run
* @param {import('./index').QueryFunc} queryFunc
*/
constructor (run, queryFunc) {
this.run = run
this.queryFunc = utils.withTimeout(queryFunc, QUERY_FUNC_TIMEOUT)
if (!this.queryFunc) throw new Error('Path requires a `queryFn` to be specified')
if (typeof this.queryFunc !== 'function') throw new Error('Path expected `queryFn` to be a function. Got ' + typeof this.queryFunc)

/**
* @type {Array<PeerId>}
*/
/** @type {PeerId[]} */
this.initialPeers = []

/**
* @type {PeerQueue}
*/
/** @type {PeerQueue | null} */
this.peersToQuery = null

/** @type {import('./index').QueryResult | null} */
this.res = null
}

/**
* Add a peer to the set of peers that are used to intialize the path.
*
* @param {PeerId} peer
*/
addInitialPeer (peer) {
this.initialPeers.push(peer)
}

/**
* Execute the path.
*
* @returns {Promise}
*
* Execute the path
*/
async execute () {
// Create a queue of peers ordered by distance from the key
@@ -63,7 +64,6 @@ class Path {
* Add a peer to the peers to be queried.
*
* @param {PeerId} peer
* @returns {Promise<void>}
*/
async addPeerToQuery (peer) {
// Don't add self
@@ -77,7 +77,9 @@ class Path {
return
}

await this.peersToQuery.enqueue(peer)
if (this.peersToQuery) {
await this.peersToQuery.enqueue(peer)
}
}
}

31 changes: 21 additions & 10 deletions src/query/run.js
Original file line number Diff line number Diff line change
@@ -4,32 +4,42 @@ const PeerDistanceList = require('../peer-list/peer-distance-list')
const EventEmitter = require('events')

const Path = require('./path')
const WorkerQueue = require('./workerQueue')
const WorkerQueue = require('./worker-queue')
const utils = require('../utils')

/**
* @typedef {import('peer-id')} PeerId
*/

/**
* Manages a single run of the query.
*/
class Run extends EventEmitter {
/**
* Creates a Run.
*
* @param {Query} query
* @param {import('./index')} query
*/
constructor (query) {
super()

this.query = query

this.running = false

/** @type {WorkerQueue[]} */
this.workers = []

// The peers that have been queried (including error responses)
this.peersSeen = new Set()

// The errors received when querying peers
/** @type {Error[]} */
this.errors = []

// The closest K peers that have been queried successfully
// (this member is initialized when the worker queues start)
/** @type {PeerDistanceList | null} */
this.peersQueried = null
}

@@ -50,11 +60,10 @@ class Run extends EventEmitter {
/**
* Execute the run with the given initial set of peers.
*
* @param {Array<PeerId>} peers
* @returns {Promise}
* @param {PeerId[]} peers
*/

async execute (peers) {
/** @type {import('./path')[]} */
const paths = [] // array of states per disjoint path

// Create disjoint paths
@@ -73,7 +82,9 @@ class Run extends EventEmitter {

const res = {
// The closest K peers we were able to query successfully
finalSet: new Set(this.peersQueried.peers),
finalSet: new Set(this.peersQueried && this.peersQueried.peers),

/** @type {import('./index').QueryResult[]} */
paths: []
}

@@ -172,22 +183,22 @@ class Run extends EventEmitter {
* stop querying on that `worker`.
*
* @param {WorkerQueue} worker
* @returns {Promise<Boolean>}
* @returns {Promise<boolean>}
*/
async continueQuerying (worker) {
// If we haven't queried K peers yet, keep going
if (this.peersQueried.length < this.peersQueried.capacity) {
if (this.peersQueried && this.peersQueried.length < this.peersQueried.capacity) {
return true
}

// Get all the peers that are currently being queried.
// Note that this function gets called right after a peer has been popped
// off the head of the closest peers queue so it will include that peer.
const running = worker.queue.workersList().map(i => i.data)
const running = Array.from(worker.queuedPeerIds)

// Check if any of the peers that are currently being queried are closer
// to the key than the peers we've already queried
const someCloser = await this.peersQueried.anyCloser(running)
const someCloser = this.peersQueried && await this.peersQueried.anyCloser(running)

// Some are closer, the worker should keep going
if (someCloser) {
132 changes: 86 additions & 46 deletions src/query/workerQueue.js → src/query/worker-queue.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
'use strict'

const queue = require('async/queue')
const promiseToCallback = require('promise-to-callback')
const { default: Queue } = require('p-queue')

/**
* @typedef {import('peer-id')} PeerId
*/

class WorkerQueue {
/**
* Creates a new WorkerQueue.
*
* @param {DHT} dht
* @param {Run} run
* @param {Object} path
* @param {function} log
* @param {import('../index')} dht
* @param {import('./run')} run
* @param {import('./path')} path
* @param {Function & {error: Function}} log
*/
constructor (dht, run, path, log) {
this.dht = dht
@@ -22,39 +25,42 @@ class WorkerQueue {
this.queue = this.setupQueue()
// a container for resolve/reject functions that will be populated
// when execute() is called

/** @type {{ resolve: (result?: any) => void, reject: (err: Error) => void} | null} */
this.execution = null

/** @type {Set<PeerId>} */
this.queuedPeerIds = new Set()
}

/**
* Create the underlying async queue.
*
* @returns {Object}
* @returns {Queue}
*/
setupQueue () {
const q = queue((peer, cb) => {
promiseToCallback(this.processNext(peer))(cb)
}, this.concurrency)

// If there's an error, stop the worker
q.error = (err) => {
this.log.error('queue', err)
this.stop(err)
}
const q = new Queue({
concurrency: this.concurrency
})

// When all peers in the queue have been processed, stop the worker
q.drain = () => {
this.log('queue:drain')
this.stop()
}
q.on('idle', () => {
if (this.path.peersToQuery && !this.path.peersToQuery.length) {
this.log('queue:drain')
this.stop()
}
})

// When a space opens up in the queue, add some more peers
q.unsaturated = () => {
if (this.running) {
this.fill()
q.on('next', () => {
if (!this.running) {
return
}
}

q.buffer = 0
if (q.pending < this.concurrency) {
this.fill()
}
})

return q
}
@@ -63,34 +69,41 @@ class WorkerQueue {
* Stop the worker, optionally providing an error to pass to the worker's
* callback.
*
* @param {Error} err
* @param {Error} [err]
*/
stop (err) {
if (!this.running) {
return
}

this.running = false
this.queue.kill()
this.queue.clear()
this.log('worker:stop, %d workers still running', this.run.workers.filter(w => w.running).length)
if (err) {
this.execution.reject(err)
} else {
this.execution.resolve()

if (this.execution) {
if (err) {
this.execution.reject(err)
} else {
this.execution.resolve()
}
}
}

/**
* Use the queue from async to keep `concurrency` amount items running
* per path.
*
* @return {Promise<void>}
* @returns {Promise<void>}
*/
async execute () {
this.running = true
// store the promise resolution functions to be resolved at end of queue
this.execution = {}
const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject }))
this.execution = null
const execPromise = new Promise((resolve, reject) => {
this.execution = {
resolve, reject
}
})
// start queue
this.fill()
// await completion
@@ -102,24 +115,42 @@ class WorkerQueue {
* worker queue concurrency.
* Note that we don't want to take any more than those required to satisfy
* concurrency from the peers-to-query queue, because we always want to
* query the closest peers to the key first, and new peers are continously
* query the closest peers to the key first, and new peers are continuously
* being added to the peers-to-query queue.
*/
fill () {
if (!this.path.peersToQuery) {
return
}

// Note:
// - queue.running(): number of items that are currently running
// - queue.length(): the number of items that are waiting to be run
while (this.queue.running() + this.queue.length() < this.concurrency &&
this.path.peersToQuery.length > 0) {
this.queue.push(this.path.peersToQuery.dequeue())
// - queue.pending: number of items that are currently running
// - queue.size: the number of items that are waiting to be run
while (this.queue.pending + this.queue.size < this.concurrency && this.path.peersToQuery.length > 0) {
const peer = this.path.peersToQuery.dequeue()

// store the peer id so we can potentially abort early
this.queuedPeerIds.add(peer)

this.queue.add(
() => {
return this.processNext(peer)
.catch(err => {
this.log.error('queue', err)
this.stop(err)
})
.finally(() => {
this.queuedPeerIds.delete(peer)
})
}
)
}
}

/**
* Process the next peer in the queue
*
* @param {PeerId} peer
* @returns {Promise<void>}
*/
async processNext (peer) {
if (!this.running) {
@@ -203,8 +234,6 @@ class WorkerQueue {
* Execute a query on the next peer.
*
* @param {PeerId} peer
* @returns {Promise<void>}
* @private
*/
async execQuery (peer) {
let res, queryError
@@ -225,12 +254,17 @@ class WorkerQueue {
}

// Add the peer to the closest peers we have successfully queried
await this.run.peersQueried.add(peer)
this.run.peersQueried && await this.run.peersQueried.add(peer)

if (!res) {
return
}

// If the query indicates that this path or the whole query is complete
// set the path result and bail out
if (res.pathComplete || res.queryComplete) {
this.path.res = res

return {
pathComplete: res.pathComplete,
queryComplete: res.queryComplete
@@ -239,14 +273,20 @@ class WorkerQueue {

// If there are closer peers to query, add them to the queue
if (res.closerPeers && res.closerPeers.length > 0) {
await Promise.all(res.closerPeers.map(async (closer) => {
/**
* @param {import('../').PeerData} closer
*/
const queryCloser = async (closer) => {
// don't add ourselves
if (this.dht._isSelf(closer.id)) {
return
}

this.dht._peerDiscovered(closer.id, closer.multiaddrs)
await this.path.addPeerToQuery(closer.id)
}))
}

await Promise.all(res.closerPeers.map(queryCloser))
}
}
}
23 changes: 10 additions & 13 deletions src/random-walk.js
Original file line number Diff line number Diff line change
@@ -3,23 +3,22 @@
const crypto = require('libp2p-crypto')
const multihashing = require('multihashing-async')
const PeerId = require('peer-id')
const AbortController = require('abort-controller')
const { AbortController } = require('abort-controller')
const errcode = require('err-code')
const times = require('p-times')
const c = require('./constants')
const { logger } = require('./utils')

/**
* @typedef {import('./')} DHT
* @typedef {import('./').RandomWalkOptions} RandomWalkOptions
*/

class RandomWalk {
/**
* @constructor
* @class
* @param {DHT} dht
* @param {object} options
* @param {randomWalkOptions.enabled} options.enabled
* @param {randomWalkOptions.queriesPerPeriod} options.queriesPerPeriod
* @param {randomWalkOptions.interval} options.interval
* @param {randomWalkOptions.timeout} options.timeout
* @param {randomWalkOptions.delay} options.delay
* @param {DHT} options.dht
* @param {RandomWalkOptions} options
*/
constructor (dht, options) {
if (!dht) {
@@ -93,7 +92,6 @@ class RandomWalk {
*
* @param {number} queries
* @param {number} walkTimeout
* @returns {Promise}
*
* @private
*/
@@ -142,8 +140,7 @@ class RandomWalk {
* @param {PeerId} id
* @param {object} options
* @param {number} options.timeout
* @param {AbortControllerSignal} options.signal
* @returns {Promise}
* @param {AbortSignal} options.signal
*
* @private
*/
@@ -165,7 +162,7 @@ class RandomWalk {
this.log('query:found', peer)

// wait what, there was something found? Lucky day!
throw errcode(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`, 'ERR_FOUND_RANDOM_PEER')
throw errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER')
}

/**
44 changes: 26 additions & 18 deletions src/routing.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
'use strict'

// @ts-ignore
const KBucket = require('k-bucket')

const utils = require('./utils')

/**
* @typedef {import('peer-id')} PeerId
*
* @typedef {object} KBucketPeer
* @property {Uint8Array} id
* @property {PeerId} peer
*/

/**
* A wrapper around `k-bucket`, to provide easy store and
* retrival for peers.
* retrieval for peers.
*/
class RoutingTable {
/**
@@ -20,7 +29,9 @@ class RoutingTable {
this._onInit(kBucketSize)
}

// -- Private Methods
/**
* @param {number} kBucketSize
*/
async _onInit (kBucketSize) {
const selfKey = await utils.convertPeerId(this.self)

@@ -36,21 +47,21 @@ class RoutingTable {
/**
* Called on the `ping` event from `k-bucket`.
* Currently this just removes the oldest contact from
* the list, without acutally pinging the individual peers.
* the list, without actually pinging the individual peers.
* This is the same as go does, but should probably
* be upgraded to actually ping the individual peers.
*
* @param {Array<Object>} oldContacts
* @param {Object} newContact
* @returns {undefined}
* @private
* @param {KBucketPeer[]} oldContacts
* @param {KBucketPeer} newContact
*/
_onPing (oldContacts, newContact) {
// just use the first one (k-bucket sorts from oldest to newest)
const oldest = oldContacts[0]

// remove the oldest one
this.kb.remove(oldest.id)
if (oldest) {
// remove the oldest one
this.kb.remove(oldest.id)
}

// add the new one
this.kb.add(newContact)
@@ -60,8 +71,6 @@ class RoutingTable {

/**
* Amount of currently stored peers.
*
* @type {number}
*/
get size () {
return this.kb.count()
@@ -71,13 +80,13 @@ class RoutingTable {
* Find a specific peer by id.
*
* @param {PeerId} peer
* @returns {Promise<PeerId>}
* @returns {Promise<PeerId | undefined>}
*/
async find (peer) {
const key = await utils.convertPeerId(peer)
const closest = this.closestPeer(key)

if (closest && closest.isEqual(peer)) {
if (closest && peer.equals(closest)) {
return closest
}
}
@@ -86,7 +95,6 @@ class RoutingTable {
* Retrieve the closest peers to the given key.
*
* @param {Uint8Array} key
* @returns {PeerId|undefined}
*/
closestPeer (key) {
const res = this.closestPeers(key, 1)
@@ -100,17 +108,18 @@ class RoutingTable {
*
* @param {Uint8Array} key
* @param {number} count
* @returns {Array<PeerId>}
*/
closestPeers (key, count) {
return this.kb.closest(key, count).map((p) => p.peer)
/** @type {KBucketPeer[]} */
const closest = this.kb.closest(key, count)

return closest.map(p => p.peer)
}

/**
* Add or update the routing table with the given peer.
*
* @param {PeerId} peer
* @returns {Promise<void>}
*/
async add (peer) {
const id = await utils.convertPeerId(peer)
@@ -122,7 +131,6 @@ class RoutingTable {
* Remove a given peer from the table.
*
* @param {PeerId} peer
* @returns {Promise<void>}
*/
async remove (peer) {
const id = await utils.convertPeerId(peer)
14 changes: 12 additions & 2 deletions src/rpc/handlers/add-provider.js
Original file line number Diff line number Diff line change
@@ -5,22 +5,30 @@ const errcode = require('err-code')

const utils = require('../../utils')

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message')} Message
*/

/**
* @param {import('../../index')} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:add-provider')
/**
* Process `AddProvider` DHT messages.
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<void>}
*/
return async function addProvider (peerId, msg) { // eslint-disable-line require-await
async function addProvider (peerId, msg) { // eslint-disable-line require-await
log('start')

if (!msg.key || msg.key.length === 0) {
throw errcode(new Error('Missing key'), 'ERR_MISSING_KEY')
}

/** @type {CID} */
let cid
try {
cid = new CID(msg.key)
@@ -58,4 +66,6 @@ module.exports = (dht) => {
// https://github.com/libp2p/js-libp2p-kad-dht/issues/128
return dht.providers.addProvider(cid, peerId)
}

return addProvider
}
15 changes: 12 additions & 3 deletions src/rpc/handlers/find-node.js
Original file line number Diff line number Diff line change
@@ -4,6 +4,13 @@ const uint8ArrayEquals = require('uint8arrays/equals')
const Message = require('../../message')
const utils = require('../../utils')

/**
* @typedef {import('peer-id')} PeerId
*/

/**
* @param {import('../../index')} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:find-node')

@@ -12,15 +19,15 @@ module.exports = (dht) => {
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<Message>}
*/
return async function findNode (peerId, msg) {
async function findNode (peerId, msg) {
log('start')

let closer
if (uint8ArrayEquals(msg.key, dht.peerId.id)) {
closer = [{
id: dht.peerId
id: dht.peerId,
multiaddrs: []
}]
} else {
closer = await dht._betterPeersToQuery(msg, peerId)
@@ -36,4 +43,6 @@ module.exports = (dht) => {

return response
}

return findNode
}
25 changes: 20 additions & 5 deletions src/rpc/handlers/get-providers.js
Original file line number Diff line number Diff line change
@@ -6,6 +6,13 @@ const errcode = require('err-code')
const Message = require('../../message')
const utils = require('../../utils')

/**
* @typedef {import('peer-id')} PeerId
*/

/**
* @param {import('../../index')} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:get-providers')

@@ -14,9 +21,8 @@ module.exports = (dht) => {
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<Message>}
*/
return async function getProviders (peerId, msg) {
async function getProviders (peerId, msg) {
let cid
try {
cid = new CID(msg.key)
@@ -33,12 +39,19 @@ module.exports = (dht) => {
dht._betterPeersToQuery(msg, peerId)
])

const providerPeers = peers.map((peerId) => ({ id: peerId }))
const closerPeers = closer.map((c) => ({ id: c.id }))
const providerPeers = peers.map((peerId) => ({
id: peerId,
multiaddrs: []
}))
const closerPeers = closer.map((c) => ({
id: c.id,
multiaddrs: []
}))

if (has) {
providerPeers.push({
id: dht.peerId
id: dht.peerId,
multiaddrs: []
})
}

@@ -55,4 +68,6 @@ module.exports = (dht) => {
log('got %s providers %s closerPeers', providerPeers.length, closerPeers.length)
return response
}

return getProviders
}
11 changes: 10 additions & 1 deletion src/rpc/handlers/get-value.js
Original file line number Diff line number Diff line change
@@ -7,6 +7,13 @@ const errcode = require('err-code')
const Message = require('../../message')
const utils = require('../../utils')

/**
* @typedef {import('peer-id')} PeerId
*/

/**
* @param {import('../../index')} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:get-value')

@@ -17,7 +24,7 @@ module.exports = (dht) => {
* @param {Message} msg
* @returns {Promise<Message>}
*/
return async function getValue (peerId, msg) {
async function getValue (peerId, msg) {
const key = msg.key

log('key: %b', key)
@@ -64,4 +71,6 @@ module.exports = (dht) => {

return response
}

return getValue
}
12 changes: 7 additions & 5 deletions src/rpc/handlers/index.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,10 @@

const T = require('../../message').TYPES

/**
*
* @param {import('../../index')} dht
*/
module.exports = (dht) => {
const handlers = {
[T.GET_VALUE]: require('./get-value')(dht),
@@ -16,12 +20,10 @@ module.exports = (dht) => {
* Get the message handler matching the passed in type.
*
* @param {number} type
*
* @returns {function(PeerId, Message, function(Error, Message))}
*
* @private
*/
return function getMessageHandler (type) {
function getMessageHandler (type) {
return handlers[type]
}

return getMessageHandler
}
13 changes: 11 additions & 2 deletions src/rpc/handlers/ping.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,14 @@

const utils = require('../../utils')

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message')} Message
*/

/**
* @param {import('../../index')} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:ping')

@@ -10,10 +18,11 @@ module.exports = (dht) => {
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Message}
*/
return function ping (peerId, msg) {
function ping (peerId, msg) {
log('from %s', peerId.toB58String())
return msg
}

return ping
}
13 changes: 11 additions & 2 deletions src/rpc/handlers/put-value.js
Original file line number Diff line number Diff line change
@@ -3,6 +3,14 @@
const utils = require('../../utils')
const errcode = require('err-code')

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../../message')} Message
*/

/**
* @param {import('../../index')} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc:put-value')

@@ -11,9 +19,8 @@ module.exports = (dht) => {
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<Message>}
*/
return async function putValue (peerId, msg) {
async function putValue (peerId, msg) {
const key = msg.key
log('key: %b', key)

@@ -36,4 +43,6 @@ module.exports = (dht) => {

return msg
}

return putValue
}
31 changes: 21 additions & 10 deletions src/rpc/index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
'use strict'

const pipe = require('it-pipe')
const { pipe } = require('it-pipe')
const lp = require('it-length-prefixed')

const Message = require('../message')
const handlers = require('./handlers')
const utils = require('../utils')

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
*/

/**
* @param {import('../index')} dht
*/
module.exports = (dht) => {
const log = utils.logger(dht.peerId, 'rpc')
const getMessageHandler = handlers(dht)

/**
* Process incoming DHT messages.
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<Message>}
*
* @private
*/
async function handleMessage (peerId, msg) {
// get handler & execute it
@@ -38,13 +44,13 @@ module.exports = (dht) => {
}

/**
* Handle incoming streams on the dht protocol.
* @param {Object} props
* @param {DuplexStream} props.stream
* @param {Connection} props.connection connection
* @returns {Promise<void>}
* Handle incoming streams on the dht protocol
*
* @param {object} props
* @param {MuxedStream} props.stream
* @param {import('libp2p-interfaces/src/connection').Connection} props.connection
*/
return async function onIncomingStream ({ stream, connection }) {
async function onIncomingStream ({ stream, connection }) {
const peerId = connection.remotePeer

try {
@@ -59,6 +65,9 @@ module.exports = (dht) => {
await pipe(
stream.source,
lp.decode(),
/**
* @param {AsyncIterable<Uint8Array>} source
*/
source => (async function * () {
for await (const msg of source) {
// handle the message
@@ -75,4 +84,6 @@ module.exports = (dht) => {
stream.sink
)
}

return onIncomingStream
}
55 changes: 30 additions & 25 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ const debug = require('debug')
const multihashing = require('multihashing-async')
const mh = multihashing.multihash
const { Key } = require('interface-datastore')
const base32 = require('base32.js')
// @ts-ignore
const distance = require('xor-distance')
const pMap = require('p-map')
const { Record } = require('libp2p-record')
@@ -57,10 +57,16 @@ exports.keyForPublicKey = (peer) => {
])
}

/**
* @param {Uint8Array} key
*/
exports.isPublicKeyKey = (key) => {
return uint8ArrayToString(key.slice(0, 4)) === '/pk/'
}

/**
* @param {Uint8Array} key
*/
exports.fromPublicKeyKey = (key) => {
return new PeerId(key.slice(4))
}
@@ -76,30 +82,29 @@ exports.now = () => {

/**
* Encode a given Uint8Array into a base32 string.
*
* @param {Uint8Array} buf
* @returns {string}
*/
exports.encodeBase32 = (buf) => {
const enc = new base32.Encoder()
return enc.write(buf).finalize()
return uint8ArrayToString(buf, 'base32')
}

/**
* Decode a given base32 string into a Uint8Array.
*
* @param {string} raw
* @returns {Uint8Array}
*/
exports.decodeBase32 = (raw) => {
const dec = new base32.Decoder()
return Uint8Array.from(dec.write(raw).finalize())
return uint8ArrayFromString(raw, 'base32')
}

/**
* Sort peers by distance to the given `target`.
*
* @param {Array<PeerId>} peers
* @param {Uint8Array} target
* @returns {Array<PeerId>}
*/
exports.sortClosestPeers = async (peers, target) => {
const distances = await pMap(peers, async (peer) => {
@@ -117,9 +122,8 @@ exports.sortClosestPeers = async (peers, target) => {
/**
* Compare function to sort an array of elements which have a distance property which is the xor distance to a given element.
*
* @param {Object} a
* @param {Object} b
* @returns {number}
* @param {{ distance: Uint8Array }} a
* @param {{ distance: Uint8Array }} b
*/
exports.xorCompare = (a, b) => {
return distance.compare(a.distance, b.distance)
@@ -131,7 +135,6 @@ exports.xorCompare = (a, b) => {
*
* @param {number} resultsWanted
* @param {number} numPaths - total number of paths
* @returns {number}
*/
exports.pathSize = (resultsWanted, numPaths) => {
return Math.ceil(resultsWanted / numPaths)
@@ -156,9 +159,6 @@ exports.createPutRecord = (key, value) => {
*
* @param {PeerId} [id]
* @param {string} [subsystem]
* @returns {debug}
*
* @private
*/
exports.logger = (id, subsystem) => {
const name = ['libp2p', 'dht']
@@ -174,8 +174,9 @@ exports.logger = (id, subsystem) => {
return mh.toB58String(v)
}

const logger = debug(name.join(':'))
logger.error = debug(name.concat(['error']).join(':'))
const logger = Object.assign(debug(name.join(':')), {
error: debug(name.concat(['error']).join(':'))
})

return logger
}
@@ -190,14 +191,16 @@ exports.TimeoutError = class TimeoutError extends Error {
* Creates an async function that calls the given `asyncFn` and Errors
* if it does not resolve within `time` ms
*
* @param {Function} [asyncFn]
* @param {Number} [time]
* @returns {Function}
*
* @private
* @template T
* @param {(...args: any[]) => Promise<T>} asyncFn
* @param {number} [time]
*/
exports.withTimeout = (asyncFn, time) => {
return async (...args) => { // eslint-disable-line require-await
/**
* @param {...any} args
* @returns {Promise<T>}
*/
function timeoutFn (...args) {
return Promise.race([
asyncFn(...args),
new Promise((resolve, reject) => {
@@ -207,18 +210,20 @@ exports.withTimeout = (asyncFn, time) => {
})
])
}

return timeoutFn
}

/**
* Iterates the given `asyncIterator` and runs each item through the given `asyncFn` in parallel.
* Returns a promise that resolves when all items of the `asyncIterator` have been passed
* through `asyncFn`.
*
* @param {AsyncIterable} [asyncIterator]
* @param {Function} [asyncFn]
* @returns {Array}
* @template T
* @template O
*
* @private
* @param {AsyncIterable<T>} asyncIterator
* @param {(arg0: T) => Promise<O>} asyncFn
*/
exports.mapParallel = async function (asyncIterator, asyncFn) {
const tasks = []
5 changes: 2 additions & 3 deletions test/kad-utils.spec.js
Original file line number Diff line number Diff line change
@@ -4,11 +4,11 @@
const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const base32 = require('base32.js')
const PeerId = require('peer-id')
const distance = require('xor-distance')
const uint8ArrayConcat = require('uint8arrays/concat')
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')

const utils = require('../src/utils')
const createPeerId = require('./utils/create-peer-id')
@@ -20,9 +20,8 @@ describe('kad utils', () => {

const key = utils.bufferToKey(buf)

const enc = new base32.Encoder()
expect(key.toString())
.to.equal('/' + enc.write(buf).finalize())
.to.equal('/' + uint8ArrayToString(buf, 'base32'))
})
})

2 changes: 1 addition & 1 deletion test/query/index.spec.js
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ describe('Query', () => {
})

before('create a dht', () => {
const peerStore = new PeerStore()
const peerStore = new PeerStore({ peerId: ourPeerId })
dht = new DHT({
dialer: {},
peerStore,
4 changes: 2 additions & 2 deletions test/rpc/handlers/add-provider.spec.js
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ describe('rpc - handlers - AddProvider', () => {
error: 'ERR_INVALID_CID'
}]

await Promise.all(tests.map((t) => {
tests.forEach((t) => {
it(t.error.toString(), async () => {
try {
await handler(dht)(peerIds[0], t.message)
@@ -57,7 +57,7 @@ describe('rpc - handlers - AddProvider', () => {
}
throw new Error()
})
}))
})
})

it('ignore providers that do not match the sender', async () => {
15 changes: 10 additions & 5 deletions test/simulation/index.js
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ let topIds // Closest 20 peerIds in the network
console.log('Total Nodes=%d, Dead Nodes=%d, Max Siblings per Peer=%d', NUM_PEERS, NUM_DEAD_NODES, MAX_PEERS_KNOWN)
console.log('Starting %d runs with concurrency %d...', RUNS, ALPHA)
const topRunIds = []
for (var i = 0; i < RUNS; i++) {
for (let i = 0; i < RUNS; i++) {
const { closestPeers, runTime } = await GetClosestPeersSimulation()
const foundIds = closestPeers.map(peerId => peerId.toB58String())
const intersection = foundIds.filter(topIdFilter)
@@ -132,7 +132,8 @@ async function GetClosestPeersSimulation () {

/**
* Create `num` PeerIds
* @param {integer} num How many peers to create
*
* @param {integer} num - How many peers to create
* @returns {Array<PeerId>}
*/
function createPeers (num) {
@@ -148,6 +149,7 @@ function createPeers (num) {

/**
* Creates a mock network
*
* @param {Array<PeerId>} peers
* @returns {Network}
*/
@@ -182,6 +184,7 @@ async function MockNetwork (peers) {

/**
* Returns a random integer between `min` and `max`
*
* @param {number} min
* @param {number} max
* @returns {int}
@@ -192,8 +195,9 @@ function randomInteger (min, max) {

/**
* Return a unique array of random `num` members from `list`
* @param {Array<any>} list array to pull random members from
* @param {number} num number of random members to get
*
* @param {Array<any>} list - array to pull random members from
* @param {number} num - number of random members to get
* @returns {Array<any>}
*/
function randomMembers (list, num) {
@@ -213,7 +217,8 @@ function randomMembers (list, num) {

/**
* Finds the common members of all arrays
* @param {Array<Array>} arrays An array of arrays to find common members
*
* @param {Array<Array>} arrays - An array of arrays to find common members
* @returns {Array<any>}
*/
function getCommonMembers (arrays) {
3 changes: 2 additions & 1 deletion test/utils/create-peer-id.js
Original file line number Diff line number Diff line change
@@ -4,7 +4,8 @@ const PeerId = require('peer-id')

/**
* Creates multiple PeerIds
* @param {number} length The number of `PeerId` to create
*
* @param {number} length - The number of `PeerId` to create
* @returns {Promise<Array<PeerId>>}
*/
function createPeerId (length) {
8 changes: 4 additions & 4 deletions test/utils/test-dht.js
Original file line number Diff line number Diff line change
@@ -27,8 +27,10 @@ class TestDHT {
}

async _spawnOne (index, options = {}, autoStart = true) {
const [peerId] = await createPeerId(1)

const regRecord = {}
const peerStore = new PeerStore()
const peerStore = new PeerStore({ peerId })

// Disable random walk by default for more controlled testing
options = {
@@ -39,8 +41,6 @@ class TestDHT {
...options
}

const [peerId] = await createPeerId(1)

const connectToPeer = (localDHT, peer) => {
const remotePeerB58 = peer.toB58String()
const remoteDht = this.nodes.find(
@@ -156,7 +156,7 @@ class TestDHT {

// Check routing tables
return Promise.all(routingTableChecks.map(check => {
pRetry(check, { retries: 50 })
return pRetry(check, { retries: 50 })
}))
}

1 change: 1 addition & 0 deletions test/utils/to-buffer.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@

/**
* Converts BufferList messages to Uint8Arrays
*
* @param {*} source
* @returns {AsyncGenerator}
*/
9 changes: 9 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"extends": "aegir/src/config/tsconfig.aegir.json",
"compilerOptions": {
"outDir": "dist"
},
"include": [
"src"
]
}