Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: Add helpers for event listeners #1051

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 21 additions & 47 deletions packages/connection/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventEmitter, promise } from "@xmpp/events";
import { EventEmitter, promise, listeners } from "@xmpp/events";
import jid from "@xmpp/jid";
import xml from "@xmpp/xml";
import StreamError from "./lib/StreamError.js";
Expand All @@ -8,13 +8,14 @@ const NS_STREAM = "urn:ietf:params:xml:ns:xmpp-streams";
const NS_JABBER_STREAM = "http://etherx.jabber.org/streams";

class Connection extends EventEmitter {
#socketListeners = null;
#parserListeners = null;

constructor(options = {}) {
super();
this.jid = null;
this.timeout = 2000;
this.options = options;
this.socketListeners = Object.create(null);
this.parserListeners = Object.create(null);
this.status = "offline";
this.socket = null;
this.parser = null;
Expand All @@ -40,7 +41,7 @@ class Connection extends EventEmitter {
this.parser.write(str);
}

_onParserError(error) {
#onParserError(error) {
// https://xmpp.org/rfcs/rfc6120.html#streams-error-conditions-bad-format
// "This error can be used instead of the more specific XML-related errors,
// such as <bad-namespace-prefix/>, <invalid-xml/>, <not-well-formed/>, <restricted-xml/>,
Expand All @@ -62,32 +63,17 @@ class Connection extends EventEmitter {

_attachSocket(socket) {
this.socket = socket;
const listeners = this.socketListeners;

listeners.data = this._onData.bind(this);

listeners.close = this.#onSocketClosed.bind(this);

listeners.connect = () => {
this._status("connect");
};

listeners.error = (error) => {
this.emit("error", error);
};

this.socket.on("close", listeners.close);
this.socket.on("data", listeners.data);
this.socket.on("error", listeners.error);
this.socket.on("connect", listeners.connect);
this.#socketListeners ??= listeners({
data: this._onData.bind(this),
close: this.#onSocketClosed.bind(this),
connect: () => this._status("connect"),
error: (error) => this.emit("error", error),
});
this.#socketListeners.subscribe(this.socket);
}

_detachSocket() {
const { socketListeners, socket } = this;
for (const k of Object.getOwnPropertyNames(socketListeners)) {
socket.removeListener(k, socketListeners[k]);
delete socketListeners[k];
}
this.socket && this.#socketListeners?.unsubscribe(this.socket);
this.socket = null;
}

Expand Down Expand Up @@ -143,29 +129,17 @@ class Connection extends EventEmitter {

_attachParser(parser) {
this.parser = parser;
const listeners = this.parserListeners;

listeners.element = this._onElement.bind(this);
listeners.error = this._onParserError.bind(this);

listeners.end = this.#onStreamClosed.bind(this);

listeners.start = (element) => {
this._status("open", element);
};

this.parser.on("error", listeners.error);
this.parser.on("element", listeners.element);
this.parser.on("end", listeners.end);
this.parser.on("start", listeners.start);
this.#parserListeners ??= listeners({
element: this._onElement.bind(this),
error: this.#onParserError.bind(this),
end: this.#onStreamClosed.bind(this),
start: (element) => this._status("open", element),
});
this.#parserListeners.subscribe(this.parser);
}

_detachParser() {
const listeners = this.parserListeners;
for (const k of Object.getOwnPropertyNames(listeners)) {
this.parser.removeListener(k, listeners[k]);
delete listeners[k];
}
this.parser && this.#parserListeners?.unsubscribe(this.parser);
this.parser = null;
this.root = null;
}
Expand Down
4 changes: 4 additions & 0 deletions packages/events/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import TimeoutError from "./lib/TimeoutError.js";
import promise from "./lib/promise.js";
import Deferred from "./lib/Deferred.js";
import procedure from "./lib/procedure.js";
import listeners from "./lib/listeners.js";
import onoff from "./lib/onoff.js";

export {
EventEmitter,
Expand All @@ -15,4 +17,6 @@ export {
promise,
Deferred,
procedure,
listeners,
onoff,
};
18 changes: 18 additions & 0 deletions packages/events/lib/listeners.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import onoff from "./onoff.js";

export default function listeners(events) {
return {
subscribe(target) {
const { on } = onoff(target);
for (const [event, handler] of Object.entries(events)) {
on(event, handler);
}
},
unsubscribe(target) {
const { off } = onoff(target);
for (const [event, handler] of Object.entries(events)) {
off(event, handler);
}
},
};
}
21 changes: 21 additions & 0 deletions packages/events/lib/onoff.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const map = new WeakMap();

export default function onoff(target) {
let m = map.get(target);

if (!m) {
const on = (target.addEventListener ?? target.addListener).bind(target);
const off = (target.removeEventListener ?? target.removeListener).bind(
target,
);
const once = (
target.once ??
((event, handler) =>
target.addEventListener(event, handler, { once: true }))
).bind(target);
m = { on, off, once };
map.set(target, m);
}

return m;
}
14 changes: 9 additions & 5 deletions packages/events/lib/promise.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import onoff from "./onoff.js";

import TimeoutError from "./TimeoutError.js";

export default function promise(EE, event, rejectEvent = "error", timeout) {
export default function promise(target, event, rejectEvent = "error", timeout) {
return new Promise((resolve, reject) => {
let timeoutId;

const { off, once } = onoff(target);

const cleanup = () => {
clearTimeout(timeoutId);
EE.removeListener(event, onEvent);
EE.removeListener(rejectEvent, onError);
off(event, onEvent);
off(rejectEvent, onError);
};

function onError(reason) {
Expand All @@ -20,9 +24,9 @@ export default function promise(EE, event, rejectEvent = "error", timeout) {
cleanup();
}

EE.once(event, onEvent);
once(event, onEvent);
if (rejectEvent) {
EE.once(rejectEvent, onError);
once(rejectEvent, onError);
}

if (timeout) {
Expand Down
72 changes: 30 additions & 42 deletions packages/websocket/lib/Socket.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import WS from "ws";
import { EventEmitter } from "@xmpp/events";
import { EventEmitter, listeners } from "@xmpp/events";
import { parseURI } from "@xmpp/connection/lib/util.js";

// eslint-disable-next-line n/no-unsupported-features/node-builtins
Expand All @@ -8,10 +8,9 @@ const WebSocket = globalThis.WebSocket || WS;
const CODE = "ECONNERROR";

export default class Socket extends EventEmitter {
constructor() {
super();
this.listeners = Object.create(null);
}
#listeners = null;
socket = null;
url = null;

isSecure() {
if (!this.url) return false;
Expand All @@ -28,47 +27,36 @@ export default class Socket extends EventEmitter {

_attachSocket(socket) {
this.socket = socket;
const { listeners } = this;
listeners.open = () => {
this.emit("connect");
};

listeners.message = ({ data }) => this.emit("data", data);
listeners.error = (event) => {
const { url } = this;
// WS
let { error } = event;
// DOM
if (!error) {
error = new Error(`WebSocket ${CODE} ${url}`);
error.errno = CODE;
error.code = CODE;
}
this.#listeners ??= listeners({
open: () => this.emit("connect"),
message: ({ data }) => this.emit("data", data),
error: (event) => {
const { url } = this;
// WS
let { error } = event;
// DOM
if (!error) {
error = new Error(event.message || `WebSocket ${CODE} ${url}`);
error.errno = CODE;
error.code = CODE;
}

error.event = event;
error.url = url;
this.emit("error", error);
};

listeners.close = (event) => {
this._detachSocket();
this.emit("close", !event.wasClean, event);
};

this.socket.addEventListener("open", listeners.open);
this.socket.addEventListener("message", listeners.message);
this.socket.addEventListener("error", listeners.error);
this.socket.addEventListener("close", listeners.close);
error.event = event;
error.url = url;
this.emit("error", error);
},
close: (event) => {
this._detachSocket();
this.emit("close", !event.wasClean, event);
},
});
this.#listeners.subscribe(this.socket);
}

_detachSocket() {
delete this.url;
const { socket, listeners } = this;
for (const k of Object.getOwnPropertyNames(listeners)) {
socket.removeEventListener(k, listeners[k]);
delete listeners[k];
}
delete this.socket;
this.url = null;
this.socket && this.#listeners?.unsubscribe(this.socket);
this.socket = null;
}

end() {
Expand Down
Loading