From f4579bc880c86332f6e97bb3a77f2907b3620653 Mon Sep 17 00:00:00 2001 From: William Stein Date: Fri, 7 Feb 2025 00:44:44 +0000 Subject: [PATCH] nats kv: work in progress --- .../nats/sync/eventually-consistent-kv.ts | 79 ++++++++++++------- src/packages/nats/sync/kv.ts | 24 +++--- 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/src/packages/nats/sync/eventually-consistent-kv.ts b/src/packages/nats/sync/eventually-consistent-kv.ts index adcb606b7f..1e6252b102 100644 --- a/src/packages/nats/sync/eventually-consistent-kv.ts +++ b/src/packages/nats/sync/eventually-consistent-kv.ts @@ -6,60 +6,61 @@ DEVELOPMENT: ~/cocalc/src/packages/server$ node Welcome to Node.js v18.17.1. Type ".help" for more information. -> env = await require("@cocalc/backend/nats/env").getEnv(); a = require("@cocalc/nats/sync/eventually-consistent-kv"); s = new a.EventuallyConsistentKV({name:'test',env,filter:['foo.>'],merge:({parent,local,remote})=>local}); await s.init(); +> env = await require("@cocalc/backend/nats/env").getEnv(); a = require("@cocalc/nats/sync/eventually-consistent-kv"); s = new a.EventuallyConsistentKV({name:'test',env,filter:['foo.>'],resolve:({parent,local,remote})=>{return {...remote,...local}}}); await s.init(); */ import { EventEmitter } from "events"; import { KV } from "./kv"; import { reuseInFlight } from "@cocalc/util/reuse-in-flight"; import { type NatsEnv } from "@cocalc/nats/types"; +import { isEqual } from "lodash"; const TOMBSTONE = Symbol("tombstone"); export class EventuallyConsistentKV extends EventEmitter { private kv: KV; private local: { [key: string]: any } = {}; - private merge: (opts: { parent; local; remote }) => any; + private resolve: (opts: { ancestor; local; remote }) => any; + private changed: Set = new Set(); constructor({ name, env, filter, - merge, + resolve, options, }: { name: string; env: NatsEnv; // conflict resolution - merge: (opts: { parent; local; remote }) => any; + resolve: (opts: { ancestor; local; remote }) => any; // filter: optionally restrict to subset of named kv store matching these subjects. // NOTE: any key name that you *set or delete* should match one of these filter?: string | string[]; options?; }) { super(); - this.merge = merge; + this.resolve = resolve; this.kv = new KV({ name, env, filter, options }); - return new Proxy(this, { - set(target, prop, value) { - if (!target.kv.isValidKey(String(prop))) { - throw Error(`set: key (=${String(prop)}) must match the filter`); - } - target.set(prop, value); - return true; - }, - get(target, prop) { - const x = - target[prop] ?? target.local[String(prop)] ?? target.kv.get(prop); - return x === TOMBSTONE ? undefined : x; - }, - }); } init = reuseInFlight(async () => { + this.kv.on("change", this.handleRemoteChange); await this.kv.init(); }); + private handleRemoteChange = (key, remote, ancestor) => { + const local = this.local[key]; + if (local !== undefined) { + const value = this.resolve({ local, remote, ancestor }); + if (isEqual(value, remote)) { + delete this.local[key]; + } else { + this.local[key] = value ?? TOMBSTONE; + } + } + }; + get = () => { const x = { ...this.kv.get(), ...this.local }; for (const key in this.local) { @@ -70,29 +71,53 @@ export class EventuallyConsistentKV extends EventEmitter { return x; }; + delete = (key) => { + this.local[key] = TOMBSTONE; + this.changed.add(key); + }; + set = (...args) => { if (args.length == 2) { this.local[args[0]] = args[1] ?? TOMBSTONE; - return; + this.changed.add(args[0]); + } else { + const obj = args[0]; + for (const key in obj) { + this.local[key] = obj[key] ?? TOMBSTONE; + this.changed.add(key); + } } - const obj = args[0]; - for (const key in obj) { - this.local[key] = obj[key] ?? TOMBSTONE; + this.tryToSave(); + }; + + private tryToSave = async () => { + try { + await this.save(); + } catch (err) { + console.log("problem saving", err); + } + if (Object.keys(this.local).length > 0) { + setTimeout(this.tryToSave, 100); } }; - save = async () => { + private save = reuseInFlight(async () => { + this.changed.clear(); const obj = { ...this.local }; for (const key in obj) { if (obj[key] === TOMBSTONE) { - obj[key] = undefined; + await this.kv.delete(key); + delete obj[key]; + if (!this.changed.has(key)) { + delete this.local[key]; + } } } await this.kv.set(obj); for (const key in obj) { - if (obj[key] === this.local[key]) { + if (obj[key] === this.local[key] && !this.changed.has(key)) { delete this.local[key]; } } - }; + }); } diff --git a/src/packages/nats/sync/kv.ts b/src/packages/nats/sync/kv.ts index d83fb16a6b..78034ced10 100644 --- a/src/packages/nats/sync/kv.ts +++ b/src/packages/nats/sync/kv.ts @@ -110,15 +110,15 @@ export class KV extends EventEmitter { : typeof filter == "string" ? [filter] : filter; - return new Proxy(this, { - set(target, prop, value) { - target.setOne(prop, value); - return true; - }, - get(target, prop) { - return target[prop] ?? target.all?.[String(prop)]; - }, - }); + // return new Proxy(this, { + // set(target, prop, value) { + // target.setOne(prop, value); + // return true; + // }, + // get(target, prop) { + // return target[prop] ?? target.all?.[String(prop)]; + // }, + // }); } init = reuseInFlight(async () => { @@ -147,8 +147,7 @@ export class KV extends EventEmitter { private startWatch = async () => { // watch for changes this.watch = await this.kv.watch({ - // we assume that we ONLY delete old items which are not relevant - ignoreDeletes: true, + ignoreDeletes: false, include: "updates", key: this.filter, }); @@ -159,6 +158,7 @@ export class KV extends EventEmitter { return; } this.revisions[key] = revision; + const prev = this.all[key]; if (value.length == 0) { // delete delete this.all[key]; @@ -167,7 +167,7 @@ export class KV extends EventEmitter { this.all[key] = this.env.jc.decode(value); this.times[key] = sm.time; } - this.emit("change", key, this.all[key]); + this.emit("change", key, this.all[key], prev); } };