Skip to content

Commit

Permalink
nats kv: work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 7, 2025
1 parent 9770162 commit f4579bc
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 39 deletions.
79 changes: 52 additions & 27 deletions src/packages/nats/sync/eventually-consistent-kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,61 @@ DEVELOPMENT:
~/cocalc/src/packages/server$ node
Welcome to Node.js v18.17.1.
Type ".help" for more information.
> env = await require("@cocalc/backend/nats/env").getEnv(); a = require("@cocalc/nats/sync/eventually-consistent-kv"); s = new a.EventuallyConsistentKV({name:'test',env,filter:['foo.>'],merge:({parent,local,remote})=>local}); await s.init();
> env = await require("@cocalc/backend/nats/env").getEnv(); a = require("@cocalc/nats/sync/eventually-consistent-kv"); s = new a.EventuallyConsistentKV({name:'test',env,filter:['foo.>'],resolve:({parent,local,remote})=>{return {...remote,...local}}}); await s.init();
*/

import { EventEmitter } from "events";
import { KV } from "./kv";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { type NatsEnv } from "@cocalc/nats/types";
import { isEqual } from "lodash";

const TOMBSTONE = Symbol("tombstone");

export class EventuallyConsistentKV extends EventEmitter {
private kv: KV;
private local: { [key: string]: any } = {};
private merge: (opts: { parent; local; remote }) => any;
private resolve: (opts: { ancestor; local; remote }) => any;
private changed: Set<string> = new Set();

constructor({
name,
env,
filter,
merge,
resolve,
options,
}: {
name: string;
env: NatsEnv;
// conflict resolution
merge: (opts: { parent; local; remote }) => any;
resolve: (opts: { ancestor; local; remote }) => any;
// filter: optionally restrict to subset of named kv store matching these subjects.
// NOTE: any key name that you *set or delete* should match one of these
filter?: string | string[];
options?;
}) {
super();
this.merge = merge;
this.resolve = resolve;
this.kv = new KV({ name, env, filter, options });
return new Proxy(this, {
set(target, prop, value) {
if (!target.kv.isValidKey(String(prop))) {
throw Error(`set: key (=${String(prop)}) must match the filter`);
}
target.set(prop, value);
return true;
},
get(target, prop) {
const x =
target[prop] ?? target.local[String(prop)] ?? target.kv.get(prop);
return x === TOMBSTONE ? undefined : x;
},
});
}

init = reuseInFlight(async () => {
this.kv.on("change", this.handleRemoteChange);
await this.kv.init();
});

private handleRemoteChange = (key, remote, ancestor) => {
const local = this.local[key];
if (local !== undefined) {
const value = this.resolve({ local, remote, ancestor });
if (isEqual(value, remote)) {
delete this.local[key];
} else {
this.local[key] = value ?? TOMBSTONE;
}
}
};

get = () => {
const x = { ...this.kv.get(), ...this.local };
for (const key in this.local) {
Expand All @@ -70,29 +71,53 @@ export class EventuallyConsistentKV extends EventEmitter {
return x;
};

delete = (key) => {
this.local[key] = TOMBSTONE;
this.changed.add(key);
};

set = (...args) => {
if (args.length == 2) {
this.local[args[0]] = args[1] ?? TOMBSTONE;
return;
this.changed.add(args[0]);
} else {
const obj = args[0];
for (const key in obj) {
this.local[key] = obj[key] ?? TOMBSTONE;
this.changed.add(key);
}
}
const obj = args[0];
for (const key in obj) {
this.local[key] = obj[key] ?? TOMBSTONE;
this.tryToSave();
};

private tryToSave = async () => {
try {
await this.save();
} catch (err) {
console.log("problem saving", err);
}
if (Object.keys(this.local).length > 0) {
setTimeout(this.tryToSave, 100);
}
};

save = async () => {
private save = reuseInFlight(async () => {
this.changed.clear();
const obj = { ...this.local };
for (const key in obj) {
if (obj[key] === TOMBSTONE) {
obj[key] = undefined;
await this.kv.delete(key);
delete obj[key];
if (!this.changed.has(key)) {
delete this.local[key];
}
}
}
await this.kv.set(obj);
for (const key in obj) {
if (obj[key] === this.local[key]) {
if (obj[key] === this.local[key] && !this.changed.has(key)) {
delete this.local[key];
}
}
};
});
}
24 changes: 12 additions & 12 deletions src/packages/nats/sync/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ export class KV extends EventEmitter {
: typeof filter == "string"
? [filter]
: filter;
return new Proxy(this, {
set(target, prop, value) {
target.setOne(prop, value);
return true;
},
get(target, prop) {
return target[prop] ?? target.all?.[String(prop)];
},
});
// return new Proxy(this, {
// set(target, prop, value) {
// target.setOne(prop, value);
// return true;
// },
// get(target, prop) {
// return target[prop] ?? target.all?.[String(prop)];
// },
// });
}

init = reuseInFlight(async () => {
Expand Down Expand Up @@ -147,8 +147,7 @@ export class KV extends EventEmitter {
private startWatch = async () => {
// watch for changes
this.watch = await this.kv.watch({
// we assume that we ONLY delete old items which are not relevant
ignoreDeletes: true,
ignoreDeletes: false,
include: "updates",
key: this.filter,
});
Expand All @@ -159,6 +158,7 @@ export class KV extends EventEmitter {
return;
}
this.revisions[key] = revision;
const prev = this.all[key];
if (value.length == 0) {
// delete
delete this.all[key];
Expand All @@ -167,7 +167,7 @@ export class KV extends EventEmitter {
this.all[key] = this.env.jc.decode(value);
this.times[key] = sm.time;
}
this.emit("change", key, this.all[key]);
this.emit("change", key, this.all[key], prev);
}
};

Expand Down

0 comments on commit f4579bc

Please sign in to comment.