Skip to content

Commit

Permalink
Emit events on streamManagement object as suggested on #1005 (review)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnyp committed Jan 8, 2025
1 parent c4d4bb8 commit 6fa3127
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
17 changes: 9 additions & 8 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import XMPPError from "@xmpp/error";
import { procedure } from "@xmpp/events";
import { EventEmitter, procedure } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";

Expand Down Expand Up @@ -49,7 +49,8 @@ export default function streamManagement({
let timeoutTimeout = null;
let requestAckTimeout = null;

const sm = {
const sm = new EventEmitter();
Object.assign(sm, {
allowResume: true,
preferredMaximum: null,
enabled: false,
Expand All @@ -63,22 +64,22 @@ export default function streamManagement({
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (requestAckTimeout) clearTimeout(requestAckTimeout);
},
};
});

async function resumed(resumed) {
sm.enabled = true;
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
sm.emit("ack", stanza);
}
let q = sm.outbound_q;
sm.outbound_q = [];
for (const item of q) {
await entity.send(item); // This will trigger the middleware and re-add to the queue
}
entity.emit("stream-management/resumed");
sm.emit("resumed");
entity._ready(true);
}

Expand All @@ -87,7 +88,7 @@ export default function streamManagement({
sm.id = "";
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
sm.emit("fail", stanza);
}
sm.outbound = 0;
}
Expand All @@ -109,7 +110,7 @@ export default function streamManagement({
entity.on("offline", () => {
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
sm.emit("fail", stanza);
}
sm.outbound = 0;
sm.inbound = 0;
Expand All @@ -131,7 +132,7 @@ export default function streamManagement({
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
sm.emit("ack", stanza);
}
}

Expand Down
8 changes: 4 additions & 4 deletions packages/stream-management/stream-features.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ test("enable - enabled - message", async () => {
expect(entity.streamManagement.outbound_q).toHaveLength(1);

let acks = 0;
entity.on("stream-management/ack", (stanza) => {
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});
Expand Down Expand Up @@ -202,15 +202,15 @@ test("resume - resumed", async () => {
entity.mockInput(<resumed xmlns="urn:xmpp:sm:3" h="46" />);

let acks = 0;
entity.on("stream-management/ack", (stanza) => {
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

expect(await entity.catchOutgoing()).toEqual(<message id="b" />);

let resumed = false;
entity.on("stream-management/resumed", () => {
entity.streamManagement.on("resumed", () => {
resumed = true;
});

Expand Down Expand Up @@ -253,7 +253,7 @@ test("resume - failed", async () => {
);

let failures = 0;
entity.on("stream-management/fail", (failed) => {
entity.streamManagement.on("fail", (failed) => {
failures++;
expect(failed).toBe("hai");
});
Expand Down

0 comments on commit 6fa3127

Please sign in to comment.