Skip to content

feat(sds): adds ephemeral messages, delivered message callback and event #2302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/sds/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"node": ">=20"
},
"dependencies": {
"@libp2p/interface": "^2.7.0",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make it strictly 2.7.0

"@noble/hashes": "^1.7.1",
"@waku/message-hash": "^0.1.18",
"@waku/proto": "^0.0.9",
Expand Down
65 changes: 58 additions & 7 deletions packages/sds/src/sds.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { DefaultBloomFilter } from "./bloom.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
Message,
MessageChannel
MessageChannel,
MessageChannelEvent
} from "./sds.js";

const channelId = "test-channel";
Expand Down Expand Up @@ -399,12 +400,10 @@ describe("MessageChannel", function () {
it("should remove messages without delivering if timeout is exceeded", async () => {
const causalHistorySize = (channelA as any).causalHistorySize;
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(
channelId,
causalHistorySize,
true,
10
);
const channelC: MessageChannel = new MessageChannel(channelId, {
receivedMessageTimeoutEnabled: true,
receivedMessageTimeout: 10
});

for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback);
Expand Down Expand Up @@ -547,4 +546,56 @@ describe("MessageChannel", function () {
);
});
});

describe("Ephemeral messages", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
});

it("should be sent without a timestamp, causal history, or bloom filter", () => {
const timestampBefore = (channelA as any).lamportTimestamp;
channelA.sendEphemeralMessage(new Uint8Array(), (message) => {
expect(message.lamportTimestamp).to.equal(undefined);
expect(message.causalHistory).to.deep.equal([]);
expect(message.bloomFilter).to.equal(undefined);
return true;
});

const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(0);

const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore);
});

it("should be delivered immediately if received", async () => {
let deliveredMessageId: string | undefined;
let sentMessage: Message | undefined;

const channelB = new MessageChannel(channelId, {
deliveredMessageCallback: (messageId) => {
deliveredMessageId = messageId;
}
});

const waitForMessageDelivered = new Promise<string>((resolve) => {
channelB.addEventListener(
MessageChannelEvent.MessageDelivered,
(event) => {
resolve(event.detail);
}
);

channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => {
sentMessage = message;
channelB.receiveMessage(message);
return true;
});
});

const eventMessageId = await waitForMessageDelivered;
expect(deliveredMessageId).to.equal(sentMessage?.messageId);
expect(eventMessageId).to.equal(sentMessage?.messageId);
});
});
});
87 changes: 81 additions & 6 deletions packages/sds/src/sds.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import { bytesToHex } from "@noble/hashes/utils";
import { proto_sds_message } from "@waku/proto";

import { DefaultBloomFilter } from "./bloom.js";

export enum MessageChannelEvent {
MessageDelivered = "messageDelivered"
}
type MessageChannelEvents = {
[MessageChannelEvent.MessageDelivered]: CustomEvent<string>;
};

export type Message = proto_sds_message.SdsMessage;
export type ChannelId = string;

Expand All @@ -15,7 +23,14 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = {
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes

export class MessageChannel {
interface MessageChannelOptions {
causalHistorySize?: number;
receivedMessageTimeoutEnabled?: boolean;
receivedMessageTimeout?: number;
deliveredMessageCallback?: (messageId: string) => void;
}

export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private lamportTimestamp: number;
private filter: DefaultBloomFilter;
private outgoingBuffer: Message[];
Expand All @@ -26,23 +41,31 @@ export class MessageChannel {
private causalHistorySize: number;
private acknowledgementCount: number;
private timeReceived: Map<string, number>;
private receivedMessageTimeoutEnabled: boolean;
private receivedMessageTimeout: number;
private deliveredMessageCallback?: (messageId: string) => void;

public constructor(
channelId: ChannelId,
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE,
private receivedMessageTimeoutEnabled: boolean = false,
private receivedMessageTimeout: number = DEFAULT_RECEIVED_MESSAGE_TIMEOUT
options: MessageChannelOptions = {}
) {
super();
this.channelId = channelId;
this.lamportTimestamp = 0;
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
this.outgoingBuffer = [];
this.acknowledgements = new Map();
this.incomingBuffer = [];
this.messageIdLog = [];
this.causalHistorySize = causalHistorySize;
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
this.acknowledgementCount = this.getAcknowledgementCount();
this.timeReceived = new Map();
this.receivedMessageTimeoutEnabled =
options.receivedMessageTimeoutEnabled ?? false;
this.receivedMessageTimeout =
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
this.deliveredMessageCallback = options.deliveredMessageCallback;
}

public static getMessageId(payload: Uint8Array): string {
Expand Down Expand Up @@ -95,6 +118,36 @@ export class MessageChannel {
}
}

/**
* Sends a short-lived message without synchronization or reliability requirements.
*
* Sends a message without a timestamp, causal history, or bloom filter.
* Ephemeral messages are not added to the outgoing buffer.
* Upon reception, ephemeral messages are delivered immediately without
* checking for causal dependencies or including in the local log.
*
* See https://rfc.vac.dev/vac/raw/sds/#ephemeral-messages
*
* @param payload - The payload to send.
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public sendEphemeralMessage(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally curious why you separate ephemeral from regular messages and not doing single send entry?

Copy link
Member Author

@adklempner adklempner Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I tried that but decided that:

  1. the logic is different enough that it doesn't look clean in the code
  2. the concept is different enough that I want it to be as explicit/obvious as possible for the library consumer when an ephemeral message is being used. I don't believe it will be used often but we will see

payload: Uint8Array,
callback?: (message: Message) => boolean
): void {
const message: Message = {
messageId: MessageChannel.getMessageId(payload),
channelId: this.channelId,
content: payload,
lamportTimestamp: undefined,
causalHistory: [],
bloomFilter: undefined
};

if (callback) {
callback(message);
}
}
/**
* Process a received SDS message for this channel.
*
Expand All @@ -110,6 +163,11 @@ export class MessageChannel {
* @param message - The received SDS message.
*/
public receiveMessage(message: Message): void {
if (!message.lamportTimestamp) {
// Messages with no timestamp are ephemeral messages and should be delivered immediately
this.deliverMessage(message);
return;
}
// review ack status
this.reviewAckStatus(message);
// add to bloom filter (skip for messages with empty content)
Expand Down Expand Up @@ -241,13 +299,19 @@ export class MessageChannel {

// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message): void {
this.notifyDeliveredMessage(message.messageId);

const messageLamportTimestamp = message.lamportTimestamp ?? 0;
if (messageLamportTimestamp > this.lamportTimestamp) {
this.lamportTimestamp = messageLamportTimestamp;
}

if (message.content?.length === 0) {
if (
message.content?.length === 0 ||
message.lamportTimestamp === undefined
) {
// Messages with empty content are sync messages.
// Messages with no timestamp are ephemeral messages.
// They are not added to the local log or bloom filter.
return;
}
Expand Down Expand Up @@ -312,4 +376,15 @@ export class MessageChannel {
private getAcknowledgementCount(): number {
return 2;
}

private notifyDeliveredMessage(messageId: string): void {
if (this.deliveredMessageCallback) {
this.deliveredMessageCallback(messageId);
}
this.dispatchEvent(
new CustomEvent(MessageChannelEvent.MessageDelivered, {
detail: messageId
})
);
}
}
Loading