Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-management: Split bind2, sasl2 and stream features #1063

Merged
merged 6 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/stream-management/bind2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { NS, makeEnableElement } from "./index.js";

export function setupBind2({ bind2, sm, failed, enabled }) {
bind2.use(
NS,
// https://xmpp.org/extensions/xep-0198.html#inline-examples
(_element) => {
return makeEnableElement({ sm });
},
async (element) => {
if (element.is("enabled")) {
enabled(element.attrs);
} else if (element.is("failed")) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}
122 changes: 9 additions & 113 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,26 @@
import XMPPError from "@xmpp/error";
import { EventEmitter, procedure } from "@xmpp/events";
import { EventEmitter } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";
import { setupBind2 } from "./bind2.js";
import { setupSasl2 } from "./sasl2.js";
import { setupStreamFeature } from "./stream-feature.js";

// https://xmpp.org/extensions/xep-0198.html

const NS = "urn:xmpp:sm:3";
export const NS = "urn:xmpp:sm:3";

function makeEnableElement({ sm }) {
export function makeEnableElement({ sm }) {
return xml("enable", {
xmlns: NS,
max: sm.preferredMaximum,
resume: sm.allowResume ? "true" : undefined,
resume: "true",
});
}

function makeResumeElement({ sm }) {
export function makeResumeElement({ sm }) {
return xml("resume", { xmlns: NS, h: sm.inbound, previd: sm.id });
}

function enable(entity, sm) {
return procedure(entity, makeEnableElement({ sm }), (element, done) => {
if (element.is("enabled", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}

async function resume(entity, sm) {
return procedure(entity, makeResumeElement({ sm }), (element, done) => {
if (element.is("resumed", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}

export default function streamManagement({
streamFeatures,
entity,
Expand All @@ -51,7 +33,6 @@ export default function streamManagement({

const sm = new EventEmitter();
Object.assign(sm, {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
Expand Down Expand Up @@ -171,6 +152,7 @@ export default function streamManagement({
clearTimeout(requestAckTimeout);

if (!sm.enabled) return;
if (!timeout) return;

requestAckTimeout = setTimeout(requestAck, timeout);
}
Expand Down Expand Up @@ -218,92 +200,6 @@ export default function streamManagement({
return sm;
}

function setupStreamFeature({
streamFeatures,
sm,
entity,
resumed,
failed,
enabled,
}) {
// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session
streamFeatures.use("sm", NS, async (context, next) => {
// Resuming
if (sm.id) {
try {
const element = await resume(entity, sm);
await resumed(element);
return;
// If resumption fails, continue with session establishment
} catch {
failed();
}
}

// Enabling

// Resource binding first
await next();

const promiseEnable = enable(entity, sm);

if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty after enable",
);
}

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
sm.outbound = 0;

try {
const response = await promiseEnable;
enabled(response.attrs);
} catch {
sm.enabled = false;
}

sm.inbound = 0;
});
}

function setupSasl2({ sasl2, sm, failed, resumed }) {
sasl2.use(
"urn:xmpp:sm:3",
(element) => {
if (!element.is("sm")) return;
if (sm.id) return makeResumeElement({ sm });
},
(element) => {
if (element.is("resumed")) {
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}

function setupBind2({ bind2, sm, failed, enabled }) {
bind2.use(
"urn:xmpp:sm:3",
// https://xmpp.org/extensions/xep-0198.html#inline-examples
(_element) => {
return makeEnableElement({ sm });
},
(element) => {
if (element.is("enabled")) {
enabled(element.attrs);
} else if (element.is("failed")) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}

function queueToStanza({ entity, item }) {
const { stanza, stamp } = item;
if (
Expand Down
19 changes: 19 additions & 0 deletions packages/stream-management/sasl2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { NS, makeResumeElement } from "./index.js";

export function setupSasl2({ sasl2, sm, failed, resumed }) {
sasl2.use(
NS,
(element) => {
if (!element.is("sm")) return;
if (sm.id) return makeResumeElement({ sm });
},
(element) => {
if (element.is("resumed")) {
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
}
},
);
}
74 changes: 74 additions & 0 deletions packages/stream-management/stream-feature.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import XMPPError from "@xmpp/error";
import { procedure } from "@xmpp/events";

import { NS, makeEnableElement, makeResumeElement } from "./index.js";

export function setupStreamFeature({
streamFeatures,
sm,
entity,
resumed,
failed,
enabled,
}) {
// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session
streamFeatures.use("sm", NS, async (context, next) => {
// Resuming
if (sm.id) {
try {
const element = await resume(entity, sm);
await resumed(element);
return;
// If resumption fails, continue with session establishment
} catch {
failed();
}
}

// Enabling

// Resource binding first
await next();

const promiseEnable = enable(entity, sm);

if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty after enable",
);
}

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
sm.outbound = 0;

try {
const response = await promiseEnable;
enabled(response.attrs);
} catch {
sm.enabled = false;
}

sm.inbound = 0;
});
}

export function enable(entity, sm) {
return procedure(entity, makeEnableElement({ sm }), (element, done) => {
if (element.is("enabled", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}

export async function resume(entity, sm) {
return procedure(entity, makeResumeElement({ sm }), (element, done) => {
if (element.is("resumed", NS)) {
return done(element);
} else if (element.is("failed", NS)) {
throw XMPPError.fromElement(element);
}
});
}
28 changes: 0 additions & 28 deletions packages/stream-management/stream-features.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,34 +124,6 @@ test("enable - failed", async () => {
expect(entity.streamManagement.enabled).toBe(false);
});

test("stanza ack", async () => {
const { entity } = mockClient();

entity.streamManagement.enabled = true;

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
// expect(entity.streamManagement.enabled).toBe(true);

await entity.send(<message id="a" />);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toHaveLength(1);

let acks = 0;
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

entity.mockInput(<a xmlns="urn:xmpp:sm:3" h="1" />);
await tick();

expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(1);
expect(entity.streamManagement.outbound_q).toHaveLength(0);
});

test("resume - resumed", async () => {
const { entity } = mockClient();

Expand Down
46 changes: 46 additions & 0 deletions packages/stream-management/stream-management.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { mockClient } from "@xmpp/test";

import { tick } from "@xmpp/events";

test("emits ack when the server ackownledge stanzas", async () => {
const { entity } = mockClient();

entity.streamManagement.enabled = true;

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toBeEmpty();
// expect(entity.streamManagement.enabled).toBe(true);

await entity.send(<message id="a" />);

expect(entity.streamManagement.outbound).toBe(0);
expect(entity.streamManagement.outbound_q).toHaveLength(1);

let acks = 0;
entity.streamManagement.on("ack", (stanza) => {
expect(stanza.attrs.id).toBe("a");
acks++;
});

entity.mockInput(<a xmlns="urn:xmpp:sm:3" h="1" />);
await tick();

expect(acks).toBe(1);
expect(entity.streamManagement.outbound).toBe(1);
expect(entity.streamManagement.outbound_q).toHaveLength(0);
});

test("sends an <a/> element before closing", async () => {
const { entity, streamManagement } = mockClient();
streamManagement.enabled = true;
streamManagement.inbound = 42;
entity.status = "online";

const promise_disconnect = entity.disconnect();

expect(await entity.catchOutgoing()).toEqual(
<a xmlns="urn:xmpp:sm:3" h={streamManagement.inbound} />,
);

await promise_disconnect;
});
1 change: 1 addition & 0 deletions server/prosody.cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ modules_enabled = {
"sasl2_bind2";
"sasl2_sm";
"sasl2_fast";
"csi_simple";
};

modules_disabled = {
Expand Down
Loading