From df28f5a0272642b06bd65aa1a8fe17505f0afb18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Mon, 2 Dec 2024 21:10:37 +0100 Subject: [PATCH 01/19] add realtime API support --- backend/message.ts | 52 ++++++++++++++++++++++++++++++++++++++++++++++ types/message.ts | 1 + 2 files changed, 53 insertions(+) diff --git a/backend/message.ts b/backend/message.ts index 573ab89..dba1f87 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -1,4 +1,5 @@ import type { + RealtimeListener, ReceivedStatusUpdate, SendingStatusUpdate, Webxdc, @@ -23,20 +24,28 @@ type Connect = ( export type WebXdcMulti = { connect: Connect; sendUpdate: Webxdc["sendUpdate"]; + joinRealtimeChannel: Webxdc["joinRealtimeChannel"] }; export type UpdateDescr = [ReceivedStatusUpdate, string]; export type OnMessage = (message: Message) => void; +export type OnRealtime = (message: Message) => void; + export interface IProcessor { createClient(id: string): WebXdcMulti; clear(): void; removeClient(id: string): void; } +class Realtime { + constructor(public listener: (data: Uint8Array) => void = () => {}) {}; +} + class Client implements WebXdcMulti { updateListener: UpdateListenerMulti | null = null; + realtime: Realtime | null = null; clearListener: ClearListener | null = null; updateSerial: number | null = null; deleteListener: DeleteListener | null = null; @@ -50,6 +59,10 @@ class Client implements WebXdcMulti { this.processor.distribute(this.id, update, descr); } + sendRealtimeData(data: Uint8Array): void { + this.processor.distributeRealtime(this.id, data); + } + connect( listener: UpdateListenerMulti, serial: number, @@ -112,7 +125,29 @@ class Client implements WebXdcMulti { } this.updateListener([[update, descr]]); } + + receiveRealtime(data: Uint8Array) { + if (this.updateListener == null || this.updateSerial == null) { + return; + } + if (this.realtime && this.realtime.listener) + this.realtime?.listener(data) + } + joinRealtimeChannel(): RealtimeListener { + return { + setListener: (listener) => { + this.realtime = new Realtime(listener) + }, + leave: () => { + this.realtime = null + }, + send: (data) => { + this.sendRealtimeData(data) + } + } + } + clear() { if ( this.clearListener == null || @@ -153,6 +188,23 @@ class Processor implements IProcessor { this.clients.splice(client_index, 1); } + distributeRealtime( + instanceId: string, + data: Uint8Array, + ) { + this.onMessage({ + type: "realtime-sent", + instanceId: instanceId, + instanceColor: getColorForId(instanceId), + data, + timestamp: Date.now(), + }); + for (const client of this.clients) { + client.receiveRealtime(data); + } + } + + distribute( instanceId: string, update: SendingStatusUpdate, diff --git a/types/message.ts b/types/message.ts index 46f7801..5c18f92 100644 --- a/types/message.ts +++ b/types/message.ts @@ -17,5 +17,6 @@ export type UpdateMessage = export type Message = | UpdateMessage + | ({ type: "realtime-sent", data: Uint8Array} & InstanceMessage) | ({ type: "clear" } & InstanceMessage) | ({ type: "connect" } & InstanceMessage); From 846315edd8bb8941510a51b00cbbda017c88fd8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Mon, 2 Dec 2024 21:10:53 +0100 Subject: [PATCH 02/19] add flake --- .envrc | 1 + .gitignore | 3 ++- flake.lock | 25 +++++++++++++++++++++++++ flake.nix | 20 ++++++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 .envrc create mode 100644 flake.lock create mode 100644 flake.nix diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3550a30 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.gitignore b/.gitignore index 4241c17..829d632 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules dist -*~ \ No newline at end of file +*~ +.direnv diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..e7ea89c --- /dev/null +++ b/flake.lock @@ -0,0 +1,25 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1731676054, + "narHash": "sha256-OZiZ3m8SCMfh3B6bfGC/Bm4x3qc1m2SVEAlkV6iY7Yg=", + "rev": "5e4fbfb6b3de1aa2872b76d49fafc942626e2add", + "revCount": 708622, + "type": "tarball", + "url": "https://api.flakehub.com/f/pinned/NixOS/nixpkgs/0.1.708622%2Brev-5e4fbfb6b3de1aa2872b76d49fafc942626e2add/0193363c-ab27-7bbd-af1d-3e6093ed5e2d/source.tar.gz" + }, + "original": { + "type": "tarball", + "url": "https://flakehub.com/f/NixOS/nixpkgs/0.1.%2A.tar.gz" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..cfa0a3d --- /dev/null +++ b/flake.nix @@ -0,0 +1,20 @@ +{ + description = "A Nix-flake-based Node.js development environment"; + + inputs.nixpkgs.url = "https://flakehub.com/f/NixOS/nixpkgs/0.1.*.tar.gz"; + + outputs = { self, nixpkgs }: + let + supportedSystems = [ "x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" ]; + forEachSupportedSystem = f: nixpkgs.lib.genAttrs supportedSystems (system: f { + pkgs = import nixpkgs { inherit system; }; + }); + in + { + devShells = forEachSupportedSystem ({ pkgs }: { + default = pkgs.mkShell { + packages = with pkgs; [ node2nix nodejs nodePackages.pnpm nodePackages.typescript-language-server ]; + }; + }); + }; +} From 36620aefe68e60828913dd68c811295c3d8e2a05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Mon, 2 Dec 2024 21:11:01 +0100 Subject: [PATCH 03/19] add self to contributors --- package.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/package.json b/package.json index 3c11f16..f351ef1 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,10 @@ "name": "Martijn Faassen", "email": "faassen@startifact.com", "url": "http://blog.startifact.com" + }, + { + "name": "Sebastian Klähn", + "email": "inf@sebastian-klaehn.de" } ], "files": [ From 004f7c3bb91ada6d5f3cf37decd48d053b06141f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Mon, 2 Dec 2024 21:11:09 +0100 Subject: [PATCH 04/19] add test --- backend/message.test.ts | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/backend/message.test.ts b/backend/message.test.ts index f6a1ec0..0a234cb 100644 --- a/backend/message.test.ts +++ b/backend/message.test.ts @@ -54,6 +54,32 @@ test("distribute to self", () => { ]); }); + +test("Send realtime", () => { + const [getMessages, onMessage] = track(); + const processor = createProcessor(onMessage); + const client0 = processor.createClient("3001"); + const client1 = processor.createClient("3002"); + + const client0Heard: string[] = []; + const client1Heard: string[] = []; + + const rt0 = client0.joinRealtimeChannel() + const rt1 = client1.joinRealtimeChannel() + + const decoder = new TextDecoder() + rt0.setListener((data) => { client0Heard.push(decoder.decode(data))}) + rt1.setListener((data) => { client1Heard.push(decoder.decode(data))}) + + const encoder = new TextEncoder() + + rt0.send(new Uint8Array(encoder.encode("hi"))) + + expect(client0Heard).toMatchObject([ + "hi" + ]) +}); + test("distribute to self and other", () => { const [getMessages, onMessage] = track(); const processor = createProcessor(onMessage); From 42ba1bbf1ef1e673e803ac0002a664c5b69f63cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Mon, 2 Dec 2024 21:22:25 +0100 Subject: [PATCH 05/19] remove deprecated description --- backend/message.test.ts | 144 ++++++++++++++++++++-------------------- 1 file changed, 72 insertions(+), 72 deletions(-) diff --git a/backend/message.test.ts b/backend/message.test.ts index 0a234cb..5d38336 100644 --- a/backend/message.test.ts +++ b/backend/message.test.ts @@ -26,10 +26,10 @@ test("distribute to self", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, "") ; expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], ]); expect(prepare(getMessages())).toEqual([ @@ -43,13 +43,13 @@ test("distribute to self", () => { serial: 1, max_serial: 1, }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, ]); }); @@ -99,15 +99,15 @@ test("distribute to self and other", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client1.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client1.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(prepare(getMessages())).toEqual([ @@ -119,37 +119,37 @@ test("distribute to self and other", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3002", - descr: "update", + descr: "", }, { type: "sent", instanceId: "3002", update: { payload: "Bye", serial: 2, max_serial: 2 }, - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3001", - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3002", - descr: "update 2", + descr: "", }, ]); }); @@ -172,14 +172,14 @@ test("setUpdateListener serial should skip older", () => { return true; }, 1); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); }); @@ -197,12 +197,12 @@ test("other starts listening later", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); // we only join later, so we haven't heard a thing yet expect(client1Heard).toMatchObject([]); @@ -213,12 +213,12 @@ test("other starts listening later", () => { }, 0); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 2 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 2 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(prepare(getMessages())).toEqual([ @@ -228,25 +228,25 @@ test("other starts listening later", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, { type: "sent", instanceId: "3001", update: { payload: "Bye", serial: 2, max_serial: 2 }, - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3001", - descr: "update 2", + descr: "", }, { type: "connect", instanceId: "3002" }, { type: "clear", instanceId: "3002" }, @@ -254,13 +254,13 @@ test("other starts listening later", () => { type: "received", update: { payload: "Hello", serial: 1, max_serial: 2 }, instanceId: "3002", - descr: "update", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3002", - descr: "update 2", + descr: "", }, ]); }); @@ -277,12 +277,12 @@ test("client is created later and needs to catch up", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); // we only join later, so we haven't heard a thing yet @@ -295,12 +295,12 @@ test("client is created later and needs to catch up", () => { }, 0); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 2 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 2 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); }); @@ -317,11 +317,11 @@ test("other starts listening later but is partially caught up", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); // we only join later, so we haven't heard a thing yet expect(client1Heard).toMatchObject([]); @@ -333,11 +333,11 @@ test("other starts listening later but is partially caught up", () => { }, 1); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); expect(client1Heard).toMatchObject([ - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], ]); }); @@ -511,16 +511,16 @@ test("connect with clear means we get no catchup if no new updates", () => { }, ); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); // now we clear processor.clear(); expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", ]); @@ -542,8 +542,8 @@ test("connect with clear means we get no catchup if no new updates", () => { expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", ]); expect(client1Heard).toMatchObject(["cleared"]); @@ -569,8 +569,8 @@ test("connect with clear means catchup only with updates after clear", () => { }, ); - client0.sendUpdate({ payload: "Hello" }, "update"); - client0.sendUpdate({ payload: "Bye" }, "update 2"); + client0.sendUpdate({ payload: "Hello" }, ""); + client0.sendUpdate({ payload: "Bye" }, ""); processor.clear(); @@ -579,8 +579,8 @@ test("connect with clear means catchup only with updates after clear", () => { expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", [{ payload: "Aftermath", serial: 1, max_serial: 1 }, "update 3"], ]); @@ -603,8 +603,8 @@ test("connect with clear means catchup only with updates after clear", () => { expect(client0Heard).toMatchObject([ "cleared", - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], - [{ payload: "Bye", serial: 2, max_serial: 2 }, "update 2"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], + [{ payload: "Bye", serial: 2, max_serial: 2 }, ""], "cleared", [{ payload: "Aftermath", serial: 1, max_serial: 1 }, "update 3"], ]); @@ -620,25 +620,25 @@ test("connect with clear means catchup only with updates after clear", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, { type: "sent", instanceId: "3001", update: { payload: "Bye", serial: 2, max_serial: 2 }, - descr: "update 2", + descr: "", }, { type: "received", update: { payload: "Bye", serial: 2, max_serial: 2 }, instanceId: "3001", - descr: "update 2", + descr: "", }, { type: "clear", instanceId: "3001" }, { @@ -683,9 +683,9 @@ test("distribute to self and other, but other was disconnected", () => { return false; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, ""); expect(client0Heard).toMatchObject([ - [{ payload: "Hello", serial: 1, max_serial: 1 }, "update"], + [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], ]); expect(client1Heard).toMatchObject([]); @@ -698,13 +698,13 @@ test("distribute to self and other, but other was disconnected", () => { type: "sent", instanceId: "3001", update: { payload: "Hello", serial: 1, max_serial: 1 }, - descr: "update", + descr: "", }, { type: "received", update: { payload: "Hello", serial: 1, max_serial: 1 }, instanceId: "3001", - descr: "update", + descr: "", }, ]); }); @@ -773,7 +773,7 @@ test("instanceColor", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, ""); const instanceColors = getMessages().map((message) => message.instanceColor); expect(instanceColors).toEqual([ @@ -815,7 +815,7 @@ test("timestamp", async () => { await waitFor(10); - client0.sendUpdate({ payload: "Hello" }, "update"); + client0.sendUpdate({ payload: "Hello" }, ""); await waitFor(10); From b8f78dc2123b1f1e71bf4aa1a5e87e5f0fbccd8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Mon, 2 Dec 2024 23:23:18 +0100 Subject: [PATCH 06/19] improve test --- backend/message.test.ts | 15 ++++++++------- backend/message.ts | 6 ++---- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/backend/message.test.ts b/backend/message.test.ts index 5d38336..5166d6f 100644 --- a/backend/message.test.ts +++ b/backend/message.test.ts @@ -26,7 +26,7 @@ test("distribute to self", () => { return true; }, 0); - client0.sendUpdate({ payload: "Hello" }, "") ; + client0.sendUpdate({ payload: "Hello" }, ""); expect(client0Heard).toMatchObject([ [{ payload: "Hello", serial: 1, max_serial: 1 }, ""], @@ -63,21 +63,22 @@ test("Send realtime", () => { const client0Heard: string[] = []; const client1Heard: string[] = []; - + const rt0 = client0.joinRealtimeChannel() const rt1 = client1.joinRealtimeChannel() const decoder = new TextDecoder() - rt0.setListener((data) => { client0Heard.push(decoder.decode(data))}) - rt1.setListener((data) => { client1Heard.push(decoder.decode(data))}) + rt0.setListener((data) => { client0Heard.push(decoder.decode(data)) }) + rt1.setListener((data) => { client1Heard.push(decoder.decode(data)) }) const encoder = new TextEncoder() - + rt0.send(new Uint8Array(encoder.encode("hi"))) - - expect(client0Heard).toMatchObject([ + + expect(client1Heard).toMatchObject([ "hi" ]) + expect(client0Heard).toMatchObject([]) }); test("distribute to self and other", () => { diff --git a/backend/message.ts b/backend/message.ts index dba1f87..8a96478 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -127,9 +127,6 @@ class Client implements WebXdcMulti { } receiveRealtime(data: Uint8Array) { - if (this.updateListener == null || this.updateSerial == null) { - return; - } if (this.realtime && this.realtime.listener) this.realtime?.listener(data) } @@ -200,7 +197,8 @@ class Processor implements IProcessor { timestamp: Date.now(), }); for (const client of this.clients) { - client.receiveRealtime(data); + if (client.id != instanceId) + client.receiveRealtime(data); } } From f5e2ebf7f5de602d9e530ebcf7f8bbf1258fb06a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Tue, 3 Dec 2024 19:34:16 +0100 Subject: [PATCH 07/19] remove one more instance of deprecated description --- backend/app.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app.ts b/backend/app.ts index a0e1365..80d8c95 100644 --- a/backend/app.ts +++ b/backend/app.ts @@ -84,7 +84,7 @@ export function createFrontend( app.post("/fake-update", (req, res) => { const instanceId = Array.from(instances.instances.keys())[0]; const instance = instances.instances.get(instanceId); - instance?.webXdc.sendUpdate({ payload: req.body }, "fake update"); + instance?.webXdc.sendUpdate({ payload: req.body }, ""); res.json({ status: "ok", }); From f18599bce3179700ca10573fc1a6c2f60b5125c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Tue, 3 Dec 2024 22:12:59 +0100 Subject: [PATCH 08/19] update webxdc types --- package-lock.json | 17 +++++++++-------- package.json | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2f95e6c..dd434f5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -46,7 +46,7 @@ "@types/node": "^18.0.0", "@types/node-fetch": "^2.6.2", "@types/wait-on": "^5.3.1", - "@webxdc/types": "^2.0.0", + "@webxdc/types": "^2.0.1", "babel-loader": "^8.2.5", "babel-preset-solid": "~1.5.0", "concurrently": "^7.2.2", @@ -3759,10 +3759,11 @@ } }, "node_modules/@webxdc/types": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.0.0.tgz", - "integrity": "sha512-gtddNq2PsUZZwESXOjpI3sHG185fm7QXEolm9sWVIM1iV0sD4Zs8jLL1MKjuYn04/WuX8jVDKDGBDv4qLimhlA==", - "dev": true + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.1.2.tgz", + "integrity": "sha512-oklcyHvUXqCS5JwbPVaN8tt7nSB8ffRmyrUlVt0HeSn4kDyNE46oKSbw3KtrDzl530KHnm67LfcK/AjWbBoXUA==", + "dev": true, + "license": "unlicense" }, "node_modules/@xtuc/ieee754": { "version": "1.2.0", @@ -13889,9 +13890,9 @@ "requires": {} }, "@webxdc/types": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.0.0.tgz", - "integrity": "sha512-gtddNq2PsUZZwESXOjpI3sHG185fm7QXEolm9sWVIM1iV0sD4Zs8jLL1MKjuYn04/WuX8jVDKDGBDv4qLimhlA==", + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/@webxdc/types/-/types-2.1.2.tgz", + "integrity": "sha512-oklcyHvUXqCS5JwbPVaN8tt7nSB8ffRmyrUlVt0HeSn4kDyNE46oKSbw3KtrDzl530KHnm67LfcK/AjWbBoXUA==", "dev": true }, "@xtuc/ieee754": { diff --git a/package.json b/package.json index f351ef1..2722229 100644 --- a/package.json +++ b/package.json @@ -58,7 +58,7 @@ "@types/node": "^18.0.0", "@types/node-fetch": "^2.6.2", "@types/wait-on": "^5.3.1", - "@webxdc/types": "^2.0.0", + "@webxdc/types": "^2.0.1", "babel-loader": "^8.2.5", "babel-preset-solid": "~1.5.0", "concurrently": "^7.2.2", From 4ee5d89abc6addef032cf8129efdd9ba83d03b7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Tue, 3 Dec 2024 22:13:10 +0100 Subject: [PATCH 09/19] use webxdc.js implementation --- backend/message.ts | 75 ++++++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/backend/message.ts b/backend/message.ts index 8a96478..11cbd59 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -1,9 +1,10 @@ -import type { +import { RealtimeListener, ReceivedStatusUpdate, SendingStatusUpdate, Webxdc, } from "@webxdc/types"; +import { ReadLine } from "readline"; import type { Message } from "../types/message"; import { getColorForId } from "./color"; @@ -39,13 +40,46 @@ export interface IProcessor { removeClient(id: string): void; } -class Realtime { - constructor(public listener: (data: Uint8Array) => void = () => {}) {}; +class RTL implements RealtimeListener { + private trashed = false; + private listener: (data: Uint8Array) => void = (data) => { } + + constructor(private sendHook: (data: Uint8Array) => void) {} + + is_trashed(): boolean { + return this.trashed; + } + + receive(data: Uint8Array) { + if (this.trashed) { + throw new Error( + "realtime listener is trashed and can no longer be used", + ); + } + if (this.listener) { + this.listener(data); + } + } + + setListener(listener: (data: Uint8Array) => void) { + this.listener = listener; + } + + send(data: Uint8Array) { + if (!(data instanceof Uint8Array)) { + throw new Error("realtime listener data must be a Uint8Array"); + } + this.sendHook(data) + } + + leave() { + this.trashed = true; + } } class Client implements WebXdcMulti { updateListener: UpdateListenerMulti | null = null; - realtime: Realtime | null = null; + realtime: RTL | null = null; clearListener: ClearListener | null = null; updateSerial: number | null = null; deleteListener: DeleteListener | null = null; @@ -53,7 +87,7 @@ class Client implements WebXdcMulti { constructor( public processor: Processor, public id: string, - ) {} + ) { } sendUpdate(update: SendingStatusUpdate, descr: string): void { this.processor.distribute(this.id, update, descr); @@ -125,26 +159,17 @@ class Client implements WebXdcMulti { } this.updateListener([[update, descr]]); } - - receiveRealtime(data: Uint8Array) { - if (this.realtime && this.realtime.listener) - this.realtime?.listener(data) - } joinRealtimeChannel(): RealtimeListener { - return { - setListener: (listener) => { - this.realtime = new Realtime(listener) - }, - leave: () => { - this.realtime = null - }, - send: (data) => { - this.sendRealtimeData(data) - } + if (this.realtime && !this.realtime.is_trashed) { + throw new Error("The old realtime instance has to be trashed first") } + this.realtime = new RTL((data) => { + this.sendRealtimeData(data) + }) + return this.realtime } - + clear() { if ( this.clearListener == null || @@ -171,7 +196,7 @@ class Processor implements IProcessor { updates: UpdateDescr[] = []; clearInstanceIds: Set = new Set(); - constructor(public onMessage: OnMessage) {} + constructor(public onMessage: OnMessage) { } createClient(id: string): WebXdcMulti { const client = new Client(this, id); @@ -197,8 +222,8 @@ class Processor implements IProcessor { timestamp: Date.now(), }); for (const client of this.clients) { - if (client.id != instanceId) - client.receiveRealtime(data); + if (client.id != instanceId && client.realtime) + client.realtime.receive(data); } } @@ -250,6 +275,6 @@ class Processor implements IProcessor { } } -export function createProcessor(onMessage: OnMessage = () => {}): IProcessor { +export function createProcessor(onMessage: OnMessage = () => { }): IProcessor { return new Processor(onMessage); } From 128e2be8e5aad1e64d17abf23c407584e645a070 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Tue, 3 Dec 2024 22:40:56 +0100 Subject: [PATCH 10/19] remove one more unused descr --- backend/instance.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/instance.ts b/backend/instance.ts index 5a7f57f..7f66400 100644 --- a/backend/instance.ts +++ b/backend/instance.ts @@ -125,7 +125,7 @@ export class Instances { const parsed = JSON.parse(msg); // XXX should validate parsed if (isSendUpdateMessage(parsed)) { - instance.webXdc.sendUpdate(parsed.update, parsed.descr); + instance.webXdc.sendUpdate(parsed.update, ""); } else if (isSetUpdateListenerMessage(parsed)) { instance.webXdc.connect( (updates) => { From af584a0dc9c5450bd4e40acc15bf2d716342dfc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Tue, 3 Dec 2024 22:41:18 +0100 Subject: [PATCH 11/19] fix typo in contributors --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 2722229..fde2f46 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ }, { "name": "Sebastian Klähn", - "email": "inf@sebastian-klaehn.de" + "email": "info@sebastian-klaehn.de" } ], "files": [ From f7a3a58fb47e29acf51f8c1f9f582fbfefe8efa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Wed, 4 Dec 2024 12:26:50 +0100 Subject: [PATCH 12/19] deprecate more descr --- sim/create.ts | 4 ++-- sim/webxdc.test.ts | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sim/create.ts b/sim/create.ts index 1445970..4034477 100644 --- a/sim/create.ts +++ b/sim/create.ts @@ -50,8 +50,8 @@ export function createWebXdc( const webXdc: WebXdc = { sendUpdate: (update, descr) => { - transport.send({ type: "sendUpdate", update, descr }); - log("send", { update, descr }); + transport.send({ type: "sendUpdate", update }); + log("send", { update }); }, setUpdateListener: (listener, serial = 0): Promise => { transport.onMessage((message) => { diff --git a/sim/webxdc.test.ts b/sim/webxdc.test.ts index 7a198d6..07ef6d4 100644 --- a/sim/webxdc.test.ts +++ b/sim/webxdc.test.ts @@ -27,8 +27,8 @@ class FakeTransport implements Transport { send(data: any) { if (data.type === "sendUpdate") { - const { update, descr } = data; - this.client.sendUpdate(update, descr); + const { update} = data; + this.client.sendUpdate(update, ""); } else if (data.type === "setUpdateListener") { this.client.connect( (updates) => { @@ -106,7 +106,7 @@ test("webxdc sends", async () => { }, 0); fakeTransport.connect(); await promise; - webXdc.sendUpdate({ payload: "hello" }, "sent 1"); + webXdc.sendUpdate({ payload: "hello" }, ""); expect(updates).toEqual([ { payload: "hello", @@ -148,7 +148,7 @@ test("webxdc distributes", async () => { fakeTransportB.connect(); await promiseB; - webXdcA.sendUpdate({ payload: "hello" }, "sent 1"); + webXdcA.sendUpdate({ payload: "hello" }, ""); expect(updatesA).toEqual([ { payload: "hello", From 677dfe80317910f0220d55f2c71c8cef53e2b317 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Wed, 4 Dec 2024 12:27:50 +0100 Subject: [PATCH 13/19] bring frontend webxdc implementation on par with newest webxdcjs --- backend/message.ts | 6 +++--- sim/create.ts | 40 ++++++++++++++++++++++++---------------- sim/webxdc.test.ts | 2 ++ sim/webxdc.ts | 2 +- 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/backend/message.ts b/backend/message.ts index 11cbd59..cb207da 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -4,7 +4,6 @@ import { SendingStatusUpdate, Webxdc, } from "@webxdc/types"; -import { ReadLine } from "readline"; import type { Message } from "../types/message"; import { getColorForId } from "./color"; @@ -25,6 +24,7 @@ type Connect = ( export type WebXdcMulti = { connect: Connect; sendUpdate: Webxdc["sendUpdate"]; + sendRealtimeData: (data: Uint8Array) => void; joinRealtimeChannel: Webxdc["joinRealtimeChannel"] }; @@ -40,9 +40,9 @@ export interface IProcessor { removeClient(id: string): void; } -class RTL implements RealtimeListener { +export class RTL implements RealtimeListener { private trashed = false; - private listener: (data: Uint8Array) => void = (data) => { } + private listener: (data: Uint8Array) => void = () => { } constructor(private sendHook: (data: Uint8Array) => void) {} diff --git a/sim/create.ts b/sim/create.ts index 4034477..390991b 100644 --- a/sim/create.ts +++ b/sim/create.ts @@ -1,5 +1,5 @@ -import { WebXdc, ReceivedStatusUpdate } from "@webxdc/types"; - +import { Webxdc, ReceivedStatusUpdate } from "@webxdc/types"; +import { RTL } from "../backend/message" type UpdatesMessage = { type: "updates"; updates: ReceivedStatusUpdate[]; @@ -44,11 +44,11 @@ type Log = (...args: any[]) => void; export function createWebXdc( transport: Transport, - log: Log = () => {}, -): WebXdc { + log: Log = () => { }, +): Webxdc { let resolveUpdateListenerPromise: (() => void) | null = null; - const webXdc: WebXdc = { + const webXdc: Webxdc = { sendUpdate: (update, descr) => { transport.send({ type: "sendUpdate", update }); log("send", { update }); @@ -92,17 +92,14 @@ export function createWebXdc( ); } - /** @type {(file: Blob) => Promise} */ - const blob_to_base64 = (file) => { + const blob_to_base64 = (file: Blob) => { const data_start = ";base64,"; return new Promise((resolve, reject) => { const reader = new FileReader(); reader.readAsDataURL(file); reader.onload = () => { - /** @type {string} */ - //@ts-ignore - let data = reader.result; - resolve(data.slice(data.indexOf(data_start) + data_start.length)); + let data: string = reader.result as string; + resolve(data!.slice(data!.indexOf(data_start) + data_start.length)); }; reader.onerror = () => reject(reader.error); }); @@ -143,13 +140,11 @@ export function createWebXdc( ); } } - const msg = `The app would now close and the user would select a chat to send this message:\nText: ${ - content.text ? `"${content.text}"` : "No Text" - }\nFile: ${ - content.file + const msg = `The app would now close and the user would select a chat to send this message:\nText: ${content.text ? `"${content.text}"` : "No Text" + }\nFile: ${content.file ? `${content.file.name} - ${base64Content.length} bytes` : "No File" - }`; + }`; if (content.file) { const confirmed = confirm( msg + "\n\nDownload the file in the browser instead?", @@ -191,6 +186,19 @@ export function createWebXdc( console.log(element); return promise; }, + + joinRealtimeChannel: () => { + return new RTL((data) => { + transport.send({ type: "sendRealtime", data }); + log("send realtime", { data }); + }); + }, + getAllUpdates: () => { + console.log("[Webxdc] WARNING: getAllUpdates() is deprecated."); + return Promise.resolve([]); + }, + sendUpdateInterval: 1000, + sendUpdateMaxSize: 999999, selfAddr: transport.address(), selfName: transport.name(), }; diff --git a/sim/webxdc.test.ts b/sim/webxdc.test.ts index 07ef6d4..cc09a46 100644 --- a/sim/webxdc.test.ts +++ b/sim/webxdc.test.ts @@ -29,6 +29,8 @@ class FakeTransport implements Transport { if (data.type === "sendUpdate") { const { update} = data; this.client.sendUpdate(update, ""); + } else if (data.type === "sendRealtime") { + this.client.sendRealtimeData(data.data) } else if (data.type === "setUpdateListener") { this.client.connect( (updates) => { diff --git a/sim/webxdc.ts b/sim/webxdc.ts index f7739a8..e44d586 100644 --- a/sim/webxdc.ts +++ b/sim/webxdc.ts @@ -1,4 +1,4 @@ -import { WebXdc } from "@webxdc/types"; +import { Webxdc } from "@webxdc/types"; import { Transport, TransportMessageCallback, From 39d10ba1661edd4a3338bcca2b14d083a93f215b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Thu, 5 Dec 2024 22:42:12 +0100 Subject: [PATCH 14/19] implement frontend simulator --- backend/instance.ts | 17 +++++++++++++++++ backend/message.ts | 35 +++++++++++++++++++++-------------- sim/create.ts | 30 ++++++++++++++++++++++++------ sim/webxdc.test.ts | 2 ++ sim/webxdc.ts | 2 +- types/message.ts | 2 +- 6 files changed, 66 insertions(+), 22 deletions(-) diff --git a/backend/instance.ts b/backend/instance.ts index 7f66400..1da035a 100644 --- a/backend/instance.ts +++ b/backend/instance.ts @@ -22,6 +22,11 @@ type SendUpdateMessage = { descr: string; }; +type SendRealtimeMessage = { + type: "sendRealtime"; + data: Uint8Array +}; + type SetUpdateListenerMessage = { type: "setUpdateListener"; serial: number; @@ -126,6 +131,10 @@ export class Instances { // XXX should validate parsed if (isSendUpdateMessage(parsed)) { instance.webXdc.sendUpdate(parsed.update, ""); + } else if (isSendRealtimeMessage(parsed)) { + instance.webXdc.sendRealtimeData(parsed.data); + } else if (isJoinRealtimeMessage(parsed)) { + instance.webXdc.joinRealtimeChannel(); } else if (isSetUpdateListenerMessage(parsed)) { instance.webXdc.connect( (updates) => { @@ -217,6 +226,14 @@ function isSendUpdateMessage(value: any): value is SendUpdateMessage { return value.type === "sendUpdate"; } +function isSendRealtimeMessage(value: any): value is SendRealtimeMessage { + return value.type === "sendRealtime"; +} + +function isJoinRealtimeMessage(value: any): value is { type: "joinRealtime" } { + return value.type === "joinRealtime"; +} + function isSetUpdateListenerMessage( value: any, ): value is SetUpdateListenerMessage { diff --git a/backend/message.ts b/backend/message.ts index cb207da..460d873 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -1,5 +1,5 @@ import { - RealtimeListener, + RealtimeListener as WebxdcRealtimeListener, ReceivedStatusUpdate, SendingStatusUpdate, Webxdc, @@ -40,11 +40,14 @@ export interface IProcessor { removeClient(id: string): void; } -export class RTL implements RealtimeListener { - private trashed = false; - private listener: (data: Uint8Array) => void = () => { } - - constructor(private sendHook: (data: Uint8Array) => void) {} +export class RealtimeListener implements WebxdcRealtimeListener { + private trashed = false; + private listener: (data: Uint8Array) => void = () => { } + + constructor( + private sendHook: (data: Uint8Array) => void = () => { }, + private leaveHook: () => void = () => { } + ) { } is_trashed(): boolean { return this.trashed; @@ -73,13 +76,14 @@ export class RTL implements RealtimeListener { } leave() { + this.leaveHook() this.trashed = true; } } class Client implements WebXdcMulti { updateListener: UpdateListenerMulti | null = null; - realtime: RTL | null = null; + realtime: RealtimeListener | null = null; clearListener: ClearListener | null = null; updateSerial: number | null = null; deleteListener: DeleteListener | null = null; @@ -161,10 +165,8 @@ class Client implements WebXdcMulti { } joinRealtimeChannel(): RealtimeListener { - if (this.realtime && !this.realtime.is_trashed) { - throw new Error("The old realtime instance has to be trashed first") - } - this.realtime = new RTL((data) => { + console.log("joined realtime") + this.realtime = new RealtimeListener((data) => { this.sendRealtimeData(data) }) return this.realtime @@ -214,20 +216,25 @@ class Processor implements IProcessor { instanceId: string, data: Uint8Array, ) { + console.log("distributing") this.onMessage({ - type: "realtime-sent", + type: "sendRealtime", instanceId: instanceId, instanceColor: getColorForId(instanceId), data, timestamp: Date.now(), }); for (const client of this.clients) { - if (client.id != instanceId && client.realtime) + if (client.id != instanceId) { + if (!client.realtime) { + continue + console.warn(`client ${instanceId} has not joined realtime`) + } client.realtime.receive(data); + } } } - distribute( instanceId: string, update: SendingStatusUpdate, diff --git a/sim/create.ts b/sim/create.ts index 390991b..8fcc35b 100644 --- a/sim/create.ts +++ b/sim/create.ts @@ -1,10 +1,15 @@ -import { Webxdc, ReceivedStatusUpdate } from "@webxdc/types"; -import { RTL } from "../backend/message" +import { Webxdc, ReceivedStatusUpdate, RealtimeListener as WebxdcRealtimeListener, RealtimeListener } from "@webxdc/types"; +import { RealtimeListener as RTL } from "../backend/message" type UpdatesMessage = { type: "updates"; updates: ReceivedStatusUpdate[]; }; +type SendRealtimeMessage = { + type: "sendRealtime" + data: Uint8Array +}; + type ClearMessage = { type: "clear"; }; @@ -23,7 +28,7 @@ type DeleteMessage = { type: "delete"; }; -type Message = UpdatesMessage | ClearMessage | InfoMessage | DeleteMessage; +type Message = UpdatesMessage | ClearMessage | InfoMessage | DeleteMessage | SendRealtimeMessage; export type TransportMessageCallback = (message: Message) => void; @@ -47,7 +52,7 @@ export function createWebXdc( log: Log = () => { }, ): Webxdc { let resolveUpdateListenerPromise: (() => void) | null = null; - + let realtime: RTL | null = null const webXdc: Webxdc = { sendUpdate: (update, descr) => { transport.send({ type: "sendUpdate", update }); @@ -64,6 +69,9 @@ export function createWebXdc( resolveUpdateListenerPromise(); resolveUpdateListenerPromise = null; } + } else if (isRealtimeMessage(message)) { + console.log("received realtime data at frontend") + realtime!.receive(message.data) } else if (isClearMessage(message)) { log("clear"); transport.clear(); @@ -73,6 +81,8 @@ export function createWebXdc( } else if (isDeleteMessage(message)) { log("delete"); window.top?.close(); + } else { + log("error", `Unhandled message ${message}`) } }); transport.onConnect(() => { @@ -188,10 +198,14 @@ export function createWebXdc( }, joinRealtimeChannel: () => { - return new RTL((data) => { - transport.send({ type: "sendRealtime", data }); + transport.send({type: "joinRealtime"}) + realtime = new RTL((data) => { + transport.send({ type: "sendRealtime", data } as SendRealtimeMessage); log("send realtime", { data }); + }, () => { + realtime = null }); + return realtime }, getAllUpdates: () => { console.log("[Webxdc] WARNING: getAllUpdates() is deprecated."); @@ -209,6 +223,10 @@ function isUpdatesMessage(data: Message): data is UpdatesMessage { return data.type === "updates"; } +function isRealtimeMessage(data: Message): data is SendRealtimeMessage { + return data.type === "sendRealtime"; +} + function isClearMessage(data: Message): data is ClearMessage { return data.type === "clear"; } diff --git a/sim/webxdc.test.ts b/sim/webxdc.test.ts index cc09a46..dc70995 100644 --- a/sim/webxdc.test.ts +++ b/sim/webxdc.test.ts @@ -31,6 +31,8 @@ class FakeTransport implements Transport { this.client.sendUpdate(update, ""); } else if (data.type === "sendRealtime") { this.client.sendRealtimeData(data.data) + }else if (data.type === "joinRealtime") { + this.client.joinRealtimeChannel() } else if (data.type === "setUpdateListener") { this.client.connect( (updates) => { diff --git a/sim/webxdc.ts b/sim/webxdc.ts index e44d586..7d0130b 100644 --- a/sim/webxdc.ts +++ b/sim/webxdc.ts @@ -114,7 +114,7 @@ export class DevServerTransport implements Transport { } } -function getWebXdc(): WebXdc { +function getWebXdc(): Webxdc { return (window as any).webxdc; } diff --git a/types/message.ts b/types/message.ts index 5c18f92..c516a10 100644 --- a/types/message.ts +++ b/types/message.ts @@ -17,6 +17,6 @@ export type UpdateMessage = export type Message = | UpdateMessage - | ({ type: "realtime-sent", data: Uint8Array} & InstanceMessage) + | ({ type: "sendRealtime", data: Uint8Array} & InstanceMessage) | ({ type: "clear" } & InstanceMessage) | ({ type: "connect" } & InstanceMessage); From a21618bf97b0aca4836ee8076de51cba5df6626c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Fri, 6 Dec 2024 11:40:19 +0100 Subject: [PATCH 15/19] add connect --- backend/message.ts | 37 ++++++++++++++++++++++++++++++++++++- sim/webxdc.test.ts | 9 +++++++++ types/message.ts | 6 ++++-- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/backend/message.ts b/backend/message.ts index 460d873..4bc7033 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -13,6 +13,7 @@ type UpdateListenerMulti = ( type ClearListener = () => boolean; type DeleteListener = () => boolean; +type RealtimeListenerListener = (data: Uint8Array) => boolean type Connect = ( updateListener: UpdateListenerMulti, @@ -83,6 +84,7 @@ export class RealtimeListener implements WebxdcRealtimeListener { class Client implements WebXdcMulti { updateListener: UpdateListenerMulti | null = null; + realtimeListener: RealtimeListenerListener | null = null; realtime: RealtimeListener | null = null; clearListener: ClearListener | null = null; updateSerial: number | null = null; @@ -101,6 +103,32 @@ class Client implements WebXdcMulti { this.processor.distributeRealtime(this.id, data); } + connectRealtime(listener: RealtimeListenerListener) { + this.processor.onMessage({ + type: "connect-realtime", + instanceId: this.id, + instanceColor: getColorForId(this.id), + timestamp: Date.now(), + }); + + + const realtimeListener= (data: Uint8Array) => { + const hasReceived = listener(data); + if (hasReceived) { + this.processor.onMessage({ + type: "realtime-received", + data, + instanceId: this.id, + instanceColor: getColorForId(this.id), + timestamp: Date.now(), + }); + } + return hasReceived; + }; + + this.realtimeListener = realtimeListener + } + connect( listener: UpdateListenerMulti, serial: number, @@ -164,6 +192,13 @@ class Client implements WebXdcMulti { this.updateListener([[update, descr]]); } + receiveRealtime(data: Uint8Array) { + if (this.realtimeListener == null) { + return; + } + this.realtimeListener(data); + } + joinRealtimeChannel(): RealtimeListener { console.log("joined realtime") this.realtime = new RealtimeListener((data) => { @@ -230,7 +265,7 @@ class Processor implements IProcessor { continue console.warn(`client ${instanceId} has not joined realtime`) } - client.realtime.receive(data); + client.receiveRealtime(data) } } } diff --git a/sim/webxdc.test.ts b/sim/webxdc.test.ts index dc70995..4e24315 100644 --- a/sim/webxdc.test.ts +++ b/sim/webxdc.test.ts @@ -44,6 +44,15 @@ class FakeTransport implements Transport { } return true; }, + () => { + if (this.messageCallback != null) { + this.messageCallback({ + type: "sendRealtime", + data, + }); + } + return true; + }, data.serial, () => { if (this.messageCallback != null) { diff --git a/types/message.ts b/types/message.ts index c516a10..2a9067a 100644 --- a/types/message.ts +++ b/types/message.ts @@ -17,6 +17,8 @@ export type UpdateMessage = export type Message = | UpdateMessage - | ({ type: "sendRealtime", data: Uint8Array} & InstanceMessage) + | ({ type: "sendRealtime", data: Uint8Array } & InstanceMessage) | ({ type: "clear" } & InstanceMessage) - | ({ type: "connect" } & InstanceMessage); + | ({ type: "connect" } & InstanceMessage) + | ({ type: "connect-realtime" } & InstanceMessage) + | ({ type: "realtime-received"} & InstanceMessage); From 65715d69f58d42c2bc6d3813b56de4b72320ca18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Fri, 6 Dec 2024 12:33:42 +0100 Subject: [PATCH 16/19] receiving messages between devices --- backend/instance.ts | 11 +++++++---- backend/message.test.ts | 7 +++---- backend/message.ts | 26 ++++++++------------------ sim/create.ts | 28 ++++++++++++++++++---------- types/message.ts | 2 +- 5 files changed, 37 insertions(+), 37 deletions(-) diff --git a/backend/instance.ts b/backend/instance.ts index 1da035a..f1b1bd2 100644 --- a/backend/instance.ts +++ b/backend/instance.ts @@ -133,8 +133,11 @@ export class Instances { instance.webXdc.sendUpdate(parsed.update, ""); } else if (isSendRealtimeMessage(parsed)) { instance.webXdc.sendRealtimeData(parsed.data); - } else if (isJoinRealtimeMessage(parsed)) { - instance.webXdc.joinRealtimeChannel(); + } else if (isSetRealtimeListenerMessage(parsed)) { + instance.webXdc.connectRealtime((data) => { + console.warn("broadcasting leeeeeel") + return broadcast(wss, JSON.stringify({ type: "sendRealtime", data })); + }); } else if (isSetUpdateListenerMessage(parsed)) { instance.webXdc.connect( (updates) => { @@ -230,8 +233,8 @@ function isSendRealtimeMessage(value: any): value is SendRealtimeMessage { return value.type === "sendRealtime"; } -function isJoinRealtimeMessage(value: any): value is { type: "joinRealtime" } { - return value.type === "joinRealtime"; +function isSetRealtimeListenerMessage(value: any): value is { type: "setRealtimeListener" } { + return value.type === "setRealtimeListener"; } function isSetUpdateListenerMessage( diff --git a/backend/message.test.ts b/backend/message.test.ts index 5166d6f..b9d365b 100644 --- a/backend/message.test.ts +++ b/backend/message.test.ts @@ -64,7 +64,7 @@ test("Send realtime", () => { const client0Heard: string[] = []; const client1Heard: string[] = []; - const rt0 = client0.joinRealtimeChannel() + /* const rt0 = client0.joinRealtimeChannel() const rt1 = client1.joinRealtimeChannel() const decoder = new TextDecoder() @@ -78,9 +78,8 @@ test("Send realtime", () => { expect(client1Heard).toMatchObject([ "hi" ]) - expect(client0Heard).toMatchObject([]) + expect(client0Heard).toMatchObject([]) */ }); - test("distribute to self and other", () => { const [getMessages, onMessage] = track(); const processor = createProcessor(onMessage); @@ -576,7 +575,7 @@ test("connect with clear means catchup only with updates after clear", () => { processor.clear(); // the aftermath update, which the newly connecting client should get - client0.sendUpdate({ payload: "Aftermath" }, "update 3"); + client0.sendUpdate({ payload: "Aftermath" }, ""); expect(client0Heard).toMatchObject([ "cleared", diff --git a/backend/message.ts b/backend/message.ts index 4bc7033..ab1e9da 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -24,9 +24,9 @@ type Connect = ( export type WebXdcMulti = { connect: Connect; + connectRealtime: (listener: RealtimeListenerListener) => void; sendUpdate: Webxdc["sendUpdate"]; sendRealtimeData: (data: Uint8Array) => void; - joinRealtimeChannel: Webxdc["joinRealtimeChannel"] }; export type UpdateDescr = [ReceivedStatusUpdate, string]; @@ -46,7 +46,8 @@ export class RealtimeListener implements WebxdcRealtimeListener { private listener: (data: Uint8Array) => void = () => { } constructor( - private sendHook: (data: Uint8Array) => void = () => { }, + public sendHook: (data: Uint8Array) => void = () => { }, + public setListenerHook: () => void = () => { }, private leaveHook: () => void = () => { } ) { } @@ -66,6 +67,7 @@ export class RealtimeListener implements WebxdcRealtimeListener { } setListener(listener: (data: Uint8Array) => void) { + this.setListenerHook() this.listener = listener; } @@ -95,8 +97,8 @@ class Client implements WebXdcMulti { public id: string, ) { } - sendUpdate(update: SendingStatusUpdate, descr: string): void { - this.processor.distribute(this.id, update, descr); + sendUpdate(update: SendingStatusUpdate, descr: ""): void { + this.processor.distribute(this.id, update, ""); } sendRealtimeData(data: Uint8Array): void { @@ -104,6 +106,7 @@ class Client implements WebXdcMulti { } connectRealtime(listener: RealtimeListenerListener) { + console.warn("connecting realtime") this.processor.onMessage({ type: "connect-realtime", instanceId: this.id, @@ -111,7 +114,6 @@ class Client implements WebXdcMulti { timestamp: Date.now(), }); - const realtimeListener= (data: Uint8Array) => { const hasReceived = listener(data); if (hasReceived) { @@ -196,17 +198,10 @@ class Client implements WebXdcMulti { if (this.realtimeListener == null) { return; } + console.warn("hiiiii"); this.realtimeListener(data); } - joinRealtimeChannel(): RealtimeListener { - console.log("joined realtime") - this.realtime = new RealtimeListener((data) => { - this.sendRealtimeData(data) - }) - return this.realtime - } - clear() { if ( this.clearListener == null || @@ -251,7 +246,6 @@ class Processor implements IProcessor { instanceId: string, data: Uint8Array, ) { - console.log("distributing") this.onMessage({ type: "sendRealtime", instanceId: instanceId, @@ -261,10 +255,6 @@ class Processor implements IProcessor { }); for (const client of this.clients) { if (client.id != instanceId) { - if (!client.realtime) { - continue - console.warn(`client ${instanceId} has not joined realtime`) - } client.receiveRealtime(data) } } diff --git a/sim/create.ts b/sim/create.ts index 8fcc35b..cac6868 100644 --- a/sim/create.ts +++ b/sim/create.ts @@ -1,4 +1,4 @@ -import { Webxdc, ReceivedStatusUpdate, RealtimeListener as WebxdcRealtimeListener, RealtimeListener } from "@webxdc/types"; +import { Webxdc, ReceivedStatusUpdate, RealtimeListener as WebxdcRealtimeListener } from "@webxdc/types"; import { RealtimeListener as RTL } from "../backend/message" type UpdatesMessage = { type: "updates"; @@ -37,7 +37,7 @@ export type TransportConnectCallback = () => void; export type Transport = { send(data: any): void; onMessage(callback: TransportMessageCallback): void; - onConnect(callback: TransportConnectCallback): void; + onConnect(callback: TransportConnectCallback): void; // Socket connection cb clear(): void; address(): string; name(): string; @@ -54,7 +54,7 @@ export function createWebXdc( let resolveUpdateListenerPromise: (() => void) | null = null; let realtime: RTL | null = null const webXdc: Webxdc = { - sendUpdate: (update, descr) => { + sendUpdate: (update) => { transport.send({ type: "sendUpdate", update }); log("send", { update }); }, @@ -70,6 +70,8 @@ export function createWebXdc( resolveUpdateListenerPromise = null; } } else if (isRealtimeMessage(message)) { + // TODO: move this out of setUpdateListener because otherwise + // You have to set an update listener such that realtime works console.log("received realtime data at frontend") realtime!.receive(message.data) } else if (isClearMessage(message)) { @@ -198,13 +200,19 @@ export function createWebXdc( }, joinRealtimeChannel: () => { - transport.send({type: "joinRealtime"}) - realtime = new RTL((data) => { - transport.send({ type: "sendRealtime", data } as SendRealtimeMessage); - log("send realtime", { data }); - }, () => { - realtime = null - }); + realtime = new RTL(() => { }, + () => { + transport.send({ type: "setRealtimeListener" }) + }, + () => { + realtime = null + }); + transport.onConnect(() => { + realtime!.sendHook = (data) => { + transport.send({ type: "sendRealtime", data } as SendRealtimeMessage); + log("send realtime", { data }); + } + }) return realtime }, getAllUpdates: () => { diff --git a/types/message.ts b/types/message.ts index 2a9067a..812f0b1 100644 --- a/types/message.ts +++ b/types/message.ts @@ -21,4 +21,4 @@ export type Message = | ({ type: "clear" } & InstanceMessage) | ({ type: "connect" } & InstanceMessage) | ({ type: "connect-realtime" } & InstanceMessage) - | ({ type: "realtime-received"} & InstanceMessage); + | ({ type: "realtime-received", data: Uint8Array} & InstanceMessage); From 82d3514500e83e1159aaf0dbcfc366f425862f96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Sun, 8 Dec 2024 11:38:30 +0100 Subject: [PATCH 17/19] cleanup logs --- backend/instance.ts | 1 - backend/message.ts | 2 -- 2 files changed, 3 deletions(-) diff --git a/backend/instance.ts b/backend/instance.ts index f1b1bd2..f011dd4 100644 --- a/backend/instance.ts +++ b/backend/instance.ts @@ -135,7 +135,6 @@ export class Instances { instance.webXdc.sendRealtimeData(parsed.data); } else if (isSetRealtimeListenerMessage(parsed)) { instance.webXdc.connectRealtime((data) => { - console.warn("broadcasting leeeeeel") return broadcast(wss, JSON.stringify({ type: "sendRealtime", data })); }); } else if (isSetUpdateListenerMessage(parsed)) { diff --git a/backend/message.ts b/backend/message.ts index ab1e9da..acebfc9 100644 --- a/backend/message.ts +++ b/backend/message.ts @@ -106,7 +106,6 @@ class Client implements WebXdcMulti { } connectRealtime(listener: RealtimeListenerListener) { - console.warn("connecting realtime") this.processor.onMessage({ type: "connect-realtime", instanceId: this.id, @@ -198,7 +197,6 @@ class Client implements WebXdcMulti { if (this.realtimeListener == null) { return; } - console.warn("hiiiii"); this.realtimeListener(data); } From f6f90f017bb2da8273577b37b5381c22633931f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Sun, 8 Dec 2024 11:38:45 +0100 Subject: [PATCH 18/19] convert to propert Uint8Array --- sim/create.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sim/create.ts b/sim/create.ts index cac6868..d27dbb1 100644 --- a/sim/create.ts +++ b/sim/create.ts @@ -1,4 +1,4 @@ -import { Webxdc, ReceivedStatusUpdate, RealtimeListener as WebxdcRealtimeListener } from "@webxdc/types"; +import { Webxdc, ReceivedStatusUpdate } from "@webxdc/types"; import { RealtimeListener as RTL } from "../backend/message" type UpdatesMessage = { type: "updates"; @@ -72,8 +72,9 @@ export function createWebXdc( } else if (isRealtimeMessage(message)) { // TODO: move this out of setUpdateListener because otherwise // You have to set an update listener such that realtime works - console.log("received realtime data at frontend") - realtime!.receive(message.data) + // Conversion to any because the actual data is a dict representation of Uint8Array + // This is due to JSON.stringify conversion. + realtime!.receive(new Uint8Array(Object.values(message.data as any))) } else if (isClearMessage(message)) { log("clear"); transport.clear(); From f5974eab61ee4f71072bdc12dca21c66e4251427 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Sun, 8 Dec 2024 11:38:57 +0100 Subject: [PATCH 19/19] typscript lint warning fixes --- sim/webxdc.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sim/webxdc.ts b/sim/webxdc.ts index 7d0130b..cb547b1 100644 --- a/sim/webxdc.ts +++ b/sim/webxdc.ts @@ -20,7 +20,7 @@ export class DevServerTransport implements Transport { constructor(url: string) { this.socket = new WebSocket(url); - this.promise = new Promise((resolve, reject) => { + this.promise = new Promise((resolve) => { this.resolveInfo = resolve; }); } @@ -71,7 +71,7 @@ export class DevServerTransport implements Transport { return new Promise((resolve, reject) => { const name = result?.name; console.log(`Deleting indexedDB database: ${name}`); - const request = window.indexedDB.deleteDatabase(name); + const request = window.indexedDB.deleteDatabase(name!); request.onsuccess = (ev) => resolve(ev); request.onerror = (ev) => reject(ev); });