From 0350f64ac48a01280bf51c41f4c95cb43e1be02a Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Fri, 17 Jan 2025 17:19:57 +0100 Subject: [PATCH 1/4] stream-management: Send ack on close --- packages/connection/index.js | 45 +++++++++++++++++++ packages/middleware/README.md | 2 +- packages/stream-management/README.md | 13 ++++++ packages/stream-management/index.js | 16 ++++++- .../stream-management/stream-features.test.js | 15 +++++++ 5 files changed, 88 insertions(+), 3 deletions(-) diff --git a/packages/connection/index.js b/packages/connection/index.js index 5c1cfcbb..600ea1ff 100644 --- a/packages/connection/index.js +++ b/packages/connection/index.js @@ -285,6 +285,8 @@ class Connection extends EventEmitter { * https://tools.ietf.org/html/rfc7395#section-3.6 */ async _closeStream(timeout = this.timeout) { + await this.#runHooks("close"); + const fragment = this.footer(this.footerElement()); await this.write(fragment); @@ -360,6 +362,49 @@ class Connection extends EventEmitter { // Override socketParameters() {} + + /* Experimental hooks */ + #hooks = new Map(); + #hook_events = new Set(["close"]); + hook(event, handler /*priority = 0 TODO */) { + this.#assertHookEventName(event); + + if (!this.#hooks.has(event)) { + this.#hooks.set(event, new Set()); + } + + this.#hooks.get(event).add([handler]); + } + #assertHookEventName(event) { + if (!this.#hook_events.has(event)) { + throw new Error(`Hook event name "${event}" is unknown.`); + } + } + unhook(event, handler) { + this.#assertHookEventName(event); + const handlers = this.#hooks.get("event"); + const item = [...handlers].find((item) => item.handler === handler); + handlers.remove(item); + } + async #runHooks(event, ...args) { + this.#assertHookEventName(event); + + const hooks = this.#hooks.get(event); + if (!hooks) return; + + // TODO run hooks by priority + // run hooks with the same priority in parallel + + await Promise.all( + [...hooks].map(async ([handler]) => { + try { + await handler(...args); + } catch (err) { + this.emit("error", err); + } + }), + ); + } } // Override diff --git a/packages/middleware/README.md b/packages/middleware/README.md index 612f01a4..3f2f38af 100644 --- a/packages/middleware/README.md +++ b/packages/middleware/README.md @@ -6,7 +6,7 @@ Supports Node.js and browsers. ## Install -``` +```sh npm install @xmpp/middleware ``` diff --git a/packages/stream-management/README.md b/packages/stream-management/README.md index 8bb8bbd5..a5ea5054 100644 --- a/packages/stream-management/README.md +++ b/packages/stream-management/README.md @@ -13,6 +13,19 @@ If the session fails to resume, entity will fallback to regular session establis - Automatically responds to acks. - Periodically request acks. - If server fails to respond, triggers a reconnect. +- On reconnect retry sending the queue + +When a stanza is re-sent, a [delay element](https://xmpp.org/extensions/xep-0203.html) will be added to it. + +- `from` client jid +- `stamp` [date/time](https://xmpp.org/extensions/xep-0082.html) at which the stanza was meant to be sent + +```xml + +``` ## Events diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 5d161d97..eec05d4a 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -63,11 +63,23 @@ export default function streamManagement({ requestAckInterval: 30_000, }); + async function sendAck() { + try { + await entity.send(xml("a", { xmlns: NS, h: sm.inbound })); + } catch {} + } + entity.on("disconnect", () => { clearTimeout(timeoutTimeout); clearTimeout(requestAckTimeout); }); + // It is RECOMMENDED that initiating entities (usually clients) send an element right before they gracefully close the stream, in order to inform the peer about received stanzas + entity.hook("close", async () => { + if (!sm.enabled) return; + await sendAck(); + }); + async function resumed(resumed) { sm.enabled = true; ackQueue(+resumed.attrs.h); @@ -127,14 +139,14 @@ export default function streamManagement({ sm.id = ""; }); - middleware.use((context, next) => { + middleware.use(async (context, next) => { const { stanza } = context; clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { // > When an element ("request") is received, the recipient MUST acknowledge it by sending an element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the element. - entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); + await sendAck(); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). ackQueue(+stanza.attrs.h); diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 69f2fa64..79a07292 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -336,3 +336,18 @@ test("resume - failed with something in queue", async () => { expect(entity.streamManagement.outbound).toBe(0); expect(entity.streamManagement.outbound_q).toBeEmpty(); }); + +test("sends an element before closing", async () => { + const { entity, streamManagement } = mockClient(); + streamManagement.enabled = true; + streamManagement.inbound = 42; + entity.status = "online"; + + const promise_disconnect = entity.disconnect(); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + await promise_disconnect; +}); From 34e645b5b7f3e8b0bef47fb2267a038d08b98d34 Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Fri, 17 Jan 2025 17:39:32 +0100 Subject: [PATCH 2/4] f --- packages/stream-management/index.js | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index eec05d4a..6980585e 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -72,6 +72,7 @@ export default function streamManagement({ entity.on("disconnect", () => { clearTimeout(timeoutTimeout); clearTimeout(requestAckTimeout); + sm.enabled = false; }); // It is RECOMMENDED that initiating entities (usually clients) send an element right before they gracefully close the stream, in order to inform the peer about received stanzas From 874bbdee4f56f180e32ade752a7cb8c22ec92c68 Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Sat, 18 Jan 2025 16:46:20 +0100 Subject: [PATCH 3/4] stream-management: Code improvements --- packages/stream-management/bind2.js | 19 +++ packages/stream-management/index.js | 122 ++----------------- packages/stream-management/sasl2.js | 19 +++ packages/stream-management/stream-feature.js | 74 +++++++++++ server/prosody.cfg.lua | 1 + 5 files changed, 122 insertions(+), 113 deletions(-) create mode 100644 packages/stream-management/bind2.js create mode 100644 packages/stream-management/sasl2.js create mode 100644 packages/stream-management/stream-feature.js diff --git a/packages/stream-management/bind2.js b/packages/stream-management/bind2.js new file mode 100644 index 00000000..96e278a2 --- /dev/null +++ b/packages/stream-management/bind2.js @@ -0,0 +1,19 @@ +import { NS, makeEnableElement } from "./index.js"; + +export function setupBind2({ bind2, sm, failed, enabled }) { + bind2.use( + NS, + // https://xmpp.org/extensions/xep-0198.html#inline-examples + (_element) => { + return makeEnableElement({ sm }); + }, + async (element) => { + if (element.is("enabled")) { + enabled(element.attrs); + } else if (element.is("failed")) { + // const error = StreamError.fromElement(element) + failed(); + } + }, + ); +} diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 6980585e..5fb03c31 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,44 +1,26 @@ -import XMPPError from "@xmpp/error"; -import { EventEmitter, procedure } from "@xmpp/events"; +import { EventEmitter } from "@xmpp/events"; import xml from "@xmpp/xml"; import { datetime } from "@xmpp/time"; +import { setupBind2 } from "./bind2.js"; +import { setupSasl2 } from "./sasl2.js"; +import { setupStreamFeature } from "./stream-feature.js"; // https://xmpp.org/extensions/xep-0198.html -const NS = "urn:xmpp:sm:3"; +export const NS = "urn:xmpp:sm:3"; -function makeEnableElement({ sm }) { +export function makeEnableElement({ sm }) { return xml("enable", { xmlns: NS, max: sm.preferredMaximum, - resume: sm.allowResume ? "true" : undefined, + resume: "true", }); } -function makeResumeElement({ sm }) { +export function makeResumeElement({ sm }) { return xml("resume", { xmlns: NS, h: sm.inbound, previd: sm.id }); } -function enable(entity, sm) { - return procedure(entity, makeEnableElement({ sm }), (element, done) => { - if (element.is("enabled", NS)) { - return done(element); - } else if (element.is("failed", NS)) { - throw XMPPError.fromElement(element); - } - }); -} - -async function resume(entity, sm) { - return procedure(entity, makeResumeElement({ sm }), (element, done) => { - if (element.is("resumed", NS)) { - return done(element); - } else if (element.is("failed", NS)) { - throw XMPPError.fromElement(element); - } - }); -} - export default function streamManagement({ streamFeatures, entity, @@ -51,7 +33,6 @@ export default function streamManagement({ const sm = new EventEmitter(); Object.assign(sm, { - allowResume: true, preferredMaximum: null, enabled: false, id: "", @@ -171,6 +152,7 @@ export default function streamManagement({ clearTimeout(requestAckTimeout); if (!sm.enabled) return; + if (!timeout) return; requestAckTimeout = setTimeout(requestAck, timeout); } @@ -218,92 +200,6 @@ export default function streamManagement({ return sm; } -function setupStreamFeature({ - streamFeatures, - sm, - entity, - resumed, - failed, - enabled, -}) { - // https://xmpp.org/extensions/xep-0198.html#enable - // For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session - streamFeatures.use("sm", NS, async (context, next) => { - // Resuming - if (sm.id) { - try { - const element = await resume(entity, sm); - await resumed(element); - return; - // If resumption fails, continue with session establishment - } catch { - failed(); - } - } - - // Enabling - - // Resource binding first - await next(); - - const promiseEnable = enable(entity, sm); - - if (sm.outbound_q.length > 0) { - throw new Error( - "Stream Management assertion failure, queue should be empty after enable", - ); - } - - // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . - sm.outbound = 0; - - try { - const response = await promiseEnable; - enabled(response.attrs); - } catch { - sm.enabled = false; - } - - sm.inbound = 0; - }); -} - -function setupSasl2({ sasl2, sm, failed, resumed }) { - sasl2.use( - "urn:xmpp:sm:3", - (element) => { - if (!element.is("sm")) return; - if (sm.id) return makeResumeElement({ sm }); - }, - (element) => { - if (element.is("resumed")) { - resumed(element); - } else if (element.is(failed)) { - // const error = StreamError.fromElement(element) - failed(); - } - }, - ); -} - -function setupBind2({ bind2, sm, failed, enabled }) { - bind2.use( - "urn:xmpp:sm:3", - // https://xmpp.org/extensions/xep-0198.html#inline-examples - (_element) => { - return makeEnableElement({ sm }); - }, - (element) => { - if (element.is("enabled")) { - enabled(element.attrs); - } else if (element.is("failed")) { - // const error = StreamError.fromElement(element) - failed(); - } - }, - ); -} - function queueToStanza({ entity, item }) { const { stanza, stamp } = item; if ( diff --git a/packages/stream-management/sasl2.js b/packages/stream-management/sasl2.js new file mode 100644 index 00000000..877b857b --- /dev/null +++ b/packages/stream-management/sasl2.js @@ -0,0 +1,19 @@ +import { NS, makeResumeElement } from "./index.js"; + +export function setupSasl2({ sasl2, sm, failed, resumed }) { + sasl2.use( + NS, + (element) => { + if (!element.is("sm")) return; + if (sm.id) return makeResumeElement({ sm }); + }, + (element) => { + if (element.is("resumed")) { + resumed(element); + } else if (element.is(failed)) { + // const error = StreamError.fromElement(element) + failed(); + } + }, + ); +} diff --git a/packages/stream-management/stream-feature.js b/packages/stream-management/stream-feature.js new file mode 100644 index 00000000..9fc4e972 --- /dev/null +++ b/packages/stream-management/stream-feature.js @@ -0,0 +1,74 @@ +import XMPPError from "@xmpp/error"; +import { procedure } from "@xmpp/events"; + +import { NS, makeEnableElement, makeResumeElement } from "./index.js"; + +export function setupStreamFeature({ + streamFeatures, + sm, + entity, + resumed, + failed, + enabled, +}) { + // https://xmpp.org/extensions/xep-0198.html#enable + // For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session + streamFeatures.use("sm", NS, async (context, next) => { + // Resuming + if (sm.id) { + try { + const element = await resume(entity, sm); + await resumed(element); + return; + // If resumption fails, continue with session establishment + } catch { + failed(); + } + } + + // Enabling + + // Resource binding first + await next(); + + const promiseEnable = enable(entity, sm); + + if (sm.outbound_q.length > 0) { + throw new Error( + "Stream Management assertion failure, queue should be empty after enable", + ); + } + + // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . + sm.outbound = 0; + + try { + const response = await promiseEnable; + enabled(response.attrs); + } catch { + sm.enabled = false; + } + + sm.inbound = 0; + }); +} + +export function enable(entity, sm) { + return procedure(entity, makeEnableElement({ sm }), (element, done) => { + if (element.is("enabled", NS)) { + return done(element); + } else if (element.is("failed", NS)) { + throw XMPPError.fromElement(element); + } + }); +} + +export async function resume(entity, sm) { + return procedure(entity, makeResumeElement({ sm }), (element, done) => { + if (element.is("resumed", NS)) { + return done(element); + } else if (element.is("failed", NS)) { + throw XMPPError.fromElement(element); + } + }); +} diff --git a/server/prosody.cfg.lua b/server/prosody.cfg.lua index 870055de..8a5a48b7 100644 --- a/server/prosody.cfg.lua +++ b/server/prosody.cfg.lua @@ -26,6 +26,7 @@ modules_enabled = { "sasl2_bind2"; "sasl2_sm"; "sasl2_fast"; + "csi_simple"; }; modules_disabled = { From d26aae4bc1e4c7353b240debc2e0d81adc079176 Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Sat, 18 Jan 2025 16:50:59 +0100 Subject: [PATCH 4/4] f --- .../stream-management/stream-features.test.js | 43 ----------------- .../stream-management.test.js | 46 +++++++++++++++++++ 2 files changed, 46 insertions(+), 43 deletions(-) create mode 100644 packages/stream-management/stream-management.test.js diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 79a07292..85341398 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -124,34 +124,6 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); -test("stanza ack", async () => { - const { entity } = mockClient(); - - entity.streamManagement.enabled = true; - - expect(entity.streamManagement.outbound).toBe(0); - expect(entity.streamManagement.outbound_q).toBeEmpty(); - // expect(entity.streamManagement.enabled).toBe(true); - - await entity.send(); - - expect(entity.streamManagement.outbound).toBe(0); - expect(entity.streamManagement.outbound_q).toHaveLength(1); - - let acks = 0; - entity.streamManagement.on("ack", (stanza) => { - expect(stanza.attrs.id).toBe("a"); - acks++; - }); - - entity.mockInput(); - await tick(); - - expect(acks).toBe(1); - expect(entity.streamManagement.outbound).toBe(1); - expect(entity.streamManagement.outbound_q).toHaveLength(0); -}); - test("resume - resumed", async () => { const { entity } = mockClient(); @@ -336,18 +308,3 @@ test("resume - failed with something in queue", async () => { expect(entity.streamManagement.outbound).toBe(0); expect(entity.streamManagement.outbound_q).toBeEmpty(); }); - -test("sends an element before closing", async () => { - const { entity, streamManagement } = mockClient(); - streamManagement.enabled = true; - streamManagement.inbound = 42; - entity.status = "online"; - - const promise_disconnect = entity.disconnect(); - - expect(await entity.catchOutgoing()).toEqual( - , - ); - - await promise_disconnect; -}); diff --git a/packages/stream-management/stream-management.test.js b/packages/stream-management/stream-management.test.js new file mode 100644 index 00000000..0cfe5fae --- /dev/null +++ b/packages/stream-management/stream-management.test.js @@ -0,0 +1,46 @@ +import { mockClient } from "@xmpp/test"; + +import { tick } from "@xmpp/events"; + +test("emits ack when the server ackownledge stanzas", async () => { + const { entity } = mockClient(); + + entity.streamManagement.enabled = true; + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); + // expect(entity.streamManagement.enabled).toBe(true); + + await entity.send(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + entity.mockInput(); + await tick(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(1); + expect(entity.streamManagement.outbound_q).toHaveLength(0); +}); + +test("sends an element before closing", async () => { + const { entity, streamManagement } = mockClient(); + streamManagement.enabled = true; + streamManagement.inbound = 42; + entity.status = "online"; + + const promise_disconnect = entity.disconnect(); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + await promise_disconnect; +});