Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/openclaw/src/gateway-status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,25 @@ describe('gateway-status: createGatewayStatusManager', () => {
expect(gatewayHeartbeat).toHaveBeenCalledTimes(1); // not 2
});

it('does not renew a late activation that finishes after gateway_stop', () => {
// Simulate the bounded stale-online race: a %gateway-start poke was in
// flight, gateway_stop latched the manager stopped, then the activation
// continuation resumed late. Even if it marks activated and asks for a
// heartbeat, stopped must win so any ship-side online state expires at
// the original lease instead of being renewed forever.
manager.markStarting();
manager.stopHeartbeat();
manager.markStopped();

manager.markActivated();
manager.startHeartbeat();

vi.advanceTimersByTime(120_000);
expect(gatewayHeartbeat).not.toHaveBeenCalled();
expect(manager.stopped).toBe(true);
expect(manager.activated).toBe(true);
});

it('skips the heartbeat poke when api-client params are not published', () => {
// Clear the params the suite beforeEach published. The per-tick body
// must bail before configuring/pokeing when the shared slot is empty.
Expand Down
148 changes: 147 additions & 1 deletion packages/openclaw/src/monitor/computing-presence.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import {
} from './computing-presence.js';

const {
clearConversationPresence,
createComputingStatus,
getComputingStatusText,
serializeComputingStatus,
setConversationPresence,
} = vi.hoisted(() => ({
clearConversationPresence: vi.fn(async () => {}),
createComputingStatus: vi.fn(({ thinking, toolCalls }) => ({
thinking,
toolCalls,
Expand All @@ -24,6 +26,7 @@ const {
}));

vi.mock('@tloncorp/api', () => ({
clearConversationPresence,
createComputingStatus,
getComputingStatusText,
serializeComputingStatus,
Expand All @@ -35,7 +38,7 @@ describe('createComputingPresenceTracker', () => {
vi.clearAllMocks();
});

test('publishes presence with an empty disclose array', async () => {
test('publishes active presence with an explicit backend timeout', async () => {
const reporter = createComputingPresenceReporter();

await reporter.publish({
Expand All @@ -48,6 +51,7 @@ describe('createComputingPresenceTracker', () => {
conversationId: '~nec',
topic: 'computing',
disclose: [],
timeout: '~m1.s30',
display: {
text: 'Computing',
blob: {
Expand All @@ -56,6 +60,23 @@ describe('createComputingPresenceTracker', () => {
},
},
});
expect(clearConversationPresence).not.toHaveBeenCalled();
});

test('clears computing presence when publishing idle state', async () => {
const reporter = createComputingPresenceReporter();

await reporter.publish({
conversationId: '~nec',
thinking: false,
toolNames: [],
});

expect(clearConversationPresence).toHaveBeenCalledWith({
conversationId: '~nec',
topic: 'computing',
});
expect(setConversationPresence).not.toHaveBeenCalled();
});

test('publishes thinking state for a new run and sends thinking false when the run stops', async () => {
Expand Down Expand Up @@ -167,6 +188,48 @@ describe('createComputingPresenceTracker', () => {
expect(reporter.publish).toHaveBeenCalledTimes(1);
});

test('republishes unchanged active state after the keepalive window so ship presence does not expire', async () => {
vi.useFakeTimers();

try {
const reporter = {
publish: vi.fn(async () => {}),
};

const tracker = createComputingPresenceTracker({ reporter });

await tracker.refreshRun({
conversationId: '~nec',
runId: 'run-1',
});

expect(reporter.publish).toHaveBeenCalledTimes(1);

await vi.advanceTimersByTimeAsync(29_999);
await tracker.refreshRun({
conversationId: '~nec',
runId: 'run-1',
});

expect(reporter.publish).toHaveBeenCalledTimes(1);

await vi.advanceTimersByTimeAsync(1);
await tracker.refreshRun({
conversationId: '~nec',
runId: 'run-1',
});

expect(reporter.publish).toHaveBeenCalledTimes(2);
expect(reporter.publish).toHaveBeenLastCalledWith({
conversationId: '~nec',
thinking: true,
toolNames: [],
});
} finally {
vi.useRealTimers();
}
});

test('unions active runs in the same conversation', async () => {
const reporter = {
publish: vi.fn(async () => {}),
Expand Down Expand Up @@ -217,6 +280,89 @@ describe('createComputingPresenceTracker', () => {
expect(reporter.publish).toHaveBeenCalledTimes(4);
});

test('keepalive refresh does not resurrect a stopped run', async () => {
const reporter = {
publish: vi.fn(async () => {}),
};

const tracker = createComputingPresenceTracker({
reporter,
minUpdateIntervalMs: 0,
});

await tracker.refreshRun({
conversationId: '~nec',
runId: 'run-1',
});

await tracker.stopRun({
conversationId: '~nec',
runId: 'run-1',
});

expect(reporter.publish).toHaveBeenLastCalledWith({
conversationId: '~nec',
thinking: false,
toolNames: [],
});

await tracker.refreshRun({
conversationId: '~nec',
runId: 'run-1',
});

expect(reporter.publish).toHaveBeenCalledTimes(2);
});

test('a tool call resumes a stopped run', async () => {
const reporter = {
publish: vi.fn(async () => {}),
};

const tracker = createComputingPresenceTracker({
reporter,
minUpdateIntervalMs: 0,
});

await tracker.refreshRun({
conversationId: '~nec',
runId: 'run-1',
});

await tracker.stopRun({
conversationId: '~nec',
runId: 'run-1',
});

await tracker.addToolCall({
conversationId: '~nec',
runId: 'run-1',
toolName: 'exec',
});

expect(reporter.publish).toHaveBeenLastCalledWith({
conversationId: '~nec',
thinking: true,
toolNames: ['exec'],
});

await tracker.refreshRun({
conversationId: '~nec',
runId: 'run-1',
});

await tracker.stopRun({
conversationId: '~nec',
runId: 'run-1',
});

expect(reporter.publish).toHaveBeenLastCalledWith({
conversationId: '~nec',
thinking: false,
toolNames: [],
});
});

test('does not republish thinking false when a stopped run is already missing', async () => {
const reporter = {
publish: vi.fn(async () => {}),
Expand Down
85 changes: 83 additions & 2 deletions packages/openclaw/src/monitor/computing-presence.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {
clearConversationPresence,
createComputingStatus,
getComputingStatusText,
serializeComputingStatus,
setConversationPresence,
} from '@tloncorp/api';
import { dr, render } from '@urbit/aura';
import type { RuntimeEnv } from 'openclaw/plugin-sdk';

import { describeError } from '../urbit/errors.js';
Expand All @@ -21,6 +23,19 @@ type PublishParams = {
type PublishedState = Omit<PublishParams, 'conversationId'>;

const DEFAULT_MIN_UPDATE_INTERVAL_MS = 1_000;
const ACTIVE_PRESENCE_TIMEOUT_SECS = 90;
const ACTIVE_PRESENCE_TIMEOUT = render(
'dr',
dr.fromSeconds(BigInt(ACTIVE_PRESENCE_TIMEOUT_SECS))
);
// Active %computing entries carry an explicit 90s ship-side timeout. Re-publish
// unchanged active state well inside that window so long healthy runs stay
// visible, while disrupted gateways still age out without a final clear poke.
const DEFAULT_MAX_PUBLISH_AGE_MS = 30_000;
// Stopped runIds remembered per conversation so a late keepalive refresh
// cannot resurrect a run that was just stopped. Capped because tombstones
// only matter for the few seconds until the keepalive loop fully stops.
const STOPPED_RUN_MEMORY = 8;

export type ComputingPresenceReporter = {
publish: (params: PublishParams) => Promise<void>;
Expand All @@ -34,13 +49,22 @@ function normalizeToolName(toolName?: string | null) {
export function createComputingPresenceReporter(): ComputingPresenceReporter {
return {
publish: async ({ conversationId, thinking, toolNames }) => {
if (!thinking) {
await clearConversationPresence({
conversationId,
topic: 'computing',
});
return;
}

const toolCalls = toolNames.map((toolName) => ({ toolName }));
const status = createComputingStatus({ thinking, toolCalls });

await setConversationPresence({
conversationId,
topic: 'computing',
disclose: [],
timeout: ACTIVE_PRESENCE_TIMEOUT,
display: {
text: getComputingStatusText(status),
blob: serializeComputingStatus({ thinking, toolCalls }),
Expand All @@ -54,18 +78,57 @@ export function createComputingPresenceTracker(params?: {
reporter?: ComputingPresenceReporter;
runtime?: RuntimeEnv;
minUpdateIntervalMs?: number;
maxPublishAgeMs?: number;
}) {
const reporter = params?.reporter ?? createComputingPresenceReporter();
const runtime = params?.runtime;
const minUpdateIntervalMs = Math.max(
0,
params?.minUpdateIntervalMs ?? DEFAULT_MIN_UPDATE_INTERVAL_MS
);
const maxPublishAgeMs = Math.max(
minUpdateIntervalMs,
params?.maxPublishAgeMs ?? DEFAULT_MAX_PUBLISH_AGE_MS
);
const conversations = new Map<string, Map<string, RunState>>();
const lastPublishedState = new Map<string, PublishedState>();
const lastPublishedAt = new Map<string, number>();
const pendingState = new Map<string, PublishedState>();
const pendingTimers = new Map<string, ReturnType<typeof setTimeout>>();
const stoppedRuns = new Map<string, Set<string>>();

const markRunStopped = (conversationId: string, runId: string) => {
let stopped = stoppedRuns.get(conversationId);
if (!stopped) {
stopped = new Set();
stoppedRuns.set(conversationId, stopped);
Comment thread
latter-bolden marked this conversation as resolved.
}

stopped.delete(runId);
stopped.add(runId);
while (stopped.size > STOPPED_RUN_MEMORY) {
const oldest = stopped.values().next().value;
if (oldest === undefined) {
break;
}
stopped.delete(oldest);
}
};

const isRunStopped = (conversationId: string, runId: string) =>
stoppedRuns.get(conversationId)?.has(runId) ?? false;

const clearRunStopped = (conversationId: string, runId: string) => {
const stopped = stoppedRuns.get(conversationId);
if (!stopped) {
return;
}

stopped.delete(runId);
if (stopped.size === 0) {
stoppedRuns.delete(conversationId);
}
};

const clonePublishedState = (state: PublishedState): PublishedState => ({
thinking: state.thinking,
Expand Down Expand Up @@ -110,6 +173,13 @@ export function createComputingPresenceTracker(params?: {
conversationId,
...state,
});
if (!state.thinking) {
// idle is the terminal state; drop the records so the maps do not grow
// unboundedly across the gateway's lifetime
lastPublishedState.delete(conversationId);
lastPublishedAt.delete(conversationId);
return;
}
lastPublishedState.set(conversationId, clonePublishedState(state));
lastPublishedAt.set(conversationId, Date.now());
};
Expand All @@ -134,8 +204,12 @@ export function createComputingPresenceTracker(params?: {
state: PublishedState
) => {
if (statesEqual(lastPublishedState.get(conversationId), state)) {
clearPending(conversationId);
return;
const publishedAt = lastPublishedAt.get(conversationId) ?? 0;
if (Date.now() - publishedAt < maxPublishAgeMs) {
clearPending(conversationId);
return;
}
// fall through: re-publish before the ship-side presence expires
}

if (minUpdateIntervalMs === 0) {
Expand Down Expand Up @@ -244,6 +318,10 @@ export function createComputingPresenceTracker(params?: {

return {
refreshRun: async (params: { conversationId: string; runId: string }) => {
if (isRunStopped(params.conversationId, params.runId)) {
return;
}

await safelySync(params.conversationId, 'refresh', async () => {
ensureRun(params.conversationId, params.runId);
await syncConversation(params.conversationId);
Expand All @@ -261,6 +339,8 @@ export function createComputingPresenceTracker(params?: {
}

await safelySync(params.conversationId, 'update', async () => {
// real activity resumes a previously stopped run
clearRunStopped(params.conversationId, params.runId);
const run = ensureRun(params.conversationId, params.runId);
if (!run.toolNames.includes(toolName)) {
run.toolNames.push(toolName);
Expand All @@ -285,6 +365,7 @@ export function createComputingPresenceTracker(params?: {
},

stopRun: async (params: { conversationId: string; runId: string }) => {
markRunStopped(params.conversationId, params.runId);
await safelySync(params.conversationId, 'clear', async () => {
const runs = conversations.get(params.conversationId);
if (!runs) {
Expand Down
Loading
Loading