Skip to content

Commit 6fee8d8

Browse files
authored
feat: add recurring card scheduler (#39)
Co-authored-by: Joao Carreiro <joaocarreiro@gmail.com>
1 parent a5bfd50 commit 6fee8d8

20 files changed

Lines changed: 780 additions & 10 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## Unreleased
44

5+
- Add atomically claimed recurring card intervals with constant-time catch-up, crash-recoverable leases, scheduler/API proof, and coalescing for active or capacity-blocked runs, thanks @Jhacarreiro.
56
- Reuse `@openclaw/libterminal` for terminal protocol codecs, Worker relays, Ghostty assets, and browser lifecycle while keeping Crabfleet authorization and session policy local.
67
- Fix automated Worker deployments by converging the app Custom Domain with the DNS-scoped deployment token instead of requiring zone-route access from the Worker token.
78

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Crabfleet gives OpenClaw maintainers a fleet dashboard where every Codex crabbox
1010

1111
- **Fleet-first workflow.** Create repo-ready Crabboxes from the app, SSH, or the Go CLI and see org Codex instances grouped by person.
1212
- **Board-based workflow.** Create cards from prompts, GitHub issues, or PRs. Track them through Todo, Running, Human Review, and Done lanes.
13+
- **Recurring cards.** Give API-created cards a bounded interval schedule; due occurrences use the normal run-attempt path and coalesce while a run or capacity limit blocks dispatch.
1314
- **Issue/PR lookup.** Type `#123` in search to preview matching GitHub issues or PRs across enabled OpenClaw repos and create a card from the match.
1415
- **Codex run control.** Start durable run attempts, track heartbeats, watch the Ghostty WASM session grid, and take over only when the selected runtime advertises that capability.
1516
- **Interactive Crabboxes.** Start a standalone Codex CLI workspace for manual cloud work and attach it in the same fullscreen Ghostty grid or WebVNC.

docs/api.md

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ Every card may include:
113113
- `changes`: changed file summary; list responses omit diff patches
114114
- `run`: active run attempt, including `selectionReason` and `capabilities`
115115
- `logs`: last 80 events
116+
- `schedule`, `nextRunAt`, and `lastScheduledRunAt`: recurring cadence and persisted scheduler evidence
116117

117118
## GitHub Lookup
118119

@@ -152,7 +153,8 @@ Maintainer+. Creates a card.
152153
"repo": "openclaw/crabfleet",
153154
"source": "Prompt",
154155
"runtime": "auto",
155-
"policy": ""
156+
"policy": "",
157+
"schedule": { "kind": "interval", "everyMs": 86400000 }
156158
}
157159
```
158160

@@ -164,8 +166,27 @@ Fields:
164166
- `source`: optional `Prompt`, `Issue`, or `PR`.
165167
- `runtime`: optional `auto`, `container`, or `crabbox`.
166168
- `policy`: optional. Blank, `default`, or `repo_default` uses a valid repo workflow policy, then `open_pr`.
169+
- `schedule`: optional interval schedule. `everyMs` must be an integer from 60000 through 2678400000. Optional `startAt` is a non-negative Unix epoch millisecond timestamp. The first occurrence is the first cadence-aligned timestamp after creation unless `startAt` is in the future.
167170

168-
Invalid explicit merge policies return `400`.
171+
Invalid explicit merge policies or schedules return `400`.
172+
173+
Scheduled cards include `schedule`, `nextRunAt`, and `lastScheduledRunAt` in card responses. Due occurrences are atomically claimed and advanced. If an active run or capacity limit prevents dispatch, the occurrence is recorded as skipped and the next cadence remains aligned.
174+
175+
### POST /api/admin/scheduler/tick
176+
177+
Owner only. Runs one recurring-card scheduler tick and returns bounded counters:
178+
179+
```json
180+
{
181+
"status": "ok",
182+
"now": 1779000000000,
183+
"scanned": 1,
184+
"claimed": 1,
185+
"queued": 1,
186+
"skipped": 0,
187+
"invalid": 0
188+
}
189+
```
169190

170191
### POST /api/cards/:id/actions
171192

docs/index.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,11 @@ Owners can evaluate `CRABBOX.md` for enabled repos. Valid workflow config sets r
118118
## Status
119119

120120
Deployed and actively used by OpenClaw. See [Current Boundaries](#current-boundaries) for the remaining deliberately unimplemented product behaviors.
121+
122+
## Recurring Cards
123+
124+
Crabfleet supports recurring cards for operational jobs that should run on a cadence without a human repeatedly pressing Start. A card can include a schedule object such as:
125+
126+
{ "kind": "interval", "everyMs": 86400000 }
127+
128+
The Worker scheduled handler and owner-only `/api/admin/scheduler/tick` endpoint atomically claim due occurrences, use the existing run scheduler, and advance `nextRunAt` with constant-time catch-up. Concurrent ticks cannot dispatch the same occurrence twice. If a card already has an active run or the fleet is at capacity, that occurrence is recorded as skipped and the cadence advances instead of retrying every minute.

docs/spec.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,3 +408,15 @@ Every push to `main` runs checks, tests, builds, migrations, deploys, and endpoi
408408
- [GitHub Actions Sessions](/github-actions-sessions/)
409409
- [Native macOS Client](/macos-native-client/)
410410
- [Fleet v2 Implementation Record](/spec-v2/)
411+
412+
## Recurring cards
413+
414+
Recurring work is modeled as a card schedule, not as an infinite process loop.
415+
416+
MVP schedule shape:
417+
418+
{ "kind": "interval", "everyMs": 86400000 }
419+
420+
When `nextRunAt` is due, the scheduler claims that exact occurrence with a five-minute recovery lease, queues a normal run attempt when capacity allows, and advances to the first cadence-aligned timestamp after the tick. Catch-up is constant-time. Concurrent ticks cannot claim the same occurrence, and an interrupted claim becomes retryable after its lease expires. Active runs and capacity blocks coalesce the current occurrence instead of causing per-minute retries.
421+
422+
This allows daily operational sweeps, maintenance checks, and recurring repair jobs to remain visible in the normal card/run history.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
ALTER TABLE cards ADD COLUMN schedule_json TEXT NOT NULL DEFAULT '';
2+
ALTER TABLE cards ADD COLUMN next_run_at INTEGER;
3+
ALTER TABLE cards ADD COLUMN last_scheduled_run_at INTEGER;
4+
ALTER TABLE cards ADD COLUMN schedule_claimed_at INTEGER;
5+
6+
CREATE INDEX IF NOT EXISTS idx_cards_recurring_due
7+
ON cards(next_run_at)
8+
WHERE schedule_json != '' AND next_run_at IS NOT NULL;

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ export default {
169169
env: RuntimeEnv,
170170
context: ExecutionContext,
171171
): Promise<void> {
172-
new WorkerApplication(env).runtime.schedule(context);
172+
new WorkerApplication(env).schedule(context);
173173
},
174174
} satisfies ExportedHandler<RuntimeEnv>;
175175

src/worker/card-lifecycle-service.ts

Lines changed: 159 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,18 @@ import {
55
cardRuntimeOptions,
66
mergePolicyOptions,
77
type Card,
8+
type DueRecurringCard,
9+
type RecurringSchedulerTickResult,
810
type RunAttempt,
911
type WorkflowConfig,
1012
} from "./card-model.ts";
13+
import {
14+
cardScheduleSummary,
15+
initialRecurringRunAt,
16+
nextRecurringRunAt,
17+
normalizeCardSchedule,
18+
parseStoredCardSchedule,
19+
} from "./card-schedule.ts";
1120
import { badRequest, notFound } from "./http.ts";
1221
import type { User } from "./models.ts";
1322
import { normalizeRepo } from "./repositories.ts";
@@ -20,6 +29,7 @@ export type CardCreateInput = {
2029
source?: unknown;
2130
runtime?: unknown;
2231
policy?: unknown;
32+
schedule?: unknown;
2333
};
2434

2535
export type CardRuntimeDescriptor = {
@@ -51,6 +61,24 @@ export type CardLifecycleStore = {
5161
input: Omit<Card, "logs" | "changes" | "run"> & { actor: string; updatedAt: number },
5262
): Promise<void>;
5363
nextRunAttempt(cardId: string): Promise<number>;
64+
readDueRecurringCards(
65+
now: number,
66+
staleBefore: number,
67+
limit: number,
68+
): Promise<DueRecurringCard[]>;
69+
claimRecurringOccurrence(
70+
cardId: string,
71+
dueAt: number,
72+
claimedAt: number,
73+
staleBefore: number,
74+
): Promise<boolean>;
75+
completeRecurringOccurrence(
76+
cardId: string,
77+
dueAt: number,
78+
claimedAt: number,
79+
nextRunAt: number,
80+
): Promise<boolean>;
81+
disableRecurringSchedule(cardId: string, dueAt: number, claimedAt: number): Promise<boolean>;
5482
claimRun(input: CardRunClaimInput): Promise<"claimed" | "capacity" | "active">;
5583
heartbeatRun(runId: string, actorName: string, now: number, message: string): Promise<void>;
5684
moveCard(cardId: string, lane: string, startedAt: number | null, now: number): Promise<void>;
@@ -70,6 +98,9 @@ export type CardLifecycleServiceDependencies = {
7098
isConstraintError(error: unknown): boolean;
7199
};
72100

101+
const recurringSchedulerBatchSize = 25;
102+
const recurringSchedulerClaimLeaseMs = 5 * 60_000;
103+
73104
export class CardLifecycleService {
74105
private readonly dependencies: CardLifecycleServiceDependencies;
75106

@@ -100,6 +131,7 @@ export class CardLifecycleService {
100131
const source = oneOf(input.source, ["Prompt", "Issue", "PR"], "Prompt");
101132
const runtime = oneOf(input.runtime, cardRuntimeOptions, "auto");
102133
const policy = resolveCardPolicy(input.policy, workflowConfig);
134+
const schedule = createCardSchedule(input.schedule);
103135
const owner = user.login ?? user.email ?? user.subject;
104136
for (let attempt = 0; attempt < 3; attempt += 1) {
105137
const id = await this.dependencies.store.nextCardId();
@@ -116,6 +148,9 @@ export class CardLifecycleService {
116148
owner,
117149
startedAt: null,
118150
createdAt: now,
151+
schedule,
152+
nextRunAt: schedule ? initialRecurringRunAt(schedule, now) : null,
153+
lastScheduledRunAt: null,
119154
updatedAt: now,
120155
actor: actor(user),
121156
});
@@ -136,11 +171,11 @@ export class CardLifecycleService {
136171
if (action === "start" || action === "pulse") {
137172
const wasRunning = card.lane === "Running";
138173
if (!wasRunning) {
139-
if (!(await this.claimRunning(user, card, now))) return this.result(cardId);
174+
if ((await this.claimRunning(user, card, now)) !== "claimed") return this.result(cardId);
140175
} else if (card.run && activeRunStatuses.includes(card.run.status)) {
141176
await this.dependencies.store.heartbeatRun(card.run.id, actorName, now + 2, "heartbeat ok");
142177
return this.result(cardId);
143-
} else if (!(await this.claimRunning(user, card, now))) {
178+
} else if ((await this.claimRunning(user, card, now)) !== "claimed") {
144179
return this.result(cardId);
145180
}
146181
await this.dependencies.store.appendEvent(card.id, actorName, "heartbeat ok", now + 3);
@@ -202,8 +237,110 @@ export class CardLifecycleService {
202237
await this.dependencies.store.reconcileStalledRuns(now, threshold, actor(systemUser()));
203238
}
204239

205-
private async claimRunning(user: User, card: Card, now: number): Promise<boolean> {
240+
async runRecurringScheduler(
241+
now = this.dependencies.now(),
242+
): Promise<RecurringSchedulerTickResult> {
206243
await this.reconcileStalledRuns(now);
244+
const staleBefore = now - recurringSchedulerClaimLeaseMs;
245+
const dueCards = await this.dependencies.store.readDueRecurringCards(
246+
now,
247+
staleBefore,
248+
recurringSchedulerBatchSize,
249+
);
250+
const result: RecurringSchedulerTickResult = {
251+
status: "ok",
252+
now,
253+
scanned: dueCards.length,
254+
claimed: 0,
255+
queued: 0,
256+
skipped: 0,
257+
invalid: 0,
258+
};
259+
const system = systemUser();
260+
const actorName = actor(system);
261+
262+
for (const due of dueCards) {
263+
let schedule;
264+
let nextRunAt: number | null = null;
265+
try {
266+
schedule = parseStoredCardSchedule(due.scheduleJson);
267+
nextRunAt = schedule ? nextRecurringRunAt(schedule, due.dueAt, now) : null;
268+
} catch {
269+
schedule = null;
270+
}
271+
const claimed = await this.dependencies.store.claimRecurringOccurrence(
272+
due.id,
273+
due.dueAt,
274+
now,
275+
staleBefore,
276+
);
277+
if (!claimed) continue;
278+
result.claimed += 1;
279+
280+
if (!schedule || nextRunAt === null) {
281+
if (await this.dependencies.store.disableRecurringSchedule(due.id, due.dueAt, now)) {
282+
result.invalid += 1;
283+
await this.dependencies.store.appendEvent(
284+
due.id,
285+
actorName,
286+
"recurring schedule invalid; disabled pending maintainer review",
287+
now,
288+
);
289+
}
290+
continue;
291+
}
292+
293+
const card = await this.dependencies.store.readCard(due.id);
294+
let runResult: "claimed" | "capacity" | "active" | "failed" = "active";
295+
try {
296+
if (card) {
297+
runResult = await this.claimRunning(system, card, now, {
298+
pulseExisting: false,
299+
reconcileStalled: false,
300+
});
301+
}
302+
} catch {
303+
// A repo can be disabled after card creation. Advance this occurrence so it
304+
// cannot poison every scheduler batch until a maintainer fixes the card.
305+
runResult = "failed";
306+
}
307+
const completed = await this.dependencies.store.completeRecurringOccurrence(
308+
due.id,
309+
due.dueAt,
310+
now,
311+
nextRunAt,
312+
);
313+
if (!completed) continue;
314+
315+
if (runResult === "claimed") {
316+
result.queued += 1;
317+
await this.dependencies.store.appendEvent(
318+
due.id,
319+
actorName,
320+
`recurring schedule queued (${cardScheduleSummary(schedule)}); next ${new Date(nextRunAt).toISOString()}`,
321+
now + 4,
322+
);
323+
} else {
324+
result.skipped += 1;
325+
await this.dependencies.store.appendEvent(
326+
due.id,
327+
actorName,
328+
`recurring occurrence skipped (${runResult === "failed" ? "dispatch failed" : runResult}); next ${new Date(nextRunAt).toISOString()}`,
329+
now + 4,
330+
);
331+
}
332+
}
333+
334+
return result;
335+
}
336+
337+
private async claimRunning(
338+
user: User,
339+
card: Card,
340+
now: number,
341+
options: { pulseExisting?: boolean; reconcileStalled?: boolean } = {},
342+
): Promise<"claimed" | "capacity" | "active"> {
343+
if (options.reconcileStalled !== false) await this.reconcileStalledRuns(now);
207344
const currentCard = (await this.dependencies.store.readCard(card.id)) ?? card;
208345
await this.dependencies.requireRepo(currentCard.repo);
209346
const settings = await this.dependencies.readSettings();
@@ -213,8 +350,15 @@ export class CardLifecycleService {
213350
? currentCard.run
214351
: null;
215352
if (existingRun) {
216-
await this.dependencies.store.heartbeatRun(existingRun.id, actor(user), now, "heartbeat ok");
217-
return true;
353+
if (options.pulseExisting !== false) {
354+
await this.dependencies.store.heartbeatRun(
355+
existingRun.id,
356+
actor(user),
357+
now,
358+
"heartbeat ok",
359+
);
360+
}
361+
return "active";
218362
}
219363

220364
const workflow = await this.dependencies.ensureWorkflow(currentCard.repo, now);
@@ -234,7 +378,7 @@ export class CardLifecycleService {
234378
const message =
235379
claimed === "capacity" ? `capacity blocked at cap ${cap}` : "run already active";
236380
await this.dependencies.store.appendEvent(currentCard.id, actor(user), message, now);
237-
return false;
381+
return claimed;
238382
}
239383
await this.dependencies.store.appendEvent(
240384
currentCard.id,
@@ -248,7 +392,7 @@ export class CardLifecycleService {
248392
`runtime=${descriptor.runtime} policy=${currentCard.policy} workflow=${workflow?.status ?? "unseen"} reason=${descriptor.reason}`,
249393
now + 2,
250394
);
251-
return true;
395+
return "claimed";
252396
}
253397

254398
private async result(cardId: string): Promise<{ card: Card }> {
@@ -307,6 +451,14 @@ function stallThresholdMs(settings: Record<string, string>): number {
307451
return Number.isFinite(parsed) && parsed >= 60_000 ? parsed : 5 * 60 * 1000;
308452
}
309453

454+
function createCardSchedule(input: unknown) {
455+
try {
456+
return normalizeCardSchedule(input);
457+
} catch (error) {
458+
throw badRequest(error instanceof Error ? error.message : "invalid schedule");
459+
}
460+
}
461+
310462
function systemUser(): User {
311463
return {
312464
subject: "system:crabfleet",

0 commit comments

Comments
 (0)