diff --git a/package.json b/package.json
index 26c130c5..912abbae 100644
--- a/package.json
+++ b/package.json
@@ -35,8 +35,8 @@
"selfsigned": "^2.4.1"
},
"scripts": {
- "test": "node --experimental-websocket ./node_modules/.bin/jest",
- "e2e": "NODE_TLS_REJECT_UNAUTHORIZED=0 node --experimental-websocket ./node_modules/.bin/jest --runInBand --config e2e.config.cjs",
+ "test": "node --experimental-websocket ./node_modules/.bin/jest --forceExit",
+ "e2e": "NODE_TLS_REJECT_UNAUTHORIZED=0 node --experimental-websocket ./node_modules/.bin/jest --forceExit --runInBand --config e2e.config.cjs",
"preversion": "make bundle"
},
"engines": {
diff --git a/packages/client-core/src/fast/fast.js b/packages/client-core/src/fast/fast.js
index 1599b948..6b2da50c 100644
--- a/packages/client-core/src/fast/fast.js
+++ b/packages/client-core/src/fast/fast.js
@@ -60,10 +60,8 @@ export default function fast({ sasl2, entity }) {
}
const { token } = credentials;
- // Invalid or unavailable token
if (!isTokenValid(token, fast.mechanisms)) {
- requestToken(streamFeatures);
- return false;
+ return onInvalidToken();
}
try {
@@ -90,13 +88,17 @@ export default function fast({ sasl2, entity }) {
err instanceof SASLError &&
["not-authorized", "credentials-expired"].includes(err.condition)
) {
- await this.delete();
- requestToken(streamFeatures);
- return false;
+ return onInvalidToken();
}
entity.emit("error", err);
return false;
}
+
+ async function onInvalidToken() {
+ await fast.delete();
+ requestToken(streamFeatures);
+ return false;
+ }
},
});
diff --git a/packages/client/README.md b/packages/client/README.md
index f3a61d35..736ebfe6 100644
--- a/packages/client/README.md
+++ b/packages/client/README.md
@@ -58,7 +58,7 @@ xmpp.on("offline", () => {
xmpp.on("stanza", onStanza);
async function onStanza(stanza) {
if (stanza.is("message")) {
- xmpp.off("stanza", onStanza);
+ xmpp.removeListener("stanza", onStanza);
await xmpp.send(xml("presence", { type: "unavailable" }));
await xmpp.stop();
}
diff --git a/packages/client/example.js b/packages/client/example.js
index 9d56999c..dc3359b1 100644
--- a/packages/client/example.js
+++ b/packages/client/example.js
@@ -28,7 +28,7 @@ xmpp.on("offline", () => {
xmpp.on("stanza", onStanza);
async function onStanza(stanza) {
if (stanza.is("message")) {
- xmpp.off("stanza", onStanza);
+ xmpp.removeListener("stanza", onStanza);
await xmpp.send(xml("presence", { type: "unavailable" }));
await xmpp.stop();
}
diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js
index 78d76f23..5d161d97 100644
--- a/packages/stream-management/index.js
+++ b/packages/stream-management/index.js
@@ -60,8 +60,7 @@ export default function streamManagement({
inbound: 0,
max: null,
timeout: 60_000,
- requestAckInterval: 300_000,
- debounceAckRequest: 100,
+ requestAckInterval: 30_000,
});
entity.on("disconnect", () => {
@@ -71,23 +70,32 @@ export default function streamManagement({
async function resumed(resumed) {
sm.enabled = true;
- const oldOutbound = sm.outbound;
- for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
- let item = sm.outbound_q.shift();
- sm.outbound++;
- sm.emit("ack", item.stanza);
- }
+ ackQueue(+resumed.attrs.h);
let q = sm.outbound_q;
sm.outbound_q = [];
// This will trigger the middleware and re-add to the queue
await entity.sendMany(q.map((item) => queueToStanza({ entity, item })));
sm.emit("resumed");
entity._ready(true);
+ scheduleRequestAck();
}
function failed() {
sm.enabled = false;
sm.id = "";
+ failQueue();
+ }
+
+ function ackQueue(n) {
+ const oldOutbound = sm.outbound;
+ for (let i = 0; i < +n - oldOutbound; i++) {
+ const item = sm.outbound_q.shift();
+ sm.outbound++;
+ sm.emit("ack", item.stanza);
+ }
+ }
+
+ function failQueue() {
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
@@ -99,6 +107,7 @@ export default function streamManagement({
sm.enabled = true;
sm.id = id;
sm.max = max;
+ scheduleRequestAck();
}
entity.on("online", () => {
@@ -112,11 +121,7 @@ export default function streamManagement({
});
entity.on("offline", () => {
- let item;
- while ((item = sm.outbound_q.shift())) {
- sm.emit("fail", item.stanza);
- }
- sm.outbound = 0;
+ failQueue();
sm.inbound = 0;
sm.enabled = false;
sm.id = "";
@@ -132,14 +137,11 @@ export default function streamManagement({
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
- const oldOutbound = sm.outbound;
- for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
- let item = sm.outbound_q.shift();
- sm.outbound++;
- sm.emit("ack", item.stanza);
- }
+ ackQueue(+stanza.attrs.h);
}
+ scheduleRequestAck();
+
return next();
});
@@ -150,8 +152,22 @@ export default function streamManagement({
setupSasl2({ sasl2, sm, failed, resumed });
}
+ // Periodically send r to check the connection
+ // If a stanza goes out it will cancel this and set a sooner timer
+ function scheduleRequestAck(timeout = sm.requestAckInterval) {
+ clearTimeout(requestAckTimeout);
+
+ if (!sm.enabled) return;
+
+ requestAckTimeout = setTimeout(requestAck, timeout);
+ }
+
function requestAck() {
clearTimeout(timeoutTimeout);
+ clearTimeout(requestAckTimeout);
+
+ if (!sm.enabled) return;
+
if (sm.timeout) {
timeoutTimeout = setTimeout(
() => entity.disconnect().catch(() => {}),
@@ -159,9 +175,8 @@ export default function streamManagement({
);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
- // Periodically send r to check the connection
- // If a stanza goes out it will cancel this and set a sooner timer
- requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval);
+
+ scheduleRequestAck();
}
middleware.filter((context, next) => {
@@ -171,8 +186,8 @@ export default function streamManagement({
sm.outbound_q.push({ stanza, stamp: datetime() });
// Debounce requests so we send only one after a big run of stanza together
- clearTimeout(requestAckTimeout);
- requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
+ queueMicrotask(requestAck);
+
return next();
});
diff --git a/test/stream-management.js b/test/stream-management.js
deleted file mode 100644
index ef66faed..00000000
--- a/test/stream-management.js
+++ /dev/null
@@ -1,106 +0,0 @@
-import { client } from "../packages/client/index.js";
-import { promise } from "../packages/events/index.js";
-import { datetime } from "../packages/time/index.js";
-import debug from "../packages/debug/index.js";
-import server from "../server/index.js";
-
-const username = "client";
-const password = "foobar";
-const credentials = { username, password };
-const domain = "localhost";
-
-let xmpp;
-
-afterEach(async () => {
- await xmpp?.stop();
- await server.reset();
-});
-
-test("client ack stanzas", async () => {
- await server.enableModules(["smacks"]);
- await server.restart();
-
- xmpp = client({ credentials, service: domain });
- debug(xmpp);
-
- const elP = promise(xmpp.streamManagement, "ack");
- await xmpp.start();
- await xmpp.send(
-
-
- ,
- );
-
- const el = await elP;
- expect(el.attrs.id).toEqual("ping");
-});
-
-test("client fail stanzas", async () => {
- await server.enableModules(["smacks"]);
- await server.restart();
-
- xmpp = client({ credentials, service: domain });
- debug(xmpp);
-
- const elP = promise(xmpp.streamManagement, "fail");
- await xmpp.start();
- // Expect send but don't actually send to server, so it will fail
- await xmpp.streamManagement.outbound_q.push({
- stanza: (
-
-
-
- ),
- stamp: datetime(),
- });
- await xmpp.stop();
-
- const el = await elP;
- expect(el.attrs.id).toEqual("ping");
-});
-
-test("client retry stanzas", async () => {
- await server.enableModules(["smacks"]);
- await server.restart();
-
- xmpp = client({ credentials, service: domain });
- debug(xmpp);
-
- const elP = promise(xmpp.streamManagement, "ack");
- await xmpp.start();
- // Add to queue but don't actually send so it can retry after disconnect
- await xmpp.streamManagement.outbound_q.push({
- stanza: (
-
-
-
- ),
- stamp: datetime(),
- });
- await xmpp.disconnect();
-
- const el = await elP;
- expect(el.attrs.id).toEqual("ping");
-});
-
-test("client reconnects when server fails to ack", async () => {
- await server.enableModules(["smacks"]);
- await server.restart();
-
- xmpp = client({ credentials, service: domain });
- xmpp.streamManagement.timeout = 10;
- xmpp.streamManagement.debounceAckRequest = 1;
- debug(xmpp);
-
- const resumedP = promise(xmpp.streamManagement, "resumed");
- await xmpp.start();
- await xmpp.send(
-
-
- ,
- );
- xmpp.socket.socket.pause();
-
- await resumedP;
- expect().pass();
-});
diff --git a/test/stream-management.test.js b/test/stream-management.test.js
index 3033ded8..ce328a0b 100644
--- a/test/stream-management.test.js
+++ b/test/stream-management.test.js
@@ -94,7 +94,7 @@ test(
xmpp = client({ credentials, service: domain });
xmpp.streamManagement.timeout = 10;
xmpp.streamManagement.debounceAckRequest = 1;
- debug(xmpp, true);
+ debug(xmpp);
const promise_resumed = promise(xmpp.streamManagement, "resumed");
await xmpp.start();