Skip to content

Commit

Permalink
Implement stream management requesting ACKs
Browse files Browse the repository at this point in the history
Queue outgoing stanzas and periodically request ACK.  Remove from the
queue anything ack'd and notify of the ack so apps can know the stanza
has for sure sent.

On resume, anything not ack'd is re-sent.  On reconnect, anything not
ack'd notify of the failure to send this stanza so apps can know the
stanza failed.

Even when there is no traffic, send an <r/> at least every 5 minutes to check the
connection.  If there is no inbound traffic (such as an <a/>) within
timeout (default 60 seconds) then consider the connection disconnected.
  • Loading branch information
singpolyma committed Jan 7, 2025
1 parent aaaa4b1 commit c4d4bb8
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 11 deletions.
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions packages/client-core/src/bind2/bind2.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ test("with function resource returning string", async () => {
test("with function resource throwing", async () => {
const error = new Error("foo");


function resource() {
throw error;
}
Expand Down Expand Up @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => {
test("with function resource returning rejected promise", async () => {
const error = new Error("foo");


async function resource() {
throw error;
}
Expand Down
86 changes: 81 additions & 5 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import XMPPError from "@xmpp/error";
import { procedure } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";

// https://xmpp.org/extensions/xep-0198.html

Expand Down Expand Up @@ -45,24 +46,49 @@ export default function streamManagement({
bind2,
sasl2,
}) {
let timeoutTimeout = null;
let requestAckTimeout = null;

const sm = {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
timeout: 60_000,
_teardown: () => {
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (requestAckTimeout) clearTimeout(requestAckTimeout);
},
};

function resumed() {
async function resumed(resumed) {
sm.enabled = true;
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
let q = sm.outbound_q;
sm.outbound_q = [];
for (const item of q) {
await entity.send(item); // This will trigger the middleware and re-add to the queue
}
entity.emit("stream-management/resumed");
entity._ready(true);
}

function failed() {
sm.enabled = false;
sm.id = "";
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
}

Expand All @@ -73,11 +99,18 @@ export default function streamManagement({
}

entity.on("online", () => {
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty during online";
}
sm.outbound = 0;
sm.inbound = 0;
});

entity.on("offline", () => {
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
Expand All @@ -86,14 +119,20 @@ export default function streamManagement({

middleware.use((context, next) => {
const { stanza } = context;
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
sm.outbound = stanza.attrs.h;
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
}

return next();
Expand All @@ -105,6 +144,41 @@ export default function streamManagement({
if (sasl2) {
setupSasl2({ sasl2, sm, failed, resumed });
}

function requestAck() {
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (sm.timeout) {
timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, 300_000);
}

middleware.filter((context, next) => {
const { stanza } = context;
if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) {
let qStanza = stanza;
if (
qStanza.name === "message" &&
!qStanza.getChild("delay", "urn:xmpp:delay")
) {
qStanza = xml.clone(qStanza);
qStanza.c("delay", {
xmlns: "urn:xmpp:delay",
from: entity.jid.toString(),
stamp: datetime(),
});
}
sm.outbound_q.push(qStanza);
// Debounce requests so we send only one after a big run of stanza together
if (requestAckTimeout) clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, 100);
}
return next();
});

if (streamFeatures) {
setupStreamFeature({
streamFeatures,
Expand Down Expand Up @@ -133,8 +207,7 @@ function setupStreamFeature({
// Resuming
if (sm.id) {
try {
await resume(entity, sm);
resumed();
await resumed(await resume(entity, sm));
return;
// If resumption fails, continue with session establishment
} catch {
Expand All @@ -150,6 +223,9 @@ function setupStreamFeature({
const promiseEnable = enable(entity, sm);

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty after enable";
}
sm.outbound = 0;

try {
Expand All @@ -172,7 +248,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) {
},
(element) => {
if (element.is("resumed")) {
resumed();
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
Expand Down
3 changes: 2 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"dependencies": {
"@xmpp/error": "^0.14.0",
"@xmpp/events": "^0.14.0",
"@xmpp/xml": "^0.14.0"
"@xmpp/xml": "^0.14.0",
"@xmpp/time": "^0.14.0"
},
"engines": {
"node": ">= 20"
Expand Down
87 changes: 84 additions & 3 deletions packages/stream-management/stream-features.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ test("enable - enabled", async () => {
);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.id).toBe("");

Expand Down Expand Up @@ -73,6 +74,7 @@ test("enable - message - enabled", async () => {
);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.id).toBe("");

Expand Down Expand Up @@ -112,6 +114,7 @@ test("enable - failed", async () => {
);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
entity.streamManagement.enabled = true;

entity.mockInput(
Expand All @@ -125,6 +128,54 @@ test("enable - failed", async () => {
expect(entity.streamManagement.enabled).toBe(false);
});

test("enable - enabled - message", async () => {
const { entity } = mockClient();

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
<sm xmlns="urn:xmpp:sm:3" />
</features>,
);

expect(await entity.catchOutgoing()).toEqual(
<enable xmlns="urn:xmpp:sm:3" resume="true" />,
);

entity.mockInput(
<enabled
xmlns="urn:xmpp:sm:3"
id="some-long-sm-id"
location="[2001:41D0:1:A49b::1]:9222"
resume="true"
/>,
);

await tick();

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(true);

await entity.send(<message id="a" />);
entity.streamManagement._teardown();

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toHaveLength(1);

let acks = 0;
entity.on("stream-management/ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

entity.mockInput(<a xmlns="urn:xmpp:sm:3" h="1" />);
await tick();

expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(1);
expect(entity.streamManagement.outbound_q).toHaveLength(0);
});

test("resume - resumed", async () => {
const { entity } = mockClient();

Expand All @@ -138,6 +189,7 @@ test("resume - resumed", async () => {
);

entity.streamManagement.outbound = 45;
entity.streamManagement.outbound_q = [<message id="a" />, <message id="b" />];

expect(await entity.catchOutgoing()).toEqual(
<resume xmlns="urn:xmpp:sm:3" previd="bar" h="0" />,
Expand All @@ -147,11 +199,31 @@ test("resume - resumed", async () => {

expect(entity.status).toBe("offline");

entity.mockInput(<resumed xmlns="urn:xmpp:sm:3" />);
entity.mockInput(<resumed xmlns="urn:xmpp:sm:3" h="46" />);

await tick();
let acks = 0;
entity.on("stream-management/ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

expect(await entity.catchOutgoing()).toEqual(<message id="b" />);

expect(entity.streamManagement.outbound).toBe(45);
let resumed = false;
entity.on("stream-management/resumed", () => {
resumed = true;
});

await tick();
entity.streamManagement._teardown();

expect(resumed).toBe(true);
expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(46);
expect(entity.streamManagement.outbound_q).toHaveLength(1);
expect(
entity.streamManagement.outbound_q[0].getChild("delay", "urn:xmpp:delay"),
).not.toBeUndefined();
expect(entity.status).toBe("online");
});

Expand All @@ -162,6 +234,7 @@ test("resume - failed", async () => {
entity.streamManagement.id = "bar";
entity.streamManagement.enabled = true;
entity.streamManagement.outbound = 45;
entity.streamManagement.outbound_q = ["hai"];

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
Expand All @@ -179,10 +252,18 @@ test("resume - failed", async () => {
</failed>,
);

let failures = 0;
entity.on("stream-management/fail", (failed) => {
failures++;
expect(failed).toBe("hai");
});

await tick();

expect(failures).toBe(1);
expect(entity.status).toBe("bar");
expect(entity.streamManagement.id).toBe("");
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
});
3 changes: 3 additions & 0 deletions packages/xml/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Element from "ltx/lib/Element.js";
import clone from "ltx/lib/clone.js";
import createElement from "ltx/lib/createElement.js";
import Parser from "./lib/Parser.js";
import {
Expand All @@ -17,6 +18,7 @@ Object.assign(xml, {
Element,
createElement,
Parser,
clone,
escapeXML,
unescapeXML,
escapeXMLText,
Expand All @@ -29,6 +31,7 @@ export {
Element,
createElement,
Parser,
clone,
escapeXML,
unescapeXML,
escapeXMLText,
Expand Down

0 comments on commit c4d4bb8

Please sign in to comment.