From ee361dc968cde84dfb4295d840c534d36bcecfc3 Mon Sep 17 00:00:00 2001 From: Sonny Date: Fri, 17 Jan 2025 09:54:30 +0100 Subject: [PATCH] stream-management: Request ack on connect (#1056) --- package.json | 4 +- packages/client-core/src/fast/fast.js | 14 ++-- packages/client/README.md | 2 +- packages/client/example.js | 2 +- packages/stream-management/index.js | 63 +++++++++------ test/stream-management.js | 106 -------------------------- test/stream-management.test.js | 2 +- 7 files changed, 52 insertions(+), 141 deletions(-) delete mode 100644 test/stream-management.js diff --git a/package.json b/package.json index 26c130c5..912abbae 100644 --- a/package.json +++ b/package.json @@ -35,8 +35,8 @@ "selfsigned": "^2.4.1" }, "scripts": { - "test": "node --experimental-websocket ./node_modules/.bin/jest", - "e2e": "NODE_TLS_REJECT_UNAUTHORIZED=0 node --experimental-websocket ./node_modules/.bin/jest --runInBand --config e2e.config.cjs", + "test": "node --experimental-websocket ./node_modules/.bin/jest --forceExit", + "e2e": "NODE_TLS_REJECT_UNAUTHORIZED=0 node --experimental-websocket ./node_modules/.bin/jest --forceExit --runInBand --config e2e.config.cjs", "preversion": "make bundle" }, "engines": { diff --git a/packages/client-core/src/fast/fast.js b/packages/client-core/src/fast/fast.js index 1599b948..6b2da50c 100644 --- a/packages/client-core/src/fast/fast.js +++ b/packages/client-core/src/fast/fast.js @@ -60,10 +60,8 @@ export default function fast({ sasl2, entity }) { } const { token } = credentials; - // Invalid or unavailable token if (!isTokenValid(token, fast.mechanisms)) { - requestToken(streamFeatures); - return false; + return onInvalidToken(); } try { @@ -90,13 +88,17 @@ export default function fast({ sasl2, entity }) { err instanceof SASLError && ["not-authorized", "credentials-expired"].includes(err.condition) ) { - await this.delete(); - requestToken(streamFeatures); - return false; + return onInvalidToken(); } entity.emit("error", err); return false; } + + async function onInvalidToken() { + await fast.delete(); + requestToken(streamFeatures); + return false; + } }, }); diff --git a/packages/client/README.md b/packages/client/README.md index f3a61d35..736ebfe6 100644 --- a/packages/client/README.md +++ b/packages/client/README.md @@ -58,7 +58,7 @@ xmpp.on("offline", () => { xmpp.on("stanza", onStanza); async function onStanza(stanza) { if (stanza.is("message")) { - xmpp.off("stanza", onStanza); + xmpp.removeListener("stanza", onStanza); await xmpp.send(xml("presence", { type: "unavailable" })); await xmpp.stop(); } diff --git a/packages/client/example.js b/packages/client/example.js index 9d56999c..dc3359b1 100644 --- a/packages/client/example.js +++ b/packages/client/example.js @@ -28,7 +28,7 @@ xmpp.on("offline", () => { xmpp.on("stanza", onStanza); async function onStanza(stanza) { if (stanza.is("message")) { - xmpp.off("stanza", onStanza); + xmpp.removeListener("stanza", onStanza); await xmpp.send(xml("presence", { type: "unavailable" })); await xmpp.stop(); } diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 78d76f23..5d161d97 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -60,8 +60,7 @@ export default function streamManagement({ inbound: 0, max: null, timeout: 60_000, - requestAckInterval: 300_000, - debounceAckRequest: 100, + requestAckInterval: 30_000, }); entity.on("disconnect", () => { @@ -71,23 +70,32 @@ export default function streamManagement({ async function resumed(resumed) { sm.enabled = true; - const oldOutbound = sm.outbound; - for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { - let item = sm.outbound_q.shift(); - sm.outbound++; - sm.emit("ack", item.stanza); - } + ackQueue(+resumed.attrs.h); let q = sm.outbound_q; sm.outbound_q = []; // This will trigger the middleware and re-add to the queue await entity.sendMany(q.map((item) => queueToStanza({ entity, item }))); sm.emit("resumed"); entity._ready(true); + scheduleRequestAck(); } function failed() { sm.enabled = false; sm.id = ""; + failQueue(); + } + + function ackQueue(n) { + const oldOutbound = sm.outbound; + for (let i = 0; i < +n - oldOutbound; i++) { + const item = sm.outbound_q.shift(); + sm.outbound++; + sm.emit("ack", item.stanza); + } + } + + function failQueue() { let item; while ((item = sm.outbound_q.shift())) { sm.emit("fail", item.stanza); @@ -99,6 +107,7 @@ export default function streamManagement({ sm.enabled = true; sm.id = id; sm.max = max; + scheduleRequestAck(); } entity.on("online", () => { @@ -112,11 +121,7 @@ export default function streamManagement({ }); entity.on("offline", () => { - let item; - while ((item = sm.outbound_q.shift())) { - sm.emit("fail", item.stanza); - } - sm.outbound = 0; + failQueue(); sm.inbound = 0; sm.enabled = false; sm.id = ""; @@ -132,14 +137,11 @@ export default function streamManagement({ entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). - const oldOutbound = sm.outbound; - for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { - let item = sm.outbound_q.shift(); - sm.outbound++; - sm.emit("ack", item.stanza); - } + ackQueue(+stanza.attrs.h); } + scheduleRequestAck(); + return next(); }); @@ -150,8 +152,22 @@ export default function streamManagement({ setupSasl2({ sasl2, sm, failed, resumed }); } + // Periodically send r to check the connection + // If a stanza goes out it will cancel this and set a sooner timer + function scheduleRequestAck(timeout = sm.requestAckInterval) { + clearTimeout(requestAckTimeout); + + if (!sm.enabled) return; + + requestAckTimeout = setTimeout(requestAck, timeout); + } + function requestAck() { clearTimeout(timeoutTimeout); + clearTimeout(requestAckTimeout); + + if (!sm.enabled) return; + if (sm.timeout) { timeoutTimeout = setTimeout( () => entity.disconnect().catch(() => {}), @@ -159,9 +175,8 @@ export default function streamManagement({ ); } entity.send(xml("r", { xmlns: NS })).catch(() => {}); - // Periodically send r to check the connection - // If a stanza goes out it will cancel this and set a sooner timer - requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval); + + scheduleRequestAck(); } middleware.filter((context, next) => { @@ -171,8 +186,8 @@ export default function streamManagement({ sm.outbound_q.push({ stanza, stamp: datetime() }); // Debounce requests so we send only one after a big run of stanza together - clearTimeout(requestAckTimeout); - requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest); + queueMicrotask(requestAck); + return next(); }); diff --git a/test/stream-management.js b/test/stream-management.js deleted file mode 100644 index ef66faed..00000000 --- a/test/stream-management.js +++ /dev/null @@ -1,106 +0,0 @@ -import { client } from "../packages/client/index.js"; -import { promise } from "../packages/events/index.js"; -import { datetime } from "../packages/time/index.js"; -import debug from "../packages/debug/index.js"; -import server from "../server/index.js"; - -const username = "client"; -const password = "foobar"; -const credentials = { username, password }; -const domain = "localhost"; - -let xmpp; - -afterEach(async () => { - await xmpp?.stop(); - await server.reset(); -}); - -test("client ack stanzas", async () => { - await server.enableModules(["smacks"]); - await server.restart(); - - xmpp = client({ credentials, service: domain }); - debug(xmpp); - - const elP = promise(xmpp.streamManagement, "ack"); - await xmpp.start(); - await xmpp.send( - - - , - ); - - const el = await elP; - expect(el.attrs.id).toEqual("ping"); -}); - -test("client fail stanzas", async () => { - await server.enableModules(["smacks"]); - await server.restart(); - - xmpp = client({ credentials, service: domain }); - debug(xmpp); - - const elP = promise(xmpp.streamManagement, "fail"); - await xmpp.start(); - // Expect send but don't actually send to server, so it will fail - await xmpp.streamManagement.outbound_q.push({ - stanza: ( - - - - ), - stamp: datetime(), - }); - await xmpp.stop(); - - const el = await elP; - expect(el.attrs.id).toEqual("ping"); -}); - -test("client retry stanzas", async () => { - await server.enableModules(["smacks"]); - await server.restart(); - - xmpp = client({ credentials, service: domain }); - debug(xmpp); - - const elP = promise(xmpp.streamManagement, "ack"); - await xmpp.start(); - // Add to queue but don't actually send so it can retry after disconnect - await xmpp.streamManagement.outbound_q.push({ - stanza: ( - - - - ), - stamp: datetime(), - }); - await xmpp.disconnect(); - - const el = await elP; - expect(el.attrs.id).toEqual("ping"); -}); - -test("client reconnects when server fails to ack", async () => { - await server.enableModules(["smacks"]); - await server.restart(); - - xmpp = client({ credentials, service: domain }); - xmpp.streamManagement.timeout = 10; - xmpp.streamManagement.debounceAckRequest = 1; - debug(xmpp); - - const resumedP = promise(xmpp.streamManagement, "resumed"); - await xmpp.start(); - await xmpp.send( - - - , - ); - xmpp.socket.socket.pause(); - - await resumedP; - expect().pass(); -}); diff --git a/test/stream-management.test.js b/test/stream-management.test.js index 3033ded8..ce328a0b 100644 --- a/test/stream-management.test.js +++ b/test/stream-management.test.js @@ -94,7 +94,7 @@ test( xmpp = client({ credentials, service: domain }); xmpp.streamManagement.timeout = 10; xmpp.streamManagement.debounceAckRequest = 1; - debug(xmpp, true); + debug(xmpp); const promise_resumed = promise(xmpp.streamManagement, "resumed"); await xmpp.start();