Skip to content

Commit

Permalink
chore(api): update protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
heapwolf committed Jul 18, 2024
1 parent 6b37ad6 commit f693911
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 19 deletions.
3 changes: 3 additions & 0 deletions api/latica/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ async function api (options = {}, events, dgram) {

sub.peers.set(peer.peerId, ee)
const isStateChange = !oldPeer || change

_peer.onDebug(_peer.peerId, `<-- API CONNECTION JOIN (scid=${scid}, peerId=${peer.peerId.slice(0, 6)})`)

sub._emit('#join', ee, packet, isStateChange)
})

Expand Down
38 changes: 23 additions & 15 deletions api/latica/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,8 @@ export class Peer {
requesterPeerId: this.peerId,
natType: this.natType,
address: this.address,
port: this.port
port: this.port,
key: [cid, scid].join(':')
}
})

Expand Down Expand Up @@ -1091,23 +1092,23 @@ export class Peer {
/**
* @return {undefined}
*/
async sync (peer) {
async sync (peer, ptime = Date.now()) {
if (typeof peer === 'string') {
peer = this.peers.find(p => p.peerId === peer)
}

const rinfo = peer?.proxy || peer

this.lastSync = Date.now()
const summary = await this.cache.summarize('', this.cachePredicate)
const summary = await this.cache.summarize('', this.cachePredicate(ptime))

this._onDebug(`-> SYNC START (dest=${peer.peerId.slice(0, 8)}, to=${rinfo.address}:${rinfo.port})`)
if (this.onSyncStart) this.onSyncStart(peer, rinfo.port, rinfo.address)

// if we are out of sync send our cache summary
const data = await Packet.encode(new PacketSync({
message: Cache.encodeSummary(summary),
usr4: Buffer.from(String(Date.now()))
usr4: Buffer.from(String(ptime))
}))

this.send(data, rinfo.port, rinfo.address, peer.socket)
Expand Down Expand Up @@ -1157,13 +1158,13 @@ export class Peer {
* from the cache when receiving a request to sync. that can be overridden
*
*/
cachePredicate (packet) {
if (packet.usr4.byteLength < 8 || packet.usr4.byteLength > 16) return
cachePredicate (ts) {
const max = Date.now() - Packet.ttl
const T = Math.min(ts || max, max)

const timestamp = parseInt(Buffer.from(packet.usr4).toString(), 10)
const ts = Math.min(Packet.ttl, timestamp)

return packet.version === VERSION && ts > Date.now() - Packet.ttl
return packet => {
return packet.version === VERSION && packet.timestamp > T
}
}

/**
Expand Down Expand Up @@ -1246,13 +1247,20 @@ export class Peer {
this.lastSync = Date.now()
const pid = packet.packetId.toString('hex')

let ptime = Date.now()

if (packet.usr4.byteLength > 8 || packet.usr4.byteLength < 16) {
const usr4 = parseInt(Buffer.from(packet.usr4).toString(), 10)
ptime = Math.min(ptime - Packet.ttl, usr4)
}

if (!isBufferLike(packet.message)) return
if (this.gate.has(pid)) return

this.gate.set(pid, 1)

const remote = Cache.decodeSummary(packet.message)
const local = await this.cache.summarize(remote.prefix, this.cachePredicate)
const local = await this.cache.summarize(remote.prefix, this.cachePredicate(ptime))

if (!remote || !remote.hash || !local || !local.hash || local.hash === remote.hash) {
if (this.onSyncFinished) this.onSyncFinished(packet, port, address)
Expand All @@ -1278,7 +1286,7 @@ export class Peer {
if (!key.startsWith(local.prefix + i.toString(16))) continue

const packet = Packet.from(p)
if (!this.cachePredicate(packet)) continue
if (!this.cachePredicate(ptime)(packet)) continue

const pid = packet.packetId.toString('hex')
this._onDebug(`-> SYNC SEND PACKET (type=data, packetId=${pid.slice(0, 8)}, to=${address}:${port})`)
Expand All @@ -1289,7 +1297,7 @@ export class Peer {
//
// need more details about what exactly isn't synce'd
//
const nextLevel = await this.cache.summarize(local.prefix + i.toString(16), this.cachePredicate)
const nextLevel = await this.cache.summarize(local.prefix + i.toString(16), this.cachePredicate(ptime))
const data = await Packet.encode(new PacketSync({
message: Cache.encodeSummary(nextLevel),
usr4: Buffer.from(String(Date.now()))
Expand Down Expand Up @@ -1450,7 +1458,7 @@ export class Peer {
delete message.isProbe
}

const { hash } = await this.cache.summarize('', this.cachePredicate)
const { hash } = await this.cache.summarize('', this.cachePredicate())
message.cacheSummaryHash = hash

const packetPong = new PacketPong({ message })
Expand Down Expand Up @@ -1645,7 +1653,7 @@ export class Peer {
if (this.onIntro) this.onIntro(packet, peer, peerPort, peerAddress)

const pingId = randomBytes(6).toString('hex').padStart(12, '0')
const { hash } = await this.cache.summarize('', this.cachePredicate)
const { hash } = await this.cache.summarize('', this.cachePredicate())

const props = {
clusterId,
Expand Down
1 change: 1 addition & 0 deletions api/latica/packets.js
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ export class PacketJoin extends Packet {
requesterPeerId: { required: true, type: 'string', assert: Peer.isValidPeerId },
natType: { required: true, type: 'number', assert: NAT.isValid },
address: { required: true, type: 'string' },
key: { type: 'string' },
port: { required: true, type: 'number', assert: isValidPort },
isConnection: { type: 'boolean' }
})
Expand Down
10 changes: 6 additions & 4 deletions api/latica/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ class PeerWorkerProxy {

if (arg?.constructor.name === 'RemotePeer' || arg?.constructor.name === 'Peer') {
args[i] = { // what details we want to expose outside of the protocol
peerId: arg.peerId,
address: arg.address,
port: arg.port,
natType: arg.natType,
clusters: arg.clusters,
connected: arg.connected
connected: arg.connected,
lastRequest: arg.lastRequest,
lastUpdate: arg.lastUpdate,
natType: arg.natType,
peerId: arg.peerId,
port: arg.port
}

delete args[i].localPeer // don't copy this over
Expand Down

0 comments on commit f693911

Please sign in to comment.