diff --git a/api/latica/index.js b/api/latica/index.js index 7dbd0e462..8c4fff8f1 100644 --- a/api/latica/index.js +++ b/api/latica/index.js @@ -213,6 +213,7 @@ export class Peer { natType = NAT.UNKNOWN nextNatType = NAT.UNKNOWN clusters = {} + syncs = {} reflectionId = null reflectionTimeout = null reflectionStage = 0 @@ -283,7 +284,7 @@ export class Peer { // // The purpose of this.config is to seperate transitioned state from initial state. // - this.config = { + this.config = { // TODO(@heapwolf): Object.freeze this maybe keepalive: DEFAULT_KEEP_ALIVE, ...config } @@ -585,6 +586,7 @@ export class Peer { return { peers, + syncs: this.syncs, config: this.config, data: [...this.cache.data.entries()], unpublished: this.unpublished @@ -617,8 +619,11 @@ export class Peer { } async reconnect () { - this.lastUpdate = 0 - this.requestReflection() + for (const cluster of Object.values(this.clusters)) { + for (const subcluster of Object.values(cluster)) { + this.join(subcluster.sharedKey, subcluster) + } + } } async disconnect () { @@ -1145,7 +1150,7 @@ export class Peer { if (this.gate.has(pid)) return this.returnRoutes.set(p.usr3.toString('hex'), {}) - this.gate.set(pid, 1) // don't accidentally spam + this.gate.set(pid, 1) // prevent accidental spam this._onDebug(`-> QUERY (type=question, query=${query}, packet=${pid.slice(0, 8)})`) @@ -1233,6 +1238,30 @@ export class Peer { if (firstContact && this.onConnection) { this.onConnection(packet, peer, port, address) + + const now = Date.now() + const key = [peer.address, peer.port].join(':') + let first = false + + // + // If you've never sync'd before, you can ask for 6 hours of data from + // other peers. If we have synced with a peer before we can just ask for + // data that they have seen since then, this will avoid the risk of + // spamming them and getting rate-limited. + // + if (!this.syncs[key]) { + this.syncs[key] = now - Packet.ttl + first = true + } + + const lastSyncSeconds = (now - this.syncs[key]) / 1000 + const syncWindow = this.config.syncWindow ?? 6000 + + if (first || now - this.syncs[key] > syncWindow) { + this.sync(peer.peerId, this.syncs[key]) + this._onDebug(`-> SYNC SEND (peerId=${peer.peerId.slice(0, 6)}, address=${key}, since=${lastSyncSeconds} seconds ago)`) + this.syncs[key] = now + } } } @@ -1332,11 +1361,8 @@ export class Peer { this.metrics.i[packet.type]++ const pid = packet.packetId.toString('hex') - if (this.gate.has(pid)) return - this.gate.set(pid, 1) - - const queryTimestamp = parseInt(packet.usr1.toString(), 10) const queryId = packet.usr3.toString('hex') + const queryTimestamp = parseInt(packet.usr1.toString(), 10) const queryType = parseInt(packet.usr4.toString(), 10) // if the timestamp in usr1 is older than now - 2s, bail @@ -1350,7 +1376,7 @@ export class Peer { // // receiving an answer // - if (this.returnRoutes.has(queryId)) { + if (this.returnRoutes.has(queryId) && type === 'answer') { rinfo = this.returnRoutes.get(queryId) let p = packet.copy() @@ -1365,7 +1391,8 @@ export class Peer { } if (!rinfo.address) return - } else { + } else if (type === 'question') { + if (this.gate.has(pid)) return // // receiving a query // @@ -1398,14 +1425,17 @@ export class Peer { p.usr3 = packet.usr3 // ensure the packet has the queryId p.usr4 = Buffer.from(String(2)) // mark it as an answer packet this.send(await Packet.encode(p), rinfo.port, rinfo.address) + this.gate.set(pid, 1) } return } } if (packet.hops >= this.maxHops) return + if (this.gate.has(pid)) return + this._onDebug('>> QUERY RELAY', port, address) - return await this.mcast(packet) + await this.mcast(packet) } /** @@ -1570,7 +1600,7 @@ export class Peer { }) } - this._mainLoop(Date.now()) + this._setTimeout(() => this._mainLoop(Date.now()), 1024) if (this.onNat) this.onNat(this.natType)