Skip to content

Commit 3ba0f16

Browse files
committed
feat(sds): adds ephemeral messages, delivered message callback and event
1 parent ea6daae commit 3ba0f16

File tree

3 files changed

+140
-13
lines changed

3 files changed

+140
-13
lines changed

packages/sds/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
"node": ">=20"
6060
},
6161
"dependencies": {
62+
"@libp2p/interface": "^2.7.0",
6263
"@noble/hashes": "^1.7.1",
6364
"@waku/message-hash": "^0.1.18",
6465
"@waku/proto": "^0.0.9",

packages/sds/src/sds.spec.ts

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { DefaultBloomFilter } from "./bloom.js";
55
import {
66
DEFAULT_BLOOM_FILTER_OPTIONS,
77
Message,
8-
MessageChannel
8+
MessageChannel,
9+
MessageChannelEvent
910
} from "./sds.js";
1011

1112
const channelId = "test-channel";
@@ -399,12 +400,10 @@ describe("MessageChannel", function () {
399400
it("should remove messages without delivering if timeout is exceeded", async () => {
400401
const causalHistorySize = (channelA as any).causalHistorySize;
401402
// Create a channel with very very short timeout
402-
const channelC: MessageChannel = new MessageChannel(
403-
channelId,
404-
causalHistorySize,
405-
true,
406-
10
407-
);
403+
const channelC: MessageChannel = new MessageChannel(channelId, {
404+
receivedMessageTimeoutEnabled: true,
405+
receivedMessageTimeout: 10
406+
});
408407

409408
for (const m of messagesA) {
410409
await channelA.sendMessage(utf8ToBytes(m), callback);
@@ -547,4 +546,56 @@ describe("MessageChannel", function () {
547546
);
548547
});
549548
});
549+
550+
describe("Ephemeral messages", () => {
551+
beforeEach(() => {
552+
channelA = new MessageChannel(channelId);
553+
});
554+
555+
it("should be sent without a timestamp, causal history, or bloom filter", () => {
556+
const timestampBefore = (channelA as any).lamportTimestamp;
557+
channelA.sendEphemeralMessage(new Uint8Array(), (message) => {
558+
expect(message.lamportTimestamp).to.equal(undefined);
559+
expect(message.causalHistory).to.deep.equal([]);
560+
expect(message.bloomFilter).to.equal(undefined);
561+
return true;
562+
});
563+
564+
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
565+
expect(outgoingBuffer.length).to.equal(0);
566+
567+
const timestampAfter = (channelA as any).lamportTimestamp;
568+
expect(timestampAfter).to.equal(timestampBefore);
569+
});
570+
571+
it("should be delivered immediately if received", async () => {
572+
let deliveredMessageId: string | undefined;
573+
let sentMessage: Message | undefined;
574+
575+
const channelB = new MessageChannel(channelId, {
576+
deliveredMessageCallback: (messageId) => {
577+
deliveredMessageId = messageId;
578+
}
579+
});
580+
581+
const waitForMessageDelivered = new Promise<string>((resolve) => {
582+
channelB.addEventListener(
583+
MessageChannelEvent.MessageDelivered,
584+
(event) => {
585+
resolve(event.detail);
586+
}
587+
);
588+
589+
channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => {
590+
sentMessage = message;
591+
channelB.receiveMessage(message);
592+
return true;
593+
});
594+
});
595+
596+
const eventMessageId = await waitForMessageDelivered;
597+
expect(deliveredMessageId).to.equal(sentMessage?.messageId);
598+
expect(eventMessageId).to.equal(sentMessage?.messageId);
599+
});
600+
});
550601
});

packages/sds/src/sds.ts

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1+
import { TypedEventEmitter } from "@libp2p/interface";
12
import { sha256 } from "@noble/hashes/sha256";
23
import { bytesToHex } from "@noble/hashes/utils";
34
import { proto_sds_message } from "@waku/proto";
45

56
import { DefaultBloomFilter } from "./bloom.js";
67

8+
export enum MessageChannelEvent {
9+
MessageDelivered = "messageDelivered"
10+
}
11+
type MessageChannelEvents = {
12+
[MessageChannelEvent.MessageDelivered]: CustomEvent<string>;
13+
};
14+
715
export type Message = proto_sds_message.SdsMessage;
816
export type ChannelId = string;
917

