Skip to content

Commit

Permalink
nats: make it so our streams/kv's can have *arbitrary* names, rather …
Browse files Browse the repository at this point in the history
…than the very restricted subject segment names
  • Loading branch information
williamstein committed Feb 9, 2025
1 parent 017067e commit 58d7066
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 24 deletions.
13 changes: 9 additions & 4 deletions src/packages/backend/nats/sync.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { stream as createStream } from "@cocalc/nats/sync/stream";
import { dstream as createDstream } from "@cocalc/nats/sync/dstream";
import { kv as createKV } from "@cocalc/nats/sync/kv";
import { dkv as createDKV } from "@cocalc/nats/sync/dkv";
import { stream as createStream, type Stream } from "@cocalc/nats/sync/stream";
import {
dstream as createDstream,
type DStream,
} from "@cocalc/nats/sync/dstream";
import { kv as createKV, type KV } from "@cocalc/nats/sync/kv";
import { dkv as createDKV, type DKV } from "@cocalc/nats/sync/dkv";
import { getEnv } from "@cocalc/backend/nats/env";

export type { Stream, DStream, KV, DKV };

export async function stream(opts) {
return await createStream({ ...opts, env: await getEnv() });
}
Expand Down
16 changes: 11 additions & 5 deletions src/packages/nats/sync/dkv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { GeneralDKV, type MergeFunction } from "./general-dkv";
import { userKvKey, type KVOptions } from "./kv";
import { jsName } from "@cocalc/nats/names";
import { sha1 } from "@cocalc/util/misc";

