Skip to content

Commit

Permalink
stream-management: Split bind2, sasl2 and stream features (#1063)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnyp authored Jan 24, 2025
1 parent dba0d4a commit 1165c33
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 141 deletions.
19 changes: 19 additions & 0 deletions packages/stream-management/bind2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { NS, makeEnableElement } from "./index.js";

export function setupBind2({ bind2, sm, failed, enabled }) {
bind2.use(
NS,
// https://xmpp.org/extensions/xep-0198.html#inline-examples
(_element) => {
return makeEnableElement({ sm });
},
async (element) => {
if (element.is("enabled")) {
enabled(element.attrs);
} else if (element.is("failed")) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}
122 changes: 9 additions & 113 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,26 @@
import XMPPError from "@xmpp/error";
import { EventEmitter, procedure } from "@xmpp/events";
import { EventEmitter } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";
import { setupBind2 } from "./bind2.js";
import { setupSasl2 } from "./sasl2.js";
import { setupStreamFeature } from "./stream-feature.js";

// https://xmpp.org/extensions/xep-0198.html

const NS = "urn:xmpp:sm:3";
export const NS = "urn:xmpp:sm:3";

function makeEnableElement({ sm }) {
export function makeEnableElement({ sm }) {
return xml("enable", {
xmlns: NS,
max: sm.preferredMaximum,
resume: sm.allowResume ? "true" : undefined,
resume: "true",
});
}

function makeResumeElement({ sm }) {
export function makeResumeElement({ sm }) {
return xml("resume", { xmlns: NS, h: sm.inbound, previd: sm.id });
}

function enable(entity, sm) {
return procedure(entity, makeEnableElement({ sm }), (element, done) => {
if (element.is("enabled", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}

async function resume(entity, sm) {
return procedure(entity, makeResumeElement({ sm }), (element, done) => {
if (element.is("resumed", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}

export default function streamManagement({
streamFeatures,
entity,
Expand All @@ -51,7 +33,6 @@ export default function streamManagement({

const sm = new EventEmitter();
Object.assign(sm, {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
Expand Down Expand Up @@ -171,6 +152,7 @@ export default function streamManagement({
clearTimeout(requestAckTimeout);

if (!sm.enabled) return;
if (!timeout) return;

requestAckTimeout = setTimeout(requestAck, timeout);
}
Expand Down Expand Up @@ -218,92 +200,6 @@ export default function streamManagement({
return sm;
}

function setupStreamFeature({
streamFeatures,
sm,
entity,
resumed,
failed,
enabled,
}) {
// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session
streamFeatures.use("sm", NS, async (context, next) => {
// Resuming
if (sm.id) {
try {
const element = await resume(entity, sm);
await resumed(element);
return;
// If resumption fails, continue with session establishment
} catch {
failed();
}
}

// Enabling

// Resource binding first
await next();

const promiseEnable = enable(entity, sm);

if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty after enable",
);
}

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
sm.outbound = 0;

try {
const response = await promiseEnable;
enabled(response.attrs);
} catch {
sm.enabled = false;
}

sm.inbound = 0;
});
}

function setupSasl2({ sasl2, sm, failed, resumed }) {
sasl2.use(
"urn:xmpp:sm:3",
(element) => {
if (!element.is("sm")) return;
if (sm.id) return makeResumeElement({ sm });
},
(element) => {
if (element.is("resumed")) {
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}

function setupBind2({ bind2, sm, failed, enabled }) {
bind2.use(
"urn:xmpp:sm:3",
// https://xmpp.org/extensions/xep-0198.html#inline-examples
(_element) => {
return makeEnableElement({ sm });
},
(element) => {
if (element.is("enabled")) {
enabled(element.attrs);
} else if (element.is("failed")) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}

function queueToStanza({ entity, item }) {
const { stanza, stamp } = item;
if (
Expand Down
19 changes: 19 additions & 0 deletions packages/stream-management/sasl2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { NS, makeResumeElement } from "./index.js";

export function setupSasl2({ sasl2, sm, failed, resumed }) {
sasl2.use(
NS,
(element) => {
if (!element.is("sm")) return;
if (sm.id) return makeResumeElement({ sm });
},
(element) => {
if (element.is("resumed")) {
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}
74 changes: 74 additions & 0 deletions packages/stream-management/stream-feature.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import XMPPError from "@xmpp/error";
import { procedure } from "@xmpp/events";

import { NS, makeEnableElement, makeResumeElement } from "./index.js";

export function setupStreamFeature({
streamFeatures,
sm,
entity,
resumed,
failed,
enabled,
}) {
// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session
streamFeatures.use("sm", NS, async (context, next) => {
// Resuming
if (sm.id) {
try {
const element = await resume(entity, sm);
await resumed(element);
return;
// If resumption fails, continue with session establishment
} catch {
failed();
}
}

// Enabling

// Resource binding first
await next();

const promiseEnable = enable(entity, sm);

if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty after enable",
);
}

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
sm.outbound = 0;

try {
const response = await promiseEnable;
enabled(response.attrs);
} catch {
sm.enabled = false;
}

sm.inbound = 0;
});
}

export function enable(entity, sm) {
return procedure(entity, makeEnableElement({ sm }), (element, done) => {
if (element.is("enabled", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}

export async function resume(entity, sm) {
return procedure(entity, makeResumeElement({ sm }), (element, done) => {
if (element.is("resumed", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}
28 changes: 0 additions & 28 deletions packages/stream-management/stream-features.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,34 +124,6 @@ test("enable - failed", async () => {
expect(entity.streamManagement.enabled).toBe(false);
});

test("stanza ack", async () => {
const { entity } = mockClient();

entity.streamManagement.enabled = true;

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
// expect(entity.streamManagement.enabled).toBe(true);

await entity.send(<message id="a" />);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toHaveLength(1);

let acks = 0;
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

entity.mockInput(<a xmlns="urn:xmpp:sm:3" h="1" />);
await tick();

expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(1);
expect(entity.streamManagement.outbound_q).toHaveLength(0);
});

test("resume - resumed", async () => {
const { entity } = mockClient();

Expand Down
46 changes: 46 additions & 0 deletions packages/stream-management/stream-management.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { mockClient } from "@xmpp/test";

import { tick } from "@xmpp/events";

test("emits ack when the server ackownledge stanzas", async () => {
const { entity } = mockClient();

entity.streamManagement.enabled = true;

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
// expect(entity.streamManagement.enabled).toBe(true);

await entity.send(<message id="a" />);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toHaveLength(1);

let acks = 0;
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

entity.mockInput(<a xmlns="urn:xmpp:sm:3" h="1" />);
await tick();

expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(1);
expect(entity.streamManagement.outbound_q).toHaveLength(0);
});

test("sends an <a/> element before closing", async () => {
const { entity, streamManagement } = mockClient();
streamManagement.enabled = true;
streamManagement.inbound = 42;
entity.status = "online";

const promise_disconnect = entity.disconnect();

expect(await entity.catchOutgoing()).toEqual(
<a xmlns="urn:xmpp:sm:3" h={streamManagement.inbound} />,
);

await promise_disconnect;
});
1 change: 1 addition & 0 deletions server/prosody.cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ modules_enabled = {
"sasl2_bind2";
"sasl2_sm";
"sasl2_fast";
"csi_simple";
};

modules_disabled = {
Expand Down

0 comments on commit 1165c33

Please sign in to comment.