Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-management: Implement requesting ACKs #1005

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions packages/client-core/src/bind2/bind2.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ test("with function resource returning string", async () => {
test("with function resource throwing", async () => {
const error = new Error("foo");


function resource() {
throw error;
}
Expand Down Expand Up @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => {
test("with function resource returning rejected promise", async () => {
const error = new Error("foo");


async function resource() {
throw error;
}
Expand Down
94 changes: 86 additions & 8 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import XMPPError from "@xmpp/error";
import { procedure } from "@xmpp/events";
import { EventEmitter, procedure } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";

// https://xmpp.org/extensions/xep-0198.html

Expand Down Expand Up @@ -45,24 +46,50 @@ export default function streamManagement({
bind2,
sasl2,
}) {
const sm = {
let timeoutTimeout = null;
let requestAckTimeout = null;

const sm = new EventEmitter();
Object.assign(sm, {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
};
timeout: 60_000,
_teardown: () => {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);
},
});

function resumed() {
async function resumed(resumed) {
sm.enabled = true;
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", stanza);
}
let q = sm.outbound_q;
sm.outbound_q = [];
for (const item of q) {
await entity.send(item); // This will trigger the middleware and re-add to the queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if entity.send rejects ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then resumed will fail. What context causes send to reject?

}
sm.emit("resumed");
entity._ready(true);
}

function failed() {
sm.enabled = false;
sm.id = "";
let stanza;
while ((stanza = sm.outbound_q.shift())) {
sm.emit("fail", stanza);
}
sm.outbound = 0;
}

Expand All @@ -73,11 +100,18 @@ export default function streamManagement({
}

entity.on("online", () => {
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty during online";
sonnyp marked this conversation as resolved.
Show resolved Hide resolved
}
sm.outbound = 0;
sm.inbound = 0;
});

entity.on("offline", () => {
let stanza;
while ((stanza = sm.outbound_q.shift())) {
sm.emit("fail", stanza);
}
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
Expand All @@ -86,14 +120,20 @@ export default function streamManagement({

middleware.use((context, next) => {
const { stanza } = context;
clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
sm.outbound = stanza.attrs.h;
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", stanza);
}
}

return next();
Expand All @@ -105,6 +145,42 @@ export default function streamManagement({
if (sasl2) {
setupSasl2({ sasl2, sm, failed, resumed });
}

function requestAck() {
clearTimeout(timeoutTimeout);
if (sm.timeout) {
timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout);
sonnyp marked this conversation as resolved.
Show resolved Hide resolved
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, 300_000);
sonnyp marked this conversation as resolved.
Show resolved Hide resolved
}

middleware.filter((context, next) => {
if (!sm.enabled) return next();
const { stanza } = context;
if (!["presence", "message", "iq"].includes(stanza.name)) return next();

let qStanza = stanza;
if (
qStanza.name === "message" &&
!qStanza.getChild("delay", "urn:xmpp:delay")
) {
qStanza = xml.clone(qStanza);
qStanza.c("delay", {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use append

xmlns: "urn:xmpp:delay",
from: entity.jid.toString(),
stamp: datetime(),
});
}
sm.outbound_q.push(qStanza);
// Debounce requests so we send only one after a big run of stanza together
clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, 100);
sonnyp marked this conversation as resolved.
Show resolved Hide resolved
return next();
});

if (streamFeatures) {
setupStreamFeature({
streamFeatures,
Expand Down Expand Up @@ -133,8 +209,7 @@ function setupStreamFeature({
// Resuming
if (sm.id) {
try {
await resume(entity, sm);
resumed();
await resumed(await resume(entity, sm));
sonnyp marked this conversation as resolved.
Show resolved Hide resolved
return;
// If resumption fails, continue with session establishment
} catch {
Expand All @@ -150,6 +225,9 @@ function setupStreamFeature({
const promiseEnable = enable(entity, sm);

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty after enable";
sonnyp marked this conversation as resolved.
Show resolved Hide resolved
}
sm.outbound = 0;

try {
Expand All @@ -172,7 +250,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) {
},
(element) => {
if (element.is("resumed")) {
resumed();
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
Expand Down
3 changes: 2 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"dependencies": {
"@xmpp/error": "^0.14.0",
"@xmpp/events": "^0.14.0",
"@xmpp/xml": "^0.14.0"
"@xmpp/xml": "^0.14.0",
"@xmpp/time": "^0.14.0"
},
"engines": {
"node": ">= 20"
Expand Down
87 changes: 84 additions & 3 deletions packages/stream-management/stream-features.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ test("enable - enabled", async () => {
);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.id).toBe("");

Expand Down Expand Up @@ -73,6 +74,7 @@ test("enable - message - enabled", async () => {
);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.id).toBe("");

Expand Down Expand Up @@ -112,6 +114,7 @@ test("enable - failed", async () => {
);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
entity.streamManagement.enabled = true;

entity.mockInput(
Expand All @@ -125,6 +128,54 @@ test("enable - failed", async () => {
expect(entity.streamManagement.enabled).toBe(false);
});

test("enable - enabled - message", async () => {
sonnyp marked this conversation as resolved.
Show resolved Hide resolved
const { entity } = mockClient();

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
<sm xmlns="urn:xmpp:sm:3" />
</features>,
);

expect(await entity.catchOutgoing()).toEqual(
<enable xmlns="urn:xmpp:sm:3" resume="true" />,
);

entity.mockInput(
<enabled
xmlns="urn:xmpp:sm:3"
id="some-long-sm-id"
location="[2001:41D0:1:A49b::1]:9222"
resume="true"
/>,
);

await tick();
sonnyp marked this conversation as resolved.
Show resolved Hide resolved

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(true);

await entity.send(<message id="a" />);
entity.streamManagement._teardown();

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toHaveLength(1);

let acks = 0;
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

entity.mockInput(<a xmlns="urn:xmpp:sm:3" h="1" />);
await tick();

expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(1);
expect(entity.streamManagement.outbound_q).toHaveLength(0);
});

test("resume - resumed", async () => {
const { entity } = mockClient();

Expand All @@ -138,6 +189,7 @@ test("resume - resumed", async () => {
);

entity.streamManagement.outbound = 45;
entity.streamManagement.outbound_q = [<message id="a" />, <message id="b" />];

expect(await entity.catchOutgoing()).toEqual(
<resume xmlns="urn:xmpp:sm:3" previd="bar" h="0" />,
Expand All @@ -147,11 +199,31 @@ test("resume - resumed", async () => {

expect(entity.status).toBe("offline");

entity.mockInput(<resumed xmlns="urn:xmpp:sm:3" />);
entity.mockInput(<resumed xmlns="urn:xmpp:sm:3" h="46" />);

await tick();
let acks = 0;
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

expect(await entity.catchOutgoing()).toEqual(<message id="b" />);

expect(entity.streamManagement.outbound).toBe(45);
let resumed = false;
entity.streamManagement.on("resumed", () => {
resumed = true;
});

await tick();
entity.streamManagement._teardown();

expect(resumed).toBe(true);
expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(46);
expect(entity.streamManagement.outbound_q).toHaveLength(1);
expect(
entity.streamManagement.outbound_q[0].getChild("delay", "urn:xmpp:delay"),
).not.toBeUndefined();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this to a new test

expect(entity.status).toBe("online");
});

Expand All @@ -162,6 +234,7 @@ test("resume - failed", async () => {
entity.streamManagement.id = "bar";
entity.streamManagement.enabled = true;
entity.streamManagement.outbound = 45;
entity.streamManagement.outbound_q = ["hai"];

entity.mockInput(
<features xmlns="http://etherx.jabber.org/streams">
Expand All @@ -179,10 +252,18 @@ test("resume - failed", async () => {
</failed>,
);

let failures = 0;
entity.streamManagement.on("fail", (failed) => {
failures++;
expect(failed).toBe("hai");
});
sonnyp marked this conversation as resolved.
Show resolved Hide resolved

await tick();

expect(failures).toBe(1);
expect(entity.status).toBe("bar");
expect(entity.streamManagement.id).toBe("");
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
});
3 changes: 3 additions & 0 deletions packages/xml/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Element from "ltx/lib/Element.js";
import clone from "ltx/lib/clone.js";
import createElement from "ltx/lib/createElement.js";
import Parser from "./lib/Parser.js";
import {
Expand All @@ -17,6 +18,7 @@ Object.assign(xml, {
Element,
createElement,
Parser,
clone,
escapeXML,
unescapeXML,
escapeXMLText,
Expand All @@ -29,6 +31,7 @@ export {
Element,
createElement,
Parser,
clone,
escapeXML,
unescapeXML,
escapeXMLText,
Expand Down
Loading