Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-management: Implement requesting ACKs #1054

Merged
merged 43 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c4d4bb8
Implement stream management requesting ACKs
singpolyma Nov 14, 2023
6fa3127
Emit events on streamManagement object as suggested on https://github…
sonnyp Jan 8, 2025
710b167
Remove unecessary conditions to clearTimeout
sonnyp Jan 8, 2025
ce30e8b
prefer early return
sonnyp Jan 8, 2025
7ef1d8d
No more clone
singpolyma Jan 8, 2025
d76e479
Throw Error not string
singpolyma Jan 8, 2025
69d0752
Catch any disconnect failure
singpolyma Jan 8, 2025
0190cb6
Reformat this line
singpolyma Jan 8, 2025
b2bf99c
Make more timeouts configurable
singpolyma Jan 8, 2025
0f79b3b
Test failure with something in queue seperately
singpolyma Jan 8, 2025
efad5c1
Document events in readme
singpolyma Jan 8, 2025
8cb8dff
Fix comment position
singpolyma Jan 13, 2025
8c408ef
e2e test for stream management
singpolyma Jan 8, 2025
43a2d69
Use sendMany
singpolyma Jan 13, 2025
97e8d28
Change test name
singpolyma Jan 13, 2025
f076fd1
Split up resumed test
singpolyma Jan 13, 2025
f6e15ba
Teardown on disconnect
singpolyma Jan 13, 2025
2209036
Use promise helper
singpolyma Jan 13, 2025
c66696c
e2e test for fail event when offline
singpolyma Jan 13, 2025
bb3998a
e2e test for stanza retry
singpolyma Jan 13, 2025
4736120
e2e test auto reconnect
singpolyma Jan 13, 2025
b5eb66b
better docs
sonnyp Jan 15, 2025
74b7f0b
extract fn to root
sonnyp Jan 15, 2025
4a06393
connection: Fix race condition when socket closes after timeout (#1048)
sonnyp Jan 8, 2025
d865571
connection: Rename disconect() to _closeSocket()
sonnyp Jan 8, 2025
3177969
connection: Rename close() to _closeStream()
sonnyp Jan 8, 2025
96be425
connection: Make disconnect a public function
sonnyp Jan 8, 2025
a8298f7
Simplify status checks
sonnyp Jan 8, 2025
9e2ff4b
reconnect: Simplify code
sonnyp Jan 8, 2025
aa6b3c1
fast: Fix configurable saveToken and fetchToken (#1049)
sonnyp Jan 8, 2025
bfef115
Remove bundlesize (#1050)
sonnyp Jan 9, 2025
d1790a2
events: Add helpers for event listeners (#1051)
sonnyp Jan 9, 2025
f932be3
websocket: Remove ws dependency (#1052)
sonnyp Jan 9, 2025
fb5b799
tls: Use listeners()
sonnyp Jan 9, 2025
2d6e46f
Various code improvements (#1053)
sonnyp Jan 9, 2025
a738c4a
client: Do not allow PLAIN on insecure connection
sonnyp Jan 10, 2025
407da51
Replace socket.isSecure with socket.secure
sonnyp Jan 10, 2025
44ae9c9
Bump ltx
sonnyp Jan 12, 2025
9cb0556
fast: Emit error events on entity
sonnyp Jan 12, 2025
ced2057
client: pass all arguments to credentials function
sonnyp Jan 12, 2025
8935336
we dont need forceReconnect or socket.destory anymore
sonnyp Jan 15, 2025
15e060e
Merge branch 'main' into implement-sm-for-outgoing
sonnyp Jan 15, 2025
26228e7
fix
sonnyp Jan 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions packages/client-core/src/bind2/bind2.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ test("with function resource returning string", async () => {
test("with function resource throwing", async () => {
const error = new Error("foo");


function resource() {
throw error;
}
Expand Down Expand Up @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => {
test("with function resource returning rejected promise", async () => {
const error = new Error("foo");


async function resource() {
throw error;
}
Expand Down
45 changes: 44 additions & 1 deletion packages/stream-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,50 @@ When the session is resumed the `online` event is not emitted as session resumpt
However `entity.status` is set to `online`.
If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted.

Automatically responds to acks but does not support requesting acks yet.
- Automatically responds to acks.
- Periodically request acks.
- If server fails to respond, triggers a reconnect.

## Events

### resumed

Indicates that the connection was resumed. When that happens the `online` event is not emitted but `xmpp.status` will be `online`.

```js
const xmpp = client(...);
const {streamManagement} = xmpp;

streamManagement.on('resumed', () => {
console.log("session resumed");
});
```

### fail

Indicates that a stanza failed to send to the server and will not be retried.

```js
const xmpp = client(...);
const {streamManagement} = xmpp;

streamManagement.on('fail', (stanza) => {
console.log("fail to send", stanza.toString());
});
```

### ack

Indicates that a stanza has been acknowledged by the server.

```js
const xmpp = client(...);
const {streamManagement} = xmpp;

streamManagement.on('ack', (stanza) => {
console.log("stanza acknowledge by the server", stanza.toString());
});
```

## References

Expand Down
110 changes: 102 additions & 8 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import XMPPError from "@xmpp/error";
import { procedure } from "@xmpp/events";
import { EventEmitter, procedure } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";

// https://xmpp.org/extensions/xep-0198.html

Expand Down Expand Up @@ -45,24 +46,52 @@ export default function streamManagement({
bind2,
sasl2,
}) {
const sm = {
let timeoutTimeout = null;
let requestAckTimeout = null;

const sm = new EventEmitter();
Object.assign(sm, {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
};
timeout: 60_000,
requestAckInterval: 300_000,
debounceAckRequest: 100,
});

entity.on("disconnect", () => {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);
});

function resumed() {
async function resumed(resumed) {
sm.enabled = true;
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
let q = sm.outbound_q;
sm.outbound_q = [];
// This will trigger the middleware and re-add to the queue
await entity.sendMany(q.map((item) => queueToStanza({ entity, item })));
sm.emit("resumed");
entity._ready(true);
}

function failed() {
sm.enabled = false;
sm.id = "";
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
sm.outbound = 0;
}

Expand All @@ -73,11 +102,20 @@ export default function streamManagement({
}

entity.on("online", () => {
if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty during online",
);
}
sm.outbound = 0;
sm.inbound = 0;
});

entity.on("offline", () => {
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
Expand All @@ -86,14 +124,20 @@ export default function streamManagement({

middleware.use((context, next) => {
const { stanza } = context;
clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
sm.outbound = stanza.attrs.h;
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
}

return next();
Expand All @@ -105,6 +149,33 @@ export default function streamManagement({
if (sasl2) {
setupSasl2({ sasl2, sm, failed, resumed });
}

function requestAck() {
clearTimeout(timeoutTimeout);
if (sm.timeout) {
timeoutTimeout = setTimeout(
() => entity.disconnect().catch(() => {}),
sm.timeout,
);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval);
}

middleware.filter((context, next) => {
if (!sm.enabled) return next();
const { stanza } = context;
if (!["presence", "message", "iq"].includes(stanza.name)) return next();

sm.outbound_q.push({ stanza, stamp: datetime() });
// Debounce requests so we send only one after a big run of stanza together
clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
return next();
});

if (streamFeatures) {
setupStreamFeature({
streamFeatures,
Expand Down Expand Up @@ -133,8 +204,8 @@ function setupStreamFeature({
// Resuming
if (sm.id) {
try {
await resume(entity, sm);
resumed();
const element = await resume(entity, sm);
await resumed(element);
return;
// If resumption fails, continue with session establishment
} catch {
Expand All @@ -149,6 +220,12 @@ function setupStreamFeature({

const promiseEnable = enable(entity, sm);

if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty after enable",
);
}

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
sm.outbound = 0;

Expand All @@ -172,7 +249,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) {
},
(element) => {
if (element.is("resumed")) {
resumed();
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
Expand All @@ -198,3 +275,20 @@ function setupBind2({ bind2, sm, failed, enabled }) {
},
);
}

function queueToStanza({ entity, item }) {
const { stanza, stamp } = item;
if (
stanza.name === "message" &&
!stanza.getChild("delay", "urn:xmpp:delay")
) {
stanza.append(
xml("delay", {
xmlns: "urn:xmpp:delay",
from: entity.jid.toString(),
stamp,
}),
);
}
return stanza;
}
3 changes: 2 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"dependencies": {
"@xmpp/error": "^0.14.0",
"@xmpp/events": "^0.14.0",
"@xmpp/xml": "^0.14.0"
"@xmpp/xml": "^0.14.0",
"@xmpp/time": "^0.14.0"
},
"engines": {
"node": ">= 20.10"
Expand Down
Loading
Loading