Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce broadcast API for event sharing #884

Merged
merged 21 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f6ec48a
Implement subscription to broadcast events
gwbaik9717 Aug 20, 2024
6284cfe
Implement publishing broadcast events
gwbaik9717 Aug 20, 2024
0472c0a
Decode broadcasted payload
gwbaik9717 Aug 21, 2024
a1442b7
Add validation logic for serializable payload
gwbaik9717 Aug 21, 2024
323f981
Add test case for throwing error when trying to broadcast unserialize…
gwbaik9717 Aug 21, 2024
b692441
Fix bug in subscribeBroadcastEvent method
gwbaik9717 Aug 21, 2024
66c996a
Add test case for successfully broadcast serializeable payload
gwbaik9717 Aug 22, 2024
bc8b66c
Merge branch 'main' into broadcast-api
gwbaik9717 Aug 22, 2024
b0b2766
Add test cases for subscribing and unsubscribing broadcast events
gwbaik9717 Aug 22, 2024
06d37fa
Modify interface for subscribing broadcast events
gwbaik9717 Aug 26, 2024
8060985
Refactor the broadcast method to be called directly by the document o…
gwbaik9717 Aug 26, 2024
4d516be
Fix lint errors
gwbaik9717 Aug 26, 2024
69650c0
Refactor test code to use EventCollector
gwbaik9717 Aug 27, 2024
86035b8
Refactor by removing unnecessary broadcastEventHanlders
gwbaik9717 Aug 28, 2024
41da599
Refactor test codes
chacha912 Aug 28, 2024
bc49062
Merge branch 'main' into broadcast-api
gwbaik9717 Aug 28, 2024
5ac5e2b
Refactor Broadcast Subscription Interface to Enable Manual Topic Comp…
gwbaik9717 Aug 29, 2024
fe3c0e9
Remove client from document to prevent circular references
gwbaik9717 Aug 29, 2024
e88e3c1
Handle the case when broadcast event fails
gwbaik9717 Aug 30, 2024
43ae22c
Refactor test code to remove undeterministic Promise
gwbaik9717 Aug 30, 2024
ffe525f
Fix bug where publisher receives its own broadcast event
gwbaik9717 Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation';
import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor';
import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor';
import { validateSerializable } from '../util/validator';

/**
* `SyncMode` defines synchronization modes for the PushPullChanges API.
Expand Down Expand Up @@ -584,6 +585,59 @@ export class Client {
return this.conditions[condition];
}

/**
* `broadcast` broadcasts the given payload to the given topic.
*/
public broadcast<T, P extends Indexable>(
sejongk marked this conversation as resolved.
Show resolved Hide resolved
doc: Document<T, P>,
topic: string,
payload: any,
): Promise<void> {
if (!this.isActive()) {
throw new YorkieError(
Code.ErrClientNotActivated,
`${this.key} is not active`,
);
}
const attachment = this.attachmentMap.get(doc.getKey());
if (!attachment) {
throw new YorkieError(
Code.ErrDocumentNotAttached,
`${doc.getKey()} is not attached`,
);
}

if (!validateSerializable(payload)) {
throw new YorkieError(
Code.ErrInvalidArgument,
'payload is not serializable',
);
}

return this.enqueueTask(async () => {
return this.rpcClient
.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then(() => {
logger.info(
`[BC] c:"${this.getKey()}" broadcasts d:"${doc.getKey()}" t:"${topic}"`,
);
})
.catch((err) => {
logger.error(`[BC] c:"${this.getKey()}" err :`, err);
this.handleConnectError(err);
throw err;
});
});
}

/**
* `runSyncLoop` runs the sync loop. The sync loop pushes local changes to
* the server and pulls remote changes from the server.
Expand Down
75 changes: 73 additions & 2 deletions packages/sdk/src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ export enum DocEventType {
* `PresenceChanged` means that the presences of the client has updated.
*/
PresenceChanged = 'presence-changed',

/**
* `Broadcast` means that the message is broadcasted to clients who subscribe to the event.
*/
Broadcast = 'broadcast',
}

