Skip to content

Commit

Permalink
nats: add better time support to kv and stream
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 9, 2025
1 parent fa07410 commit df98481
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 8 deletions.
10 changes: 10 additions & 0 deletions src/packages/nats/sync/dkv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ export class DKV extends EventEmitter {
this.generalDKV.delete(`${this.prefix}.${this.sha1(key)}`);
};

// server assigned time
time = (key?: string) => {
if (this.generalDKV == null) {
throw Error("closed");
}
return this.generalDKV.time(
key ? `${this.prefix}.${this.sha1(key)}` : undefined,
);
};

get = (key?) => {
if (this.generalDKV == null) {
throw Error("closed");
Expand Down
9 changes: 9 additions & 0 deletions src/packages/nats/sync/dstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { delay } from "awaiting";
import { map as awaitMap } from "awaiting";
import { isNumericString } from "@cocalc/util/misc";
import { sha1 } from "@cocalc/util/misc";
import { millis } from "@cocalc/nats/util";

const MAX_PARALLEL = 50;

Expand Down Expand Up @@ -101,6 +102,14 @@ export class DStream extends EventEmitter {
return this.raw[n]?.seq;
};

time = (n) => {
const r = this.raw[n];
if (r == null) {
return;
}
return new Date(millis(r?.info.timestampNanos));
};

get length() {
return this.messages.length + Object.keys(this.local).length;
}
Expand Down
7 changes: 7 additions & 0 deletions src/packages/nats/sync/general-dkv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ export class GeneralDKV extends EventEmitter {
return x;
};

time = (key?: string) => {
if (this.kv == null) {
throw Error("closed");
}
return this.kv.time(key);
};

private assertValidKey = (key) => {
if (this.kv == null) {
throw Error("closed");
Expand Down
2 changes: 1 addition & 1 deletion src/packages/nats/sync/general-kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ export class GeneralKV extends EventEmitter {
}
};

time = (key?) => {
time = (key?: string) => {
if (key == null) {
return this.times;
} else {
Expand Down
10 changes: 10 additions & 0 deletions src/packages/nats/sync/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ export class KV extends EventEmitter {
await this.generalKV.clear();
};

// server assigned time
time = (key?: string) => {
if (this.generalKV == null) {
throw Error("closed");
}
return this.generalKV.time(
key ? `${this.prefix}.${this.sha1(key)}` : undefined,
);
};

get = (key?) => {
if (this.generalKV == null) {
throw Error("closed");
Expand Down
61 changes: 61 additions & 0 deletions src/packages/nats/sync/open-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,64 @@ export class OpenFiles {
};
};
}

import { dkv, type DKV } from "@cocalc/nats/sync/dkv";
export class OpenFiles2 {
private project_id: string;
private env: NatsEnv;
private dkv?: DKV;

constructor({ env, project_id }: { env: NatsEnv; project_id: string }) {
this.env;
this.project_id = project_id;
}

init = async () => {
this.dkv = await dkv({
name: "open-files",
project_id: this.project_id,
env: this.env,
});
};

close = () => {
if (this.dkv == null) {
return;
}
this.dkv.close();
delete this.dkv;
};

// When a client has a file open, they should periodically
// touch it to indicate that it is open.
// updates timestamp and ensures open=true.
// do we need compute server?
// touch = async ({ path }: { path: string }) => {
// const { dkv } = this;
// if (dkv == null) {
// throw Error("closed");
// }
// cur = dkv.get(path);
// const newValue = { ...cur, path };

// // just read and write it back, which updates the timestamp
// // no encode/decode needed.
// const obj = { ...validObject(obj0), open: true };
// const key = this.getKey(obj);
// const kv = await this.getKv();
// const mesg = await kv.get(key);
// if (mesg == null || mesg.sm.data.length == 0) {
// // no current entry -- create new
// await this.set(obj);
// } else {
// const cur = this.decode(mesg, true);
// const newValue = { ...cur, ...obj };
// if (!isEqual(cur, newValue)) {
// await this.set(newValue);
// } else {
// // update existing by just rewriting it back; this updates timestamp too
// await kv.put(key, mesg.sm.data);
// }
// }
// };
}
13 changes: 11 additions & 2 deletions src/packages/nats/sync/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import { type NatsEnv } from "@cocalc/nats/types";
import { jetstreamManager, jetstream } from "@nats-io/jetstream";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { jsName, streamSubject } from "@cocalc/nats/names";
import { nanos, type Nanos } from "@cocalc/nats/util";
import { nanos, type Nanos, millis } from "@cocalc/nats/util";
import { delay } from "awaiting";
import { throttle } from "lodash";
import { isNumericString } from "@cocalc/util/misc";
Expand Down Expand Up @@ -215,11 +215,20 @@ export class Stream extends EventEmitter {
}
};

// get sequence number of n-th message in stream
// get server assigned global sequence number of n-th message in stream
seq = (n) => {
return this.raw[n]?.seq;
};

// get server assigned time of n-th message in stream
time = (n): Date | undefined => {
const r = this.raw[n];
if (r == null) {
return;
}
return new Date(millis(r?.info.timestampNanos));
};

get length() {
return this.messages.length;
}
Expand Down
3 changes: 2 additions & 1 deletion src/packages/project/nats/open-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ DEVELOPMENT:
await cc.client.nats_client.projectApi({project_id:'81e0c408-ac65-4114-bad5-5f4b6539bd0e'}).system.terminate({service:'open-files'})
Set env variables as in a project, then:
Set env variables as in a project (see api/index.ts ), then:
> require("@cocalc/project/nats/open-files").init()
*/

import { OpenFiles, Entry } from "@cocalc/nats/sync/open-files";
Expand Down
8 changes: 4 additions & 4 deletions src/packages/project/nats/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import { project_id } from "@cocalc/project/data";

export type { Stream, DStream, KV, DKV };

export async function stream(opts) {
export async function stream(opts): Promise<Stream> {
return await createStream({ project_id, env: await getEnv(), ...opts });
}

export async function dstream(opts) {
export async function dstream(opts): Promise<DStream> {
return await createDstream({ project_id, env: await getEnv(), ...opts });
}

export async function kv(opts) {
export async function kv(opts): Promise<KV> {
return await createKV({ project_id, env: await getEnv(), ...opts });
}

export async function dkv(opts) {
export async function dkv(opts): Promise<DKV> {
return await createDKV({ project_id, env: await getEnv(), ...opts });
}

0 comments on commit df98481

Please sign in to comment.