From 6fa312782c49e5514c1aacc7d978abcd9da0092a Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Wed, 8 Jan 2025 10:13:58 +0100 Subject: [PATCH] Emit events on streamManagement object as suggested on https://github.com/xmppjs/xmpp.js/pull/1005#pullrequestreview-2499707865 --- packages/stream-management/index.js | 17 +++++++++-------- .../stream-management/stream-features.test.js | 8 ++++---- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index dc5dc7d1..a04759bf 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,5 +1,5 @@ import XMPPError from "@xmpp/error"; -import { procedure } from "@xmpp/events"; +import { EventEmitter, procedure } from "@xmpp/events"; import xml from "@xmpp/xml"; import { datetime } from "@xmpp/time"; @@ -49,7 +49,8 @@ export default function streamManagement({ let timeoutTimeout = null; let requestAckTimeout = null; - const sm = { + const sm = new EventEmitter(); + Object.assign(sm, { allowResume: true, preferredMaximum: null, enabled: false, @@ -63,7 +64,7 @@ export default function streamManagement({ if (timeoutTimeout) clearTimeout(timeoutTimeout); if (requestAckTimeout) clearTimeout(requestAckTimeout); }, - }; + }); async function resumed(resumed) { sm.enabled = true; @@ -71,14 +72,14 @@ export default function streamManagement({ for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { let stanza = sm.outbound_q.shift(); sm.outbound++; - entity.emit("stream-management/ack", stanza); + sm.emit("ack", stanza); } let q = sm.outbound_q; sm.outbound_q = []; for (const item of q) { await entity.send(item); // This will trigger the middleware and re-add to the queue } - entity.emit("stream-management/resumed"); + sm.emit("resumed"); entity._ready(true); } @@ -87,7 +88,7 @@ export default function streamManagement({ sm.id = ""; let stanza; while ((stanza = sm.outbound_q.shift())) { - entity.emit("stream-management/fail", stanza); + sm.emit("fail", stanza); } sm.outbound = 0; } @@ -109,7 +110,7 @@ export default function streamManagement({ entity.on("offline", () => { let stanza; while ((stanza = sm.outbound_q.shift())) { - entity.emit("stream-management/fail", stanza); + sm.emit("fail", stanza); } sm.outbound = 0; sm.inbound = 0; @@ -131,7 +132,7 @@ export default function streamManagement({ for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { let stanza = sm.outbound_q.shift(); sm.outbound++; - entity.emit("stream-management/ack", stanza); + sm.emit("ack", stanza); } } diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 845e11f2..cb83a48e 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -163,7 +163,7 @@ test("enable - enabled - message", async () => { expect(entity.streamManagement.outbound_q).toHaveLength(1); let acks = 0; - entity.on("stream-management/ack", (stanza) => { + entity.streamManagement.on("ack", (stanza) => { expect(stanza.attrs.id).toBe("a"); acks++; }); @@ -202,7 +202,7 @@ test("resume - resumed", async () => { entity.mockInput(); let acks = 0; - entity.on("stream-management/ack", (stanza) => { + entity.streamManagement.on("ack", (stanza) => { expect(stanza.attrs.id).toBe("a"); acks++; }); @@ -210,7 +210,7 @@ test("resume - resumed", async () => { expect(await entity.catchOutgoing()).toEqual(); let resumed = false; - entity.on("stream-management/resumed", () => { + entity.streamManagement.on("resumed", () => { resumed = true; }); @@ -253,7 +253,7 @@ test("resume - failed", async () => { ); let failures = 0; - entity.on("stream-management/fail", (failed) => { + entity.streamManagement.on("fail", (failed) => { failures++; expect(failed).toBe("hai"); });