diff --git a/src/packages/database/nats/changefeeds.ts b/src/packages/database/nats/changefeeds.ts index b0e118a636..67ed01fb19 100644 --- a/src/packages/database/nats/changefeeds.ts +++ b/src/packages/database/nats/changefeeds.ts @@ -8,6 +8,8 @@ require("@cocalc/database/nats/changefeeds").init() + echo 'require("@cocalc/database/nats/changefeeds").init()' | node + */ import getLogger from "@cocalc/backend/logger"; @@ -26,14 +28,13 @@ import jsonStableStringify from "json-stable-stringify"; import { reuseInFlight } from "@cocalc/util/reuse-in-flight"; import { uuid } from "@cocalc/util/misc"; import { delay } from "awaiting"; -import { Svcm, type ServiceMsg } from "@nats-io/services"; -import { type QueuedIterator } from "nats"; +import { Svcm } from "@nats-io/services"; const logger = getLogger("database:nats:changefeeds"); const jc = JSONCodec(); -let api: QueuedIterator | null = null; +let api: any | null = null; export async function init() { const subject = "hub.*.*.db"; logger.debug(`init -- subject='${subject}', options=`, { @@ -50,7 +51,7 @@ export async function init() { description: "CoCalc Database Service (changefeeds)", }); - const api = service.addEndpoint("api", { subject }); + api = service.addEndpoint("api", { subject }); for await (const mesg of api) { handleRequest(mesg, nc); @@ -58,8 +59,9 @@ export async function init() { } export function terminate() { - logger.debug("terminating"); + logger.debug("terminating service"); api?.stop(); + api = null; // also, stop reporting data into the streams cancelAllChangefeeds(); } @@ -133,14 +135,21 @@ function cancelAllChangefeeds() { const createChangefeed = reuseInFlight( async (opts, nc) => { const query = opts.query; - const hash = sha1(jsonStableStringify(query)); + // the query *AND* the user making it define the thing: + const user = { account_id: opts.account_id, project_id: opts.project_id }; + const hash = sha1( + jsonStableStringify({ + query, + ...user, + }), + ); const now = Date.now(); if (changefeedInterest[hash]) { changefeedInterest[hash] = now; - logger.debug("using existing changefeed for", queryTable(query)); + logger.debug("using existing changefeed for", queryTable(query), user); return; } - logger.debug("creating new changefeed for", queryTable(query)); + logger.debug("creating new changefeed for", queryTable(query), user); const changes = uuid(); changefeedHashes[changes] = hash; const env = { nc, jc, sha1 }; @@ -151,6 +160,7 @@ const createChangefeed = reuseInFlight( project_id: opts.project_id, atomic: true, }); + await synctable.init(); // if (global.z == null) { // global.z = {}; @@ -162,9 +172,17 @@ const createChangefeed = reuseInFlight( cb(err ?? "missing result"); return; } + const current = synctable.get(); + const databaseKeys = new Set(); for (const obj of rows) { + databaseKeys.add(synctable.getKey(obj)); synctable.set(obj); } + for (const key in current) { + if (!databaseKeys.has(key)) { + synctable.delete(key); + } + } cb(); }; @@ -177,7 +195,9 @@ const createChangefeed = reuseInFlight( return; } if (action == "insert" || action == "update") { - synctable.set(new_val); + const cur = synctable.get(new_val); + // logger.debug({ table: queryTable(query), action, new_val, old_val }); + synctable.set({ ...cur, ...new_val }); } else if (action == "delete") { synctable.delete(old_val); } else if (action == "close") { diff --git a/src/packages/nats/sync/synctable-kv-atomic.ts b/src/packages/nats/sync/synctable-kv-atomic.ts index f1f4a64b3b..39824127a1 100644 --- a/src/packages/nats/sync/synctable-kv-atomic.ts +++ b/src/packages/nats/sync/synctable-kv-atomic.ts @@ -65,7 +65,11 @@ export class SyncTableKVAtomic extends EventEmitter { this.set_state("connected"); }; - getKey = (obj): string => { + getKey = (obj_or_key): string => { + if (typeof obj_or_key == "string") { + return obj_or_key; + } + const obj = obj_or_key; if (this.primaryKeys.length === 1) { return toKey(obj[this.primaryKeys[0]] ?? "")!; } else { @@ -79,17 +83,17 @@ export class SyncTableKVAtomic extends EventEmitter { this.dkv.set(this.getKey(obj), obj); }; - delete = (obj) => { + delete = (obj_or_key) => { if (this.dkv == null) throw Error("closed"); - this.dkv.delete(this.getKey(obj)); + this.dkv.delete(this.getKey(obj_or_key)); }; - get = (obj?) => { + get = (obj_or_key?) => { if (this.dkv == null) throw Error("closed"); - if (obj == null) { + if (obj_or_key == null) { return this.dkv.get(); } - return this.dkv.get(this.getKey(obj)); + return this.dkv.get(this.getKey(obj_or_key)); }; close = () => { diff --git a/src/packages/sync/editor/string/test/client-test.ts b/src/packages/sync/editor/string/test/client-test.ts index 798f3a8d5f..4fa52d41b1 100644 --- a/src/packages/sync/editor/string/test/client-test.ts +++ b/src/packages/sync/editor/string/test/client-test.ts @@ -168,14 +168,14 @@ export class Client extends EventEmitter implements Client0 { _options: any, _throttle_changes?: number, ): Promise { - throw Error("not implemented"); + throw Error("synctable_database: not implemented"); } async synctable_nats(_query: any): Promise { - throw Error("not implemented"); + throw Error("synctable_nats: not implemented"); } async pubsub_nats(_query: any): Promise { - throw Error("not implemented"); + throw Error("pubsub_nats: not implemented"); } // account_id or project_id diff --git a/src/packages/sync/table/changefeed-nats.ts b/src/packages/sync/table/changefeed-nats.ts index f3f899c257..36987d3b79 100644 --- a/src/packages/sync/table/changefeed-nats.ts +++ b/src/packages/sync/table/changefeed-nats.ts @@ -28,7 +28,8 @@ export class NatsChangefeed extends EventEmitter { this.natsSynctable = await this.client.nats_client.changefeed(this.query); this.interest(); this.startWatch(); - return Object.values(this.natsSynctable.get()); + const v = this.natsSynctable.get(); + return Object.values(v); }; close = (): void => {