diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3550a30 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.gitignore b/.gitignore index 4241c17..829d632 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules dist -*~ \ No newline at end of file +*~ +.direnv diff --git a/backend/app.ts b/backend/app.ts index a0e1365..80d8c95 100644 --- a/backend/app.ts +++ b/backend/app.ts @@ -84,7 +84,7 @@ export function createFrontend( app.post("/fake-update", (req, res) => { const instanceId = Array.from(instances.instances.keys())[0]; const instance = instances.instances.get(instanceId); - instance?.webXdc.sendUpdate({ payload: req.body }, "fake update"); + instance?.webXdc.sendUpdate({ payload: req.body }, ""); res.json({ status: "ok", }); diff --git a/backend/instance.ts b/backend/instance.ts index 5a7f57f..f011dd4 100644 --- a/backend/instance.ts +++ b/backend/instance.ts @@ -22,6 +22,11 @@ type SendUpdateMessage = { descr: string; }; +type SendRealtimeMessage = { + type: "sendRealtime"; + data: Uint8Array +}; + type SetUpdateListenerMessage = { type: "setUpdateListener"; serial: number; @@ -125,7 +130,13 @@ export class Instances { const parsed = JSON.parse(msg); // XXX should validate parsed if (isSendUpdateMessage(parsed)) { - instance.webXdc.sendUpdate(parsed.update, parsed.descr); + instance.webXdc.sendUpdate(parsed.update, ""); + } else if (isSendRealtimeMessage(parsed)) { + instance.webXdc.sendRealtimeData(parsed.data); + } else if (isSetRealtimeListenerMessage(parsed)) { + instance.webXdc.connectRealtime((data) => { + return broadcast(wss, JSON.stringify({ type: "sendRealtime", data })); + }); } else if (isSetUpdateListenerMessage(parsed)) { instance.webXdc.connect( (updates) => { @@ -217,6 +228,14 @@ function isSendUpdateMessage(value: any): value is SendUpdateMessage { return value.type === "sendUpdate"; } +function isSendRealtimeMessage(value: any): value is SendRealtimeMessage { + return value.type === "sendRealtime"; +} + +function isSetRealtimeListenerMessage(value: any): value is { type: "setRealtimeListener" } { + return value.type === "setRealtimeListener"; +} + function isSetUpdateListenerMessage( value: any, ): value is SetUpdateListenerMessage { diff --git a/backend/message.test.ts b/backend/message.test.ts index f6a1ec0..b9d365b 100644 --- a/backend/message.test.ts +++ b/backend/message.test.ts @@ -26,10 +26,10 @@ test("distribute to self", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], ]); expect(prepare(getMessages())).toEqual([ @@ -43,17 +43,43 @@ test("distribute to self", () => { serial: 1, max_serial: 1, }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, ]); }); + +test("Send realtime", () => { + const [getMessages, onMessage] = track(); + const processor = createProcessor(onMessage); + const client0 = processor.createClient("3001"); + const client1 = processor.createClient("3002"); + + const client0Heard: string[] = []; + const client1Heard: string[] = []; + + /* const rt0 = client0.joinRealtimeChannel() + const rt1 = client1.joinRealtimeChannel() + + const decoder = new TextDecoder() + rt0.setListener((data) => { client0Heard.push(decoder.decode(data)) }) + rt1.setListener((data) => { client1Heard.push(decoder.decode(data)) }) + + const encoder = new TextEncoder() + + rt0.send(new Uint8Array(encoder.encode("hi"))) + + expect(client1Heard).toMatchObject([ + "hi" + ]) + expect(client0Heard).toMatchObject([]) */ +}); test("distribute to self and other", () => { const [getMessages, onMessage] = track(); const processor = createProcessor(onMessage); @@ -73,15 +99,15 @@ test("distribute to self and other", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client1.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client1.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(prepare(getMessages())).toEqual([ @@ -93,37 +119,37 @@ test("distribute to self and other", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3002", - descr: "update", + descr: "", }, { type: "sent", instanceId: "3002", update: { payload: "Bye", serial: 2, max_serial: 2 }, - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3001", - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3002", - descr: "update 2", + descr: "", }, ]); }); @@ -146,14 +172,14 @@ test("setUpdateListener serial should skip older", () => { return true; }, 1); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); }); @@ -171,12 +197,12 @@ test("other starts listening later", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); // we only join later, so we haven't heard a thing yet expect(client1Heard).toMatchObject([]); @@ -187,12 +213,12 @@ test("other starts listening later", () => { }, 0); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 2 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 2 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(prepare(getMessages())).toEqual([ @@ -202,25 +228,25 @@ test("other starts listening later", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, { type: "sent", instanceId: "3001", update: { payload: "Bye", serial: 2, max_serial: 2 }, - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3001", - descr: "update 2", + descr: "", }, { type: "connect", instanceId: "3002" }, { type: "clear", instanceId: "3002" }, @@ -228,13 +254,13 @@ test("other starts listening later", () => { type: "received", update: { payload: "Hello", serial: 1, max_serial: 2 }, instanceId: "3002", - descr: "update", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3002", - descr: "update 2", + descr: "", }, ]); }); @@ -251,12 +277,12 @@ test("client is created later and needs to catch up", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); // we only join later, so we haven't heard a thing yet @@ -269,12 +295,12 @@ test("client is created later and needs to catch up", () => { }, 0); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 2 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 2 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); }); @@ -291,11 +317,11 @@ test("other starts listening later but is partially caught up", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); // we only join later, so we haven't heard a thing yet expect(client1Heard).toMatchObject([]); @@ -307,11 +333,11 @@ test("other starts listening later but is partially caught up", () => { }, 1); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); }); @@ -485,16 +511,16 @@ test("connect with clear means we get no catchup if no new updates", () => { }, ); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); // now we clear processor.clear(); expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", ]); @@ -516,8 +542,8 @@ test("connect with clear means we get no catchup if no new updates", () => { expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", ]); expect(client1Heard).toMatchObject(["cleared"]); @@ -543,18 +569,18 @@ test("connect with clear means catchup only with updates after clear", () => { }, ); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); processor.clear(); // the aftermath update, which the newly connecting client should get - client0.sendUpdate({ payload: "Aftermath" }, "update 3"); + client0.sendUpdate({ payload: "Aftermath" }, ""); expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", [{ payload: "Aftermath", serial: 1, max_serial: 1 }, "update 3"], ]); @@ -577,8 +603,8 @@ test("connect with clear means catchup only with updates after clear", () => { expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", [{ payload: "Aftermath", serial: 1, max_serial: 1 }, "update 3"], ]); @@ -594,25 +620,25 @@ test("connect with clear means catchup only with updates after clear", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, { type: "sent", instanceId: "3001", update: { payload: "Bye", serial: 2, max_serial: 2 }, - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3001", - descr: "update 2", + descr: "", }, { type: "clear", instanceId: "3001" }, { @@ -657,9 +683,9 @@ test("distribute to self and other, but other was disconnected", () => { return false; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], ]); expect(client1Heard).toMatchObject([]); @@ -672,13 +698,13 @@ test("distribute to self and other, but other was disconnected", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, ]); }); @@ -747,7 +773,7 @@ test("instanceColor", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, ""); const instanceColors = getMessages().map((message) => message.instanceColor); expect(instanceColors).toEqual([ @@ -789,7 +815,7 @@ test("timestamp", async () => { await waitFor(10); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, ""); await waitFor(10); diff --git a/backend/message.ts b/backend/message.ts index 573ab89..acebfc9 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -1,4 +1,5 @@ -import type { +import { + RealtimeListener as WebxdcRealtimeListener, ReceivedStatusUpdate, SendingStatusUpdate, Webxdc, @@ -12,6 +13,7 @@ type UpdateListenerMulti = ( type ClearListener = () => boolean; type DeleteListener = () => boolean; +type RealtimeListenerListener = (data: Uint8Array) => boolean type Connect = ( updateListener: UpdateListenerMulti, @@ -22,21 +24,70 @@ type Connect = ( export type WebXdcMulti = { connect: Connect; + connectRealtime: (listener: RealtimeListenerListener) => void; sendUpdate: Webxdc["sendUpdate"]; + sendRealtimeData: (data: Uint8Array) => void; }; export type UpdateDescr = [ReceivedStatusUpdate, string]; export type OnMessage = (message: Message) => void; +export type OnRealtime = (message: Message) => void; + export interface IProcessor { createClient(id: string): WebXdcMulti; clear(): void; removeClient(id: string): void; } +export class RealtimeListener implements WebxdcRealtimeListener { + private trashed = false; + private listener: (data: Uint8Array) => void = () => { } + + constructor( + public sendHook: (data: Uint8Array) => void = () => { }, + public setListenerHook: () => void = () => { }, + private leaveHook: () => void = () => { } + ) { } + + is_trashed(): boolean { + return this.trashed; + } + + receive(data: Uint8Array) { + if (this.trashed) { + throw new Error( + "realtime listener is trashed and can no longer be used", + ); + } + if (this.listener) { + this.listener(data); + } + } + + setListener(listener: (data: Uint8Array) => void) { + this.setListenerHook() + this.listener = listener; + } + + send(data: Uint8Array) { + if (!(data instanceof Uint8Array)) { + throw new Error("realtime listener data must be a Uint8Array"); + } + this.sendHook(data) + } + + leave() { + this.leaveHook() + this.trashed = true; + } +} + class Client implements WebXdcMulti { updateListener: UpdateListenerMulti | null = null; + realtimeListener: RealtimeListenerListener | null = null; + realtime: RealtimeListener | null = null; clearListener: ClearListener | null = null; updateSerial: number | null = null; deleteListener: DeleteListener | null = null; @@ -44,10 +95,39 @@ class Client implements WebXdcMulti { constructor( public processor: Processor, public id: string, - ) {} + ) { } + + sendUpdate(update: SendingStatusUpdate, descr: ""): void { + this.processor.distribute(this.id, update, ""); + } + + sendRealtimeData(data: Uint8Array): void { + this.processor.distributeRealtime(this.id, data); + } + + connectRealtime(listener: RealtimeListenerListener) { + this.processor.onMessage({ + type: "connect-realtime", + instanceId: this.id, + instanceColor: getColorForId(this.id), + timestamp: Date.now(), + }); + + const realtimeListener= (data: Uint8Array) => { + const hasReceived = listener(data); + if (hasReceived) { + this.processor.onMessage({ + type: "realtime-received", + data, + instanceId: this.id, + instanceColor: getColorForId(this.id), + timestamp: Date.now(), + }); + } + return hasReceived; + }; - sendUpdate(update: SendingStatusUpdate, descr: string): void { - this.processor.distribute(this.id, update, descr); + this.realtimeListener = realtimeListener } connect( @@ -113,6 +193,13 @@ class Client implements WebXdcMulti { this.updateListener([[update, descr]]); } + receiveRealtime(data: Uint8Array) { + if (this.realtimeListener == null) { + return; + } + this.realtimeListener(data); + } + clear() { if ( this.clearListener == null || @@ -139,7 +226,7 @@ class Processor implements IProcessor { updates: UpdateDescr[] = []; clearInstanceIds: Set = new Set(); - constructor(public onMessage: OnMessage) {} + constructor(public onMessage: OnMessage) { } createClient(id: string): WebXdcMulti { const client = new Client(this, id); @@ -153,6 +240,24 @@ class Processor implements IProcessor { this.clients.splice(client_index, 1); } + distributeRealtime( + instanceId: string, + data: Uint8Array, + ) { + this.onMessage({ + type: "sendRealtime", + instanceId: instanceId, + instanceColor: getColorForId(instanceId), + data, + timestamp: Date.now(), + }); + for (const client of this.clients) { + if (client.id != instanceId) { + client.receiveRealtime(data) + } + } + } + distribute( instanceId: string, update: SendingStatusUpdate, @@ -200,6 +305,6 @@ class Processor implements IProcessor { } } -export function createProcessor(onMessage: OnMessage = () => {}): IProcessor { +export function createProcessor(onMessage: OnMessage = () => { }): IProcessor { return new Processor(onMessage); } diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..e7ea89c --- /dev/null +++ b/flake.lock @@ -0,0 +1,25 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1731676054, + "narHash": "sha256-OZiZ3m8SCMfh3B6bfGC/Bm4x3qc1m2SVEAlkV6iY7Yg=", + "rev": "5e4fbfb6b3de1aa2872b76d49fafc942626e2add", + "revCount": 708622, + "type": "tarball", + "url": "https://api.flakehub.com/f/pinned/NixOS/nixpkgs/0.1.708622%2Brev-5e4fbfb6b3de1aa2872b76d49fafc942626e2add/0193363c-ab27-7bbd-af1d-3e6093ed5e2d/source.tar.gz" + }, + "original": { + "type": "tarball", + "url": "https://flakehub.com/f/NixOS/nixpkgs/0.1.%2A.tar.gz" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..cfa0a3d --- /dev/null +++ b/flake.nix @@ -0,0 +1,20 @@ +{ + description = "A Nix-flake-based Node.js development environment"; + + inputs.nixpkgs.url = "https://flakehub.com/f/NixOS/nixpkgs/0.1.*.tar.gz"; + + outputs = { self, nixpkgs }: + let + supportedSystems = [ "x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" ]; + forEachSupportedSystem = f: nixpkgs.lib.genAttrs supportedSystems (system: f { + pkgs = import nixpkgs { inherit system; }; + }); + in + { + devShells = forEachSupportedSystem ({ pkgs }: { + default = pkgs.mkShell { + packages = with pkgs; [ node2nix nodejs nodePackages.pnpm nodePackages.typescript-language-server ]; + }; + }); + }; +} diff --git a/package-lock.json b/package-lock.json index 2f95e6c..dd434f5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -46,7 +46,7 @@ "@types/node": "^18.0.0", "@types/node-fetch": "^2.6.2", "@types/wait-on": "^5.3.1", - "@webxdc/types": "^2.0.0", + "@webxdc/types": "^2.0.1", "babel-loader": "^8.2.5", "babel-preset-solid": "~1.5.0", "concurrently": "^7.2.2", @@ -3759,10 +3759,11 @@ } }, "node_modules/@webxdc/types": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.0.0.tgz", - "integrity": "sha512-gtddNq2PsUZZwESXOjpI3sHG185fm7QXEolm9sWVIM1iV0sD4Zs8jLL1MKjuYn04/WuX8jVDKDGBDv4qLimhlA==", - "dev": true + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.1.2.tgz", + "integrity": "sha512-oklcyHvUXqCS5JwbPVaN8tt7nSB8ffRmyrUlVt0HeSn4kDyNE46oKSbw3KtrDzl530KHnm67LfcK/AjWbBoXUA==", + "dev": true, + "license": "unlicense" }, "node_modules/@xtuc/ieee754": { "version": "1.2.0", @@ -13889,9 +13890,9 @@ "requires": {} }, "@webxdc/types": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.0.0.tgz", - "integrity": "sha512-gtddNq2PsUZZwESXOjpI3sHG185fm7QXEolm9sWVIM1iV0sD4Zs8jLL1MKjuYn04/WuX8jVDKDGBDv4qLimhlA==", + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.1.2.tgz", + "integrity": "sha512-oklcyHvUXqCS5JwbPVaN8tt7nSB8ffRmyrUlVt0HeSn4kDyNE46oKSbw3KtrDzl530KHnm67LfcK/AjWbBoXUA==", "dev": true }, "@xtuc/ieee754": { diff --git a/package.json b/package.json index 3c11f16..fde2f46 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,10 @@ "name": "Martijn Faassen", "email": "faassen@startifact.com", "url": "http://blog.startifact.com" + }, + { + "name": "Sebastian Klähn", + "email": "info@sebastian-klaehn.de" } ], "files": [ @@ -54,7 +58,7 @@ "@types/node": "^18.0.0", "@types/node-fetch": "^2.6.2", "@types/wait-on": "^5.3.1", - "@webxdc/types": "^2.0.0", + "@webxdc/types": "^2.0.1", "babel-loader": "^8.2.5", "babel-preset-solid": "~1.5.0", "concurrently": "^7.2.2", diff --git a/sim/create.ts b/sim/create.ts index 1445970..d27dbb1 100644 --- a/sim/create.ts +++ b/sim/create.ts @@ -1,10 +1,15 @@ -import { WebXdc, ReceivedStatusUpdate } from "@webxdc/types"; - +import { Webxdc, ReceivedStatusUpdate } from "@webxdc/types"; +import { RealtimeListener as RTL } from "../backend/message" type UpdatesMessage = { type: "updates"; updates: ReceivedStatusUpdate[]; }; +type SendRealtimeMessage = { + type: "sendRealtime" + data: Uint8Array +}; + type ClearMessage = { type: "clear"; }; @@ -23,7 +28,7 @@ type DeleteMessage = { type: "delete"; }; -type Message = UpdatesMessage | ClearMessage | InfoMessage | DeleteMessage; +type Message = UpdatesMessage | ClearMessage | InfoMessage | DeleteMessage | SendRealtimeMessage; export type TransportMessageCallback = (message: Message) => void; @@ -32,7 +37,7 @@ export type TransportConnectCallback = () => void; export type Transport = { send(data: any): void; onMessage(callback: TransportMessageCallback): void; - onConnect(callback: TransportConnectCallback): void; + onConnect(callback: TransportConnectCallback): void; // Socket connection cb clear(): void; address(): string; name(): string; @@ -44,14 +49,14 @@ type Log = (...args: any[]) => void; export function createWebXdc( transport: Transport, - log: Log = () => {}, -): WebXdc { + log: Log = () => { }, +): Webxdc { let resolveUpdateListenerPromise: (() => void) | null = null; - - const webXdc: WebXdc = { - sendUpdate: (update, descr) => { - transport.send({ type: "sendUpdate", update, descr }); - log("send", { update, descr }); + let realtime: RTL | null = null + const webXdc: Webxdc = { + sendUpdate: (update) => { + transport.send({ type: "sendUpdate", update }); + log("send", { update }); }, setUpdateListener: (listener, serial = 0): Promise => { transport.onMessage((message) => { @@ -64,6 +69,12 @@ export function createWebXdc( resolveUpdateListenerPromise(); resolveUpdateListenerPromise = null; } + } else if (isRealtimeMessage(message)) { + // TODO: move this out of setUpdateListener because otherwise + // You have to set an update listener such that realtime works + // Conversion to any because the actual data is a dict representation of Uint8Array + // This is due to JSON.stringify conversion. + realtime!.receive(new Uint8Array(Object.values(message.data as any))) } else if (isClearMessage(message)) { log("clear"); transport.clear(); @@ -73,6 +84,8 @@ export function createWebXdc( } else if (isDeleteMessage(message)) { log("delete"); window.top?.close(); + } else { + log("error", `Unhandled message ${message}`) } }); transport.onConnect(() => { @@ -92,17 +105,14 @@ export function createWebXdc( ); } - /** @type {(file: Blob) => Promise} */ - const blob_to_base64 = (file) => { + const blob_to_base64 = (file: Blob) => { const data_start = ";base64,"; return new Promise((resolve, reject) => { const reader = new FileReader(); reader.readAsDataURL(file); reader.onload = () => { - /** @type {string} */ - //@ts-ignore - let data = reader.result; - resolve(data.slice(data.indexOf(data_start) + data_start.length)); + let data: string = reader.result as string; + resolve(data!.slice(data!.indexOf(data_start) + data_start.length)); }; reader.onerror = () => reject(reader.error); }); @@ -143,13 +153,11 @@ export function createWebXdc( ); } } - const msg = `The app would now close and the user would select a chat to send this message:\nText: ${ - content.text ? `"${content.text}"` : "No Text" - }\nFile: ${ - content.file + const msg = `The app would now close and the user would select a chat to send this message:\nText: ${content.text ? `"${content.text}"` : "No Text" + }\nFile: ${content.file ? `${content.file.name} - ${base64Content.length} bytes` : "No File" - }`; + }`; if (content.file) { const confirmed = confirm( msg + "\n\nDownload the file in the browser instead?", @@ -191,6 +199,29 @@ export function createWebXdc( console.log(element); return promise; }, + + joinRealtimeChannel: () => { + realtime = new RTL(() => { }, + () => { + transport.send({ type: "setRealtimeListener" }) + }, + () => { + realtime = null + }); + transport.onConnect(() => { + realtime!.sendHook = (data) => { + transport.send({ type: "sendRealtime", data } as SendRealtimeMessage); + log("send realtime", { data }); + } + }) + return realtime + }, + getAllUpdates: () => { + console.log("[Webxdc] WARNING: getAllUpdates() is deprecated."); + return Promise.resolve([]); + }, + sendUpdateInterval: 1000, + sendUpdateMaxSize: 999999, selfAddr: transport.address(), selfName: transport.name(), }; @@ -201,6 +232,10 @@ function isUpdatesMessage(data: Message): data is UpdatesMessage { return data.type === "updates"; } +function isRealtimeMessage(data: Message): data is SendRealtimeMessage { + return data.type === "sendRealtime"; +} + function isClearMessage(data: Message): data is ClearMessage { return data.type === "clear"; } diff --git a/sim/webxdc.test.ts b/sim/webxdc.test.ts index 7a198d6..4e24315 100644 --- a/sim/webxdc.test.ts +++ b/sim/webxdc.test.ts @@ -27,8 +27,12 @@ class FakeTransport implements Transport { send(data: any) { if (data.type === "sendUpdate") { - const { update, descr } = data; - this.client.sendUpdate(update, descr); + const { update} = data; + this.client.sendUpdate(update, ""); + } else if (data.type === "sendRealtime") { + this.client.sendRealtimeData(data.data) + }else if (data.type === "joinRealtime") { + this.client.joinRealtimeChannel() } else if (data.type === "setUpdateListener") { this.client.connect( (updates) => { @@ -40,6 +44,15 @@ class FakeTransport implements Transport { } return true; }, + () => { + if (this.messageCallback != null) { + this.messageCallback({ + type: "sendRealtime", + data, + }); + } + return true; + }, data.serial, () => { if (this.messageCallback != null) { @@ -106,7 +119,7 @@ test("webxdc sends", async () => { }, 0); fakeTransport.connect(); await promise; - webXdc.sendUpdate({ payload: "hello" }, "sent 1"); + webXdc.sendUpdate({ payload: "hello" }, ""); expect(updates).toEqual([ { payload: "hello", @@ -148,7 +161,7 @@ test("webxdc distributes", async () => { fakeTransportB.connect(); await promiseB; - webXdcA.sendUpdate({ payload: "hello" }, "sent 1"); + webXdcA.sendUpdate({ payload: "hello" }, ""); expect(updatesA).toEqual([ { payload: "hello", diff --git a/sim/webxdc.ts b/sim/webxdc.ts index f7739a8..cb547b1 100644 --- a/sim/webxdc.ts +++ b/sim/webxdc.ts @@ -1,4 +1,4 @@ -import { WebXdc } from "@webxdc/types"; +import { Webxdc } from "@webxdc/types"; import { Transport, TransportMessageCallback, @@ -20,7 +20,7 @@ export class DevServerTransport implements Transport { constructor(url: string) { this.socket = new WebSocket(url); - this.promise = new Promise((resolve, reject) => { + this.promise = new Promise((resolve) => { this.resolveInfo = resolve; }); } @@ -71,7 +71,7 @@ export class DevServerTransport implements Transport { return new Promise((resolve, reject) => { const name = result?.name; console.log(`Deleting indexedDB database: ${name}`); - const request = window.indexedDB.deleteDatabase(name); + const request = window.indexedDB.deleteDatabase(name!); request.onsuccess = (ev) => resolve(ev); request.onerror = (ev) => reject(ev); }); @@ -114,7 +114,7 @@ export class DevServerTransport implements Transport { } } -function getWebXdc(): WebXdc { +function getWebXdc(): Webxdc { return (window as any).webxdc; } diff --git a/types/message.ts b/types/message.ts index 46f7801..812f0b1 100644 --- a/types/message.ts +++ b/types/message.ts @@ -17,5 +17,8 @@ export type UpdateMessage = export type Message = | UpdateMessage + | ({ type: "sendRealtime", data: Uint8Array } & InstanceMessage) | ({ type: "clear" } & InstanceMessage) - | ({ type: "connect" } & InstanceMessage); + | ({ type: "connect" } & InstanceMessage) + | ({ type: "connect-realtime" } & InstanceMessage) + | ({ type: "realtime-received", data: Uint8Array} & InstanceMessage);