Skip to content

Commit

Permalink
nats: new streams were hanging on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
williamstein committed Feb 9, 2025
1 parent 1f2a475 commit e583350
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 16 deletions.
8 changes: 4 additions & 4 deletions src/packages/backend/nats/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import { getEnv } from "@cocalc/backend/nats/env";
export type { Stream, DStream, KV, DKV };

export async function stream(opts) {
return await createStream({ ...opts, env: await getEnv() });
return await createStream({ env: await getEnv(), ...opts });
}

export async function dstream(opts) {
return await createDstream({ ...opts, env: await getEnv() });
return await createDstream({ env: await getEnv(), ...opts });
}

export async function kv(opts) {
return await createKV({ ...opts, env: await getEnv() });
return await createKV({ env: await getEnv(), ...opts });
}

export async function dkv(opts) {
return await createDKV({ ...opts, env: await getEnv() });
return await createDKV({ env: await getEnv(), ...opts });
}
3 changes: 3 additions & 0 deletions src/packages/nats/sync/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ export class Stream extends EventEmitter {
// First we get info so we know how many messages
// are already in the stream:
const info = await consumer.info();
if (info.num_pending == 0) {
return consumer;
}
const fetch = await consumer.fetch();
this.watch = fetch;
let i = 0;
Expand Down
13 changes: 8 additions & 5 deletions src/packages/project/nats/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
/*
How to do development (so in a dev project doing cc-in-cc dev).
0. From the browser, terminate this api server running in the project already, if any
0. From the browser, terminate this api server running in the project:
await cc.client.nats_client.projectApi({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'}).system.terminate({service:'api'})
> await cc.client.nats_client.projectApi({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'}).system.terminate({service:'api'})
{status: 'terminated', service: 'api'}
1. Open a terminal in the project itself, which sets up the required environment variables, e.g.,
- COCALC_NATS_JWT -- this has the valid JWT issued to grant the project rights to use nats
- COCALC_PROJECT_ID
You can type the following into the miniterminal in a project and copy the output into a terminal here to
setup the same environment and make starting this server act like this part of a project.
You can type the following into the miniterminal in a project and copy the output into
a terminal here to setup the same environment and make starting this server act like
this part of a project.
export | grep -E "COCALC|HOME"
Expand All @@ -21,7 +24,7 @@ setup the same environment and make starting this server act like this part of a
or just run node and paste
require("@cocalc/project/nats/api").init()
require("@cocalc/project/nats/api/index").init()
if you want to easily be able to grab some state, e.g., global.x = {...} in some code.
Expand Down
9 changes: 5 additions & 4 deletions src/packages/project/nats/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ import {
import { kv as createKV, type KV } from "@cocalc/nats/sync/kv";
import { dkv as createDKV, type DKV } from "@cocalc/nats/sync/dkv";
import { getEnv } from "./env";
import { project_id } from "@cocalc/project/data";

export type { Stream, DStream, KV, DKV };

export async function stream(opts) {
return await createStream({ ...opts, env: await getEnv() });
return await createStream({ project_id, env: await getEnv(), ...opts });
}

export async function dstream(opts) {
return await createDstream({ ...opts, env: await getEnv() });
return await createDstream({ project_id, env: await getEnv(), ...opts });
}

export async function kv(opts) {
return await createKV({ ...opts, env: await getEnv() });
return await createKV({ project_id, env: await getEnv(), ...opts });
}

export async function dkv(opts) {
return await createDKV({ ...opts, env: await getEnv() });
return await createDKV({ project_id, env: await getEnv(), ...opts });
}
7 changes: 4 additions & 3 deletions src/packages/project/nats/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { JSONCodec } from "nats";
import { getLogger } from "@cocalc/project/logger";
import { readlink, realpath } from "node:fs/promises";
import { dstream, type DStream } from "@cocalc/project/nats/sync";
import { project_id } from "@cocalc/project/data";
import { getSubject } from "./names";
import getConnection from "./connection";

Expand Down Expand Up @@ -141,7 +140,7 @@ class Session {
};

createStream = async () => {
this.stream = await dstream({ name: this.streamName, project_id });
this.stream = await dstream({ name: this.streamName });
};

init = async () => {
Expand All @@ -160,15 +159,17 @@ class Session {
args.push(path_split(initFilename).tail);
}
const cwd = getCWD(head, this.options.cwd);
logger.debug("creating pty with size", this.size);
logger.debug("creating pty");
this.pty = spawn(command, args, {
cwd,
env,
rows: this.size?.rows,
cols: this.size?.cols,
});
this.state = "running";
logger.debug("creating stream");
await this.createStream();
logger.debug("connect stream to pty");
this.pty.onData((data) => {
this.handleBackendMessages(data);
this.stream.publish({ data });
Expand Down

0 comments on commit e583350

Please sign in to comment.