/**
Expand All @@ -191,7 +196,8 @@ export type DocEvent<P extends Indexable = Indexable, T = OperationInfo> =
| InitializedEvent<P>
| WatchedEvent<P>
| UnwatchedEvent<P>
| PresenceChangedEvent<P>;
| PresenceChangedEvent<P>
| BroadcastEvent;

/**
* `TransactionEvent` represents document events that occur within
Expand Down Expand Up @@ -371,6 +377,11 @@ export interface PresenceChangedEvent<P extends Indexable>
value: { clientID: ActorID; presence: P };
}

export interface BroadcastEvent extends BaseDocEvent {
type: DocEventType.Broadcast;
value: { topic: string; payload: any };
}

type DocEventCallbackMap<P extends Indexable> = {
default: NextFn<
| SnapshotEvent
Expand Down Expand Up @@ -589,6 +600,15 @@ export class Document<T, P extends Indexable = Indexable> {
*/
private isUpdating: boolean;

/**
* `broadcastEventHandlers` is a map of broadcast event handlers.
* The key is the topic of the broadcast event, and the value is the handler.
*/
private broadcastEventHandlers: Map<
string,
(topic: string, payload: any) => void
>;

constructor(key: string, opts?: DocumentOptions) {
this.opts = opts || {};

Expand Down Expand Up @@ -616,6 +636,8 @@ export class Document<T, P extends Indexable = Indexable> {
redo: this.redo.bind(this),
};

this.broadcastEventHandlers = new Map();

setupDevtools(this);
}

Expand Down Expand Up @@ -1027,6 +1049,36 @@ export class Document<T, P extends Indexable = Indexable> {
throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`);
}

/**
* subscribeBroadcastEvent registers a callback to subscribe to broadcast events
* on the document. The callback will be called when the document receives a
* broadcast event with the given topic.
*/
public subscribeBroadcastEvent(
sejongk marked this conversation as resolved.
Show resolved Hide resolved
topic: string,
handler: (topic: string, payload: any) => void,
error?: ErrorFn,
): Unsubscribe {
this.broadcastEventHandlers.set(topic, handler);

const unsubscribe = this.eventStream.subscribe((event) => {
for (const docEvent of event) {
if (docEvent.type !== DocEventType.Broadcast) {
continue;
}

if (docEvent.value.topic === topic) {
handler(topic, docEvent.value.payload);
}
}
}, error);

return () => {
unsubscribe();
this.broadcastEventHandlers.delete(topic);
};
}

/**
* `publish` triggers an event in this document, which can be received by
* callback functions from document.subscribe().
Expand Down Expand Up @@ -1468,7 +1520,8 @@ export class Document<T, P extends Indexable = Indexable> {

if (resp.body.case === 'event') {
const { type, publisher } = resp.body.value;
const event: Array<WatchedEvent<P> | UnwatchedEvent<P>> = [];
const event: Array<WatchedEvent<P> | UnwatchedEvent<P> | BroadcastEvent> =
[];
if (type === PbDocEventType.DOCUMENT_WATCHED) {
this.addOnlineClient(publisher);
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
Expand All @@ -1495,6 +1548,16 @@ export class Document<T, P extends Indexable = Indexable> {
value: { clientID: publisher, presence },
});
}
} else if (type === PbDocEventType.DOCUMENT_BROADCAST) {
if (resp.body.value.body) {
const { topic, payload } = resp.body.value.body;
const decoder = new TextDecoder();

event.push({
type: DocEventType.Broadcast,
value: { topic, payload: JSON.parse(decoder.decode(payload)) },
});
}
}

if (event.length > 0) {
Expand Down Expand Up @@ -1584,6 +1647,14 @@ export class Document<T, P extends Indexable = Indexable> {
const { clientID, presence } = event.value;
this.presences.set(clientID, presence);
}

if (event.type === DocEventType.Broadcast) {
const { topic, payload } = event.value;
const handler = this.broadcastEventHandlers.get(topic);
if (handler) {
handler(topic, payload);
}
}
}

/**
Expand Down
31 changes: 31 additions & 0 deletions packages/sdk/src/util/validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* `validateSerializable` returns whether the given value is serializable or not.
*/
export const validateSerializable = (value: any): boolean => {
try {
const serialized = JSON.stringify(value);

if (serialized === undefined) {
return false;
}
} catch (error) {
return false;
}
return true;
};
126 changes: 126 additions & 0 deletions packages/sdk/test/integration/client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -863,4 +863,130 @@ describe.sequential('Client', function () {
assert.equal(d1.toSortedJSON(), d2.toSortedJSON());
}, task.name);
});

it('Should successfully broadcast serializeable payload', async ({
task,
}) => {
const cli = new yorkie.Client(testRPCAddr);
await cli.activate();

const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`));
await cli.attach(doc);

