Skip to content

Commit

Permalink
nats dstream: add unit tests and fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 13, 2025
1 parent 26a7221 commit 4994b84
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 15 deletions.
69 changes: 66 additions & 3 deletions src/packages/backend/nats/test/dstream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ describe("create a dstream and do some basic operations", () => {
expect(s[0]).toEqual(mesg);
});

it("verifies that unsaved changes works properly", async () => {
expect(s.hasUnsavedChanges()).toBe(true);
expect(s.unsavedChanges()).toEqual([mesg]);
await s.save();
expect(s.hasUnsavedChanges()).toBe(false);
expect(s.unsavedChanges()).toEqual([]);
});

it("confirm persistence: closes and re-opens stream and confirms message is still there", async () => {
const name = s.name;
await s.save();
Expand Down Expand Up @@ -117,13 +125,13 @@ describe("get sequence number and time of message", () => {
});

it("save and get server assigned sequence number", async () => {
await s.save();
s.save();
await once(s, "change");
const n = s.seq(0);
expect(n).toBeGreaterThan(0);
});

it("save and get server assigned time", async () => {
await s.save();
it("get server assigned time", async () => {
const t = s.time(0);
// since testing on the same machine as server, these times should be close:
expect(t.getTime() - Date.now()).toBeLessThan(5000);
Expand Down Expand Up @@ -168,4 +176,59 @@ describe("closing also saves by default, but not if autosave is off", () => {
});
});

describe("testing start_seq", () => {
const name = `test-${Math.random()}`;
let seq;
it("creates a stream and adds 3 messages, noting their assigned sequence numbers", async () => {
const s = await createDstream({ name, noAutosave: true });
s.push(1, 2, 3);
expect(s.get()).toEqual([1, 2, 3]);
// save, thus getting sequence numbers
s.save();
while (s.seq(2) == null) {
s.save();
await once(s, "change");
}
seq = [s.seq(0), s.seq(1), s.seq(2)];
// tests partly that these are integers...
const n = seq.reduce((a, b) => a + b, 0);
expect(typeof n).toBe("number");
expect(n).toBeGreaterThan(2);
await s.close();
});

let s;
it("it opens the stream but starting with the last sequence number, so only one message", async () => {
s = await createDstream({
name,
noAutosave: true,
start_seq: seq[2],
});
expect(s.length).toBe(1);
expect(s.get()).toEqual([3]);
});

it("it then pulls in the previous message, so now two messages are loaded", async () => {
await s.load({ start_seq: seq[1] });
expect(s.length).toBe(2);
expect(s.get()).toEqual([2, 3]);
});
});

describe("a little bit of a stress test", () => {
const name = `test-${Math.random()}`;
const count = 250;
let s;
it(`creates a stream and pushes ${count} messages`, async () => {
s = await createDstream({
name,
noAutosave: true,
});
for (let i = 0; i < count; i++) {
s.push({ i });
}
expect(s.length).toBe(count);
await s.save();
expect(s.length).toBe(count);
});
});
46 changes: 35 additions & 11 deletions src/packages/nats/sync/dstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export class DStream extends EventEmitter {
private messages: any[];
private raw: any[];
private noAutosave: boolean;
// TODO: using Map for these will be better because we use .length a bunch, which is O(n) instead of O(1).
private local: { [id: string]: { mesg: any; subject?: string } } = {};
private saved: { [seq: number]: any } = {};

constructor(opts: DStreamOptions) {
super();
Expand All @@ -66,8 +68,9 @@ export class DStream extends EventEmitter {
if (this.stream == null) {
throw Error("closed");
}
this.stream.on("change", (...args) => {
this.emit("change", ...args);
this.stream.on("change", (mesg, raw) => {
delete this.saved[raw.seq];
this.emit("change", mesg);
});
await this.stream.init();
this.emit("connected");
Expand Down Expand Up @@ -99,19 +102,31 @@ export class DStream extends EventEmitter {
if (n == null) {
return [
...this.messages,
...Object.values(this.saved),
...Object.values(this.local).map((x) => x.mesg),
];
} else {
return (
this.messages[n] ??
Object.values(this.local)[n - this.messages.length]?.mesg
);
if (n < this.messages.length) {
return this.messages[n];
}
const v = Object.keys(this.saved);
if (n < v.length + this.messages.length) {
return v[n - this.messages.length];
}
return Object.values(this.local)[n - this.messages.length - v.length]
?.mesg;
}
};

// sequence number of n-th message
seq = (n) => {
return this.raw[n]?.seq;
if (n < this.raw.length) {
return this.raw[n]?.seq;
}
const v = Object.keys(this.saved);
if (n < v.length + this.raw.length) {
return parseInt(v[n - this.raw.length]);
}
};

time = (n) => {
Expand All @@ -123,7 +138,11 @@ export class DStream extends EventEmitter {
};

get length() {
return this.messages.length + Object.keys(this.local).length;
return (
this.messages.length +
Object.keys(this.saved).length +
Object.keys(this.local).length
);
}

publish = (mesg, subject?: string) => {
Expand Down Expand Up @@ -151,7 +170,7 @@ export class DStream extends EventEmitter {
};

unsavedChanges = () => {
return Object.values(this.local);
return Object.values(this.local).map(({ mesg }) => mesg);
};

save = reuseInFlight(async () => {
Expand Down Expand Up @@ -180,7 +199,11 @@ export class DStream extends EventEmitter {
const { mesg, subject } = this.local[id];
try {
// @ts-ignore
await this.stream.publish(mesg, subject, { msgID: id });
const { seq } = await this.stream.publish(mesg, subject, { msgID: id });
if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {
// it still isn't in this.raw
this.saved[seq] = mesg;
}
delete this.local[id];
} catch (err) {
if (err.code == "REJECT") {
Expand All @@ -197,7 +220,8 @@ export class DStream extends EventEmitter {
await awaitMap(Object.keys(this.local), MAX_PARALLEL, f);
});

load = async (opts) => {
// load older messages starting at start_seq
load = async (opts: { start_seq: number }) => {
if (this.stream == null) {
throw Error("closed");
}
Expand Down
2 changes: 1 addition & 1 deletion src/packages/nats/sync/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ export class Stream extends EventEmitter {
this.messages.push(mesg);
this.raw.push(raw);
if (!noEmit) {
this.emit("change", mesg);
this.emit("change", mesg, raw);
}
};

Expand Down

0 comments on commit 4994b84

Please sign in to comment.