diff --git a/packages/stream-management/bind2.js b/packages/stream-management/bind2.js
new file mode 100644
index 00000000..96e278a2
--- /dev/null
+++ b/packages/stream-management/bind2.js
@@ -0,0 +1,19 @@
+import { NS, makeEnableElement } from "./index.js";
+
+export function setupBind2({ bind2, sm, failed, enabled }) {
+ bind2.use(
+ NS,
+ // https://xmpp.org/extensions/xep-0198.html#inline-examples
+ (_element) => {
+ return makeEnableElement({ sm });
+ },
+ async (element) => {
+ if (element.is("enabled")) {
+ enabled(element.attrs);
+ } else if (element.is("failed")) {
+ // const error = StreamError.fromElement(element)
+ failed();
+ }
+ },
+ );
+}
diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js
index 6980585e..5fb03c31 100644
--- a/packages/stream-management/index.js
+++ b/packages/stream-management/index.js
@@ -1,44 +1,26 @@
-import XMPPError from "@xmpp/error";
-import { EventEmitter, procedure } from "@xmpp/events";
+import { EventEmitter } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";
+import { setupBind2 } from "./bind2.js";
+import { setupSasl2 } from "./sasl2.js";
+import { setupStreamFeature } from "./stream-feature.js";
// https://xmpp.org/extensions/xep-0198.html
-const NS = "urn:xmpp:sm:3";
+export const NS = "urn:xmpp:sm:3";
-function makeEnableElement({ sm }) {
+export function makeEnableElement({ sm }) {
return xml("enable", {
xmlns: NS,
max: sm.preferredMaximum,
- resume: sm.allowResume ? "true" : undefined,
+ resume: "true",
});
}
-function makeResumeElement({ sm }) {
+export function makeResumeElement({ sm }) {
return xml("resume", { xmlns: NS, h: sm.inbound, previd: sm.id });
}
-function enable(entity, sm) {
- return procedure(entity, makeEnableElement({ sm }), (element, done) => {
- if (element.is("enabled", NS)) {
- return done(element);
- } else if (element.is("failed", NS)) {
- throw XMPPError.fromElement(element);
- }
- });
-}
-
-async function resume(entity, sm) {
- return procedure(entity, makeResumeElement({ sm }), (element, done) => {
- if (element.is("resumed", NS)) {
- return done(element);
- } else if (element.is("failed", NS)) {
- throw XMPPError.fromElement(element);
- }
- });
-}
-
export default function streamManagement({
streamFeatures,
entity,
@@ -51,7 +33,6 @@ export default function streamManagement({
const sm = new EventEmitter();
Object.assign(sm, {
- allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
@@ -171,6 +152,7 @@ export default function streamManagement({
clearTimeout(requestAckTimeout);
if (!sm.enabled) return;
+ if (!timeout) return;
requestAckTimeout = setTimeout(requestAck, timeout);
}
@@ -218,92 +200,6 @@ export default function streamManagement({
return sm;
}
-function setupStreamFeature({
- streamFeatures,
- sm,
- entity,
- resumed,
- failed,
- enabled,
-}) {
- // https://xmpp.org/extensions/xep-0198.html#enable
- // For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session
- streamFeatures.use("sm", NS, async (context, next) => {
- // Resuming
- if (sm.id) {
- try {
- const element = await resume(entity, sm);
- await resumed(element);
- return;
- // If resumption fails, continue with session establishment
- } catch {
- failed();
- }
- }
-
- // Enabling
-
- // Resource binding first
- await next();
-
- const promiseEnable = enable(entity, sm);
-
- if (sm.outbound_q.length > 0) {
- throw new Error(
- "Stream Management assertion failure, queue should be empty after enable",
- );
- }
-
- // > The counter for an entity's own sent stanzas is set to zero and started after sending either or .
- sm.outbound = 0;
-
- try {
- const response = await promiseEnable;
- enabled(response.attrs);
- } catch {
- sm.enabled = false;
- }
-
- sm.inbound = 0;
- });
-}
-
-function setupSasl2({ sasl2, sm, failed, resumed }) {
- sasl2.use(
- "urn:xmpp:sm:3",
- (element) => {
- if (!element.is("sm")) return;
- if (sm.id) return makeResumeElement({ sm });
- },
- (element) => {
- if (element.is("resumed")) {
- resumed(element);
- } else if (element.is(failed)) {
- // const error = StreamError.fromElement(element)
- failed();
- }
- },
- );
-}
-
-function setupBind2({ bind2, sm, failed, enabled }) {
- bind2.use(
- "urn:xmpp:sm:3",
- // https://xmpp.org/extensions/xep-0198.html#inline-examples
- (_element) => {
- return makeEnableElement({ sm });
- },
- (element) => {
- if (element.is("enabled")) {
- enabled(element.attrs);
- } else if (element.is("failed")) {
- // const error = StreamError.fromElement(element)
- failed();
- }
- },
- );
-}
-
function queueToStanza({ entity, item }) {
const { stanza, stamp } = item;
if (
diff --git a/packages/stream-management/sasl2.js b/packages/stream-management/sasl2.js
new file mode 100644
index 00000000..877b857b
--- /dev/null
+++ b/packages/stream-management/sasl2.js
@@ -0,0 +1,19 @@
+import { NS, makeResumeElement } from "./index.js";
+
+export function setupSasl2({ sasl2, sm, failed, resumed }) {
+ sasl2.use(
+ NS,
+ (element) => {
+ if (!element.is("sm")) return;
+ if (sm.id) return makeResumeElement({ sm });
+ },
+ (element) => {
+ if (element.is("resumed")) {
+ resumed(element);
+ } else if (element.is(failed)) {
+ // const error = StreamError.fromElement(element)
+ failed();
+ }
+ },
+ );
+}
diff --git a/packages/stream-management/stream-feature.js b/packages/stream-management/stream-feature.js
new file mode 100644
index 00000000..9fc4e972
--- /dev/null
+++ b/packages/stream-management/stream-feature.js
@@ -0,0 +1,74 @@
+import XMPPError from "@xmpp/error";
+import { procedure } from "@xmpp/events";
+
+import { NS, makeEnableElement, makeResumeElement } from "./index.js";
+
+export function setupStreamFeature({
+ streamFeatures,
+ sm,
+ entity,
+ resumed,
+ failed,
+ enabled,
+}) {
+ // https://xmpp.org/extensions/xep-0198.html#enable
+ // For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session
+ streamFeatures.use("sm", NS, async (context, next) => {
+ // Resuming
+ if (sm.id) {
+ try {
+ const element = await resume(entity, sm);
+ await resumed(element);
+ return;
+ // If resumption fails, continue with session establishment
+ } catch {
+ failed();
+ }
+ }
+
+ // Enabling
+
+ // Resource binding first
+ await next();
+
+ const promiseEnable = enable(entity, sm);
+
+ if (sm.outbound_q.length > 0) {
+ throw new Error(
+ "Stream Management assertion failure, queue should be empty after enable",
+ );
+ }
+
+ // > The counter for an entity's own sent stanzas is set to zero and started after sending either or .
+ sm.outbound = 0;
+
+ try {
+ const response = await promiseEnable;
+ enabled(response.attrs);
+ } catch {
+ sm.enabled = false;
+ }
+
+ sm.inbound = 0;
+ });
+}
+
+export function enable(entity, sm) {
+ return procedure(entity, makeEnableElement({ sm }), (element, done) => {
+ if (element.is("enabled", NS)) {
+ return done(element);
+ } else if (element.is("failed", NS)) {
+ throw XMPPError.fromElement(element);
+ }
+ });
+}
+
+export async function resume(entity, sm) {
+ return procedure(entity, makeResumeElement({ sm }), (element, done) => {
+ if (element.is("resumed", NS)) {
+ return done(element);
+ } else if (element.is("failed", NS)) {
+ throw XMPPError.fromElement(element);
+ }
+ });
+}
diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js
index 79a07292..bf2fac21 100644
--- a/packages/stream-management/stream-features.test.js
+++ b/packages/stream-management/stream-features.test.js
@@ -124,34 +124,6 @@ test("enable - failed", async () => {
expect(entity.streamManagement.enabled).toBe(false);
});
-test("stanza ack", async () => {
- const { entity } = mockClient();
-
- entity.streamManagement.enabled = true;
-
- expect(entity.streamManagement.outbound).toBe(0);
- expect(entity.streamManagement.outbound_q).toBeEmpty();
- // expect(entity.streamManagement.enabled).toBe(true);
-
- await entity.send();
-
- expect(entity.streamManagement.outbound).toBe(0);
- expect(entity.streamManagement.outbound_q).toHaveLength(1);
-
- let acks = 0;
- entity.streamManagement.on("ack", (stanza) => {
- expect(stanza.attrs.id).toBe("a");
- acks++;
- });
-
- entity.mockInput();
- await tick();
-
- expect(acks).toBe(1);
- expect(entity.streamManagement.outbound).toBe(1);
- expect(entity.streamManagement.outbound_q).toHaveLength(0);
-});
-
test("resume - resumed", async () => {
const { entity } = mockClient();
diff --git a/packages/stream-management/stream-management.test.js b/packages/stream-management/stream-management.test.js
new file mode 100644
index 00000000..0cfe5fae
--- /dev/null
+++ b/packages/stream-management/stream-management.test.js
@@ -0,0 +1,46 @@
+import { mockClient } from "@xmpp/test";
+
+import { tick } from "@xmpp/events";
+
+test("emits ack when the server ackownledge stanzas", async () => {
+ const { entity } = mockClient();
+
+ entity.streamManagement.enabled = true;
+
+ expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toBeEmpty();
+ // expect(entity.streamManagement.enabled).toBe(true);
+
+ await entity.send();
+
+ expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toHaveLength(1);
+
+ let acks = 0;
+ entity.streamManagement.on("ack", (stanza) => {
+ expect(stanza.attrs.id).toBe("a");
+ acks++;
+ });
+
+ entity.mockInput();
+ await tick();
+
+ expect(acks).toBe(1);
+ expect(entity.streamManagement.outbound).toBe(1);
+ expect(entity.streamManagement.outbound_q).toHaveLength(0);
+});
+
+test("sends an element before closing", async () => {
+ const { entity, streamManagement } = mockClient();
+ streamManagement.enabled = true;
+ streamManagement.inbound = 42;
+ entity.status = "online";
+
+ const promise_disconnect = entity.disconnect();
+
+ expect(await entity.catchOutgoing()).toEqual(
+ ,
+ );
+
+ await promise_disconnect;
+});
diff --git a/server/prosody.cfg.lua b/server/prosody.cfg.lua
index 870055de..8a5a48b7 100644
--- a/server/prosody.cfg.lua
+++ b/server/prosody.cfg.lua
@@ -26,6 +26,7 @@ modules_enabled = {
"sasl2_bind2";
"sasl2_sm";
"sasl2_fast";
+ "csi_simple";
};
modules_disabled = {