@@ -15,7 +23,14 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = {
1523
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
1624
const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes
1725

18-
export class MessageChannel {
26+
interface MessageChannelOptions {
27+
causalHistorySize?: number;
28+
receivedMessageTimeoutEnabled?: boolean;
29+
receivedMessageTimeout?: number;
30+
deliveredMessageCallback?: (messageId: string) => void;
31+
}
32+
33+
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
1934
private lamportTimestamp: number;
2035
private filter: DefaultBloomFilter;
2136
private outgoingBuffer: Message[];
@@ -26,23 +41,31 @@ export class MessageChannel {
2641
private causalHistorySize: number;
2742
private acknowledgementCount: number;
2843
private timeReceived: Map<string, number>;
44+
private receivedMessageTimeoutEnabled: boolean;
45+
private receivedMessageTimeout: number;
46+
private deliveredMessageCallback?: (messageId: string) => void;
2947

3048
public constructor(
3149
channelId: ChannelId,
32-
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE,
33-
private receivedMessageTimeoutEnabled: boolean = false,
34-
private receivedMessageTimeout: number = DEFAULT_RECEIVED_MESSAGE_TIMEOUT
50+
options: MessageChannelOptions = {}
3551
) {
52+
super();
3653
this.channelId = channelId;
3754
this.lamportTimestamp = 0;
3855
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
3956
this.outgoingBuffer = [];
4057
this.acknowledgements = new Map();
4158
this.incomingBuffer = [];
4259
this.messageIdLog = [];
43-
this.causalHistorySize = causalHistorySize;
60+
this.causalHistorySize =
61+
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
4462
this.acknowledgementCount = this.getAcknowledgementCount();
4563
this.timeReceived = new Map();
64+
this.receivedMessageTimeoutEnabled =
65+
options.receivedMessageTimeoutEnabled ?? false;
66+
this.receivedMessageTimeout =
67+
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
68+
this.deliveredMessageCallback = options.deliveredMessageCallback;
4669
}
4770

4871
public static getMessageId(payload: Uint8Array): string {
@@ -95,6 +118,36 @@ export class MessageChannel {
95118
}
96119
}
97120

121+
/**
122+
* Sends a short-lived message without synchronization or reliability requirements.
123+
*
124+
* Sends a message without a timestamp, causal history, or bloom filter.
125+
* Ephemeral messages are not added to the outgoing buffer.
126+
* Upon reception, ephemeral messages are delivered immediately without
127+
* checking for causal dependencies or including in the local log.
128+
*
129+
* See https://rfc.vac.dev/vac/raw/sds/#ephemeral-messages
130+
*
131+
* @param payload - The payload to send.
132+
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
133+
*/
134+
public sendEphemeralMessage(
135+
payload: Uint8Array,
136+
callback?: (message: Message) => boolean
137+
): void {
138+
const message: Message = {
139+
messageId: MessageChannel.getMessageId(payload),
140+
channelId: this.channelId,
141+
content: payload,
142+
lamportTimestamp: undefined,
143+
causalHistory: [],
144+
bloomFilter: undefined
145+
};
146+
147+
if (callback) {
148+
callback(message);
149+
}
150+
}
98151
/**
99152
* Process a received SDS message for this channel.
100153
*
@@ -110,6 +163,11 @@ export class MessageChannel {
110163
* @param message - The received SDS message.
111164
*/
112165
public receiveMessage(message: Message): void {
166+
if (!message.lamportTimestamp) {
167+
// Messages with no timestamp are ephemeral messages and should be delivered immediately
168+
this.deliverMessage(message);
169+
return;
170+
}
113171
// review ack status
114172
this.reviewAckStatus(message);
115173
// add to bloom filter (skip for messages with empty content)
@@ -241,13 +299,19 @@ export class MessageChannel {
241299

242300
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
243301
private deliverMessage(message: Message): void {
302+
this.notifyDeliveredMessage(message.messageId);
303+
244304
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
245305
if (messageLamportTimestamp > this.lamportTimestamp) {
246306
this.lamportTimestamp = messageLamportTimestamp;
247307
}
248308

249-
if (message.content?.length === 0) {
309+
if (
310+
message.content?.length === 0 ||
311+
message.lamportTimestamp === undefined
312+
) {
250313
// Messages with empty content are sync messages.
314+
// Messages with no timestamp are ephemeral messages.
251315
// They are not added to the local log or bloom filter.
252316
return;
253317
}
@@ -312,4 +376,15 @@ export class MessageChannel {
312376
private getAcknowledgementCount(): number {
313377
return 2;
314378
}
379+
380+
private notifyDeliveredMessage(messageId: string): void {
381+
if (this.deliveredMessageCallback) {
382+
this.deliveredMessageCallback(messageId);
383+
}
384+
this.dispatchEvent(
385+
new CustomEvent(MessageChannelEvent.MessageDelivered, {
386+
detail: messageId
387+
})
388+
);
389+
}
315390
}

0 commit comments

Comments
 (0)