diff --git a/package-lock.json b/package-lock.json index 13d18a4d..de35cabd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14407,6 +14407,7 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", + "@xmpp/time": "^0.14.0", "@xmpp/xml": "^0.14.0" }, "engines": { diff --git a/packages/client-core/src/bind2/bind2.test.js b/packages/client-core/src/bind2/bind2.test.js index b0aac358..23c5fddb 100644 --- a/packages/client-core/src/bind2/bind2.test.js +++ b/packages/client-core/src/bind2/bind2.test.js @@ -66,7 +66,6 @@ test("with function resource returning string", async () => { test("with function resource throwing", async () => { const error = new Error("foo"); - function resource() { throw error; } @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => { test("with function resource returning rejected promise", async () => { const error = new Error("foo"); - async function resource() { throw error; } diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 9f162ce3..dc5dc7d1 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,6 +1,7 @@ import XMPPError from "@xmpp/error"; import { procedure } from "@xmpp/events"; import xml from "@xmpp/xml"; +import { datetime } from "@xmpp/time"; // https://xmpp.org/extensions/xep-0198.html @@ -45,24 +46,49 @@ export default function streamManagement({ bind2, sasl2, }) { + let timeoutTimeout = null; + let requestAckTimeout = null; + const sm = { allowResume: true, preferredMaximum: null, enabled: false, id: "", + outbound_q: [], outbound: 0, inbound: 0, max: null, + timeout: 60_000, + _teardown: () => { + if (timeoutTimeout) clearTimeout(timeoutTimeout); + if (requestAckTimeout) clearTimeout(requestAckTimeout); + }, }; - function resumed() { + async function resumed(resumed) { sm.enabled = true; + const oldOutbound = sm.outbound; + for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { + let stanza = sm.outbound_q.shift(); + sm.outbound++; + entity.emit("stream-management/ack", stanza); + } + let q = sm.outbound_q; + sm.outbound_q = []; + for (const item of q) { + await entity.send(item); // This will trigger the middleware and re-add to the queue + } + entity.emit("stream-management/resumed"); entity._ready(true); } function failed() { sm.enabled = false; sm.id = ""; + let stanza; + while ((stanza = sm.outbound_q.shift())) { + entity.emit("stream-management/fail", stanza); + } sm.outbound = 0; } @@ -73,11 +99,18 @@ export default function streamManagement({ } entity.on("online", () => { + if (sm.outbound_q.length > 0) { + throw "Stream Management assertion failure, queue should be empty during online"; + } sm.outbound = 0; sm.inbound = 0; }); entity.on("offline", () => { + let stanza; + while ((stanza = sm.outbound_q.shift())) { + entity.emit("stream-management/fail", stanza); + } sm.outbound = 0; sm.inbound = 0; sm.enabled = false; @@ -86,6 +119,7 @@ export default function streamManagement({ middleware.use((context, next) => { const { stanza } = context; + if (timeoutTimeout) clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { @@ -93,7 +127,12 @@ export default function streamManagement({ entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). - sm.outbound = stanza.attrs.h; + const oldOutbound = sm.outbound; + for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { + let stanza = sm.outbound_q.shift(); + sm.outbound++; + entity.emit("stream-management/ack", stanza); + } } return next(); @@ -105,6 +144,41 @@ export default function streamManagement({ if (sasl2) { setupSasl2({ sasl2, sm, failed, resumed }); } + + function requestAck() { + if (timeoutTimeout) clearTimeout(timeoutTimeout); + if (sm.timeout) { + timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout); + } + entity.send(xml("r", { xmlns: NS })).catch(() => {}); + // Periodically send r to check the connection + // If a stanza goes out it will cancel this and set a sooner timer + requestAckTimeout = setTimeout(requestAck, 300_000); + } + + middleware.filter((context, next) => { + const { stanza } = context; + if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) { + let qStanza = stanza; + if ( + qStanza.name === "message" && + !qStanza.getChild("delay", "urn:xmpp:delay") + ) { + qStanza = xml.clone(qStanza); + qStanza.c("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp: datetime(), + }); + } + sm.outbound_q.push(qStanza); + // Debounce requests so we send only one after a big run of stanza together + if (requestAckTimeout) clearTimeout(requestAckTimeout); + requestAckTimeout = setTimeout(requestAck, 100); + } + return next(); + }); + if (streamFeatures) { setupStreamFeature({ streamFeatures, @@ -133,8 +207,7 @@ function setupStreamFeature({ // Resuming if (sm.id) { try { - await resume(entity, sm); - resumed(); + await resumed(await resume(entity, sm)); return; // If resumption fails, continue with session establishment } catch { @@ -150,6 +223,9 @@ function setupStreamFeature({ const promiseEnable = enable(entity, sm); // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . + if (sm.outbound_q.length > 0) { + throw "Stream Management assertion failure, queue should be empty after enable"; + } sm.outbound = 0; try { @@ -172,7 +248,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) { }, (element) => { if (element.is("resumed")) { - resumed(); + resumed(element); } else if (element.is(failed)) { // const error = StreamError.fromElement(element) failed(); diff --git a/packages/stream-management/package.json b/packages/stream-management/package.json index e9818cbb..72503e38 100644 --- a/packages/stream-management/package.json +++ b/packages/stream-management/package.json @@ -16,7 +16,8 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", - "@xmpp/xml": "^0.14.0" + "@xmpp/xml": "^0.14.0", + "@xmpp/time": "^0.14.0" }, "engines": { "node": ">= 20" diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index a8edc437..845e11f2 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -22,6 +22,7 @@ test("enable - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -73,6 +74,7 @@ test("enable - message - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -112,6 +114,7 @@ test("enable - failed", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); entity.streamManagement.enabled = true; entity.mockInput( @@ -125,6 +128,54 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); +test("enable - enabled - message", async () => { + const { entity } = mockClient(); + + entity.mockInput( + + + , + ); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + entity.mockInput( + , + ); + + await tick(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); + expect(entity.streamManagement.enabled).toBe(true); + + await entity.send(); + entity.streamManagement._teardown(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + + let acks = 0; + entity.on("stream-management/ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + entity.mockInput(); + await tick(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(1); + expect(entity.streamManagement.outbound_q).toHaveLength(0); +}); + test("resume - resumed", async () => { const { entity } = mockClient(); @@ -138,6 +189,7 @@ test("resume - resumed", async () => { ); entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [, ]; expect(await entity.catchOutgoing()).toEqual( , @@ -147,11 +199,31 @@ test("resume - resumed", async () => { expect(entity.status).toBe("offline"); - entity.mockInput(); + entity.mockInput(); - await tick(); + let acks = 0; + entity.on("stream-management/ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual(); - expect(entity.streamManagement.outbound).toBe(45); + let resumed = false; + entity.on("stream-management/resumed", () => { + resumed = true; + }); + + await tick(); + entity.streamManagement._teardown(); + + expect(resumed).toBe(true); + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + expect( + entity.streamManagement.outbound_q[0].getChild("delay", "urn:xmpp:delay"), + ).not.toBeUndefined(); expect(entity.status).toBe("online"); }); @@ -162,6 +234,7 @@ test("resume - failed", async () => { entity.streamManagement.id = "bar"; entity.streamManagement.enabled = true; entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = ["hai"]; entity.mockInput( @@ -179,10 +252,18 @@ test("resume - failed", async () => { , ); + let failures = 0; + entity.on("stream-management/fail", (failed) => { + failures++; + expect(failed).toBe("hai"); + }); + await tick(); + expect(failures).toBe(1); expect(entity.status).toBe("bar"); expect(entity.streamManagement.id).toBe(""); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); }); diff --git a/packages/xml/index.js b/packages/xml/index.js index efb06e0c..7b512e86 100644 --- a/packages/xml/index.js +++ b/packages/xml/index.js @@ -1,4 +1,5 @@ import Element from "ltx/lib/Element.js"; +import clone from "ltx/lib/clone.js"; import createElement from "ltx/lib/createElement.js"; import Parser from "./lib/Parser.js"; import { @@ -17,6 +18,7 @@ Object.assign(xml, { Element, createElement, Parser, + clone, escapeXML, unescapeXML, escapeXMLText, @@ -29,6 +31,7 @@ export { Element, createElement, Parser, + clone, escapeXML, unescapeXML, escapeXMLText,