diff --git a/api/latica/api.js b/api/latica/api.js index 22108cbe5..9c28f0a63 100644 --- a/api/latica/api.js +++ b/api/latica/api.js @@ -357,6 +357,9 @@ async function api (options = {}, events, dgram) { sub.peers.set(peer.peerId, ee) const isStateChange = !oldPeer || change + + _peer.onDebug(_peer.peerId, `<-- API CONNECTION JOIN (scid=${scid}, peerId=${peer.peerId.slice(0, 6)})`) + sub._emit('#join', ee, packet, isStateChange) }) diff --git a/api/latica/index.js b/api/latica/index.js index e230247ee..7dbd0e462 100644 --- a/api/latica/index.js +++ b/api/latica/index.js @@ -935,7 +935,8 @@ export class Peer { requesterPeerId: this.peerId, natType: this.natType, address: this.address, - port: this.port + port: this.port, + key: [cid, scid].join(':') } }) @@ -1091,7 +1092,7 @@ export class Peer { /** * @return {undefined} */ - async sync (peer) { + async sync (peer, ptime = Date.now()) { if (typeof peer === 'string') { peer = this.peers.find(p => p.peerId === peer) } @@ -1099,7 +1100,7 @@ export class Peer { const rinfo = peer?.proxy || peer this.lastSync = Date.now() - const summary = await this.cache.summarize('', this.cachePredicate) + const summary = await this.cache.summarize('', this.cachePredicate(ptime)) this._onDebug(`-> SYNC START (dest=${peer.peerId.slice(0, 8)}, to=${rinfo.address}:${rinfo.port})`) if (this.onSyncStart) this.onSyncStart(peer, rinfo.port, rinfo.address) @@ -1107,7 +1108,7 @@ export class Peer { // if we are out of sync send our cache summary const data = await Packet.encode(new PacketSync({ message: Cache.encodeSummary(summary), - usr4: Buffer.from(String(Date.now())) + usr4: Buffer.from(String(ptime)) })) this.send(data, rinfo.port, rinfo.address, peer.socket) @@ -1157,13 +1158,13 @@ export class Peer { * from the cache when receiving a request to sync. that can be overridden * */ - cachePredicate (packet) { - if (packet.usr4.byteLength < 8 || packet.usr4.byteLength > 16) return + cachePredicate (ts) { + const max = Date.now() - Packet.ttl + const T = Math.min(ts || max, max) - const timestamp = parseInt(Buffer.from(packet.usr4).toString(), 10) - const ts = Math.min(Packet.ttl, timestamp) - - return packet.version === VERSION && ts > Date.now() - Packet.ttl + return packet => { + return packet.version === VERSION && packet.timestamp > T + } } /** @@ -1246,13 +1247,20 @@ export class Peer { this.lastSync = Date.now() const pid = packet.packetId.toString('hex') + let ptime = Date.now() + + if (packet.usr4.byteLength > 8 || packet.usr4.byteLength < 16) { + const usr4 = parseInt(Buffer.from(packet.usr4).toString(), 10) + ptime = Math.min(ptime - Packet.ttl, usr4) + } + if (!isBufferLike(packet.message)) return if (this.gate.has(pid)) return this.gate.set(pid, 1) const remote = Cache.decodeSummary(packet.message) - const local = await this.cache.summarize(remote.prefix, this.cachePredicate) + const local = await this.cache.summarize(remote.prefix, this.cachePredicate(ptime)) if (!remote || !remote.hash || !local || !local.hash || local.hash === remote.hash) { if (this.onSyncFinished) this.onSyncFinished(packet, port, address) @@ -1278,7 +1286,7 @@ export class Peer { if (!key.startsWith(local.prefix + i.toString(16))) continue const packet = Packet.from(p) - if (!this.cachePredicate(packet)) continue + if (!this.cachePredicate(ptime)(packet)) continue const pid = packet.packetId.toString('hex') this._onDebug(`-> SYNC SEND PACKET (type=data, packetId=${pid.slice(0, 8)}, to=${address}:${port})`) @@ -1289,7 +1297,7 @@ export class Peer { // // need more details about what exactly isn't synce'd // - const nextLevel = await this.cache.summarize(local.prefix + i.toString(16), this.cachePredicate) + const nextLevel = await this.cache.summarize(local.prefix + i.toString(16), this.cachePredicate(ptime)) const data = await Packet.encode(new PacketSync({ message: Cache.encodeSummary(nextLevel), usr4: Buffer.from(String(Date.now())) @@ -1450,7 +1458,7 @@ export class Peer { delete message.isProbe } - const { hash } = await this.cache.summarize('', this.cachePredicate) + const { hash } = await this.cache.summarize('', this.cachePredicate()) message.cacheSummaryHash = hash const packetPong = new PacketPong({ message }) @@ -1645,7 +1653,7 @@ export class Peer { if (this.onIntro) this.onIntro(packet, peer, peerPort, peerAddress) const pingId = randomBytes(6).toString('hex').padStart(12, '0') - const { hash } = await this.cache.summarize('', this.cachePredicate) + const { hash } = await this.cache.summarize('', this.cachePredicate()) const props = { clusterId, diff --git a/api/latica/packets.js b/api/latica/packets.js index 50008b963..6e0dae1c5 100644 --- a/api/latica/packets.js +++ b/api/latica/packets.js @@ -494,6 +494,7 @@ export class PacketJoin extends Packet { requesterPeerId: { required: true, type: 'string', assert: Peer.isValidPeerId }, natType: { required: true, type: 'number', assert: NAT.isValid }, address: { required: true, type: 'string' }, + key: { type: 'string' }, port: { required: true, type: 'number', assert: isValidPort }, isConnection: { type: 'boolean' } }) diff --git a/api/latica/proxy.js b/api/latica/proxy.js index 92e740f1a..4ea1ebe7d 100644 --- a/api/latica/proxy.js +++ b/api/latica/proxy.js @@ -263,12 +263,14 @@ class PeerWorkerProxy { if (arg?.constructor.name === 'RemotePeer' || arg?.constructor.name === 'Peer') { args[i] = { // what details we want to expose outside of the protocol - peerId: arg.peerId, address: arg.address, - port: arg.port, - natType: arg.natType, clusters: arg.clusters, - connected: arg.connected + connected: arg.connected, + lastRequest: arg.lastRequest, + lastUpdate: arg.lastUpdate, + natType: arg.natType, + peerId: arg.peerId, + port: arg.port } delete args[i].localPeer // don't copy this over