Skip to content

Commit

Permalink
stream-management: Send ack on close (#1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnyp authored Jan 18, 2025
1 parent ee361dc commit 20291dc
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 3 deletions.
45 changes: 45 additions & 0 deletions packages/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ class Connection extends EventEmitter {
* https://tools.ietf.org/html/rfc7395#section-3.6
*/
async _closeStream(timeout = this.timeout) {
await this.#runHooks("close");

const fragment = this.footer(this.footerElement());

await this.write(fragment);
Expand Down Expand Up @@ -360,6 +362,49 @@ class Connection extends EventEmitter {

// Override
socketParameters() {}

/* Experimental hooks */
#hooks = new Map();
#hook_events = new Set(["close"]);
hook(event, handler /*priority = 0 TODO */) {
this.#assertHookEventName(event);

if (!this.#hooks.has(event)) {
this.#hooks.set(event, new Set());
}

this.#hooks.get(event).add([handler]);
}
#assertHookEventName(event) {
if (!this.#hook_events.has(event)) {
throw new Error(`Hook event name "${event}" is unknown.`);
}
}
unhook(event, handler) {
this.#assertHookEventName(event);
const handlers = this.#hooks.get("event");
const item = [...handlers].find((item) => item.handler === handler);
handlers.remove(item);
}
async #runHooks(event, ...args) {
this.#assertHookEventName(event);

const hooks = this.#hooks.get(event);
if (!hooks) return;

// TODO run hooks by priority
// run hooks with the same priority in parallel

await Promise.all(
[...hooks].map(async ([handler]) => {
try {
await handler(...args);
} catch (err) {
this.emit("error", err);
}
}),
);
}
}

// Override
Expand Down
2 changes: 1 addition & 1 deletion packages/middleware/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Supports Node.js and browsers.

## Install

```
```sh
npm install @xmpp/middleware
```

Expand Down
13 changes: 13 additions & 0 deletions packages/stream-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ If the session fails to resume, entity will fallback to regular session establis
- Automatically responds to acks.
- Periodically request acks.
- If server fails to respond, triggers a reconnect.
- On reconnect retry sending the queue

When a stanza is re-sent, a [delay element](https://xmpp.org/extensions/xep-0203.html) will be added to it.

- `from` client jid
- `stamp` [date/time](https://xmpp.org/extensions/xep-0082.html) at which the stanza was meant to be sent

```xml
<delay xmlns="urn:xmpp:delay"
from="[email protected]/resource"
stamp="1990-01-01T00:00:00Z"
/>
```

## Events

Expand Down
17 changes: 15 additions & 2 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,22 @@ export default function streamManagement({
requestAckInterval: 30_000,
});

async function sendAck() {
try {
await entity.send(xml("a", { xmlns: NS, h: sm.inbound }));
} catch {}
}

entity.on("disconnect", () => {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);
sm.enabled = false;
});

// It is RECOMMENDED that initiating entities (usually clients) send an element right before they gracefully close the stream, in order to inform the peer about received stanzas
entity.hook("close", async () => {
if (!sm.enabled) return;
await sendAck();
});

async function resumed(resumed) {
Expand Down Expand Up @@ -127,14 +140,14 @@ export default function streamManagement({
sm.id = "";
});

middleware.use((context, next) => {
middleware.use(async (context, next) => {
const { stanza } = context;
clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
await sendAck();
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
ackQueue(+stanza.attrs.h);
Expand Down
15 changes: 15 additions & 0 deletions packages/stream-management/stream-features.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,18 @@ test("resume - failed with something in queue", async () => {
expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
});

test("sends an <a/> element before closing", async () => {
const { entity, streamManagement } = mockClient();
streamManagement.enabled = true;
streamManagement.inbound = 42;
entity.status = "online";

const promise_disconnect = entity.disconnect();

expect(await entity.catchOutgoing()).toEqual(
<a xmlns="urn:xmpp:sm:3" h={streamManagement.inbound} />,
);

await promise_disconnect;
});

0 comments on commit 20291dc

Please sign in to comment.