Skip to content

Commit

Permalink
nats: switching to new stream for terminal -- work in progress that d…
Browse files Browse the repository at this point in the history
…oesn't work yet
  • Loading branch information
williamstein committed Feb 9, 2025
1 parent fa1aff2 commit 34f1457
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ export class Terminal<T extends CodeEditorState = CodeEditorState> {
}
}

close(): void {
close = (): void => {
this.assert_not_closed();
this.set_connection_status("disconnected");
this.state = "closed";
Expand All @@ -233,7 +233,7 @@ export class Terminal<T extends CodeEditorState = CodeEditorState> {
}
close(this);
this.state = "closed";
}
};

private disconnect(): void {
if (this.conn === undefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { JSONCodec } from "nats.ws";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { uuid } from "@cocalc/util/misc";
import { delay } from "awaiting";
import { projectStreamName, projectSubject } from "@cocalc/nats/names";
import { type DStream } from "@cocalc/nats/sync/dstream";
import { projectSubject } from "@cocalc/nats/names";

const jc = JSONCodec();
const client = uuid();
Expand All @@ -13,10 +14,9 @@ export class NatsTerminalConnection extends EventEmitter {
private project_id: string;
//private compute_server_id: number;
private path: string;
private subject: string;
private cmd_subject: string;
private state: null | "running" | "init" | "closed";
private consumer?;
private stream?: DStream;
// keep = optional number of messages to retain between clients/sessions/view, i.e.,
// "amount of history". This is global to all terminals in the project.
private keep?: number;
Expand Down Expand Up @@ -51,12 +51,6 @@ export class NatsTerminalConnection extends EventEmitter {
this.openPaths = openPaths;
this.closePaths = closePaths;
this.project = webapp_client.nats_client.projectApi({ project_id });
this.subject = projectSubject({
project_id,
compute_server_id,
service: "terminal",
path,
});
this.cmd_subject = projectSubject({
project_id,
compute_server_id,
Expand Down Expand Up @@ -109,7 +103,9 @@ export class NatsTerminalConnection extends EventEmitter {
};

end = () => {
// todo
this.stream?.close();
delete this.stream;
// todo -- anything else?
this.state = "closed";
};

Expand All @@ -118,42 +114,23 @@ export class NatsTerminalConnection extends EventEmitter {
await this.project.terminal.create({ path: this.path });
});

private getConsumer = async () => {
private getStream = async () => {
// TODO: idempotent, but move to project
const { nats_client } = webapp_client;
const streamName = projectStreamName({
return await nats_client.dstream({
name: `terminal-${this.path}`,
project_id: this.project_id,
service: "terminal",
});
const nc = await nats_client.getConnection();
const js = nats_client.jetstream.jetstream(nc);
// consumer doesn't exist, so setup everything.
const jsm = await nats_client.jetstream.jetstreamManager(nc);
// making an ephemeral consumer for just one subject (e.g., this terminal frame)
const { name } = await jsm.consumers.add(streamName, {
filter_subject: this.subject,
});
return await js.consumers.get(streamName, name);
};

init = async () => {
this.state = "init";
await this.start();
this.consumer = await this.getConsumer();
this.stream = await this.getStream();
this.consumeDataStream();
this.subscribeToCommands();
};

private handle = (mesg) => {
if (this.state == "closed") {
return true;
}
const x = jc.decode(mesg.data) as any;
if (x?.data != null) {
this.emit("data", x?.data);
}
};

private subscribeToCommands = async () => {
const nc = await webapp_client.nats_client.getConnection();
const sub = nc.subscribe(this.cmd_subject);
Expand Down Expand Up @@ -184,33 +161,22 @@ export class NatsTerminalConnection extends EventEmitter {
}
};

private consumeDataStream = async () => {
if (this.consumer == null) {
private handleStreamMessage = (mesg) => {
const data = mesg?.data;
if (data) {
this.emit("data", data);
}
};

private consumeDataStream = () => {
if (this.stream == null) {
return;
}
const messages = await this.consumer.fetch({
max_messages: 100000, // should only be a few hundred in practice
expires: 1000,
});
for await (const mesg of messages) {
if (this.handle(mesg)) {
return;
}
if (mesg.info.pending == 0) {
// no further messages pending, so switch to consuming below
// TODO: I don't know if there is some chance to miss a message?
// This is a *terminal* so purely visual so not too critical.
break;
}
for (const mesg of this.stream.get()) {
this.handleStreamMessage(mesg);
}

this.setReady();

for await (const mesg of await this.consumer.consume()) {
if (this.handle(mesg)) {
return;
}
}
this.stream.on("change", this.handleStreamMessage);
};

private setReady = async () => {
Expand Down
25 changes: 20 additions & 5 deletions src/packages/nats/sync/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ export class Stream extends EventEmitter {
// probably already exists, so try to modify to have the requested properties.
this.stream = await this.jsm.streams.update(this.jsname, options);
}
this.startFetch();
const consumer = await this.fetchInitialData();
if (this.stream == null) {
// closed *during* initial load
return;
}
this.watchForNewData(consumer);
});

get = (n?) => {
Expand Down Expand Up @@ -265,7 +270,7 @@ export class Stream extends EventEmitter {
return await js.consumers.get(this.jsname, name);
};

private startFetch = async (options?) => {
private fetchInitialData = async (options?) => {
const consumer = await this.getConsumer(options);
// This goes in two stages:
// STAGE 1: Get what is in the stream now.
Expand All @@ -284,6 +289,10 @@ export class Stream extends EventEmitter {
break;
}
}
return consumer;
};

private watchForNewData = async (consumer) => {
if (this.stream == null) {
// closed *during* initial load
return;
Expand Down Expand Up @@ -319,12 +328,18 @@ export class Stream extends EventEmitter {
this.watch.stop(); // stop current watch
// make new one:
const start_seq = this.raw[this.raw.length - 1]?.seq + 1;
this.startFetch({ start_seq });
return; // because startFetch creates a new consumer monitor loop
const consumer = await this.fetchInitialData({ start_seq });
if (this.stream == null) {
// closed
return;
}
this.watchForNewData(consumer);

return; // because watchForNewData creates a new consumer monitor loop
}
}
await delay(CONSUMER_MONITOR_INTERVAL);
}
await delay(CONSUMER_MONITOR_INTERVAL);
};

private decode = (raw) => {
Expand Down
26 changes: 26 additions & 0 deletions src/packages/project/nats/sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { stream as createStream, type Stream } from "@cocalc/nats/sync/stream";
import {
dstream as createDstream,
type DStream,
} from "@cocalc/nats/sync/dstream";
import { kv as createKV, type KV } from "@cocalc/nats/sync/kv";
import { dkv as createDKV, type DKV } from "@cocalc/nats/sync/dkv";
import { getEnv } from "./env";

export type { Stream, DStream, KV, DKV };

export async function stream(opts) {
return await createStream({ ...opts, env: await getEnv() });
}

export async function dstream(opts) {
return await createDstream({ ...opts, env: await getEnv() });
}

export async function kv(opts) {
return await createKV({ ...opts, env: await getEnv() });
}

export async function dkv(opts) {
return await createDKV({ ...opts, env: await getEnv() });
}
53 changes: 12 additions & 41 deletions src/packages/project/nats/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ import { console_init_filename, len } from "@cocalc/util/misc";
import { exists } from "@cocalc/backend/misc/async-utils-node";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { JSONCodec } from "nats";
import { jetstreamManager } from "@nats-io/jetstream";
import { getLogger } from "@cocalc/project/logger";
import { readlink, realpath } from "node:fs/promises";
import { dstream, type DStream } from "@cocalc/project/nats/sync";
import { project_id } from "@cocalc/project/data";
import { getSubject } from "./names";
import getConnection from "./connection";
import { getSubject, getStreamName } from "./names";

const logger = getLogger("server:nats:terminal");

const DEFAULT_KEEP = 300;
const MIN_KEEP = 5;
const MAX_KEEP = 2000;
const EXIT_MESSAGE = "\r\n\r\n[Process completed - press any key]\r\n\r\n";
const DEFAULT_COMMAND = "/bin/bash";
const INFINITY = 999999;
Expand Down Expand Up @@ -86,7 +84,6 @@ export async function terminalCommand({ path, cmd, ...args }) {
}

class Session {
private nc;
private path: string;
private options;
private pty?;
Expand All @@ -95,21 +92,17 @@ class Session {
public subject: string;
private cmd_subject: string;
private state: "running" | "off" = "off";
private stream: DStream;
private streamName: string;
private keep: number;
private nc;

constructor({ path, options, nc }) {
logger.debug("create session ", { path, options });
this.nc = nc;
this.path = path;
this.options = options;
this.keep = Math.max(
MIN_KEEP,
Math.min(this.options.keep ?? DEFAULT_KEEP, MAX_KEEP),
);
this.subject = getSubject({ service: "terminal", path });
this.cmd_subject = getSubject({ service: "terminal-cmd", path });
this.streamName = getStreamName({ service: "terminal" });
this.streamName = `terminal-${path}`;
this.nc = nc;
}

write = async (data) => {
Expand All @@ -121,6 +114,7 @@ class Session {

restart = async () => {
this.pty?.destroy();
this.stream?.close();
delete this.pty;
await this.init();
};
Expand All @@ -147,24 +141,7 @@ class Session {
};

createStream = async () => {
// idempotent so don't have to check if there is already a stream
const nc = this.nc;
const jsm = await jetstreamManager(nc);
try {
await jsm.streams.add({
name: this.streamName,
subjects: [getSubject({ service: "terminal" }) + ".>"],
compression: "s2",
max_msgs_per_subject: this.keep,
});
} catch (_err) {
// probably already exists
await jsm.streams.update(this.streamName, {
subjects: [getSubject({ service: "terminal" }) + ".>"],
compression: "s2" as any,
max_msgs_per_subject: this.keep,
});
}
this.stream = await dstream({ name: this.streamName, project_id });
};

init = async () => {
Expand Down Expand Up @@ -194,19 +171,15 @@ class Session {
await this.createStream();
this.pty.onData((data) => {
this.handleBackendMessages(data);
this.publish({ data });
this.stream.publish({ data });
});
this.pty.onExit((status) => {
this.publish({ data: EXIT_MESSAGE });
this.publish({ ...status, exit: true });
this.stream.publish({ data: EXIT_MESSAGE });
this.stream.publish({ ...status, exit: true });
this.state = "off";
});
};

private publish = (mesg) => {
this.nc.publish(this.subject, jc.encode(mesg));
};

private publishCommand = (mesg) => {
this.nc.publish(this.cmd_subject, jc.encode(mesg));
};
Expand Down Expand Up @@ -319,13 +292,11 @@ class Session {
if (i == -1) {
// continue to wait... unless too long
if (this.backendMessagesBuffer.length > 10000) {
console.log("huge reset");
this.resetBackendMessagesBuffer();
}
return;
}
const s = this.backendMessagesBuffer.slice(5, i);
console.log("endup up with ", { s });
this.resetBackendMessagesBuffer();
logger.debug(
`handle_backend_message: parsing JSON payload ${JSON.stringify(s)}`,
Expand Down

0 comments on commit 34f1457

Please sign in to comment.