diff --git a/packages/connection/index.js b/packages/connection/index.js
index 5c1cfcbb..600ea1ff 100644
--- a/packages/connection/index.js
+++ b/packages/connection/index.js
@@ -285,6 +285,8 @@ class Connection extends EventEmitter {
* https://tools.ietf.org/html/rfc7395#section-3.6
*/
async _closeStream(timeout = this.timeout) {
+ await this.#runHooks("close");
+
const fragment = this.footer(this.footerElement());
await this.write(fragment);
@@ -360,6 +362,49 @@ class Connection extends EventEmitter {
// Override
socketParameters() {}
+
+ /* Experimental hooks */
+ #hooks = new Map();
+ #hook_events = new Set(["close"]);
+ hook(event, handler /*priority = 0 TODO */) {
+ this.#assertHookEventName(event);
+
+ if (!this.#hooks.has(event)) {
+ this.#hooks.set(event, new Set());
+ }
+
+ this.#hooks.get(event).add([handler]);
+ }
+ #assertHookEventName(event) {
+ if (!this.#hook_events.has(event)) {
+ throw new Error(`Hook event name "${event}" is unknown.`);
+ }
+ }
+ unhook(event, handler) {
+ this.#assertHookEventName(event);
+ const handlers = this.#hooks.get("event");
+ const item = [...handlers].find((item) => item.handler === handler);
+ handlers.remove(item);
+ }
+ async #runHooks(event, ...args) {
+ this.#assertHookEventName(event);
+
+ const hooks = this.#hooks.get(event);
+ if (!hooks) return;
+
+ // TODO run hooks by priority
+ // run hooks with the same priority in parallel
+
+ await Promise.all(
+ [...hooks].map(async ([handler]) => {
+ try {
+ await handler(...args);
+ } catch (err) {
+ this.emit("error", err);
+ }
+ }),
+ );
+ }
}
// Override
diff --git a/packages/middleware/README.md b/packages/middleware/README.md
index 612f01a4..3f2f38af 100644
--- a/packages/middleware/README.md
+++ b/packages/middleware/README.md
@@ -6,7 +6,7 @@ Supports Node.js and browsers.
## Install
-```
+```sh
npm install @xmpp/middleware
```
diff --git a/packages/stream-management/README.md b/packages/stream-management/README.md
index 8bb8bbd5..a5ea5054 100644
--- a/packages/stream-management/README.md
+++ b/packages/stream-management/README.md
@@ -13,6 +13,19 @@ If the session fails to resume, entity will fallback to regular session establis
- Automatically responds to acks.
- Periodically request acks.
- If server fails to respond, triggers a reconnect.
+- On reconnect retry sending the queue
+
+When a stanza is re-sent, a [delay element](https://xmpp.org/extensions/xep-0203.html) will be added to it.
+
+- `from` client jid
+- `stamp` [date/time](https://xmpp.org/extensions/xep-0082.html) at which the stanza was meant to be sent
+
+```xml
+
+```
## Events
diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js
index 5d161d97..eec05d4a 100644
--- a/packages/stream-management/index.js
+++ b/packages/stream-management/index.js
@@ -63,11 +63,23 @@ export default function streamManagement({
requestAckInterval: 30_000,
});
+ async function sendAck() {
+ try {
+ await entity.send(xml("a", { xmlns: NS, h: sm.inbound }));
+ } catch {}
+ }
+
entity.on("disconnect", () => {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);
});
+ // It is RECOMMENDED that initiating entities (usually clients) send an element right before they gracefully close the stream, in order to inform the peer about received stanzas
+ entity.hook("close", async () => {
+ if (!sm.enabled) return;
+ await sendAck();
+ });
+
async function resumed(resumed) {
sm.enabled = true;
ackQueue(+resumed.attrs.h);
@@ -127,14 +139,14 @@ export default function streamManagement({
sm.id = "";
});
- middleware.use((context, next) => {
+ middleware.use(async (context, next) => {
const { stanza } = context;
clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an element ("request") is received, the recipient MUST acknowledge it by sending an element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the element.
- entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
+ await sendAck();
} else if (stanza.is("a", NS)) {
// > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
ackQueue(+stanza.attrs.h);
diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js
index 69f2fa64..79a07292 100644
--- a/packages/stream-management/stream-features.test.js
+++ b/packages/stream-management/stream-features.test.js
@@ -336,3 +336,18 @@ test("resume - failed with something in queue", async () => {
expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
});
+
+test("sends an element before closing", async () => {
+ const { entity, streamManagement } = mockClient();
+ streamManagement.enabled = true;
+ streamManagement.inbound = 42;
+ entity.status = "online";
+
+ const promise_disconnect = entity.disconnect();
+
+ expect(await entity.catchOutgoing()).toEqual(
+ ,
+ );
+
+ await promise_disconnect;
+});