Skip to content

Commit

Permalink
nats dkv -- more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 11, 2025
1 parent 664afdf commit 3e61607
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 72 deletions.
161 changes: 110 additions & 51 deletions src/packages/backend/nats/test/dkv.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { dkv as createDkv } from "@cocalc/backend/nats/sync";
import { once } from "@cocalc/util/async-utils";

import { delay } from "awaiting";

describe("create a public dkv and do basic operations", () => {
let kv;
const name = `test-${Math.random()}`;
Expand Down Expand Up @@ -118,16 +120,15 @@ describe("check server assigned times", () => {
describe("test deleting and clearing a dkv", () => {
let kv1;
let kv2;
const name = `test-${Math.random()}`;

const reset = () => {
kv1.clear();
kv2.clear();
const reset = async () => {
const name = `test-${Math.random()}`;
kv1 = await createDkv({ name }, { noCache: true });
kv2 = await createDkv({ name }, { noCache: true });
};

it("creates the dkv twice without caching so can make sure sync works", async () => {
kv1 = await createDkv({ name }, { noCache: true });
kv2 = await createDkv({ name }, { noCache: true });
await reset();
expect(kv1.get()).toEqual({});
expect(kv2.get()).toEqual({});
expect(kv1 === kv2).toBe(false);
Expand All @@ -147,85 +148,143 @@ describe("test deleting and clearing a dkv", () => {
});

it("adds an entry, clears it and confirms", async () => {
reset();
await reset();

kv1.foo = "bar";
kv1.foo10 = "bar";
await once(kv2, "change");
expect(kv2.foo).toBe(kv1.foo);
expect(kv2.foo10).toBe(kv1.foo10);
kv2.clear();
expect(kv2.has("foo")).toBe(false);
expect(kv2.has("foo10")).toBe(false);
await once(kv1, "change");
expect(kv1.has("foo")).toBe(false);
expect(kv1.has("foo10")).toBe(false);
});

it("adds an entry, syncs, adds another local entry (not sync'd), clears in sync and confirms NOT everything was cleared", async () => {
reset();
kv1.foo = Math.random();
await reset();
kv1["foo"] = Math.random();
await kv1.save();
if (kv2.foo != kv1.foo) {
if (kv2["foo"] != kv1["foo"]) {
await once(kv2, "change");
}
expect(kv2.foo).toBe(kv1.foo);
kv1.xxx = "yyy";
expect(kv2.xxx).toBe(undefined);
// this ONLY clears foo, not xxx
expect(kv2["foo"]).toBe(kv1["foo"]);
kv1["bar"] = "yyy";
expect(kv2["bar"]).toBe(undefined);
// this ONLY clears 'foo', not 'bar'
kv2.clear();
await once(kv1, "change");
expect(kv1.has("xxx")).toBe(true);
expect(kv1.has("bar")).toBe(true);
});

it("adds an entry, syncs, adds another local entry (not sync'd), clears in first one, and confirms everything was cleared", async () => {
reset();
await reset();

kv1.foo = Math.random();
const key = Math.random();
kv1[key] = Math.random();
await kv1.save();
if (kv2.foo != kv1.foo) {
if (kv2[key] != kv1[key]) {
await once(kv2, "change");
}
kv1.xxx = "yyy";
expect(kv2.xxx).toBe(undefined);
const key2 = Math.random();
kv1[key2] = "yyy";
expect(kv2[key2]).toBe(undefined);
// this ONLY clears foo, not xxx
kv1.clear();
expect(kv1.has("xxx")).toBe(false);
expect(kv1.has(key2)).toBe(false);
});
});

describe("set several items, confirm exist, save, and confirm they are still there", () => {
describe("set several items, confirm write worked, save, and confirm they are still there after save", () => {
const name = `test-${Math.random()}`;
const count = 10;
const count = 100;
// the time thresholds should be trivial for only 100 items
it(`adds ${count} entries`, async () => {
const kv = await createDkv({ name });
expect(kv.get()).toEqual({});
const obj: any = {};
const t0 = Date.now();
for (let i = 0; i < count; i++) {
obj[`${i}`] = i;
kv.set(`${i}`, i);
}
expect(Date.now() - t0).toBeLessThan(50);
expect(Object.keys(kv.get()).length).toEqual(count);
expect(kv.get()).toEqual(obj);
await kv.save();
expect(Date.now() - t0).toBeLessThan(500);
expect(Object.keys(kv.get()).length).toEqual(count);
// the local state maps should also get cleared quickly,
// but there is no event for this, so we loop:
// @ts-ignore: saved is private
while (Object.keys(kv.generalDKV.saved).length > 0) {
await delay(5);
}
// @ts-ignore: local is private
expect(kv.generalDKV.local).toEqual({});
// @ts-ignore: saved is private
expect(kv.generalDKV.saved).toEqual({});
});
});

describe("do an insert and clear test", () => {
const name = `test-${Math.random()}`;
const count = 100;
it(`adds ${count} entries, saves, clears, and confirms empty`, async () => {
const kv = await createDkv({ name });
expect(kv.get()).toEqual({});
for (let i = 0; i < count; i++) {
kv[`${i}`] = i;
}
console.log(kv.get());
expect(Object.keys(kv.get()).length).toEqual(count);
await kv.save();
console.log(kv.get());
expect(Object.keys(kv.get()).length).toEqual(count);
kv.clear();
expect(kv.get()).toEqual({});
await kv.save();
expect(kv.get()).toEqual({});
});
});

// import { delay } from "awaiting";

// describe("do a large insert and clear stress test", () => {
// const name = `test-${Math.random()}`;
// const count = 10;
// it(`adds ${count} entries, saves, clears, and confirms empty`, async () => {
// const kv = await createDkv({ name });
// expect(kv.get()).toEqual({});
// for (let i = 0; i < count; i++) {
// kv[`${i}`] = i;
// }
// console.log(kv.get());
// expect(Object.keys(kv.get()).length).toEqual(count);
// await kv.save();
// console.log(kv.get());
// expect(Object.keys(kv.get()).length).toEqual(count);
// kv.clear();
// expect(kv.get()).toEqual({});
// await kv.save();
// expect(kv.get()).toEqual({});
// });
// });
describe("create many distinct clients at once, write to all of them, and see that that results are merged", () => {
const name = `test-${Math.random()}`;
const count = 5;
const clients: any[] = [];

it(`creates the ${count} clients`, async () => {
for (let i = 0; i < count; i++) {
clients[i] = await createDkv({ name }, { noCache: true });
}
});

// what the combination should be
let combined: any = {};
it("writes a separate key/value for each client", () => {
for (let i = 0; i < count; i++) {
clients[i].set(`${i}`, i);
combined[`${i}`] = i;
expect(clients[i].get(`${i}`)).toEqual(i);
}
});

it("saves and checks that everybody has the combined values", async () => {
for (const kv of clients) {
await kv.save();
}
let done = false;
let i = 0;
while (!done && i < 50) {
done = true;
i += 1;
for (const client of clients) {
if (client.length != count) {
done = false;
await delay(10);
break;
}
}
}
for (const client of clients) {
expect(client.length).toEqual(count);
expect(client.get()).toEqual(combined);
}
});
});
5 changes: 5 additions & 0 deletions src/packages/nats/sync/dkv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ export class DKV extends EventEmitter {
}
};

get length() {
// not efficient?
return Object.keys(this.get()).length;
}

set = (key: string, value: any) => {
if (this.generalDKV == null) {
throw Error("closed");
Expand Down
40 changes: 19 additions & 21 deletions src/packages/nats/sync/general-dkv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ export type MergeFunction = (opts: {
export class GeneralDKV extends EventEmitter {
private kv?: GeneralKV;
private merge?: MergeFunction;
// local values that have NOT been saved to NATS:
private local: { [key: string]: any } = {};
// local values that HAVE been saved to NATS but may not yet be in this.kv.get()
private localSaved: { [key: string]: any } = {};
// these may have been changed locally:
private saved: { [key: string]: any } = {};
private changed: Set<string> = new Set();

constructor({
Expand Down Expand Up @@ -152,15 +149,14 @@ export class GeneralDKV extends EventEmitter {
};

private handleRemoteChange = ({ key, value: remote, prev }) => {
// local = value we have NOT saved:
const local = this.local[key] ?? this.localSaved[key];
const local = this.local[key];
let value: any = remote;
if (local !== undefined) {
if (isEqual(local, remote)) {
// we have a local change, but it's the same change as remote, so just
// forget about our local change.
delete this.local[key];
delete this.localSaved[key];
delete this.saved[key];
} else {
try {
value = this.merge?.({ key, local, remote, prev }) ?? local;
Expand All @@ -179,11 +175,10 @@ export class GeneralDKV extends EventEmitter {
if (isEqual(value, remote)) {
// no change, so forget our local value
delete this.local[key];
delete this.localSaved[key];
delete this.saved[key];
} else {
// resolve with the new value, or if it is undefined, a TOMBSTONE, meaning choice is to delete.
this.local[key] = value ?? TOMBSTONE;
delete this.localSaved[key];
}
}
}
Expand All @@ -196,26 +191,31 @@ export class GeneralDKV extends EventEmitter {
}
if (key != null) {
this.assertValidKey(key);
const local = this.local[key] ?? this.localSaved[key];
const local = this.local[key];
if (local === TOMBSTONE) {
return undefined;
}
return local ?? this.kv.get(key);
}
const x = { ...this.kv.get(), ...this.local };
for (const key in this.local) {
if ((this.local[key] ?? this.localSaved[key]) === TOMBSTONE) {
if (this.local[key] === TOMBSTONE) {
delete x[key];
}
}
return x;
};

get length() {
// not efficient
return Object.keys(this.get()).length;
}

has = (key: string): boolean => {
if (this.kv == null) {
throw Error("closed");
}
const a = this.local[key] ?? this.localSaved[key];
const a = this.local[key];
if (a === TOMBSTONE) {
return false;
}
Expand Down Expand Up @@ -267,14 +267,12 @@ export class GeneralDKV extends EventEmitter {
if (args.length == 2) {
this.assertValidKey(args[0]);
this.local[args[0]] = args[1] ?? TOMBSTONE;
delete this.localSaved[args[0]];
this.changed.add(args[0]);
} else {
const obj = args[0];
for (const key in obj) {
this.assertValidKey(key);
this.local[key] = obj[key] ?? TOMBSTONE;
delete this.localSaved[key];
this.changed.add(key);
}
}
Expand All @@ -285,11 +283,13 @@ export class GeneralDKV extends EventEmitter {
if (this.kv == null) {
return false;
}
return this.changed.size > 0 || Object.keys(this.local).length > 0;
return this.unsavedChanges().length > 0;
};

unsavedChanges = () => {
return Object.keys(this.local);
return Object.keys(this.local).filter(
(key) => this.local[key] !== this.saved[key],
);
};

save = reuseInFlight(async () => {
Expand Down Expand Up @@ -320,8 +320,7 @@ export class GeneralDKV extends EventEmitter {
await this.kv.delete(key);
delete obj[key];
if (!this.changed.has(key)) {
this.localSaved[key] = this.local[key];
delete this.local[key];
this.saved[key] = this.local[key];
}
}
}
Expand All @@ -334,14 +333,13 @@ export class GeneralDKV extends EventEmitter {
await this.kv.set(key, obj[key]);
if (obj[key] === this.local[key] && !this.changed.has(key)) {
// successfully saved this
this.localSaved[key] = this.local[key];
delete this.local[key];
this.saved[key] = this.local[key];
}
} catch (err) {
if (err.code == "REJECT" && err.key) {
const value = this.local[err.key];
delete this.local[err.key]; // can never save this.
delete this.localSaved[err.key];
delete this.saved[err.key]; // can never save this.
this.emit("reject", { key: err.key, value });
}
throw err;
Expand Down
4 changes: 4 additions & 0 deletions src/packages/nats/sync/general-kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ export class GeneralKV extends EventEmitter {
}
};

get length() {
return Object.keys(this.all ?? {}).length;
}

has = (key: string): boolean => {
return this.all?.[key] !== undefined;
};
Expand Down

0 comments on commit 3e61607

Please sign in to comment.