Skip to content

Commit

Permalink
change polling interval config name
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Jul 23, 2024
1 parent 5659faf commit fbdb9ad
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 34 deletions.
12 changes: 4 additions & 8 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Emitted at most once every 2 seconds when workers are active and jobs are enteri
{
id: 'fc738fb0-1de5-4947-b138-40d6a790749e',
name: 'my-queue',
options: { newJobCheckInterval: 2000 },
options: { pollingInterval: 2000 },
state: 'active',
count: 1,
createdOn: 1620149137015,
Expand Down Expand Up @@ -792,13 +792,9 @@ The default concurrency for `work()` is 1 job every 2 seconds. Both the interval

How often workers will poll the queue table for jobs. Available in the constructor as a default or per worker in `work()`.

* **newJobCheckInterval**, int
* **pollingIntervalSeconds**, int

Interval to check for new jobs in milliseconds, must be >=100

* **newJobCheckIntervalSeconds**, int

Interval to check for new jobs in seconds, must be >=1
Interval to check for new jobs in seconds, must be >=0.5 (500ms)

* Default: 2 seconds

Expand Down Expand Up @@ -964,7 +960,7 @@ The promise will resolve on a successful failure state assignment, or reject if
## `notifyWorker(id)`

Notifies a worker by id to bypass the job polling interval (see `newJobCheckInterval`) for this iteration in the loop.
Notifies a worker by id to bypass the job polling interval (see `pollingIntervalSeconds`) for this iteration in the loop.

## `getQueueSize(name [, options])`

Expand Down
21 changes: 8 additions & 13 deletions src/attorney.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ function checkWorkArgs (name, args, defaults) {

options = { ...options }

applyNewJobCheckInterval(options, defaults)
applyPollingInterval(options, defaults)

assert(!('teamConcurrency' in options) ||
(Number.isInteger(options.teamConcurrency) && options.teamConcurrency >= 1 && options.teamConcurrency <= 1000),
Expand Down Expand Up @@ -163,7 +163,7 @@ function getConfig (value) {
applyDeleteConfig(config)
applyMonitoringConfig(config)

applyNewJobCheckInterval(config)
applyPollingInterval(config)
applyExpirationConfig(config)
applyRetentionConfig(config)

Expand Down Expand Up @@ -279,18 +279,13 @@ function applyRetryConfig (config, defaults) {
config.retryBackoffDefault = defaults?.retryBackoff
}

function applyNewJobCheckInterval (config, defaults) {
assert(!('newJobCheckInterval' in config) || config.newJobCheckInterval >= 500,
'configuration assert: newJobCheckInterval must be at least every 500ms')
function applyPollingInterval (config, defaults) {
assert(!('pollingIntervalSeconds' in config) || config.pollingIntervalSeconds >= 0.5,
'configuration assert: pollingIntervalSeconds must be at least every 500ms')

assert(!('newJobCheckIntervalSeconds' in config) || config.newJobCheckIntervalSeconds >= 1,
'configuration assert: newJobCheckIntervalSeconds must be at least every second')

config.newJobCheckInterval = ('newJobCheckIntervalSeconds' in config)
? config.newJobCheckIntervalSeconds * 1000
: ('newJobCheckInterval' in config)
? config.newJobCheckInterval
: defaults?.newJobCheckInterval || 2000
config.pollingInterval = ('pollingIntervalSeconds' in config)
? config.pollingIntervalSeconds * 1000
: defaults?.pollingInterval || 2000
}

function applyMaintenanceConfig (config) {
Expand Down
2 changes: 1 addition & 1 deletion src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class Manager extends EventEmitter {
}

const {
newJobCheckInterval: interval = this.config.newJobCheckInterval,
pollingInterval: interval = this.config.pollingInterval,
batchSize,
teamSize = 1,
teamConcurrency = 1,
Expand Down
2 changes: 1 addition & 1 deletion src/timekeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Timekeeper extends EventEmitter {
} catch {}

const options = {
newJobCheckIntervalSeconds: this.config.cronWorkerIntervalSeconds,
pollingIntervalSeconds: this.config.cronWorkerIntervalSeconds,
teamSize: 50,
teamConcurrency: 5
}
Expand Down
2 changes: 1 addition & 1 deletion test/failureTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ describe('failure', function () {
const jobId = await boss.send(queue)
const message = 'mhmm'

await boss.work(queue, { newJobCheckInterval: 500 }, async () => {
await boss.work(queue, { pollingIntervalSeconds: 0.5 }, async () => {
const err = { message }
err.myself = err
throw err
Expand Down
2 changes: 1 addition & 1 deletion test/retryTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ describe('retries', function () {
let processCount = 0
const retryLimit = 5

await boss.work(queue, { newJobCheckInterval: 500 }, async () => {
await boss.work(queue, { pollingIntervalSeconds: 0.5 }, async () => {
++processCount
throw new Error('retry')
})
Expand Down
14 changes: 7 additions & 7 deletions test/workTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('work', function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema

const newJobCheckIntervalSeconds = 1
const pollingIntervalSeconds = 1
const timeout = 5000
let processCount = 0
const jobCount = 10
Expand All @@ -60,11 +60,11 @@ describe('work', function () {
await boss.send(queue)
}

await boss.work(queue, { newJobCheckIntervalSeconds }, () => processCount++)
await boss.work(queue, { pollingIntervalSeconds }, () => processCount++)

await delay(timeout)

assert.strictEqual(processCount, timeout / 1000 / newJobCheckIntervalSeconds)
assert.strictEqual(processCount, timeout / 1000 / pollingIntervalSeconds)
})

it('should honor when a worker is notified', async function () {
Expand All @@ -75,7 +75,7 @@ describe('work', function () {

await boss.send(queue)

const workerId = await boss.work(queue, { newJobCheckIntervalSeconds: 5 }, () => processCount++)
const workerId = await boss.work(queue, { pollingIntervalSeconds: 5 }, () => processCount++)

await delay(500)

Expand Down Expand Up @@ -118,7 +118,7 @@ describe('work', function () {
await boss.send(queue)
await boss.send(queue)

const id = await boss.work(queue, { newJobCheckInterval: 500 }, async () => {
const id = await boss.work(queue, { pollingIntervalSeconds: 0.5 }, async () => {
receivedCount++
await boss.offWork({ id })
})
Expand Down Expand Up @@ -256,7 +256,7 @@ describe('work', function () {
const options = {
teamSize: 4,
teamConcurrency: 2,
newJobCheckInterval: 500,
pollingIntervalSeconds: 0.5,
teamRefill: true
}

Expand Down Expand Up @@ -284,7 +284,7 @@ describe('work', function () {
const options = {
teamSize: 4,
teamConcurrency: 2,
newJobCheckInterval: 500,
pollingIntervalSeconds: 0.5,
teamRefill: true
}

Expand Down
3 changes: 1 addition & 2 deletions types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ declare namespace PgBoss {
type ScheduleOptions = SendOptions & { tz?: string }

interface JobPollingOptions {
newJobCheckInterval?: number;
newJobCheckIntervalSeconds?: number;
pollingIntervalSeconds?: number;
}

interface CommonJobFetchOptions {
Expand Down

0 comments on commit fbdb9ad

Please sign in to comment.