Skip to content

Commit

Permalink
stream-management: Request ack on connect (#1056)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnyp authored Jan 17, 2025
1 parent 9ecdf1e commit ee361dc
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 141 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
"selfsigned": "^2.4.1"
},
"scripts": {
"test": "node --experimental-websocket ./node_modules/.bin/jest",
"e2e": "NODE_TLS_REJECT_UNAUTHORIZED=0 node --experimental-websocket ./node_modules/.bin/jest --runInBand --config e2e.config.cjs",
"test": "node --experimental-websocket ./node_modules/.bin/jest --forceExit",
"e2e": "NODE_TLS_REJECT_UNAUTHORIZED=0 node --experimental-websocket ./node_modules/.bin/jest --forceExit --runInBand --config e2e.config.cjs",
"preversion": "make bundle"
},
"engines": {
Expand Down
14 changes: 8 additions & 6 deletions packages/client-core/src/fast/fast.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ export default function fast({ sasl2, entity }) {
}

const { token } = credentials;
// Invalid or unavailable token
if (!isTokenValid(token, fast.mechanisms)) {
requestToken(streamFeatures);
return false;
return onInvalidToken();
}

try {
Expand All @@ -90,13 +88,17 @@ export default function fast({ sasl2, entity }) {
err instanceof SASLError &&
["not-authorized", "credentials-expired"].includes(err.condition)
) {
await this.delete();
requestToken(streamFeatures);
return false;
return onInvalidToken();
}
entity.emit("error", err);
return false;
}

async function onInvalidToken() {
await fast.delete();
requestToken(streamFeatures);
return false;
}
},
});

Expand Down
2 changes: 1 addition & 1 deletion packages/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ xmpp.on("offline", () => {
xmpp.on("stanza", onStanza);
async function onStanza(stanza) {
if (stanza.is("message")) {
xmpp.off("stanza", onStanza);
xmpp.removeListener("stanza", onStanza);
await xmpp.send(xml("presence", { type: "unavailable" }));
await xmpp.stop();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/client/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ xmpp.on("offline", () => {
xmpp.on("stanza", onStanza);
async function onStanza(stanza) {
if (stanza.is("message")) {
xmpp.off("stanza", onStanza);
xmpp.removeListener("stanza", onStanza);
await xmpp.send(xml("presence", { type: "unavailable" }));
await xmpp.stop();
}
Expand Down
63 changes: 39 additions & 24 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ export default function streamManagement({
inbound: 0,
max: null,
timeout: 60_000,
requestAckInterval: 300_000,
debounceAckRequest: 100,
requestAckInterval: 30_000,
});

entity.on("disconnect", () => {
Expand All @@ -71,23 +70,32 @@ export default function streamManagement({

async function resumed(resumed) {
sm.enabled = true;
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
ackQueue(+resumed.attrs.h);
let q = sm.outbound_q;
sm.outbound_q = [];
// This will trigger the middleware and re-add to the queue
await entity.sendMany(q.map((item) => queueToStanza({ entity, item })));
sm.emit("resumed");
entity._ready(true);
scheduleRequestAck();
}

function failed() {
sm.enabled = false;
sm.id = "";
failQueue();
}

function ackQueue(n) {
const oldOutbound = sm.outbound;
for (let i = 0; i < +n - oldOutbound; i++) {
const item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
}

function failQueue() {
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
Expand All @@ -99,6 +107,7 @@ export default function streamManagement({
sm.enabled = true;
sm.id = id;
sm.max = max;
scheduleRequestAck();
}

entity.on("online", () => {
Expand All @@ -112,11 +121,7 @@ export default function streamManagement({
});

entity.on("offline", () => {
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
sm.outbound = 0;
failQueue();
sm.inbound = 0;
sm.enabled = false;
sm.id = "";
Expand All @@ -132,14 +137,11 @@ export default function streamManagement({
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
ackQueue(+stanza.attrs.h);
}

scheduleRequestAck();

return next();
});

Expand All @@ -150,18 +152,31 @@ export default function streamManagement({
setupSasl2({ sasl2, sm, failed, resumed });
}

// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
function scheduleRequestAck(timeout = sm.requestAckInterval) {
clearTimeout(requestAckTimeout);

if (!sm.enabled) return;

requestAckTimeout = setTimeout(requestAck, timeout);
}

function requestAck() {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);

if (!sm.enabled) return;

if (sm.timeout) {
timeoutTimeout = setTimeout(
() => entity.disconnect().catch(() => {}),
sm.timeout,
);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval);

scheduleRequestAck();
}

middleware.filter((context, next) => {
Expand All @@ -171,8 +186,8 @@ export default function streamManagement({

sm.outbound_q.push({ stanza, stamp: datetime() });
// Debounce requests so we send only one after a big run of stanza together
clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
queueMicrotask(requestAck);

return next();
});

Expand Down
106 changes: 0 additions & 106 deletions test/stream-management.js

This file was deleted.

2 changes: 1 addition & 1 deletion test/stream-management.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ test(
xmpp = client({ credentials, service: domain });
xmpp.streamManagement.timeout = 10;
xmpp.streamManagement.debounceAckRequest = 1;
debug(xmpp, true);
debug(xmpp);

const promise_resumed = promise(xmpp.streamManagement, "resumed");
await xmpp.start();
Expand Down

0 comments on commit ee361dc

Please sign in to comment.