Skip to content

Commit

Permalink
Merge pull request #91 from oliver-oloughlin/patch/catch-cron-errors
Browse files Browse the repository at this point in the history
catch errors that occur within exitOn and setInterval callbacks + upd…
  • Loading branch information
oliver-oloughlin authored Oct 10, 2023
2 parents 4597cf1 + 69091c1 commit 1796fe6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -704,11 +704,11 @@ db.listenQueue(async (data) => {
### cron()

Create a cron job that will run on interval, either indefinitely or until an
exit condition is met. Interval defaults to 1 second if not set. Like with queue
listeners, there can be multiple cron jobs defined.
exit condition is met. Interval defaults to 1 hour if not set. Like with queue
listeners, multiple cron jobs can be created.

```ts
// Will repeat indefinitely with 1 second interval
// Will repeat indefinitely with 1 hour interval
db.cron(() => console.log("Hello World!"))

// First job starts with a 10 second delay, after that there is a 5 second delay between jobs
Expand All @@ -722,8 +722,8 @@ db.cron(() => console.log("I terminate after running 10 times"), {
// If this is set it will override the fixed interval
setInterval: ({ count }) => count * 500

// Count starts at 0 and is given before the current job is run
exit: ({ count }) => count === 10,
// Count starts at 0, exitOn is run before the current job
exitOn: ({ count }) => count === 10,
})
```

Expand Down
32 changes: 28 additions & 4 deletions src/kvdex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,17 @@ export function kvdex<const T extends SchemaDefinition>(

// Add queue listener
kv.listenQueue(async (msg) => {
// Parse queue message
const parsed = parseQueueMessage(msg)
if (!parsed.ok) {
return
}

// Find correct queue handlers
const { __data__, __handlerId__ } = parsed.msg
const handlers = queueHandlers.get(__handlerId__)

// Run queue handlers
await allFulfilled(handlers?.map((handler) => handler(__data__)) ?? [])
})
}
Expand Down Expand Up @@ -343,6 +346,7 @@ export class KvDex<const T extends Schema<SchemaDefinition>> {
*
* @param job - Work that will be run for each job interval.
* @param options - Cron options.
* @returns Cron job ID.
*/
async cron(
job: (msg: CronMessage) => unknown,
Expand Down Expand Up @@ -379,16 +383,33 @@ export class KvDex<const T extends Schema<SchemaDefinition>> {
// Add cron job listener
this.listenQueue<CronMessage>(async (msg) => {
// Check if exit criteria is met, end repeating cron job if true
const exit = await options?.exitOn?.(msg) ?? false
let exit = false
try {
exit = await options?.exitOn?.(msg) ?? false
} catch (e) {
console.error(
`An error was caught while running exitOn callback for cron job {ID = ${id}}`,
e,
)
}

if (exit) {
await options?.onExit?.(msg)
return
}

// Set the next interval
const interval = options?.setInterval
? await options?.setInterval!(msg)
: options?.interval ?? DEFAULT_CRON_INTERVAL
let interval = DEFAULT_CRON_INTERVAL
try {
interval = options?.setInterval
? await options?.setInterval!(msg)
: options?.interval ?? DEFAULT_CRON_INTERVAL
} catch (e) {
console.error(
`An error was caught while running setInterval callback for cron job {ID = ${id}}`,
e,
)
}

await allFulfilled([
// Enqueue next job
Expand All @@ -411,6 +432,9 @@ export class KvDex<const T extends Schema<SchemaDefinition>> {
isFirstJob: true,
enqueueTimestamp: new Date(),
}, options?.startDelay)

// Return the cron job id
return id
}
}

Expand Down

0 comments on commit 1796fe6

Please sign in to comment.