export interface DKVOptions extends KVOptions {
merge: MergeFunction;
Expand All @@ -24,15 +25,20 @@ export interface DKVOptions extends KVOptions {
export class DKV extends EventEmitter {
generalDKV?: GeneralDKV;
name: string;
private prefix: string;

constructor({ name, account_id, project_id, merge, env }: DKVOptions) {
super();
if (env == null) {
throw Error("env must not be null");
}
// name of the jetstream key:value store.
const kvname = jsName({ account_id, project_id });
this.name = name;
this.prefix = (env.sha1 ?? sha1)(name);
this.generalDKV = new GeneralDKV({
name: kvname,
filter: `${name}.>`,
filter: `${this.prefix}.>`,
env,
merge,
});
Expand Down Expand Up @@ -81,7 +87,7 @@ export class DKV extends EventEmitter {
if (this.generalDKV == null) {
throw Error("closed");
}
this.generalDKV.delete(`${this.name}.${key}`);
this.generalDKV.delete(`${this.prefix}.${key}`);
};

get = (key?) => {
Expand All @@ -92,19 +98,19 @@ export class DKV extends EventEmitter {
const obj = this.generalDKV.get();
const x: any = {};
for (const k in obj) {
x[k.slice(this.name.length + 1)] = obj[k];
x[k.slice(this.prefix.length + 1)] = obj[k];
}
return x;
} else {
return this.generalDKV.get(`${this.name}.${key}`);
return this.generalDKV.get(`${this.prefix}.${key}`);
}
};

set = (key: string, value: any) => {
if (this.generalDKV == null) {
throw Error("closed");
}
this.generalDKV.set(`${this.name}.${key}`, value);
this.generalDKV.set(`${this.prefix}.${key}`, value);
};
}

Expand Down
8 changes: 6 additions & 2 deletions src/packages/nats/sync/dstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@ import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { delay } from "awaiting";
import { map as awaitMap } from "awaiting";
import { isNumericString } from "@cocalc/util/misc";
import { sha1 } from "@cocalc/util/misc";

const MAX_PARALLEL = 50;

export class DStream extends EventEmitter {
public readonly name: string;
private stream?: Stream;
private messages: any[];
private raw: any[];
private local: { [id: string]: { mesg: any; subject?: string } } = {};

constructor(opts: StreamOptions) {
super();
this.name = opts.name;
this.stream = new Stream(opts);
this.messages = this.stream.messages;
this.raw = this.stream.raw;
Expand Down Expand Up @@ -175,12 +178,13 @@ export const dstream = reuseInFlight(
const { account_id, project_id, name } = options;
const jsname = jsName({ account_id, project_id });
const subjects = streamSubject({ account_id, project_id });
const filter = subjects.replace(">", name);
const filter = subjects.replace(">", (options.env.sha1 ?? sha1)(name));
const key = userStreamOptionsKey(options);
if (dstreamCache[key] == null) {
const dstream = new DStream({
...options,
name: jsname,
name,
jsname,
subjects,
subject: filter,
filter,
Expand Down
13 changes: 8 additions & 5 deletions src/packages/nats/sync/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { type NatsEnv } from "@cocalc/nats/types";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { GeneralKV } from "./general-kv";
import { jsName } from "@cocalc/nats/names";
import { sha1 } from "@cocalc/util/misc";

export interface KVOptions {
name: string;
Expand All @@ -27,15 +28,17 @@ export interface KVOptions {
export class KV extends EventEmitter {
generalKV?: GeneralKV;
name: string;
private prefix: string;

constructor({ name, account_id, project_id, env }: KVOptions) {
super();
// name of the jetstream key:value store.
const kvname = jsName({ account_id, project_id });
this.name = name;
this.prefix = (env.sha1 ?? sha1)(name);
this.generalKV = new GeneralKV({
name: kvname,
filter: `${name}.>`,
filter: `${this.prefix}.>`,
env,
});
this.init();
Expand Down Expand Up @@ -83,7 +86,7 @@ export class KV extends EventEmitter {
if (this.generalKV == null) {
throw Error("closed");
}
this.generalKV.delete(`${this.name}.${key}`);
this.generalKV.delete(`${this.prefix}.${key}`);
};

get = (key?) => {
Expand All @@ -94,19 +97,19 @@ export class KV extends EventEmitter {
const obj = this.generalKV.get();
const x: any = {};
for (const k in obj) {
x[k.slice(this.name.length + 1)] = obj[k];
x[k.slice(this.prefix.length + 1)] = obj[k];
}
return x;
} else {
return this.generalKV.get(`${this.name}.${key}`);
return this.generalKV.get(`${this.prefix}.${key}`);
}
};

set = async (key: string, value: any) => {
if (this.generalKV == null) {
throw Error("closed");
}
await this.generalKV.set(`${this.name}.${key}`, value);
await this.generalKV.set(`${this.prefix}.${key}`, value);
};
}

Expand Down
24 changes: 16 additions & 8 deletions src/packages/nats/sync/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { delay } from "awaiting";
import { throttle } from "lodash";
import { isNumericString } from "@cocalc/util/misc";
import { map as awaitMap } from "awaiting";
import { sha1 } from "@cocalc/util/misc";

const MAX_PARALLEL = 50;

Expand Down Expand Up @@ -88,7 +89,10 @@ interface FilteredStreamLimitOptions {
}

export interface StreamOptions {
// what it's called by us
name: string;
// actually name of the jetstream in NATS
jsname: string;
// subject = default subject used for publishing; defaults to filter if filter doesn't have any wildcard
subjects: string | string[];
subject?: string;
Expand All @@ -102,6 +106,7 @@ export interface StreamOptions {

export class Stream extends EventEmitter {
public readonly name: string;
public readonly jsname: string;
private natsStreamOptions?;
private limits: FilteredStreamLimitOptions;
private subjects: string | string[];
Expand All @@ -119,6 +124,7 @@ export class Stream extends EventEmitter {

constructor({
name,
jsname,
env,
subject,
subjects,
Expand All @@ -132,6 +138,7 @@ export class Stream extends EventEmitter {
// create a jetstream client so we can publish to the stream
this.js = jetstream(env.nc);
this.name = name;
this.jsname = jsname;
this.natsStreamOptions = natsStreamOptions;
if (
subject == null &&
Expand Down Expand Up @@ -178,12 +185,12 @@ export class Stream extends EventEmitter {
};
try {
this.stream = await this.jsm.streams.add({
name: this.name,
name: this.jsname,
...options,
});
} catch (err) {
// probably already exists, so try to modify to have the requested properties.
this.stream = await this.jsm.streams.update(this.name, options);
this.stream = await this.jsm.streams.update(this.jsname, options);
}
this.startFetch();
});
Expand Down Expand Up @@ -251,11 +258,11 @@ export class Stream extends EventEmitter {
} else {
startOptions = {};
}
const { name } = await jsm.consumers.add(this.name, {
const { name } = await jsm.consumers.add(this.jsname, {
...options,
...startOptions,
});
return await js.consumers.get(this.name, name);
return await js.consumers.get(this.jsname, name);
};

private startFetch = async (options?) => {
Expand Down Expand Up @@ -363,12 +370,12 @@ export class Stream extends EventEmitter {
index = this.raw.length - 1;
// everything
// console.log("purge everything");
await this.jsm.streams.purge(this.name, {
await this.jsm.streams.purge(this.jsname, {
filter: this.filter,
});
} else {
const { seq } = this.raw[index + 1];
await this.jsm.streams.purge(this.name, {
await this.jsm.streams.purge(this.jsname, {
filter: this.filter,
seq,
});
Expand Down Expand Up @@ -511,12 +518,13 @@ export const stream = reuseInFlight(
const { account_id, project_id, name } = options;
const jsname = jsName({ account_id, project_id });
const subjects = streamSubject({ account_id, project_id });
const filter = subjects.replace(">", name);
const filter = subjects.replace(">", (options.env.sha1 ?? sha1)(name));
const key = userStreamOptionsKey(options);
if (streamCache[key] == null) {
const stream = new Stream({
...options,
name: jsname,
name,
jsname,
subjects,
subject: filter,
filter,
Expand Down

0 comments on commit 58d7066

Please sign in to comment.