From c4d4bb825451a564e5d32a12ba4a0f270a3f7c3e Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Tue, 14 Nov 2023 16:31:15 -0500 Subject: [PATCH 01/21] Implement stream management requesting ACKs Queue outgoing stanzas and periodically request ACK. Remove from the queue anything ack'd and notify of the ack so apps can know the stanza has for sure sent. On resume, anything not ack'd is re-sent. On reconnect, anything not ack'd notify of the failure to send this stanza so apps can know the stanza failed. Even when there is no traffic, send an at least every 5 minutes to check the connection. If there is no inbound traffic (such as an ) within timeout (default 60 seconds) then consider the connection disconnected. --- package-lock.json | 1 + packages/client-core/src/bind2/bind2.test.js | 2 - packages/stream-management/index.js | 86 ++++++++++++++++-- packages/stream-management/package.json | 3 +- .../stream-management/stream-features.test.js | 87 ++++++++++++++++++- packages/xml/index.js | 3 + 6 files changed, 171 insertions(+), 11 deletions(-) diff --git a/package-lock.json b/package-lock.json index 13d18a4d..de35cabd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14407,6 +14407,7 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", + "@xmpp/time": "^0.14.0", "@xmpp/xml": "^0.14.0" }, "engines": { diff --git a/packages/client-core/src/bind2/bind2.test.js b/packages/client-core/src/bind2/bind2.test.js index b0aac358..23c5fddb 100644 --- a/packages/client-core/src/bind2/bind2.test.js +++ b/packages/client-core/src/bind2/bind2.test.js @@ -66,7 +66,6 @@ test("with function resource returning string", async () => { test("with function resource throwing", async () => { const error = new Error("foo"); - function resource() { throw error; } @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => { test("with function resource returning rejected promise", async () => { const error = new Error("foo"); - async function resource() { throw error; } diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 9f162ce3..dc5dc7d1 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,6 +1,7 @@ import XMPPError from "@xmpp/error"; import { procedure } from "@xmpp/events"; import xml from "@xmpp/xml"; +import { datetime } from "@xmpp/time"; // https://xmpp.org/extensions/xep-0198.html @@ -45,24 +46,49 @@ export default function streamManagement({ bind2, sasl2, }) { + let timeoutTimeout = null; + let requestAckTimeout = null; + const sm = { allowResume: true, preferredMaximum: null, enabled: false, id: "", + outbound_q: [], outbound: 0, inbound: 0, max: null, + timeout: 60_000, + _teardown: () => { + if (timeoutTimeout) clearTimeout(timeoutTimeout); + if (requestAckTimeout) clearTimeout(requestAckTimeout); + }, }; - function resumed() { + async function resumed(resumed) { sm.enabled = true; + const oldOutbound = sm.outbound; + for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { + let stanza = sm.outbound_q.shift(); + sm.outbound++; + entity.emit("stream-management/ack", stanza); + } + let q = sm.outbound_q; + sm.outbound_q = []; + for (const item of q) { + await entity.send(item); // This will trigger the middleware and re-add to the queue + } + entity.emit("stream-management/resumed"); entity._ready(true); } function failed() { sm.enabled = false; sm.id = ""; + let stanza; + while ((stanza = sm.outbound_q.shift())) { + entity.emit("stream-management/fail", stanza); + } sm.outbound = 0; } @@ -73,11 +99,18 @@ export default function streamManagement({ } entity.on("online", () => { + if (sm.outbound_q.length > 0) { + throw "Stream Management assertion failure, queue should be empty during online"; + } sm.outbound = 0; sm.inbound = 0; }); entity.on("offline", () => { + let stanza; + while ((stanza = sm.outbound_q.shift())) { + entity.emit("stream-management/fail", stanza); + } sm.outbound = 0; sm.inbound = 0; sm.enabled = false; @@ -86,6 +119,7 @@ export default function streamManagement({ middleware.use((context, next) => { const { stanza } = context; + if (timeoutTimeout) clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { @@ -93,7 +127,12 @@ export default function streamManagement({ entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). - sm.outbound = stanza.attrs.h; + const oldOutbound = sm.outbound; + for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { + let stanza = sm.outbound_q.shift(); + sm.outbound++; + entity.emit("stream-management/ack", stanza); + } } return next(); @@ -105,6 +144,41 @@ export default function streamManagement({ if (sasl2) { setupSasl2({ sasl2, sm, failed, resumed }); } + + function requestAck() { + if (timeoutTimeout) clearTimeout(timeoutTimeout); + if (sm.timeout) { + timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout); + } + entity.send(xml("r", { xmlns: NS })).catch(() => {}); + // Periodically send r to check the connection + // If a stanza goes out it will cancel this and set a sooner timer + requestAckTimeout = setTimeout(requestAck, 300_000); + } + + middleware.filter((context, next) => { + const { stanza } = context; + if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) { + let qStanza = stanza; + if ( + qStanza.name === "message" && + !qStanza.getChild("delay", "urn:xmpp:delay") + ) { + qStanza = xml.clone(qStanza); + qStanza.c("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp: datetime(), + }); + } + sm.outbound_q.push(qStanza); + // Debounce requests so we send only one after a big run of stanza together + if (requestAckTimeout) clearTimeout(requestAckTimeout); + requestAckTimeout = setTimeout(requestAck, 100); + } + return next(); + }); + if (streamFeatures) { setupStreamFeature({ streamFeatures, @@ -133,8 +207,7 @@ function setupStreamFeature({ // Resuming if (sm.id) { try { - await resume(entity, sm); - resumed(); + await resumed(await resume(entity, sm)); return; // If resumption fails, continue with session establishment } catch { @@ -150,6 +223,9 @@ function setupStreamFeature({ const promiseEnable = enable(entity, sm); // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . + if (sm.outbound_q.length > 0) { + throw "Stream Management assertion failure, queue should be empty after enable"; + } sm.outbound = 0; try { @@ -172,7 +248,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) { }, (element) => { if (element.is("resumed")) { - resumed(); + resumed(element); } else if (element.is(failed)) { // const error = StreamError.fromElement(element) failed(); diff --git a/packages/stream-management/package.json b/packages/stream-management/package.json index e9818cbb..72503e38 100644 --- a/packages/stream-management/package.json +++ b/packages/stream-management/package.json @@ -16,7 +16,8 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", - "@xmpp/xml": "^0.14.0" + "@xmpp/xml": "^0.14.0", + "@xmpp/time": "^0.14.0" }, "engines": { "node": ">= 20" diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index a8edc437..845e11f2 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -22,6 +22,7 @@ test("enable - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -73,6 +74,7 @@ test("enable - message - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -112,6 +114,7 @@ test("enable - failed", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); entity.streamManagement.enabled = true; entity.mockInput( @@ -125,6 +128,54 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); +test("enable - enabled - message", async () => { + const { entity } = mockClient(); + + entity.mockInput( + + + , + ); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + entity.mockInput( + , + ); + + await tick(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); + expect(entity.streamManagement.enabled).toBe(true); + + await entity.send(); + entity.streamManagement._teardown(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + + let acks = 0; + entity.on("stream-management/ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + entity.mockInput(); + await tick(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(1); + expect(entity.streamManagement.outbound_q).toHaveLength(0); +}); + test("resume - resumed", async () => { const { entity } = mockClient(); @@ -138,6 +189,7 @@ test("resume - resumed", async () => { ); entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [, ]; expect(await entity.catchOutgoing()).toEqual( , @@ -147,11 +199,31 @@ test("resume - resumed", async () => { expect(entity.status).toBe("offline"); - entity.mockInput(); + entity.mockInput(); - await tick(); + let acks = 0; + entity.on("stream-management/ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual(); - expect(entity.streamManagement.outbound).toBe(45); + let resumed = false; + entity.on("stream-management/resumed", () => { + resumed = true; + }); + + await tick(); + entity.streamManagement._teardown(); + + expect(resumed).toBe(true); + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + expect( + entity.streamManagement.outbound_q[0].getChild("delay", "urn:xmpp:delay"), + ).not.toBeUndefined(); expect(entity.status).toBe("online"); }); @@ -162,6 +234,7 @@ test("resume - failed", async () => { entity.streamManagement.id = "bar"; entity.streamManagement.enabled = true; entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = ["hai"]; entity.mockInput( @@ -179,10 +252,18 @@ test("resume - failed", async () => { , ); + let failures = 0; + entity.on("stream-management/fail", (failed) => { + failures++; + expect(failed).toBe("hai"); + }); + await tick(); + expect(failures).toBe(1); expect(entity.status).toBe("bar"); expect(entity.streamManagement.id).toBe(""); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); }); diff --git a/packages/xml/index.js b/packages/xml/index.js index efb06e0c..7b512e86 100644 --- a/packages/xml/index.js +++ b/packages/xml/index.js @@ -1,4 +1,5 @@ import Element from "ltx/lib/Element.js"; +import clone from "ltx/lib/clone.js"; import createElement from "ltx/lib/createElement.js"; import Parser from "./lib/Parser.js"; import { @@ -17,6 +18,7 @@ Object.assign(xml, { Element, createElement, Parser, + clone, escapeXML, unescapeXML, escapeXMLText, @@ -29,6 +31,7 @@ export { Element, createElement, Parser, + clone, escapeXML, unescapeXML, escapeXMLText, From 6fa312782c49e5514c1aacc7d978abcd9da0092a Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Wed, 8 Jan 2025 10:13:58 +0100 Subject: [PATCH 02/21] Emit events on streamManagement object as suggested on https://github.com/xmppjs/xmpp.js/pull/1005#pullrequestreview-2499707865 --- packages/stream-management/index.js | 17 +++++++++-------- .../stream-management/stream-features.test.js | 8 ++++---- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index dc5dc7d1..a04759bf 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,5 +1,5 @@ import XMPPError from "@xmpp/error"; -import { procedure } from "@xmpp/events"; +import { EventEmitter, procedure } from "@xmpp/events"; import xml from "@xmpp/xml"; import { datetime } from "@xmpp/time"; @@ -49,7 +49,8 @@ export default function streamManagement({ let timeoutTimeout = null; let requestAckTimeout = null; - const sm = { + const sm = new EventEmitter(); + Object.assign(sm, { allowResume: true, preferredMaximum: null, enabled: false, @@ -63,7 +64,7 @@ export default function streamManagement({ if (timeoutTimeout) clearTimeout(timeoutTimeout); if (requestAckTimeout) clearTimeout(requestAckTimeout); }, - }; + }); async function resumed(resumed) { sm.enabled = true; @@ -71,14 +72,14 @@ export default function streamManagement({ for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { let stanza = sm.outbound_q.shift(); sm.outbound++; - entity.emit("stream-management/ack", stanza); + sm.emit("ack", stanza); } let q = sm.outbound_q; sm.outbound_q = []; for (const item of q) { await entity.send(item); // This will trigger the middleware and re-add to the queue } - entity.emit("stream-management/resumed"); + sm.emit("resumed"); entity._ready(true); } @@ -87,7 +88,7 @@ export default function streamManagement({ sm.id = ""; let stanza; while ((stanza = sm.outbound_q.shift())) { - entity.emit("stream-management/fail", stanza); + sm.emit("fail", stanza); } sm.outbound = 0; } @@ -109,7 +110,7 @@ export default function streamManagement({ entity.on("offline", () => { let stanza; while ((stanza = sm.outbound_q.shift())) { - entity.emit("stream-management/fail", stanza); + sm.emit("fail", stanza); } sm.outbound = 0; sm.inbound = 0; @@ -131,7 +132,7 @@ export default function streamManagement({ for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { let stanza = sm.outbound_q.shift(); sm.outbound++; - entity.emit("stream-management/ack", stanza); + sm.emit("ack", stanza); } } diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 845e11f2..cb83a48e 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -163,7 +163,7 @@ test("enable - enabled - message", async () => { expect(entity.streamManagement.outbound_q).toHaveLength(1); let acks = 0; - entity.on("stream-management/ack", (stanza) => { + entity.streamManagement.on("ack", (stanza) => { expect(stanza.attrs.id).toBe("a"); acks++; }); @@ -202,7 +202,7 @@ test("resume - resumed", async () => { entity.mockInput(); let acks = 0; - entity.on("stream-management/ack", (stanza) => { + entity.streamManagement.on("ack", (stanza) => { expect(stanza.attrs.id).toBe("a"); acks++; }); @@ -210,7 +210,7 @@ test("resume - resumed", async () => { expect(await entity.catchOutgoing()).toEqual(); let resumed = false; - entity.on("stream-management/resumed", () => { + entity.streamManagement.on("resumed", () => { resumed = true; }); @@ -253,7 +253,7 @@ test("resume - failed", async () => { ); let failures = 0; - entity.on("stream-management/fail", (failed) => { + entity.streamManagement.on("fail", (failed) => { failures++; expect(failed).toBe("hai"); }); From 710b16754428ca5d0501e812e98d74a50710a715 Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Wed, 8 Jan 2025 10:17:20 +0100 Subject: [PATCH 03/21] Remove unecessary conditions to clearTimeout --- packages/stream-management/index.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index a04759bf..8cc8c575 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -61,8 +61,8 @@ export default function streamManagement({ max: null, timeout: 60_000, _teardown: () => { - if (timeoutTimeout) clearTimeout(timeoutTimeout); - if (requestAckTimeout) clearTimeout(requestAckTimeout); + clearTimeout(timeoutTimeout); + clearTimeout(requestAckTimeout); }, }); @@ -120,7 +120,7 @@ export default function streamManagement({ middleware.use((context, next) => { const { stanza } = context; - if (timeoutTimeout) clearTimeout(timeoutTimeout); + clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { @@ -147,7 +147,7 @@ export default function streamManagement({ } function requestAck() { - if (timeoutTimeout) clearTimeout(timeoutTimeout); + clearTimeout(timeoutTimeout); if (sm.timeout) { timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout); } @@ -174,7 +174,7 @@ export default function streamManagement({ } sm.outbound_q.push(qStanza); // Debounce requests so we send only one after a big run of stanza together - if (requestAckTimeout) clearTimeout(requestAckTimeout); + clearTimeout(requestAckTimeout); requestAckTimeout = setTimeout(requestAck, 100); } return next(); From ce30e8b960fb4d87ec310a6095a5488744bfed53 Mon Sep 17 00:00:00 2001 From: Sonny Piers Date: Wed, 8 Jan 2025 10:25:18 +0100 Subject: [PATCH 04/21] prefer early return --- packages/stream-management/index.js | 35 +++++++++++++++-------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 8cc8c575..74d2eee9 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -158,25 +158,26 @@ export default function streamManagement({ } middleware.filter((context, next) => { + if (!sm.enabled) return next(); const { stanza } = context; - if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) { - let qStanza = stanza; - if ( - qStanza.name === "message" && - !qStanza.getChild("delay", "urn:xmpp:delay") - ) { - qStanza = xml.clone(qStanza); - qStanza.c("delay", { - xmlns: "urn:xmpp:delay", - from: entity.jid.toString(), - stamp: datetime(), - }); - } - sm.outbound_q.push(qStanza); - // Debounce requests so we send only one after a big run of stanza together - clearTimeout(requestAckTimeout); - requestAckTimeout = setTimeout(requestAck, 100); + if (!["presence", "message", "iq"].includes(stanza.name)) return next(); + + let qStanza = stanza; + if ( + qStanza.name === "message" && + !qStanza.getChild("delay", "urn:xmpp:delay") + ) { + qStanza = xml.clone(qStanza); + qStanza.c("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp: datetime(), + }); } + sm.outbound_q.push(qStanza); + // Debounce requests so we send only one after a big run of stanza together + clearTimeout(requestAckTimeout); + requestAckTimeout = setTimeout(requestAck, 100); return next(); }); From 7ef1d8de6c5430e44904477c44d7be11ae3b25eb Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 10:50:23 -0500 Subject: [PATCH 05/21] No more clone Store the stamp seperately in the queue and then add it the first time we retry. Saves some work and a dependency. --- packages/stream-management/index.js | 44 ++++++++++--------- .../stream-management/stream-features.test.js | 20 ++++++--- packages/xml/index.js | 3 -- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 74d2eee9..56c75bdb 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -66,18 +66,32 @@ export default function streamManagement({ }, }); + async function sendQueueItem({ stanza, stamp }) { + if ( + stanza.name === "message" && + !stanza.getChild("delay", "urn:xmpp:delay") + ) { + stanza.append(xml("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp: stamp, + })); + } + await entity.send(stanza); + } + async function resumed(resumed) { sm.enabled = true; const oldOutbound = sm.outbound; for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { - let stanza = sm.outbound_q.shift(); + let item = sm.outbound_q.shift(); sm.outbound++; - sm.emit("ack", stanza); + sm.emit("ack", item.stanza); } let q = sm.outbound_q; sm.outbound_q = []; for (const item of q) { - await entity.send(item); // This will trigger the middleware and re-add to the queue + await sendQueueItem(item); // This will trigger the middleware and re-add to the queue } sm.emit("resumed"); entity._ready(true); @@ -86,9 +100,9 @@ export default function streamManagement({ function failed() { sm.enabled = false; sm.id = ""; - let stanza; - while ((stanza = sm.outbound_q.shift())) { - sm.emit("fail", stanza); + let item; + while ((item = sm.outbound_q.shift())) { + sm.emit("fail", item.stanza); } sm.outbound = 0; } @@ -130,9 +144,9 @@ export default function streamManagement({ // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). const oldOutbound = sm.outbound; for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { - let stanza = sm.outbound_q.shift(); + let item = sm.outbound_q.shift(); sm.outbound++; - sm.emit("ack", stanza); + sm.emit("ack", item.stanza); } } @@ -162,19 +176,7 @@ export default function streamManagement({ const { stanza } = context; if (!["presence", "message", "iq"].includes(stanza.name)) return next(); - let qStanza = stanza; - if ( - qStanza.name === "message" && - !qStanza.getChild("delay", "urn:xmpp:delay") - ) { - qStanza = xml.clone(qStanza); - qStanza.c("delay", { - xmlns: "urn:xmpp:delay", - from: entity.jid.toString(), - stamp: datetime(), - }); - } - sm.outbound_q.push(qStanza); + sm.outbound_q.push({ stanza, stamp: datetime() }); // Debounce requests so we send only one after a big run of stanza together clearTimeout(requestAckTimeout); requestAckTimeout = setTimeout(requestAck, 100); diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index cb83a48e..f30c572d 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -189,7 +189,10 @@ test("resume - resumed", async () => { ); entity.streamManagement.outbound = 45; - entity.streamManagement.outbound_q = [, ]; + entity.streamManagement.outbound_q = [ + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + ]; expect(await entity.catchOutgoing()).toEqual( , @@ -207,7 +210,15 @@ test("resume - resumed", async () => { acks++; }); - expect(await entity.catchOutgoing()).toEqual(); + expect(await entity.catchOutgoing()).toEqual( + + + , + ); let resumed = false; entity.streamManagement.on("resumed", () => { @@ -221,9 +232,6 @@ test("resume - resumed", async () => { expect(acks).toBe(1); expect(entity.streamManagement.outbound).toBe(46); expect(entity.streamManagement.outbound_q).toHaveLength(1); - expect( - entity.streamManagement.outbound_q[0].getChild("delay", "urn:xmpp:delay"), - ).not.toBeUndefined(); expect(entity.status).toBe("online"); }); @@ -234,7 +242,7 @@ test("resume - failed", async () => { entity.streamManagement.id = "bar"; entity.streamManagement.enabled = true; entity.streamManagement.outbound = 45; - entity.streamManagement.outbound_q = ["hai"]; + entity.streamManagement.outbound_q = [{ stanza: "hai" }]; entity.mockInput( diff --git a/packages/xml/index.js b/packages/xml/index.js index 7b512e86..efb06e0c 100644 --- a/packages/xml/index.js +++ b/packages/xml/index.js @@ -1,5 +1,4 @@ import Element from "ltx/lib/Element.js"; -import clone from "ltx/lib/clone.js"; import createElement from "ltx/lib/createElement.js"; import Parser from "./lib/Parser.js"; import { @@ -18,7 +17,6 @@ Object.assign(xml, { Element, createElement, Parser, - clone, escapeXML, unescapeXML, escapeXMLText, @@ -31,7 +29,6 @@ export { Element, createElement, Parser, - clone, escapeXML, unescapeXML, escapeXMLText, From d76e479eed725a10ce1f1f7b9fc3e3b949a49223 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 10:51:01 -0500 Subject: [PATCH 06/21] Throw Error not string --- packages/stream-management/index.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 56c75bdb..6273002b 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -115,7 +115,9 @@ export default function streamManagement({ entity.on("online", () => { if (sm.outbound_q.length > 0) { - throw "Stream Management assertion failure, queue should be empty during online"; + throw new Error( + "Stream Management assertion failure, queue should be empty during online", + ); } sm.outbound = 0; sm.inbound = 0; @@ -228,7 +230,9 @@ function setupStreamFeature({ // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . if (sm.outbound_q.length > 0) { - throw "Stream Management assertion failure, queue should be empty after enable"; + throw new Error( + "Stream Management assertion failure, queue should be empty after enable", + ); } sm.outbound = 0; From 69d0752e6085a3ab51d6ddc98853206d2c0f72b8 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 10:51:16 -0500 Subject: [PATCH 07/21] Catch any disconnect failure --- packages/stream-management/index.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 6273002b..e5f1ce69 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -165,7 +165,10 @@ export default function streamManagement({ function requestAck() { clearTimeout(timeoutTimeout); if (sm.timeout) { - timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout); + timeoutTimeout = setTimeout( + () => entity.disconnect().catch(), + sm.timeout, + ); } entity.send(xml("r", { xmlns: NS })).catch(() => {}); // Periodically send r to check the connection From 0190cb6bb5013a8746e48f05f34e741c87217876 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 10:51:28 -0500 Subject: [PATCH 08/21] Reformat this line --- packages/stream-management/index.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index e5f1ce69..dcae39fa 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -216,7 +216,8 @@ function setupStreamFeature({ // Resuming if (sm.id) { try { - await resumed(await resume(entity, sm)); + const element = await resume(entity, sm); + await resumed(element); return; // If resumption fails, continue with session establishment } catch { From b2bf99cc19cd96e5f78022b309df3016c409ff1d Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 10:55:45 -0500 Subject: [PATCH 09/21] Make more timeouts configurable --- packages/stream-management/index.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index dcae39fa..158ed6ac 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -60,6 +60,8 @@ export default function streamManagement({ inbound: 0, max: null, timeout: 60_000, + requestAckInterval: 300_000, + debounceAckRequest: 100, _teardown: () => { clearTimeout(timeoutTimeout); clearTimeout(requestAckTimeout); @@ -173,7 +175,7 @@ export default function streamManagement({ entity.send(xml("r", { xmlns: NS })).catch(() => {}); // Periodically send r to check the connection // If a stanza goes out it will cancel this and set a sooner timer - requestAckTimeout = setTimeout(requestAck, 300_000); + requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval); } middleware.filter((context, next) => { @@ -184,7 +186,7 @@ export default function streamManagement({ sm.outbound_q.push({ stanza, stamp: datetime() }); // Debounce requests so we send only one after a big run of stanza together clearTimeout(requestAckTimeout); - requestAckTimeout = setTimeout(requestAck, 100); + requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest); return next(); }); From 0f79b3b3ab717bc5a134ab785eacfd548c27a4ba Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 10:58:21 -0500 Subject: [PATCH 10/21] Test failure with something in queue seperately --- .../stream-management/stream-features.test.js | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index f30c572d..1439a25d 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -238,6 +238,40 @@ test("resume - resumed", async () => { test("resume - failed", async () => { const { entity } = mockClient(); + entity.status = "bar"; + entity.streamManagement.id = "bar"; + entity.streamManagement.enabled = true; + entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = []; + + entity.mockInput( + + + , + ); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + entity.mockInput( + + + , + ); + + await tick(); + + expect(entity.status).toBe("bar"); + expect(entity.streamManagement.id).toBe(""); + expect(entity.streamManagement.enabled).toBe(false); + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); +}); + +test("resume - failed with something in queue", async () => { + const { entity } = mockClient(); + entity.status = "bar"; entity.streamManagement.id = "bar"; entity.streamManagement.enabled = true; From efad5c1e3dd7271888156bb3f8e9a682912451f5 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 11:03:37 -0500 Subject: [PATCH 11/21] Document events in readme --- packages/stream-management/README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/stream-management/README.md b/packages/stream-management/README.md index 296b8ad4..d28596e5 100644 --- a/packages/stream-management/README.md +++ b/packages/stream-management/README.md @@ -10,7 +10,13 @@ When the session is resumed the `online` event is not emitted as session resumpt However `entity.status` is set to `online`. If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted. -Automatically responds to acks but does not support requesting acks yet. +Automatically responds to acks and requests them. Also requests periodically even if you haven't sent anything. If server fails to respond to a request, the module triggers a reconnect. + +## Events + +**resumed**: Indicates that the connection was resumed (so online with no online event) +**fail**: Indicates that a stanza failed to send to the server and will not be retried +**ack**: Indicates that a stanza has been acknowledged by the server ## References From 8cb8dff2ae1d245cc2bff651a92eddcdece8c2af Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:09:38 -0500 Subject: [PATCH 12/21] Fix comment position --- packages/stream-management/index.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 158ed6ac..eebdd5a2 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -234,12 +234,13 @@ function setupStreamFeature({ const promiseEnable = enable(entity, sm); - // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . if (sm.outbound_q.length > 0) { throw new Error( "Stream Management assertion failure, queue should be empty after enable", ); } + + // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . sm.outbound = 0; try { From 8c408ef572c9f16658960fe72ee3afbc840ce93a Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Wed, 8 Jan 2025 11:36:43 -0500 Subject: [PATCH 13/21] e2e test for stream management --- test/stream-management.js | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 test/stream-management.js diff --git a/test/stream-management.js b/test/stream-management.js new file mode 100644 index 00000000..3db72ee1 --- /dev/null +++ b/test/stream-management.js @@ -0,0 +1,37 @@ +import { client } from "../packages/client/index.js"; +import debug from "../packages/debug/index.js"; +import server from "../server/index.js"; + +const username = "client"; +const password = "foobar"; +const credentials = { username, password }; +const domain = "localhost"; + +let xmpp; + +afterEach(async () => { + await xmpp?.stop(); + await server.reset(); +}); + +test("client ack stanzas", async () => { + expect.assertions(1); + + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + xmpp.streamManagement.on("ack", (el) => { + expect(el.attrs.id).toEqual("ping"); + xmpp.streamManagement._teardown(); + }); + + await xmpp.start(); + await xmpp.send( + + + , + ); +}); From 43a2d69180511ce727497d94d9da73ac6591e987 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:29:05 -0500 Subject: [PATCH 14/21] Use sendMany --- packages/stream-management/index.js | 21 +++++++++++---------- packages/test/mockClient.js | 5 +++++ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index eebdd5a2..02bfb7f7 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -68,18 +68,20 @@ export default function streamManagement({ }, }); - async function sendQueueItem({ stanza, stamp }) { + function queueToStanza({ stanza, stamp }) { if ( stanza.name === "message" && !stanza.getChild("delay", "urn:xmpp:delay") ) { - stanza.append(xml("delay", { - xmlns: "urn:xmpp:delay", - from: entity.jid.toString(), - stamp: stamp, - })); + stanza.append( + xml("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp: stamp, + }), + ); } - await entity.send(stanza); + return stanza; } async function resumed(resumed) { @@ -92,9 +94,8 @@ export default function streamManagement({ } let q = sm.outbound_q; sm.outbound_q = []; - for (const item of q) { - await sendQueueItem(item); // This will trigger the middleware and re-add to the queue - } + // This will trigger the middleware and re-add to the queue + await entity.sendMany(q.map((item) => queueToStanza(item))); sm.emit("resumed"); entity._ready(true); } diff --git a/packages/test/mockClient.js b/packages/test/mockClient.js index c7025edd..45eff046 100644 --- a/packages/test/mockClient.js +++ b/packages/test/mockClient.js @@ -5,6 +5,11 @@ import context from "./context.js"; export default function mockClient(options) { const xmpp = client(options); xmpp.send = Connection.prototype.send; + xmpp.sendMany = async (stanzas) => { + for (const stanza of stanzas) { + await xmpp.send(stanza); + } + }; const ctx = context(xmpp); return Object.assign(xmpp, ctx); } From 97e8d283e8a44e0ac47e139396cb9be6734a5004 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:30:19 -0500 Subject: [PATCH 15/21] Change test name --- packages/stream-management/stream-features.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 1439a25d..7b42b4f0 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -128,7 +128,7 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); -test("enable - enabled - message", async () => { +test("stanza ack", async () => { const { entity } = mockClient(); entity.mockInput( From f076fd1e544447370a1fa0ebf0ebfe4d3e6b30b4 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:32:41 -0500 Subject: [PATCH 16/21] Split up resumed test --- .../stream-management/stream-features.test.js | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 7b42b4f0..757fe358 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -220,6 +220,59 @@ test("resume - resumed", async () => { , ); + await tick(); + entity.streamManagement._teardown(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + expect(entity.status).toBe("online"); +}); + +test("resumed event", async () => { + const { entity } = mockClient(); + + entity.status = "offline"; + entity.streamManagement.id = "bar"; + + entity.mockInput( + + + , + ); + + entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [ + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + ]; + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + expect(entity.streamManagement.enabled).toBe(false); + + expect(entity.status).toBe("offline"); + + entity.mockInput(); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual( + + + , + ); + let resumed = false; entity.streamManagement.on("resumed", () => { resumed = true; From f6e15ba1b559e5dd052057dd402d0f21260d48b4 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:36:02 -0500 Subject: [PATCH 17/21] Teardown on disconnect --- packages/stream-management/index.js | 9 +++++---- packages/stream-management/stream-features.test.js | 3 --- test/stream-management.js | 1 - 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 02bfb7f7..68059037 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -62,10 +62,11 @@ export default function streamManagement({ timeout: 60_000, requestAckInterval: 300_000, debounceAckRequest: 100, - _teardown: () => { - clearTimeout(timeoutTimeout); - clearTimeout(requestAckTimeout); - }, + }); + + entity.on("disconnect", () => { + clearTimeout(timeoutTimeout); + clearTimeout(requestAckTimeout); }); function queueToStanza({ stanza, stamp }) { diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index 757fe358..3eefbbe9 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -157,7 +157,6 @@ test("stanza ack", async () => { expect(entity.streamManagement.enabled).toBe(true); await entity.send(); - entity.streamManagement._teardown(); expect(entity.streamManagement.outbound).toBe(0); expect(entity.streamManagement.outbound_q).toHaveLength(1); @@ -221,7 +220,6 @@ test("resume - resumed", async () => { ); await tick(); - entity.streamManagement._teardown(); expect(acks).toBe(1); expect(entity.streamManagement.outbound).toBe(46); @@ -279,7 +277,6 @@ test("resumed event", async () => { }); await tick(); - entity.streamManagement._teardown(); expect(resumed).toBe(true); expect(acks).toBe(1); diff --git a/test/stream-management.js b/test/stream-management.js index 3db72ee1..e277c7ba 100644 --- a/test/stream-management.js +++ b/test/stream-management.js @@ -25,7 +25,6 @@ test("client ack stanzas", async () => { xmpp.streamManagement.on("ack", (el) => { expect(el.attrs.id).toEqual("ping"); - xmpp.streamManagement._teardown(); }); await xmpp.start(); From 2209036c6a5511cc6808761f1099e6d78261d7b8 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:41:52 -0500 Subject: [PATCH 18/21] Use promise helper --- test/stream-management.js | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/test/stream-management.js b/test/stream-management.js index e277c7ba..72bda9a6 100644 --- a/test/stream-management.js +++ b/test/stream-management.js @@ -1,4 +1,5 @@ import { client } from "../packages/client/index.js"; +import { promise } from "../packages/events/index.js"; import debug from "../packages/debug/index.js"; import server from "../server/index.js"; @@ -15,22 +16,20 @@ afterEach(async () => { }); test("client ack stanzas", async () => { - expect.assertions(1); - await server.enableModules(["smacks"]); await server.restart(); xmpp = client({ credentials, service: domain }); debug(xmpp); - xmpp.streamManagement.on("ack", (el) => { - expect(el.attrs.id).toEqual("ping"); - }); - + const elP = promise(xmpp.streamManagement, "ack"); await xmpp.start(); await xmpp.send( , ); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); }); From c66696c4d9d0cddc7176a62bc7908696c47d3636 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:46:55 -0500 Subject: [PATCH 19/21] e2e test for fail event when offline --- packages/stream-management/index.js | 6 +++--- test/stream-management.js | 23 +++++++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 68059037..ace127af 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -128,9 +128,9 @@ export default function streamManagement({ }); entity.on("offline", () => { - let stanza; - while ((stanza = sm.outbound_q.shift())) { - sm.emit("fail", stanza); + let item; + while ((item = sm.outbound_q.shift())) { + sm.emit("fail", item.stanza); } sm.outbound = 0; sm.inbound = 0; diff --git a/test/stream-management.js b/test/stream-management.js index 72bda9a6..a8669bfd 100644 --- a/test/stream-management.js +++ b/test/stream-management.js @@ -1,5 +1,6 @@ import { client } from "../packages/client/index.js"; import { promise } from "../packages/events/index.js"; +import { datetime } from "../packages/time/index.js"; import debug from "../packages/debug/index.js"; import server from "../server/index.js"; @@ -33,3 +34,25 @@ test("client ack stanzas", async () => { const el = await elP; expect(el.attrs.id).toEqual("ping"); }); + +test("client fail stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "fail"); + await xmpp.start(); + // Expect send but don't actually send to server, so it will fail + await xmpp.streamManagement.outbound_q.push({ + stanza: + + , + stamp: datetime() + }); + await xmpp.stop(); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); From bb3998ada4bdaec6ee0dd4fa7ef65374cdec784a Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 11:51:57 -0500 Subject: [PATCH 20/21] e2e test for stanza retry --- test/stream-management.js | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/stream-management.js b/test/stream-management.js index a8669bfd..926bbc4d 100644 --- a/test/stream-management.js +++ b/test/stream-management.js @@ -56,3 +56,25 @@ test("client fail stanzas", async () => { const el = await elP; expect(el.attrs.id).toEqual("ping"); }); + +test("client retry stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "ack"); + await xmpp.start(); + // Add to queue but don't actually send so it can retry after disconnect + await xmpp.streamManagement.outbound_q.push({ + stanza: + + , + stamp: datetime() + }); + await xmpp.disconnect(); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); From 473612001c37e36dcd50979a4f15dbbcb19f6d05 Mon Sep 17 00:00:00 2001 From: Stephen Paul Weber Date: Mon, 13 Jan 2025 12:30:46 -0500 Subject: [PATCH 21/21] e2e test auto reconnect Needs to use force-disconnect especially if we emulate an issue using pause --- packages/connection/index.js | 15 +++++++++++++++ packages/stream-management/index.js | 2 +- packages/tls/lib/Socket.js | 4 ++++ packages/websocket/lib/Socket.js | 4 ++++ test/stream-management.js | 22 ++++++++++++++++++++++ 5 files changed, 46 insertions(+), 1 deletion(-) diff --git a/packages/connection/index.js b/packages/connection/index.js index 77f53d2e..3f631860 100644 --- a/packages/connection/index.js +++ b/packages/connection/index.js @@ -252,6 +252,21 @@ class Connection extends EventEmitter { await promise(this.socket, "close", "error", timeout); } + /** + * Forcibly disconnects the socket + * https://xmpp.org/rfcs/rfc6120.html#streams-close + * https://tools.ietf.org/html/rfc7395#section-3.6 + */ + async forceDisconnect(timeout = this.timeout) { + if (!this.socket) return; + + this._status("disconnecting"); + this.socket.destroy(); + + // The 'disconnect' status is set by the socket 'close' listener + await promise(this.socket, "close", "error", timeout); + } + /** * Opens the stream */ diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index ace127af..0cc7fa8b 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -170,7 +170,7 @@ export default function streamManagement({ clearTimeout(timeoutTimeout); if (sm.timeout) { timeoutTimeout = setTimeout( - () => entity.disconnect().catch(), + () => entity.forceDisconnect().catch(), sm.timeout, ); } diff --git a/packages/tls/lib/Socket.js b/packages/tls/lib/Socket.js index e0bbfa27..9b60506c 100644 --- a/packages/tls/lib/Socket.js +++ b/packages/tls/lib/Socket.js @@ -63,6 +63,10 @@ class Socket extends EventEmitter { this.socket.end(); } + destroy() { + this.socket.destroy(); + } + write(data, fn) { this.socket.write(data, fn); } diff --git a/packages/websocket/lib/Socket.js b/packages/websocket/lib/Socket.js index 3227a11b..d48e5fa4 100644 --- a/packages/websocket/lib/Socket.js +++ b/packages/websocket/lib/Socket.js @@ -75,6 +75,10 @@ export default class Socket extends EventEmitter { this.socket.close(); } + destroy() { + this.socket.close(); + } + write(data, fn) { if (WebSocket === WS) { this.socket.send(data, fn); diff --git a/test/stream-management.js b/test/stream-management.js index 926bbc4d..5d8aacca 100644 --- a/test/stream-management.js +++ b/test/stream-management.js @@ -78,3 +78,25 @@ test("client retry stanzas", async () => { const el = await elP; expect(el.attrs.id).toEqual("ping"); }); + +test("client reconnect automatically", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + xmpp.streamManagement.timeout = 10; + xmpp.streamManagement.debounceAckRequest = 1; + debug(xmpp); + + const resumedP = promise(xmpp.streamManagement, "resumed"); + await xmpp.start(); + await xmpp.send( + + + , + ); + xmpp.socket.socket.pause(); + + await resumedP; + expect().pass(); +});