From 1165c33a34d3ebb99bd96984116e6cf422d5830c Mon Sep 17 00:00:00 2001 From: Sonny Date: Fri, 24 Jan 2025 15:59:18 +0100 Subject: [PATCH] stream-management: Split bind2, sasl2 and stream features (#1063) --- packages/stream-management/bind2.js | 19 +++ packages/stream-management/index.js | 122 ++---------------- packages/stream-management/sasl2.js | 19 +++ packages/stream-management/stream-feature.js | 74 +++++++++++ .../stream-management/stream-features.test.js | 28 ---- .../stream-management.test.js | 46 +++++++ server/prosody.cfg.lua | 1 + 7 files changed, 168 insertions(+), 141 deletions(-) create mode 100644 packages/stream-management/bind2.js create mode 100644 packages/stream-management/sasl2.js create mode 100644 packages/stream-management/stream-feature.js create mode 100644 packages/stream-management/stream-management.test.js diff --git a/packages/stream-management/bind2.js b/packages/stream-management/bind2.js new file mode 100644 index 00000000..96e278a2 --- /dev/null +++ b/packages/stream-management/bind2.js @@ -0,0 +1,19 @@ +import { NS, makeEnableElement } from "./index.js"; + +export function setupBind2({ bind2, sm, failed, enabled }) { + bind2.use( + NS, + // https://xmpp.org/extensions/xep-0198.html#inline-examples + (_element) => { + return makeEnableElement({ sm }); + }, + async (element) => { + if (element.is("enabled")) { + enabled(element.attrs); + } else if (element.is("failed")) { + // const error = StreamError.fromElement(element) + failed(); + } + }, + ); +} diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 6980585e..5fb03c31 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,44 +1,26 @@ -import XMPPError from "@xmpp/error"; -import { EventEmitter, procedure } from "@xmpp/events"; +import { EventEmitter } from "@xmpp/events"; import xml from "@xmpp/xml"; import { datetime } from "@xmpp/time"; +import { setupBind2 } from "./bind2.js"; +import { setupSasl2 } from "./sasl2.js"; +import { setupStreamFeature } from "./stream-feature.js"; // https://xmpp.org/extensions/xep-0198.html -const NS = "urn:xmpp:sm:3"; +export const NS = "urn:xmpp:sm:3"; -function makeEnableElement({ sm }) { +export function makeEnableElement({ sm }) { return xml("enable", { xmlns: NS, max: sm.preferredMaximum, - resume: sm.allowResume ? "true" : undefined, + resume: "true", }); } -function makeResumeElement({ sm }) { +export function makeResumeElement({ sm }) { return xml("resume", { xmlns: NS, h: sm.inbound, previd: sm.id }); } -function enable(entity, sm) { - return procedure(entity, makeEnableElement({ sm }), (element, done) => { - if (element.is("enabled", NS)) { - return done(element); - } else if (element.is("failed", NS)) { - throw XMPPError.fromElement(element); - } - }); -} - -async function resume(entity, sm) { - return procedure(entity, makeResumeElement({ sm }), (element, done) => { - if (element.is("resumed", NS)) { - return done(element); - } else if (element.is("failed", NS)) { - throw XMPPError.fromElement(element); - } - }); -} - export default function streamManagement({ streamFeatures, entity, @@ -51,7 +33,6 @@ export default function streamManagement({ const sm = new EventEmitter(); Object.assign(sm, { - allowResume: true, preferredMaximum: null, enabled: false, id: "", @@ -171,6 +152,7 @@ export default function streamManagement({ clearTimeout(requestAckTimeout); if (!sm.enabled) return; + if (!timeout) return; requestAckTimeout = setTimeout(requestAck, timeout); } @@ -218,92 +200,6 @@ export default function streamManagement({ return sm; } -function setupStreamFeature({ - streamFeatures, - sm, - entity, - resumed, - failed, - enabled, -}) { - // https://xmpp.org/extensions/xep-0198.html#enable - // For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session - streamFeatures.use("sm", NS, async (context, next) => { - // Resuming - if (sm.id) { - try { - const element = await resume(entity, sm); - await resumed(element); - return; - // If resumption fails, continue with session establishment - } catch { - failed(); - } - } - - // Enabling - - // Resource binding first - await next(); - - const promiseEnable = enable(entity, sm); - - if (sm.outbound_q.length > 0) { - throw new Error( - "Stream Management assertion failure, queue should be empty after enable", - ); - } - - // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . - sm.outbound = 0; - - try { - const response = await promiseEnable; - enabled(response.attrs); - } catch { - sm.enabled = false; - } - - sm.inbound = 0; - }); -} - -function setupSasl2({ sasl2, sm, failed, resumed }) { - sasl2.use( - "urn:xmpp:sm:3", - (element) => { - if (!element.is("sm")) return; - if (sm.id) return makeResumeElement({ sm }); - }, - (element) => { - if (element.is("resumed")) { - resumed(element); - } else if (element.is(failed)) { - // const error = StreamError.fromElement(element) - failed(); - } - }, - ); -} - -function setupBind2({ bind2, sm, failed, enabled }) { - bind2.use( - "urn:xmpp:sm:3", - // https://xmpp.org/extensions/xep-0198.html#inline-examples - (_element) => { - return makeEnableElement({ sm }); - }, - (element) => { - if (element.is("enabled")) { - enabled(element.attrs); - } else if (element.is("failed")) { - // const error = StreamError.fromElement(element) - failed(); - } - }, - ); -} - function queueToStanza({ entity, item }) { const { stanza, stamp } = item; if ( diff --git a/packages/stream-management/sasl2.js b/packages/stream-management/sasl2.js new file mode 100644 index 00000000..877b857b --- /dev/null +++ b/packages/stream-management/sasl2.js @@ -0,0 +1,19 @@ +import { NS, makeResumeElement } from "./index.js"; + +export function setupSasl2({ sasl2, sm, failed, resumed }) { + sasl2.use( + NS, + (element) => { + if (!element.is("sm")) return; + if (sm.id) return makeResumeElement({ sm }); + }, + (element) => { + if (element.is("resumed")) { + resumed(element); + } else if (element.is(failed)) { + // const error = StreamError.fromElement(element) + failed(); + } + }, + ); +} diff --git a/packages/stream-management/stream-feature.js b/packages/stream-management/stream-feature.js new file mode 100644 index 00000000..9fc4e972 --- /dev/null +++ b/packages/stream-management/stream-feature.js @@ -0,0 +1,74 @@ +import XMPPError from "@xmpp/error"; +import { procedure } from "@xmpp/events"; + +import { NS, makeEnableElement, makeResumeElement } from "./index.js"; + +export function setupStreamFeature({ + streamFeatures, + sm, + entity, + resumed, + failed, + enabled, +}) { + // https://xmpp.org/extensions/xep-0198.html#enable + // For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session + streamFeatures.use("sm", NS, async (context, next) => { + // Resuming + if (sm.id) { + try { + const element = await resume(entity, sm); + await resumed(element); + return; + // If resumption fails, continue with session establishment + } catch { + failed(); + } + } + + // Enabling + + // Resource binding first + await next(); + + const promiseEnable = enable(entity, sm); + + if (sm.outbound_q.length > 0) { + throw new Error( + "Stream Management assertion failure, queue should be empty after enable", + ); + } + + // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . + sm.outbound = 0; + + try { + const response = await promiseEnable; + enabled(response.attrs); + } catch { + sm.enabled = false; + } + + sm.inbound = 0; + }); +} + +export function enable(entity, sm) { + return procedure(entity, makeEnableElement({ sm }), (element, done) => { + if (element.is("enabled", NS)) { + return done(element); + } else if (element.is("failed", NS)) { + throw XMPPError.fromElement(element); + } + }); +} + +export async function resume(entity, sm) { + return procedure(entity, makeResumeElement({ sm }), (element, done) => { + if (element.is("resumed", NS)) { + return done(element); + } else if (element.is("failed", NS)) { + throw XMPPError.fromElement(element); + } + }); +} diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 79a07292..bf2fac21 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -124,34 +124,6 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); -test("stanza ack", async () => { - const { entity } = mockClient(); - - entity.streamManagement.enabled = true; - - expect(entity.streamManagement.outbound).toBe(0); - expect(entity.streamManagement.outbound_q).toBeEmpty(); - // expect(entity.streamManagement.enabled).toBe(true); - - await entity.send(); - - expect(entity.streamManagement.outbound).toBe(0); - expect(entity.streamManagement.outbound_q).toHaveLength(1); - - let acks = 0; - entity.streamManagement.on("ack", (stanza) => { - expect(stanza.attrs.id).toBe("a"); - acks++; - }); - - entity.mockInput(); - await tick(); - - expect(acks).toBe(1); - expect(entity.streamManagement.outbound).toBe(1); - expect(entity.streamManagement.outbound_q).toHaveLength(0); -}); - test("resume - resumed", async () => { const { entity } = mockClient(); diff --git a/packages/stream-management/stream-management.test.js b/packages/stream-management/stream-management.test.js new file mode 100644 index 00000000..0cfe5fae --- /dev/null +++ b/packages/stream-management/stream-management.test.js @@ -0,0 +1,46 @@ +import { mockClient } from "@xmpp/test"; + +import { tick } from "@xmpp/events"; + +test("emits ack when the server ackownledge stanzas", async () => { + const { entity } = mockClient(); + + entity.streamManagement.enabled = true; + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); + // expect(entity.streamManagement.enabled).toBe(true); + + await entity.send(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + entity.mockInput(); + await tick(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(1); + expect(entity.streamManagement.outbound_q).toHaveLength(0); +}); + +test("sends an element before closing", async () => { + const { entity, streamManagement } = mockClient(); + streamManagement.enabled = true; + streamManagement.inbound = 42; + entity.status = "online"; + + const promise_disconnect = entity.disconnect(); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + await promise_disconnect; +}); diff --git a/server/prosody.cfg.lua b/server/prosody.cfg.lua index 870055de..8a5a48b7 100644 --- a/server/prosody.cfg.lua +++ b/server/prosody.cfg.lua @@ -26,6 +26,7 @@ modules_enabled = { "sasl2_bind2"; "sasl2_sm"; "sasl2_fast"; + "csi_simple"; }; modules_disabled = {