Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions packages/durably/src/durably.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ interface DurablyState<
leaseMs: number
leaseRenewIntervalMs: number
retainRunsMs: number | null
lastPurgeAt: number
releaseBrowserSingleton: () => void
runIdleMaintenance: () => Promise<void>
}

/**
Expand Down Expand Up @@ -918,30 +918,8 @@ function createDurablyInstance<
const workerId = options?.workerId ?? defaultWorkerId()
const now = new Date().toISOString()

await storage.releaseExpiredLeases(now)

const run = await storage.claimNext(workerId, now, state.leaseMs)
if (!run) {
// Auto-purge old terminal runs if retainRuns is configured.
// Runs after claimNext so purge never serializes with job claiming.
// lastPurgeAt starts at 0, so the first idle cycle purges immediately.
if (
state.retainRunsMs !== null &&
Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS
) {
const purgeNow = Date.now()
state.lastPurgeAt = purgeNow
const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString()
storage
.purgeRuns({ olderThan: cutoff, limit: 100 })
.catch((error) => {
eventEmitter.emit({
type: 'worker:error',
error: getErrorMessage(error),
context: 'auto-purge',
})
})
}
return false
}

Expand All @@ -957,14 +935,21 @@ function createDurablyInstance<
const maxRuns = options?.maxRuns ?? Number.POSITIVE_INFINITY
let processed = 0

let reachedIdle = false
while (processed < maxRuns) {
const didProcess = await this.processOne({ workerId })
if (!didProcess) {
reachedIdle = true
break
}
processed++
}

// Run maintenance only when actually idle, not when maxRuns was hit
if (reachedIdle) {
await state.runIdleMaintenance()
}

return processed
},

Expand Down Expand Up @@ -1057,6 +1042,30 @@ export function createDurably<
}) as typeof db.destroy
const eventEmitter = createEventEmitter()
const jobRegistry = createJobRegistry()
let lastPurgeAt = 0

const runIdleMaintenance = async (): Promise<void> => {
try {
const now = new Date().toISOString()
await storage.releaseExpiredLeases(now)

if (config.retainRunsMs !== null) {
const purgeNow = Date.now()
if (purgeNow - lastPurgeAt >= PURGE_INTERVAL_MS) {
lastPurgeAt = purgeNow
const cutoff = new Date(purgeNow - config.retainRunsMs).toISOString()
await storage.purgeRuns({ olderThan: cutoff, limit: 100 })
}
}
} catch (error) {
eventEmitter.emit({
type: 'worker:error',
error: getErrorMessage(error),
context: 'idle-maintenance',
})
}
}

let processOneImpl:
| ((options?: { workerId?: string }) => Promise<boolean>)
| null = null
Expand All @@ -1068,6 +1077,7 @@ export function createDurably<
}
return processOneImpl(runtimeOptions)
},
runIdleMaintenance,
)

const state: DurablyState<TLabels> = {
Expand All @@ -1083,8 +1093,8 @@ export function createDurably<
leaseMs: config.leaseMs,
leaseRenewIntervalMs: config.leaseRenewIntervalMs,
retainRunsMs: config.retainRunsMs,
lastPurgeAt: 0,
releaseBrowserSingleton,
runIdleMaintenance,
}

const instance = createDurablyInstance<Record<string, never>, TLabels>(
Expand Down
12 changes: 10 additions & 2 deletions packages/durably/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export interface Worker {
export function createWorker(
config: WorkerConfig,
processOne: (options?: { workerId?: string }) => Promise<boolean>,
onIdle?: () => Promise<void>,
): Worker {
let running = false
let pollingTimeout: ReturnType<typeof setTimeout> | null = null
Expand All @@ -32,9 +33,16 @@ export function createWorker(
return
}

const cycle = (async () => {
const didProcess = await processOne({ workerId: activeWorkerId })
if (!didProcess && onIdle && running) {
await onIdle()
}
})()
inFlight = cycle

try {
inFlight = processOne({ workerId: activeWorkerId }).then(() => undefined)
await inFlight
await cycle
} finally {
inFlight = null
}
Expand Down
14 changes: 7 additions & 7 deletions packages/durably/tests/shared/purge.shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ export function createPurgeTests(createDialect: () => Dialect) {

await d.migrate()

// Process the run deterministically without starting the polling loop
// Process the run deterministically without starting the polling loop.
// Use processOne (not processUntilIdle) so idle maintenance doesn't run yet,
// keeping lastPurgeAt at 0 for the purge assertion below.
const run = await d.jobs.testJob.trigger({})
await d.processOne()
expect((await d.getRun(run.id))?.status).toBe('completed')
Expand All @@ -256,12 +258,10 @@ export function createPurgeTests(createDialect: () => Dialect) {
.where('id', '=', run.id)
.execute()

// processOne returns false (no pending runs) and triggers auto-purge
// on the idle path. lastPurgeAt starts at 0 so purge fires immediately.
await d.processOne()

// Auto-purge is fire-and-forget, give it a tick to complete
await new Promise((r) => setTimeout(r, 50))
// processUntilIdle runs idle maintenance (including auto-purge) after
// draining. lastPurgeAt is still 0 (processOne doesn't run maintenance)
// so purge fires immediately.
await d.processUntilIdle()

expect(await d.getRun(run.id)).toBeNull()

Expand Down