Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-management: Request ack on connect #1056

Merged
merged 8 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions packages/client-core/src/fast/fast.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ export default function fast({ sasl2, entity }) {
}

const { token } = credentials;
// Invalid or unavailable token
if (!isTokenValid(token, fast.mechanisms)) {
requestToken(streamFeatures);
return false;
return onInvalidToken();
}

try {
Expand All @@ -90,13 +88,17 @@ export default function fast({ sasl2, entity }) {
err instanceof SASLError &&
["not-authorized", "credentials-expired"].includes(err.condition)
) {
await this.delete();
requestToken(streamFeatures);
return false;
return onInvalidToken();
}
entity.emit("error", err);
return false;
}

async function onInvalidToken() {
await fast.delete();
requestToken(streamFeatures);
return false;
}
},
});

Expand Down
2 changes: 1 addition & 1 deletion packages/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ xmpp.on("offline", () => {
xmpp.on("stanza", onStanza);
async function onStanza(stanza) {
if (stanza.is("message")) {
xmpp.off("stanza", onStanza);
xmpp.removeListener("stanza", onStanza);
await xmpp.send(xml("presence", { type: "unavailable" }));
await xmpp.stop();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/client/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ xmpp.on("offline", () => {
xmpp.on("stanza", onStanza);
async function onStanza(stanza) {
if (stanza.is("message")) {
xmpp.off("stanza", onStanza);
xmpp.removeListener("stanza", onStanza);
await xmpp.send(xml("presence", { type: "unavailable" }));
await xmpp.stop();
}
Expand Down
63 changes: 39 additions & 24 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ export default function streamManagement({
inbound: 0,
max: null,
timeout: 60_000,
requestAckInterval: 300_000,
debounceAckRequest: 100,
requestAckInterval: 30_000,
});

entity.on("disconnect", () => {
Expand All @@ -71,23 +70,32 @@ export default function streamManagement({

async function resumed(resumed) {
sm.enabled = true;
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
ackQueue(+resumed.attrs.h);
let q = sm.outbound_q;
sm.outbound_q = [];
// This will trigger the middleware and re-add to the queue
await entity.sendMany(q.map((item) => queueToStanza({ entity, item })));
sm.emit("resumed");
entity._ready(true);
scheduleRequestAck();
}

function failed() {
sm.enabled = false;
sm.id = "";
failQueue();
}

function ackQueue(n) {
const oldOutbound = sm.outbound;
for (let i = 0; i < +n - oldOutbound; i++) {
const item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
}

function failQueue() {
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
Expand All @@ -99,6 +107,7 @@ export default function streamManagement({
sm.enabled = true;
sm.id = id;
sm.max = max;
scheduleRequestAck();
}

entity.on("online", () => {
Expand All @@ -112,11 +121,7 @@ export default function streamManagement({
});

entity.on("offline", () => {
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
sm.outbound = 0;
failQueue();
sm.inbound = 0;
sm.enabled = false;
sm.id = "";
Expand All @@ -132,14 +137,11 @@ export default function streamManagement({
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
ackQueue(+stanza.attrs.h);
}

scheduleRequestAck();

return next();
});

Expand All @@ -150,18 +152,31 @@ export default function streamManagement({
setupSasl2({ sasl2, sm, failed, resumed });
}

// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
function scheduleRequestAck(timeout = sm.requestAckInterval) {
clearTimeout(requestAckTimeout);

if (!sm.enabled) return;

requestAckTimeout = setTimeout(requestAck, timeout);
}

function requestAck() {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);

if (!sm.enabled) return;

if (sm.timeout) {
timeoutTimeout = setTimeout(
() => entity.disconnect().catch(() => {}),
sm.timeout,
);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval);

scheduleRequestAck();
}

middleware.filter((context, next) => {
Expand All @@ -171,8 +186,8 @@ export default function streamManagement({

sm.outbound_q.push({ stanza, stamp: datetime() });
// Debounce requests so we send only one after a big run of stanza together
clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
queueMicrotask(requestAck);
Comment on lines -174 to +189
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singpolyma ok for you?

I removed debounceAckRequest, will try to hook into sendMany later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do? I don' see this queueMicrotask defined anywhere? Will it still debounce or does it send a request after every single stanza now?


return next();
});

Expand Down
106 changes: 0 additions & 106 deletions test/stream-management.js

This file was deleted.

2 changes: 1 addition & 1 deletion test/stream-management.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ test(
xmpp = client({ credentials, service: domain });
xmpp.streamManagement.timeout = 10;
xmpp.streamManagement.debounceAckRequest = 1;
debug(xmpp, true);
debug(xmpp);

const promise_resumed = promise(xmpp.streamManagement, "resumed");
await xmpp.start();
Expand Down
Loading