const broadcastTopic = 'test';
const payload = { a: 1, b: '2' };

expect(async () =>
cli.broadcast(doc, broadcastTopic, payload),
).not.toThrow();

await cli.deactivate();
});

it('Should throw error when broadcasting unserializeable payload', async ({
task,
}) => {
const cli = new yorkie.Client(testRPCAddr);
await cli.activate();

const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`));
await cli.attach(doc);

// broadcast unserializable payload
const payload = () => {};
const broadcastTopic = 'test';

expect(async () =>
cli.broadcast(doc, broadcastTopic, payload),
).rejects.toThrowErrorCode(Code.ErrInvalidArgument);

await cli.deactivate();
});
});

it('Should trigger the handler for a subscribed broadcast event', async ({
task,
}) => {
await withTwoClientsAndDocuments<{ t: Text }>(
async (c1, d1, c2, d2) => {
const spy = vi.fn();

const broadcastTopic = 'test';
const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic, spy);

const payload = { a: 1, b: '2' };
await c1.broadcast(d1, broadcastTopic, payload);

// Assuming that every subscriber can receive the broadcast event within 1000ms.
await new Promise((res) => setTimeout(res, 1000));
chacha912 marked this conversation as resolved.
Show resolved Hide resolved

expect(spy.mock.calls.length).toBe(1);
expect(spy.mock.calls[0][0]).toBe(broadcastTopic);
expect(spy.mock.calls[0][1]).toEqual(payload);

unsubscribe();
},
task.name,
SyncMode.Realtime,
);
});

it('Should not trigger the handler for an unsubscribed broadcast event', async ({
task,
}) => {
await withTwoClientsAndDocuments<{ t: Text }>(
async (c1, d1, c2, d2) => {
const spy = vi.fn();

const broadcastTopic1 = 'test1';
const broadcastTopic2 = 'test2';

const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic2, spy);

const payload = { a: 1, b: '2' };
await c1.broadcast(d1, broadcastTopic1, payload);

// Assuming that every subscriber can receive the broadcast event within 1000ms.
await new Promise((res) => setTimeout(res, 1000));

expect(spy.mock.calls.length).toBe(0);

unsubscribe();
},
task.name,
SyncMode.Realtime,
);
});

it('Should not trigger the handler for a broadcast event after unsubscribing', async ({
task,
}) => {
await withTwoClientsAndDocuments<{ t: Text }>(
async (c1, d1, c2, d2) => {
const spy = vi.fn();

const broadcastTopic = 'test';
const unsubscribe = d2.subscribeBroadcastEvent(broadcastTopic, spy);

const payload = { a: 1, b: '2' };
await c1.broadcast(d1, broadcastTopic, payload);

// Assuming that every subscriber can receive the broadcast event within 1000ms.
await new Promise((res) => setTimeout(res, 1000));

expect(spy.mock.calls.length).toBe(1);

unsubscribe();

await c1.broadcast(d1, broadcastTopic, payload);

// Assuming that every subscriber can receive the broadcast event within 1000ms.
await new Promise((res) => setTimeout(res, 1000));

// No change in the number of calls
expect(spy.mock.calls.length).toBe(1);
},
task.name,
SyncMode.Realtime,
);
});
5 changes: 3 additions & 2 deletions packages/sdk/test/integration/integration_helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export async function withTwoClientsAndDocuments<T>(
d2: Document<T>,
) => Promise<void>,
title: string,
syncMode: SyncMode = SyncMode.Manual,
): Promise<void> {
const client1 = new yorkie.Client(testRPCAddr);
const client2 = new yorkie.Client(testRPCAddr);
Expand All @@ -31,8 +32,8 @@ export async function withTwoClientsAndDocuments<T>(
const doc1 = new yorkie.Document<T>(docKey);
const doc2 = new yorkie.Document<T>(docKey);

await client1.attach(doc1, { syncMode: SyncMode.Manual });
await client2.attach(doc2, { syncMode: SyncMode.Manual });
await client1.attach(doc1, { syncMode });
await client2.attach(doc2, { syncMode });

await callback(client1, doc1, client2, doc2);

Expand Down
Loading