From 0c235fb4fa97f552c759551c9daab815a96eeaa9 Mon Sep 17 00:00:00 2001 From: heapwolf Date: Tue, 30 Jan 2024 08:09:06 +0100 Subject: [PATCH] (protocol): improve emit method --- api/stream-relay/sugar.js | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/api/stream-relay/sugar.js b/api/stream-relay/sugar.js index 83c9521dee..0fef434235 100644 --- a/api/stream-relay/sugar.js +++ b/api/stream-relay/sugar.js @@ -2,7 +2,7 @@ import { wrap, Encryption, sha256, NAT, RemotePeer } from './index.js' import { sodium } from '../crypto.js' import { Buffer } from '../buffer.js' import { isBufferLike } from '../util.js' -import { CACHE_TTL } from './packets.js' +import { Packet, CACHE_TTL } from './packets.js' /** * Creates and manages a network bus for communication. @@ -249,15 +249,32 @@ export default (dgram, events) => { sub.emit = async (eventName, value, opts = {}) => { opts.clusterId = opts.clusterId || clusterId - opts.subclusterId = opts.subclusterId || subclusterId + opts.subclusterId = opts.subclusterId || sub.subclusterId const args = await pack(eventName, value, opts) - for (const p of sub.peers.values()) { - await p._peer.write(sub.sharedKey, args) - } + if (sub.peers.values().length) { + let packets = [] + + for (const p of sub.peers.values()) { + const r = await p._peer.write(sub.sharedKey, args) + if (packets.length === 0) packets = r + } + + for (const packet of packets) { + const p = Packet.from(packet) + _peer.cache.insert(packet.packetId.toString('hex'), p) - return await _peer.publish(sub.sharedKey, args) + _peer.unpublished[packet.packetId.toString('hex')] = Date.now() + if (globalThis.navigator && !globalThis.navigator.onLine) continue + + _peer.mcast(packet) + } + return packets + } else { + const packets = await _peer.publish(sub.sharedKey, args) + return packets + } } sub.on = async (eventName, cb) => {