From c4d4bb825451a564e5d32a12ba4a0f270a3f7c3e Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Tue, 14 Nov 2023 16:31:15 -0500 Subject: [PATCH] Implement stream management requesting ACKs Queue outgoing stanzas and periodically request ACK. Remove from the queue anything ack'd and notify of the ack so apps can know the stanza has for sure sent. On resume, anything not ack'd is re-sent. On reconnect, anything not ack'd notify of the failure to send this stanza so apps can know the stanza failed. Even when there is no traffic, send an at least every 5 minutes to check the connection. If there is no inbound traffic (such as an ) within timeout (default 60 seconds) then consider the connection disconnected. --- package-lock.json | 1 + packages/client-core/src/bind2/bind2.test.js | 2 - packages/stream-management/index.js | 86 ++++++++++++++++-- packages/stream-management/package.json | 3 +- .../stream-management/stream-features.test.js | 87 ++++++++++++++++++- packages/xml/index.js | 3 + 6 files changed, 171 insertions(+), 11 deletions(-) diff --git a/package-lock.json b/package-lock.json index 13d18a4d..de35cabd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14407,6 +14407,7 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", + "@xmpp/time": "^0.14.0", "@xmpp/xml": "^0.14.0" }, "engines": { diff --git a/packages/client-core/src/bind2/bind2.test.js b/packages/client-core/src/bind2/bind2.test.js index b0aac358..23c5fddb 100644 --- a/packages/client-core/src/bind2/bind2.test.js +++ b/packages/client-core/src/bind2/bind2.test.js @@ -66,7 +66,6 @@ test("with function resource returning string", async () => { test("with function resource throwing", async () => { const error = new Error("foo"); - function resource() { throw error; } @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => { test("with function resource returning rejected promise", async () => { const error = new Error("foo"); - async function resource() { throw error; } diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 9f162ce3..dc5dc7d1 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,6 +1,7 @@ import XMPPError from "@xmpp/error"; import { procedure } from "@xmpp/events"; import xml from "@xmpp/xml"; +import { datetime } from "@xmpp/time"; // https://xmpp.org/extensions/xep-0198.html @@ -45,24 +46,49 @@ export default function streamManagement({ bind2, sasl2, }) { + let timeoutTimeout = null; + let requestAckTimeout = null; + const sm = { allowResume: true, preferredMaximum: null, enabled: false, id: "", + outbound_q: [], outbound: 0, inbound: 0, max: null, + timeout: 60_000, + _teardown: () => { + if (timeoutTimeout) clearTimeout(timeoutTimeout); + if (requestAckTimeout) clearTimeout(requestAckTimeout); + }, }; - function resumed() { + async function resumed(resumed) { sm.enabled = true; + const oldOutbound = sm.outbound; + for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { + let stanza = sm.outbound_q.shift(); + sm.outbound++; + entity.emit("stream-management/ack", stanza); + } + let q = sm.outbound_q; + sm.outbound_q = []; + for (const item of q) { + await entity.send(item); // This will trigger the middleware and re-add to the queue + } + entity.emit("stream-management/resumed"); entity._ready(true); } function failed() { sm.enabled = false; sm.id = ""; + let stanza; + while ((stanza = sm.outbound_q.shift())) { + entity.emit("stream-management/fail", stanza); + } sm.outbound = 0; } @@ -73,11 +99,18 @@ export default function streamManagement({ } entity.on("online", () => { + if (sm.outbound_q.length > 0) { + throw "Stream Management assertion failure, queue should be empty during online"; + } sm.outbound = 0; sm.inbound = 0; }); entity.on("offline", () => { + let stanza; + while ((stanza = sm.outbound_q.shift())) { + entity.emit("stream-management/fail", stanza); + } sm.outbound = 0; sm.inbound = 0; sm.enabled = false; @@ -86,6 +119,7 @@ export default function streamManagement({ middleware.use((context, next) => { const { stanza } = context; + if (timeoutTimeout) clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { @@ -93,7 +127,12 @@ export default function streamManagement({ entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). - sm.outbound = stanza.attrs.h; + const oldOutbound = sm.outbound; + for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { + let stanza = sm.outbound_q.shift(); + sm.outbound++; + entity.emit("stream-management/ack", stanza); + } } return next(); @@ -105,6 +144,41 @@ export default function streamManagement({ if (sasl2) { setupSasl2({ sasl2, sm, failed, resumed }); } + + function requestAck() { + if (timeoutTimeout) clearTimeout(timeoutTimeout); + if (sm.timeout) { + timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout); + } + entity.send(xml("r", { xmlns: NS })).catch(() => {}); + // Periodically send r to check the connection + // If a stanza goes out it will cancel this and set a sooner timer + requestAckTimeout = setTimeout(requestAck, 300_000); + } + + middleware.filter((context, next) => { + const { stanza } = context; + if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) { + let qStanza = stanza; + if ( + qStanza.name === "message" && + !qStanza.getChild("delay", "urn:xmpp:delay") + ) { + qStanza = xml.clone(qStanza); + qStanza.c("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp: datetime(), + }); + } + sm.outbound_q.push(qStanza); + // Debounce requests so we send only one after a big run of stanza together + if (requestAckTimeout) clearTimeout(requestAckTimeout); + requestAckTimeout = setTimeout(requestAck, 100); + } + return next(); + }); + if (streamFeatures) { setupStreamFeature({ streamFeatures, @@ -133,8 +207,7 @@ function setupStreamFeature({ // Resuming if (sm.id) { try { - await resume(entity, sm); - resumed(); + await resumed(await resume(entity, sm)); return; // If resumption fails, continue with session establishment } catch { @@ -150,6 +223,9 @@ function setupStreamFeature({ const promiseEnable = enable(entity, sm); // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . + if (sm.outbound_q.length > 0) { + throw "Stream Management assertion failure, queue should be empty after enable"; + } sm.outbound = 0; try { @@ -172,7 +248,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) { }, (element) => { if (element.is("resumed")) { - resumed(); + resumed(element); } else if (element.is(failed)) { // const error = StreamError.fromElement(element) failed(); diff --git a/packages/stream-management/package.json b/packages/stream-management/package.json index e9818cbb..72503e38 100644 --- a/packages/stream-management/package.json +++ b/packages/stream-management/package.json @@ -16,7 +16,8 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", - "@xmpp/xml": "^0.14.0" + "@xmpp/xml": "^0.14.0", + "@xmpp/time": "^0.14.0" }, "engines": { "node": ">= 20" diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index a8edc437..845e11f2 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -22,6 +22,7 @@ test("enable - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -73,6 +74,7 @@ test("enable - message - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -112,6 +114,7 @@ test("enable - failed", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); entity.streamManagement.enabled = true; entity.mockInput( @@ -125,6 +128,54 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); +test("enable - enabled - message", async () => { + const { entity } = mockClient(); + + entity.mockInput( + + + , + ); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + entity.mockInput( + , + ); + + await tick(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); + expect(entity.streamManagement.enabled).toBe(true); + + await entity.send(); + entity.streamManagement._teardown(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + + let acks = 0; + entity.on("stream-management/ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + entity.mockInput(); + await tick(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(1); + expect(entity.streamManagement.outbound_q).toHaveLength(0); +}); + test("resume - resumed", async () => { const { entity } = mockClient(); @@ -138,6 +189,7 @@ test("resume - resumed", async () => { ); entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [, ]; expect(await entity.catchOutgoing()).toEqual( , @@ -147,11 +199,31 @@ test("resume - resumed", async () => { expect(entity.status).toBe("offline"); - entity.mockInput(); + entity.mockInput(); - await tick(); + let acks = 0; + entity.on("stream-management/ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual(); - expect(entity.streamManagement.outbound).toBe(45); + let resumed = false; + entity.on("stream-management/resumed", () => { + resumed = true; + }); + + await tick(); + entity.streamManagement._teardown(); + + expect(resumed).toBe(true); + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + expect( + entity.streamManagement.outbound_q[0].getChild("delay", "urn:xmpp:delay"), + ).not.toBeUndefined(); expect(entity.status).toBe("online"); }); @@ -162,6 +234,7 @@ test("resume - failed", async () => { entity.streamManagement.id = "bar"; entity.streamManagement.enabled = true; entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = ["hai"]; entity.mockInput( @@ -179,10 +252,18 @@ test("resume - failed", async () => { , ); + let failures = 0; + entity.on("stream-management/fail", (failed) => { + failures++; + expect(failed).toBe("hai"); + }); + await tick(); + expect(failures).toBe(1); expect(entity.status).toBe("bar"); expect(entity.streamManagement.id).toBe(""); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); }); diff --git a/packages/xml/index.js b/packages/xml/index.js index efb06e0c..7b512e86 100644 --- a/packages/xml/index.js +++ b/packages/xml/index.js @@ -1,4 +1,5 @@ import Element from "ltx/lib/Element.js"; +import clone from "ltx/lib/clone.js"; import createElement from "ltx/lib/createElement.js"; import Parser from "./lib/Parser.js"; import { @@ -17,6 +18,7 @@ Object.assign(xml, { Element, createElement, Parser, + clone, escapeXML, unescapeXML, escapeXMLText, @@ -29,6 +31,7 @@ export { Element, createElement, Parser, + clone, escapeXML, unescapeXML, escapeXMLText,