From 0cfb654da6624bc224d9438a769f2d51dd63c3a9 Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Thu, 9 Jan 2025 13:55:12 +0100 Subject: [PATCH] events: Add helpers for event listeners * Simplify code * Support EventTarget and EventEmitter --- packages/connection/index.js | 66 +++++++++----------------------- packages/events/index.js | 4 ++ packages/events/lib/listeners.js | 24 ++++++++++++ packages/events/lib/onoff.js | 21 ++++++++++ packages/events/lib/promise.js | 14 ++++--- packages/websocket/lib/Socket.js | 65 ++++++++++++------------------- 6 files changed, 102 insertions(+), 92 deletions(-) create mode 100644 packages/events/lib/listeners.js create mode 100644 packages/events/lib/onoff.js diff --git a/packages/connection/index.js b/packages/connection/index.js index 01f0d329..e0abecd2 100644 --- a/packages/connection/index.js +++ b/packages/connection/index.js @@ -1,4 +1,4 @@ -import { EventEmitter, promise } from "@xmpp/events"; +import { EventEmitter, promise, listeners } from "@xmpp/events"; import jid from "@xmpp/jid"; import xml from "@xmpp/xml"; import StreamError from "./lib/StreamError.js"; @@ -8,13 +8,14 @@ const NS_STREAM = "urn:ietf:params:xml:ns:xmpp-streams"; const NS_JABBER_STREAM = "http://etherx.jabber.org/streams"; class Connection extends EventEmitter { + #socketListeners = null; + #parserListeners = null; + constructor(options = {}) { super(); this.jid = null; this.timeout = 2000; this.options = options; - this.socketListeners = Object.create(null); - this.parserListeners = Object.create(null); this.status = "offline"; this.socket = null; this.parser = null; @@ -40,7 +41,7 @@ class Connection extends EventEmitter { this.parser.write(str); } - _onParserError(error) { + #onParserError(error) { // https://xmpp.org/rfcs/rfc6120.html#streams-error-conditions-bad-format // "This error can be used instead of the more specific XML-related errors, // such as , , , , @@ -62,32 +63,16 @@ class Connection extends EventEmitter { _attachSocket(socket) { this.socket = socket; - const listeners = this.socketListeners; - - listeners.data = this._onData.bind(this); - - listeners.close = this.#onSocketClosed.bind(this); - - listeners.connect = () => { - this._status("connect"); - }; - - listeners.error = (error) => { - this.emit("error", error); - }; - - this.socket.on("close", listeners.close); - this.socket.on("data", listeners.data); - this.socket.on("error", listeners.error); - this.socket.on("connect", listeners.connect); + this.#socketListeners ??= listeners(socket, { + data: this._onData.bind(this), + close: this.#onSocketClosed.bind(this), + connect: () => this._status("connect"), + error: (error) => this.emit("error", error), + }).subscribe(); } _detachSocket() { - const { socketListeners, socket } = this; - for (const k of Object.getOwnPropertyNames(socketListeners)) { - socket.removeListener(k, socketListeners[k]); - delete socketListeners[k]; - } + this.#socketListeners?.unsubscribe(); this.socket = null; } @@ -143,29 +128,16 @@ class Connection extends EventEmitter { _attachParser(parser) { this.parser = parser; - const listeners = this.parserListeners; - - listeners.element = this._onElement.bind(this); - listeners.error = this._onParserError.bind(this); - - listeners.end = this.#onStreamClosed.bind(this); - - listeners.start = (element) => { - this._status("open", element); - }; - - this.parser.on("error", listeners.error); - this.parser.on("element", listeners.element); - this.parser.on("end", listeners.end); - this.parser.on("start", listeners.start); + this.#parserListeners ??= listeners(parser, { + element: this._onElement.bind(this), + error: this.#onParserError.bind(this), + end: this.#onStreamClosed.bind(this), + start: (element) => this._status("open", element), + }).subscribe(); } _detachParser() { - const listeners = this.parserListeners; - for (const k of Object.getOwnPropertyNames(listeners)) { - this.parser.removeListener(k, listeners[k]); - delete listeners[k]; - } + this.#parserListeners?.unsubscribe(); this.parser = null; this.root = null; } diff --git a/packages/events/index.js b/packages/events/index.js index e952ad71..cbe287f7 100644 --- a/packages/events/index.js +++ b/packages/events/index.js @@ -6,6 +6,8 @@ import TimeoutError from "./lib/TimeoutError.js"; import promise from "./lib/promise.js"; import Deferred from "./lib/Deferred.js"; import procedure from "./lib/procedure.js"; +import listeners from "./lib/listeners.js"; +import onoff from "./lib/onoff.js"; export { EventEmitter, @@ -15,4 +17,6 @@ export { promise, Deferred, procedure, + listeners, + onoff, }; diff --git a/packages/events/lib/listeners.js b/packages/events/lib/listeners.js new file mode 100644 index 00000000..f4f0c34a --- /dev/null +++ b/packages/events/lib/listeners.js @@ -0,0 +1,24 @@ +import onoff from "./onoff.js"; + +export default function listeners(target, events) { + const { on, off } = onoff(target); + + return { + subscribed: false, + subscribe() { + if (this.subscribed) throw new Event("Listeners already subscribed"); + for (const [event, handler] of Object.entries(events)) { + on(event, handler); + } + this.subscribed = true; + return this; + }, + unsubscribe() { + for (const [event, handler] of Object.entries(events)) { + off(event, handler); + } + this.subscribed = false; + return this; + }, + }; +} diff --git a/packages/events/lib/onoff.js b/packages/events/lib/onoff.js new file mode 100644 index 00000000..56e95a69 --- /dev/null +++ b/packages/events/lib/onoff.js @@ -0,0 +1,21 @@ +const map = new WeakMap(); + +export default function onoff(target) { + let m = map.get(target); + + if (!m) { + const on = (target.addListener ?? target.addEventListener).bind(target); + const off = (target.removeListener ?? target.removeEventListener).bind( + target, + ); + const once = ( + target.once ?? + ((event, handler) => + target.addEventListener(event, handler, { once: true })) + ).bind(target); + m = { on, off, once }; + map.set(target, m); + } + + return m; +} diff --git a/packages/events/lib/promise.js b/packages/events/lib/promise.js index 69fb61b4..c741b9b7 100644 --- a/packages/events/lib/promise.js +++ b/packages/events/lib/promise.js @@ -1,13 +1,17 @@ +import onoff from "./onoff.js"; + import TimeoutError from "./TimeoutError.js"; -export default function promise(EE, event, rejectEvent = "error", timeout) { +export default function promise(target, event, rejectEvent = "error", timeout) { return new Promise((resolve, reject) => { let timeoutId; + const { off, once } = onoff(target); + const cleanup = () => { clearTimeout(timeoutId); - EE.removeListener(event, onEvent); - EE.removeListener(rejectEvent, onError); + off(event, onEvent); + off(rejectEvent, onError); }; function onError(reason) { @@ -20,9 +24,9 @@ export default function promise(EE, event, rejectEvent = "error", timeout) { cleanup(); } - EE.once(event, onEvent); + once(event, onEvent); if (rejectEvent) { - EE.once(rejectEvent, onError); + once(rejectEvent, onError); } if (timeout) { diff --git a/packages/websocket/lib/Socket.js b/packages/websocket/lib/Socket.js index 3227a11b..5b147d6b 100644 --- a/packages/websocket/lib/Socket.js +++ b/packages/websocket/lib/Socket.js @@ -1,5 +1,5 @@ import WS from "ws"; -import { EventEmitter } from "@xmpp/events"; +import { EventEmitter, listeners } from "@xmpp/events"; import { parseURI } from "@xmpp/connection/lib/util.js"; // eslint-disable-next-line n/no-unsupported-features/node-builtins @@ -8,10 +8,7 @@ const WebSocket = globalThis.WebSocket || WS; const CODE = "ECONNERROR"; export default class Socket extends EventEmitter { - constructor() { - super(); - this.listeners = Object.create(null); - } + #listeners = null; isSecure() { if (!this.url) return false; @@ -28,46 +25,34 @@ export default class Socket extends EventEmitter { _attachSocket(socket) { this.socket = socket; - const { listeners } = this; - listeners.open = () => { - this.emit("connect"); - }; - - listeners.message = ({ data }) => this.emit("data", data); - listeners.error = (event) => { - const { url } = this; - // WS - let { error } = event; - // DOM - if (!error) { - error = new Error(`WebSocket ${CODE} ${url}`); - error.errno = CODE; - error.code = CODE; - } + this.#listeners ??= listeners(socket, { + open: () => this.emit("connect"), + message: ({ data }) => this.emit("data", data), + error: (event) => { + const { url } = this; + // WS + let { error } = event; + // DOM + if (!error) { + error = new Error(`WebSocket ${CODE} ${url}`); + error.errno = CODE; + error.code = CODE; + } - error.event = event; - error.url = url; - this.emit("error", error); - }; - - listeners.close = (event) => { - this._detachSocket(); - this.emit("close", !event.wasClean, event); - }; - - this.socket.addEventListener("open", listeners.open); - this.socket.addEventListener("message", listeners.message); - this.socket.addEventListener("error", listeners.error); - this.socket.addEventListener("close", listeners.close); + error.event = event; + error.url = url; + this.emit("error", error); + }, + close: (event) => { + this._detachSocket(); + this.emit("close", !event.wasClean, event); + }, + }).subscribe(); } _detachSocket() { delete this.url; - const { socket, listeners } = this; - for (const k of Object.getOwnPropertyNames(listeners)) { - socket.removeEventListener(k, listeners[k]); - delete listeners[k]; - } + this.#listeners?.unsubscribe; delete this.socket; }