Skip to content

Commit

Permalink
nats changefeeds -- fix some bugs with rewrite; this makes sense fina…
Browse files Browse the repository at this point in the history
…lly and works well
  • Loading branch information
williamstein committed Feb 10, 2025
1 parent c1d2f6c commit c903fa1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
38 changes: 29 additions & 9 deletions src/packages/database/nats/changefeeds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
require("@cocalc/database/nats/changefeeds").init()
echo 'require("@cocalc/database/nats/changefeeds").init()' | node
*/

import getLogger from "@cocalc/backend/logger";
Expand All @@ -26,14 +28,13 @@ import jsonStableStringify from "json-stable-stringify";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { uuid } from "@cocalc/util/misc";
import { delay } from "awaiting";
import { Svcm, type ServiceMsg } from "@nats-io/services";
import { type QueuedIterator } from "nats";
import { Svcm } from "@nats-io/services";

const logger = getLogger("database:nats:changefeeds");

const jc = JSONCodec();

let api: QueuedIterator<ServiceMsg> | null = null;
let api: any | null = null;
export async function init() {
const subject = "hub.*.*.db";
logger.debug(`init -- subject='${subject}', options=`, {
Expand All @@ -50,16 +51,17 @@ export async function init() {
description: "CoCalc Database Service (changefeeds)",
});

const api = service.addEndpoint("api", { subject });
api = service.addEndpoint("api", { subject });

for await (const mesg of api) {
handleRequest(mesg, nc);
}
}

export function terminate() {
logger.debug("terminating");
logger.debug("terminating service");
api?.stop();
api = null;
// also, stop reporting data into the streams
cancelAllChangefeeds();
}
Expand Down Expand Up @@ -133,14 +135,21 @@ function cancelAllChangefeeds() {
const createChangefeed = reuseInFlight(
async (opts, nc) => {
const query = opts.query;
const hash = sha1(jsonStableStringify(query));
// the query *AND* the user making it define the thing:
const user = { account_id: opts.account_id, project_id: opts.project_id };
const hash = sha1(
jsonStableStringify({
query,
...user,
}),
);
const now = Date.now();
if (changefeedInterest[hash]) {
changefeedInterest[hash] = now;
logger.debug("using existing changefeed for", queryTable(query));
logger.debug("using existing changefeed for", queryTable(query), user);
return;
}
logger.debug("creating new changefeed for", queryTable(query));
logger.debug("creating new changefeed for", queryTable(query), user);
const changes = uuid();
changefeedHashes[changes] = hash;
const env = { nc, jc, sha1 };
Expand All @@ -151,6 +160,7 @@ const createChangefeed = reuseInFlight(
project_id: opts.project_id,
atomic: true,
});

await synctable.init();
// if (global.z == null) {
// global.z = {};
Expand All @@ -162,9 +172,17 @@ const createChangefeed = reuseInFlight(
cb(err ?? "missing result");
return;
}
const current = synctable.get();
const databaseKeys = new Set<string>();
for (const obj of rows) {
databaseKeys.add(synctable.getKey(obj));
synctable.set(obj);
}
for (const key in current) {
if (!databaseKeys.has(key)) {
synctable.delete(key);
}
}
cb();
};

Expand All @@ -177,7 +195,9 @@ const createChangefeed = reuseInFlight(
return;
}
if (action == "insert" || action == "update") {
synctable.set(new_val);
const cur = synctable.get(new_val);
// logger.debug({ table: queryTable(query), action, new_val, old_val });
synctable.set({ ...cur, ...new_val });
} else if (action == "delete") {
synctable.delete(old_val);
} else if (action == "close") {
Expand Down
16 changes: 10 additions & 6 deletions src/packages/nats/sync/synctable-kv-atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ export class SyncTableKVAtomic extends EventEmitter {
this.set_state("connected");
};

getKey = (obj): string => {
getKey = (obj_or_key): string => {
if (typeof obj_or_key == "string") {
return obj_or_key;
}
const obj = obj_or_key;
if (this.primaryKeys.length === 1) {
return toKey(obj[this.primaryKeys[0]] ?? "")!;
} else {
Expand All @@ -79,17 +83,17 @@ export class SyncTableKVAtomic extends EventEmitter {
this.dkv.set(this.getKey(obj), obj);
};

delete = (obj) => {
delete = (obj_or_key) => {
if (this.dkv == null) throw Error("closed");
this.dkv.delete(this.getKey(obj));
this.dkv.delete(this.getKey(obj_or_key));
};

get = (obj?) => {
get = (obj_or_key?) => {
if (this.dkv == null) throw Error("closed");
if (obj == null) {
if (obj_or_key == null) {
return this.dkv.get();
}
return this.dkv.get(this.getKey(obj));
return this.dkv.get(this.getKey(obj_or_key));
};

close = () => {
Expand Down
6 changes: 3 additions & 3 deletions src/packages/sync/editor/string/test/client-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ export class Client extends EventEmitter implements Client0 {
_options: any,
_throttle_changes?: number,
): Promise<SyncTable> {
throw Error("not implemented");
throw Error("synctable_database: not implemented");
}

async synctable_nats(_query: any): Promise<SyncTable> {
throw Error("not implemented");
throw Error("synctable_nats: not implemented");
}
async pubsub_nats(_query: any): Promise<SyncTable> {
throw Error("not implemented");
throw Error("pubsub_nats: not implemented");
}

// account_id or project_id
Expand Down
3 changes: 2 additions & 1 deletion src/packages/sync/table/changefeed-nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ export class NatsChangefeed extends EventEmitter {
this.natsSynctable = await this.client.nats_client.changefeed(this.query);
this.interest();
this.startWatch();
return Object.values(this.natsSynctable.get());
const v = this.natsSynctable.get();
return Object.values(v);
};

close = (): void => {
Expand Down

0 comments on commit c903fa1

Please sign in to comment.